1use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::arrow::schema::extension::try_add_extension_type;
22use crate::arrow::schema::primitive::convert_primitive;
23use crate::arrow::{PARQUET_FIELD_ID_META_KEY, ProjectionMask};
24use crate::basic::{ConvertedType, Repetition};
25use crate::errors::ParquetError;
26use crate::errors::Result;
27use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
28use arrow_schema::{DataType, Field, Fields, SchemaBuilder};
29
30fn get_repetition(t: &Type) -> Repetition {
31 let info = t.get_basic_info();
32 match info.has_repetition() {
33 true => info.repetition(),
34 false => Repetition::REQUIRED,
35 }
36}
37
38#[derive(Debug, Clone)]
40pub struct ParquetField {
41 pub rep_level: i16,
44 pub def_level: i16,
48 pub nullable: bool,
50 pub arrow_type: DataType,
55 pub field_type: ParquetFieldType,
57}
58
59impl ParquetField {
60 fn into_list(self, name: &str) -> Self {
64 ParquetField {
65 rep_level: self.rep_level,
66 def_level: self.def_level,
67 nullable: false,
68 arrow_type: DataType::List(Arc::new(Field::new(name, self.arrow_type.clone(), false))),
69 field_type: ParquetFieldType::Group {
70 children: vec![self],
71 },
72 }
73 }
74
75 pub fn children(&self) -> Option<&[Self]> {
77 match &self.field_type {
78 ParquetFieldType::Primitive { .. } => None,
79 ParquetFieldType::Group { children } => Some(children),
80 }
81 }
82}
83
84#[derive(Debug, Clone)]
85pub enum ParquetFieldType {
86 Primitive {
87 col_idx: usize,
89 primitive_type: TypePtr,
91 },
92 Group {
93 children: Vec<ParquetField>,
94 },
95}
96
97struct VisitorContext {
99 rep_level: i16,
100 def_level: i16,
101 data_type: Option<DataType>,
103}
104
105impl VisitorContext {
106 fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
109 match repetition {
110 Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
111 Repetition::REQUIRED => (self.def_level, self.rep_level, false),
112 Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
113 }
114 }
115}
116
117struct Visitor {
123 next_col_idx: usize,
125
126 mask: ProjectionMask,
128}
129
130impl Visitor {
131 fn visit_primitive(
132 &mut self,
133 primitive_type: &TypePtr,
134 context: VisitorContext,
135 ) -> Result<Option<ParquetField>> {
136 let col_idx = self.next_col_idx;
137 self.next_col_idx += 1;
138
139 if !self.mask.leaf_included(col_idx) {
140 return Ok(None);
141 }
142
143 let repetition = get_repetition(primitive_type);
144 let (def_level, rep_level, nullable) = context.levels(repetition);
145
146 let arrow_type = convert_primitive(primitive_type, context.data_type)?;
147
148 let primitive_field = ParquetField {
149 rep_level,
150 def_level,
151 nullable,
152 arrow_type,
153 field_type: ParquetFieldType::Primitive {
154 primitive_type: primitive_type.clone(),
155 col_idx,
156 },
157 };
158
159 Ok(Some(match repetition {
160 Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),
161 _ => primitive_field,
162 }))
163 }
164
165 fn visit_struct(
166 &mut self,
167 struct_type: &TypePtr,
168 context: VisitorContext,
169 ) -> Result<Option<ParquetField>> {
170 let repetition = get_repetition(struct_type);
172 let (def_level, rep_level, nullable) = context.levels(repetition);
173
174 let parquet_fields = struct_type.get_fields();
175
176 let arrow_fields = match &context.data_type {
178 Some(DataType::Struct(fields)) => {
179 if fields.len() != parquet_fields.len() {
180 return Err(arrow_err!(
181 "incompatible arrow schema, expected {} struct fields got {}",
182 parquet_fields.len(),
183 fields.len()
184 ));
185 }
186 Some(fields)
187 }
188 Some(d) => {
189 return Err(arrow_err!(
190 "incompatible arrow schema, expected struct got {}",
191 d
192 ));
193 }
194 None => None,
195 };
196
197 let mut child_fields = SchemaBuilder::with_capacity(parquet_fields.len());
198 let mut children = Vec::with_capacity(parquet_fields.len());
199
200 for (idx, parquet_field) in parquet_fields.iter().enumerate() {
202 let data_type = match arrow_fields {
203 Some(fields) => {
204 let field = &fields[idx];
205 if field.name() != parquet_field.name() {
206 return Err(arrow_err!(
207 "incompatible arrow schema, expected field named {} got {}",
208 parquet_field.name(),
209 field.name()
210 ));
211 }
212 Some(field.data_type().clone())
213 }
214 None => None,
215 };
216
217 let arrow_field = arrow_fields.map(|x| &*x[idx]);
218 let child_ctx = VisitorContext {
219 rep_level,
220 def_level,
221 data_type,
222 };
223
224 if let Some(mut child) = self.dispatch(parquet_field, child_ctx)? {
225 child_fields.push(convert_field(parquet_field, &mut child, arrow_field)?);
228 children.push(child);
229 }
230 }
231
232 if children.is_empty() {
233 return Ok(None);
234 }
235
236 let struct_field = ParquetField {
237 rep_level,
238 def_level,
239 nullable,
240 arrow_type: DataType::Struct(child_fields.finish().fields),
241 field_type: ParquetFieldType::Group { children },
242 };
243
244 Ok(Some(match repetition {
245 Repetition::REPEATED => struct_field.into_list(struct_type.name()),
246 _ => struct_field,
247 }))
248 }
249
250 fn visit_map(
251 &mut self,
252 map_type: &TypePtr,
253 context: VisitorContext,
254 ) -> Result<Option<ParquetField>> {
255 let rep_level = context.rep_level + 1;
256 let (def_level, nullable) = match get_repetition(map_type) {
257 Repetition::REQUIRED => (context.def_level + 1, false),
258 Repetition::OPTIONAL => (context.def_level + 2, true),
259 Repetition::REPEATED => return Err(arrow_err!("Map cannot be repeated")),
260 };
261
262 if map_type.get_fields().len() != 1 {
263 return Err(arrow_err!(
264 "Map field must have exactly one key_value child, found {}",
265 map_type.get_fields().len()
266 ));
267 }
268
269 let map_key_value = &map_type.get_fields()[0];
271 if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
272 return Err(arrow_err!("Child of map field must be repeated"));
273 }
274
275 if map_key_value.get_fields().len() == 1 {
278 return self.visit_list(map_type, context);
279 }
280
281 if map_key_value.get_fields().len() != 2 {
282 return Err(arrow_err!(
283 "Child of map field must have two children, found {}",
284 map_key_value.get_fields().len()
285 ));
286 }
287
288 let map_key = &map_key_value.get_fields()[0];
290 let map_value = &map_key_value.get_fields()[1];
291
292 match map_key.get_basic_info().repetition() {
293 Repetition::REPEATED => {
294 return Err(arrow_err!("Map keys cannot be repeated"));
295 }
296 Repetition::REQUIRED | Repetition::OPTIONAL => {
297 }
302 }
303
304 if map_value.get_basic_info().repetition() == Repetition::REPEATED {
305 return Err(arrow_err!("Map values cannot be repeated"));
306 }
307
308 let (arrow_map, arrow_key, arrow_value, sorted) = match &context.data_type {
310 Some(DataType::Map(field, sorted)) => match field.data_type() {
311 DataType::Struct(fields) => {
312 if fields.len() != 2 {
313 return Err(arrow_err!(
314 "Map data type should contain struct with two children, got {}",
315 fields.len()
316 ));
317 }
318
319 (Some(field), Some(&*fields[0]), Some(&*fields[1]), *sorted)
320 }
321 d => {
322 return Err(arrow_err!("Map data type should contain struct got {}", d));
323 }
324 },
325 Some(d) => {
326 return Err(arrow_err!(
327 "incompatible arrow schema, expected map got {}",
328 d
329 ));
330 }
331 None => (None, None, None, false),
332 };
333
334 let maybe_key = {
335 let context = VisitorContext {
336 rep_level,
337 def_level,
338 data_type: arrow_key.map(|x| x.data_type().clone()),
339 };
340
341 self.dispatch(map_key, context)?
342 };
343
344 let maybe_value = {
345 let context = VisitorContext {
346 rep_level,
347 def_level,
348 data_type: arrow_value.map(|x| x.data_type().clone()),
349 };
350
351 self.dispatch(map_value, context)?
352 };
353
354 match (maybe_key, maybe_value) {
356 (Some(mut key), Some(mut value)) => {
357 let key_field = Arc::new(
358 convert_field(map_key, &mut key, arrow_key)?
359 .with_nullable(false),
361 );
362 let value_field = Arc::new(convert_field(map_value, &mut value, arrow_value)?);
363 let field_metadata = match arrow_map {
364 Some(field) => field.metadata().clone(),
365 _ => HashMap::default(),
366 };
367
368 let map_field = Field::new_struct(
369 map_key_value.name(),
370 [key_field, value_field],
371 false, )
373 .with_metadata(field_metadata);
374
375 Ok(Some(ParquetField {
376 rep_level,
377 def_level,
378 nullable,
379 arrow_type: DataType::Map(Arc::new(map_field), sorted),
380 field_type: ParquetFieldType::Group {
381 children: vec![key, value],
382 },
383 }))
384 }
385 _ => Ok(None),
386 }
387 }
388
389 fn visit_list(
390 &mut self,
391 list_type: &TypePtr,
392 context: VisitorContext,
393 ) -> Result<Option<ParquetField>> {
394 if list_type.is_primitive() {
395 return Err(arrow_err!(
396 "{:?} is a list type and can't be processed as primitive.",
397 list_type
398 ));
399 }
400
401 let fields = list_type.get_fields();
402 if fields.len() != 1 {
403 return Err(arrow_err!(
404 "list type must have a single child, found {}",
405 fields.len()
406 ));
407 }
408
409 let repeated_field = &fields[0];
410 if get_repetition(repeated_field) != Repetition::REPEATED {
411 return Err(arrow_err!("List child must be repeated"));
412 }
413
414 let (def_level, nullable) = match list_type.get_basic_info().repetition() {
416 Repetition::REQUIRED => (context.def_level, false),
417 Repetition::OPTIONAL => (context.def_level + 1, true),
418 Repetition::REPEATED => return Err(arrow_err!("List type cannot be repeated")),
419 };
420
421 let arrow_field = match &context.data_type {
422 Some(DataType::List(f)) => Some(f.as_ref()),
423 Some(DataType::LargeList(f)) => Some(f.as_ref()),
424 Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
425 Some(d) => {
426 return Err(arrow_err!(
427 "incompatible arrow schema, expected list got {}",
428 d
429 ));
430 }
431 None => None,
432 };
433
434 if repeated_field.is_primitive() {
435 let context = VisitorContext {
442 rep_level: context.rep_level,
443 def_level,
444 data_type: arrow_field.map(|f| f.data_type().clone()),
445 };
446
447 return match self.visit_primitive(repeated_field, context) {
448 Ok(Some(mut field)) => {
449 field.nullable = nullable;
451 Ok(Some(field))
452 }
453 r => r,
454 };
455 }
456
457 let items = repeated_field.get_fields();
459 if items.len() != 1
460 || (!repeated_field.is_list()
461 && !repeated_field.has_single_repeated_child()
462 && (repeated_field.name() == "array"
463 || repeated_field.name() == format!("{}_tuple", list_type.name())))
464 {
465 let context = VisitorContext {
473 rep_level: context.rep_level,
474 def_level,
475 data_type: arrow_field.map(|f| f.data_type().clone()),
476 };
477
478 return match self.visit_struct(repeated_field, context) {
479 Ok(Some(mut field)) => {
480 field.nullable = nullable;
481 Ok(Some(field))
482 }
483 r => r,
484 };
485 }
486
487 let item_type = &items[0];
489 let rep_level = context.rep_level + 1;
490 let def_level = def_level + 1;
491
492 let new_context = VisitorContext {
493 def_level,
494 rep_level,
495 data_type: arrow_field.map(|f| f.data_type().clone()),
496 };
497
498 match self.dispatch(item_type, new_context) {
499 Ok(Some(mut item)) => {
500 let item_field = Arc::new(convert_field(item_type, &mut item, arrow_field)?);
501
502 let arrow_type = match context.data_type {
504 Some(DataType::LargeList(_)) => DataType::LargeList(item_field),
505 Some(DataType::FixedSizeList(_, len)) => {
506 DataType::FixedSizeList(item_field, len)
507 }
508 _ => DataType::List(item_field),
509 };
510
511 Ok(Some(ParquetField {
512 rep_level,
513 def_level,
514 nullable,
515 arrow_type,
516 field_type: ParquetFieldType::Group {
517 children: vec![item],
518 },
519 }))
520 }
521 r => r,
522 }
523 }
524
525 fn dispatch(
526 &mut self,
527 cur_type: &TypePtr,
528 context: VisitorContext,
529 ) -> Result<Option<ParquetField>> {
530 if cur_type.is_primitive() {
531 self.visit_primitive(cur_type, context)
532 } else {
533 match cur_type.get_basic_info().converted_type() {
534 ConvertedType::LIST => self.visit_list(cur_type, context),
535 ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
536 self.visit_map(cur_type, context)
537 }
538 _ => self.visit_struct(cur_type, context),
539 }
540 }
541 }
542}
543
544fn convert_field(
549 parquet_type: &Type,
550 field: &mut ParquetField,
551 arrow_hint: Option<&Field>,
552) -> Result<Field, ParquetError> {
553 let name = parquet_type.name();
554 let data_type = field.arrow_type.clone();
555 let nullable = field.nullable;
556
557 match arrow_hint {
558 Some(hint) => {
559 #[allow(deprecated)]
561 let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) {
562 (DataType::Dictionary(_, _), Some(id), Some(ordered)) =>
563 {
564 #[allow(deprecated)]
565 Field::new_dict(name, data_type, nullable, id, ordered)
566 }
567 _ => Field::new(name, data_type, nullable),
568 };
569
570 Ok(field.with_metadata(hint.metadata().clone()))
571 }
572 None => {
573 let mut ret = Field::new(name, data_type, nullable);
574 let basic_info = parquet_type.get_basic_info();
575 if basic_info.has_id() {
576 let mut meta = HashMap::with_capacity(1);
577 meta.insert(
578 PARQUET_FIELD_ID_META_KEY.to_string(),
579 basic_info.id().to_string(),
580 );
581 ret.set_metadata(meta);
582 }
583 try_add_extension_type(ret, parquet_type)
584 }
585 }
586}
587
588pub fn convert_schema(
594 schema: &SchemaDescriptor,
595 mask: ProjectionMask,
596 embedded_arrow_schema: Option<&Fields>,
597) -> Result<Option<ParquetField>> {
598 let mut visitor = Visitor {
599 next_col_idx: 0,
600 mask,
601 };
602
603 let context = VisitorContext {
604 rep_level: 0,
605 def_level: 0,
606 data_type: embedded_arrow_schema.map(|fields| DataType::Struct(fields.clone())),
607 };
608
609 visitor.dispatch(&schema.root_schema_ptr(), context)
610}
611
612pub fn convert_type(parquet_type: &TypePtr) -> Result<ParquetField> {
614 let mut visitor = Visitor {
615 next_col_idx: 0,
616 mask: ProjectionMask::all(),
617 };
618
619 let context = VisitorContext {
620 rep_level: 0,
621 def_level: 0,
622 data_type: None,
623 };
624
625 Ok(visitor.dispatch(parquet_type, context)?.unwrap())
626}