1use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::arrow::schema::primitive::convert_primitive;
22use crate::arrow::{ProjectionMask, PARQUET_FIELD_ID_META_KEY};
23use crate::basic::{ConvertedType, Repetition};
24use crate::errors::ParquetError;
25use crate::errors::Result;
26use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
27use arrow_schema::{DataType, Field, Fields, SchemaBuilder};
28
29fn get_repetition(t: &Type) -> Repetition {
30 let info = t.get_basic_info();
31 match info.has_repetition() {
32 true => info.repetition(),
33 false => Repetition::REQUIRED,
34 }
35}
36
37#[derive(Debug, Clone)]
39pub struct ParquetField {
40 pub rep_level: i16,
43 pub def_level: i16,
47 pub nullable: bool,
49 pub arrow_type: DataType,
54 pub field_type: ParquetFieldType,
56}
57
58impl ParquetField {
59 fn into_list(self, name: &str) -> Self {
63 ParquetField {
64 rep_level: self.rep_level,
65 def_level: self.def_level,
66 nullable: false,
67 arrow_type: DataType::List(Arc::new(Field::new(name, self.arrow_type.clone(), false))),
68 field_type: ParquetFieldType::Group {
69 children: vec![self],
70 },
71 }
72 }
73
74 pub fn children(&self) -> Option<&[Self]> {
76 match &self.field_type {
77 ParquetFieldType::Primitive { .. } => None,
78 ParquetFieldType::Group { children } => Some(children),
79 }
80 }
81}
82
83#[derive(Debug, Clone)]
84pub enum ParquetFieldType {
85 Primitive {
86 col_idx: usize,
88 primitive_type: TypePtr,
90 },
91 Group {
92 children: Vec<ParquetField>,
93 },
94}
95
96struct VisitorContext {
98 rep_level: i16,
99 def_level: i16,
100 data_type: Option<DataType>,
102}
103
104impl VisitorContext {
105 fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
108 match repetition {
109 Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
110 Repetition::REQUIRED => (self.def_level, self.rep_level, false),
111 Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
112 }
113 }
114}
115
116struct Visitor {
122 next_col_idx: usize,
124
125 mask: ProjectionMask,
127}
128
129impl Visitor {
130 fn visit_primitive(
131 &mut self,
132 primitive_type: &TypePtr,
133 context: VisitorContext,
134 ) -> Result<Option<ParquetField>> {
135 let col_idx = self.next_col_idx;
136 self.next_col_idx += 1;
137
138 if !self.mask.leaf_included(col_idx) {
139 return Ok(None);
140 }
141
142 let repetition = get_repetition(primitive_type);
143 let (def_level, rep_level, nullable) = context.levels(repetition);
144
145 let arrow_type = convert_primitive(primitive_type, context.data_type)?;
146
147 let primitive_field = ParquetField {
148 rep_level,
149 def_level,
150 nullable,
151 arrow_type,
152 field_type: ParquetFieldType::Primitive {
153 primitive_type: primitive_type.clone(),
154 col_idx,
155 },
156 };
157
158 Ok(Some(match repetition {
159 Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),
160 _ => primitive_field,
161 }))
162 }
163
164 fn visit_struct(
165 &mut self,
166 struct_type: &TypePtr,
167 context: VisitorContext,
168 ) -> Result<Option<ParquetField>> {
169 let repetition = get_repetition(struct_type);
171 let (def_level, rep_level, nullable) = context.levels(repetition);
172
173 let parquet_fields = struct_type.get_fields();
174
175 let arrow_fields = match &context.data_type {
177 Some(DataType::Struct(fields)) => {
178 if fields.len() != parquet_fields.len() {
179 return Err(arrow_err!(
180 "incompatible arrow schema, expected {} struct fields got {}",
181 parquet_fields.len(),
182 fields.len()
183 ));
184 }
185 Some(fields)
186 }
187 Some(d) => {
188 return Err(arrow_err!(
189 "incompatible arrow schema, expected struct got {}",
190 d
191 ))
192 }
193 None => None,
194 };
195
196 let mut child_fields = SchemaBuilder::with_capacity(parquet_fields.len());
197 let mut children = Vec::with_capacity(parquet_fields.len());
198
199 for (idx, parquet_field) in parquet_fields.iter().enumerate() {
201 let data_type = match arrow_fields {
202 Some(fields) => {
203 let field = &fields[idx];
204 if field.name() != parquet_field.name() {
205 return Err(arrow_err!(
206 "incompatible arrow schema, expected field named {} got {}",
207 parquet_field.name(),
208 field.name()
209 ));
210 }
211 Some(field.data_type().clone())
212 }
213 None => None,
214 };
215
216 let arrow_field = arrow_fields.map(|x| &*x[idx]);
217 let child_ctx = VisitorContext {
218 rep_level,
219 def_level,
220 data_type,
221 };
222
223 if let Some(child) = self.dispatch(parquet_field, child_ctx)? {
224 child_fields.push(convert_field(parquet_field, &child, arrow_field));
227 children.push(child);
228 }
229 }
230
231 if children.is_empty() {
232 return Ok(None);
233 }
234
235 let struct_field = ParquetField {
236 rep_level,
237 def_level,
238 nullable,
239 arrow_type: DataType::Struct(child_fields.finish().fields),
240 field_type: ParquetFieldType::Group { children },
241 };
242
243 Ok(Some(match repetition {
244 Repetition::REPEATED => struct_field.into_list(struct_type.name()),
245 _ => struct_field,
246 }))
247 }
248
249 fn visit_map(
250 &mut self,
251 map_type: &TypePtr,
252 context: VisitorContext,
253 ) -> Result<Option<ParquetField>> {
254 let rep_level = context.rep_level + 1;
255 let (def_level, nullable) = match get_repetition(map_type) {
256 Repetition::REQUIRED => (context.def_level + 1, false),
257 Repetition::OPTIONAL => (context.def_level + 2, true),
258 Repetition::REPEATED => return Err(arrow_err!("Map cannot be repeated")),
259 };
260
261 if map_type.get_fields().len() != 1 {
262 return Err(arrow_err!(
263 "Map field must have exactly one key_value child, found {}",
264 map_type.get_fields().len()
265 ));
266 }
267
268 let map_key_value = &map_type.get_fields()[0];
270 if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
271 return Err(arrow_err!("Child of map field must be repeated"));
272 }
273
274 if map_key_value.get_fields().len() == 1 {
277 return self.visit_list(map_type, context);
278 }
279
280 if map_key_value.get_fields().len() != 2 {
281 return Err(arrow_err!(
282 "Child of map field must have two children, found {}",
283 map_key_value.get_fields().len()
284 ));
285 }
286
287 let map_key = &map_key_value.get_fields()[0];
289 let map_value = &map_key_value.get_fields()[1];
290
291 match map_key.get_basic_info().repetition() {
292 Repetition::REPEATED => {
293 return Err(arrow_err!("Map keys cannot be repeated"));
294 }
295 Repetition::REQUIRED | Repetition::OPTIONAL => {
296 }
301 }
302
303 if map_value.get_basic_info().repetition() == Repetition::REPEATED {
304 return Err(arrow_err!("Map values cannot be repeated"));
305 }
306
307 let (arrow_map, arrow_key, arrow_value, sorted) = match &context.data_type {
309 Some(DataType::Map(field, sorted)) => match field.data_type() {
310 DataType::Struct(fields) => {
311 if fields.len() != 2 {
312 return Err(arrow_err!(
313 "Map data type should contain struct with two children, got {}",
314 fields.len()
315 ));
316 }
317
318 (Some(field), Some(&*fields[0]), Some(&*fields[1]), *sorted)
319 }
320 d => {
321 return Err(arrow_err!("Map data type should contain struct got {}", d));
322 }
323 },
324 Some(d) => {
325 return Err(arrow_err!(
326 "incompatible arrow schema, expected map got {}",
327 d
328 ))
329 }
330 None => (None, None, None, false),
331 };
332
333 let maybe_key = {
334 let context = VisitorContext {
335 rep_level,
336 def_level,
337 data_type: arrow_key.map(|x| x.data_type().clone()),
338 };
339
340 self.dispatch(map_key, context)?
341 };
342
343 let maybe_value = {
344 let context = VisitorContext {
345 rep_level,
346 def_level,
347 data_type: arrow_value.map(|x| x.data_type().clone()),
348 };
349
350 self.dispatch(map_value, context)?
351 };
352
353 match (maybe_key, maybe_value) {
355 (Some(key), Some(value)) => {
356 let key_field = Arc::new(
357 convert_field(map_key, &key, arrow_key)
358 .with_nullable(false),
360 );
361 let value_field = Arc::new(convert_field(map_value, &value, arrow_value));
362 let field_metadata = match arrow_map {
363 Some(field) => field.metadata().clone(),
364 _ => HashMap::default(),
365 };
366
367 let map_field = Field::new_struct(
368 map_key_value.name(),
369 [key_field, value_field],
370 false, )
372 .with_metadata(field_metadata);
373
374 Ok(Some(ParquetField {
375 rep_level,
376 def_level,
377 nullable,
378 arrow_type: DataType::Map(Arc::new(map_field), sorted),
379 field_type: ParquetFieldType::Group {
380 children: vec![key, value],
381 },
382 }))
383 }
384 _ => Ok(None),
385 }
386 }
387
388 fn visit_list(
389 &mut self,
390 list_type: &TypePtr,
391 context: VisitorContext,
392 ) -> Result<Option<ParquetField>> {
393 if list_type.is_primitive() {
394 return Err(arrow_err!(
395 "{:?} is a list type and can't be processed as primitive.",
396 list_type
397 ));
398 }
399
400 let fields = list_type.get_fields();
401 if fields.len() != 1 {
402 return Err(arrow_err!(
403 "list type must have a single child, found {}",
404 fields.len()
405 ));
406 }
407
408 let repeated_field = &fields[0];
409 if get_repetition(repeated_field) != Repetition::REPEATED {
410 return Err(arrow_err!("List child must be repeated"));
411 }
412
413 let (def_level, nullable) = match list_type.get_basic_info().repetition() {
415 Repetition::REQUIRED => (context.def_level, false),
416 Repetition::OPTIONAL => (context.def_level + 1, true),
417 Repetition::REPEATED => return Err(arrow_err!("List type cannot be repeated")),
418 };
419
420 let arrow_field = match &context.data_type {
421 Some(DataType::List(f)) => Some(f.as_ref()),
422 Some(DataType::LargeList(f)) => Some(f.as_ref()),
423 Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
424 Some(d) => {
425 return Err(arrow_err!(
426 "incompatible arrow schema, expected list got {}",
427 d
428 ))
429 }
430 None => None,
431 };
432
433 if repeated_field.is_primitive() {
434 let context = VisitorContext {
441 rep_level: context.rep_level,
442 def_level,
443 data_type: arrow_field.map(|f| f.data_type().clone()),
444 };
445
446 return match self.visit_primitive(repeated_field, context) {
447 Ok(Some(mut field)) => {
448 field.nullable = nullable;
450 Ok(Some(field))
451 }
452 r => r,
453 };
454 }
455
456 let items = repeated_field.get_fields();
458 if items.len() != 1
459 || (!repeated_field.is_list()
460 && !repeated_field.has_single_repeated_child()
461 && (repeated_field.name() == "array"
462 || repeated_field.name() == format!("{}_tuple", list_type.name())))
463 {
464 let context = VisitorContext {
472 rep_level: context.rep_level,
473 def_level,
474 data_type: arrow_field.map(|f| f.data_type().clone()),
475 };
476
477 return match self.visit_struct(repeated_field, context) {
478 Ok(Some(mut field)) => {
479 field.nullable = nullable;
480 Ok(Some(field))
481 }
482 r => r,
483 };
484 }
485
486 let item_type = &items[0];
488 let rep_level = context.rep_level + 1;
489 let def_level = def_level + 1;
490
491 let new_context = VisitorContext {
492 def_level,
493 rep_level,
494 data_type: arrow_field.map(|f| f.data_type().clone()),
495 };
496
497 match self.dispatch(item_type, new_context) {
498 Ok(Some(item)) => {
499 let item_field = Arc::new(convert_field(item_type, &item, arrow_field));
500
501 let arrow_type = match context.data_type {
503 Some(DataType::LargeList(_)) => DataType::LargeList(item_field),
504 Some(DataType::FixedSizeList(_, len)) => {
505 DataType::FixedSizeList(item_field, len)
506 }
507 _ => DataType::List(item_field),
508 };
509
510 Ok(Some(ParquetField {
511 rep_level,
512 def_level,
513 nullable,
514 arrow_type,
515 field_type: ParquetFieldType::Group {
516 children: vec![item],
517 },
518 }))
519 }
520 r => r,
521 }
522 }
523
524 fn dispatch(
525 &mut self,
526 cur_type: &TypePtr,
527 context: VisitorContext,
528 ) -> Result<Option<ParquetField>> {
529 if cur_type.is_primitive() {
530 self.visit_primitive(cur_type, context)
531 } else {
532 match cur_type.get_basic_info().converted_type() {
533 ConvertedType::LIST => self.visit_list(cur_type, context),
534 ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
535 self.visit_map(cur_type, context)
536 }
537 _ => self.visit_struct(cur_type, context),
538 }
539 }
540 }
541}
542
543fn convert_field(parquet_type: &Type, field: &ParquetField, arrow_hint: Option<&Field>) -> Field {
548 let name = parquet_type.name();
549 let data_type = field.arrow_type.clone();
550 let nullable = field.nullable;
551
552 match arrow_hint {
553 Some(hint) => {
554 #[allow(deprecated)]
556 let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) {
557 (DataType::Dictionary(_, _), Some(id), Some(ordered)) =>
558 {
559 #[allow(deprecated)]
560 Field::new_dict(name, data_type, nullable, id, ordered)
561 }
562 _ => Field::new(name, data_type, nullable),
563 };
564
565 field.with_metadata(hint.metadata().clone())
566 }
567 None => {
568 let mut ret = Field::new(name, data_type, nullable);
569 let basic_info = parquet_type.get_basic_info();
570 if basic_info.has_id() {
571 let mut meta = HashMap::with_capacity(1);
572 meta.insert(
573 PARQUET_FIELD_ID_META_KEY.to_string(),
574 basic_info.id().to_string(),
575 );
576 ret.set_metadata(meta);
577 }
578 ret
579 }
580 }
581}
582
583pub fn convert_schema(
589 schema: &SchemaDescriptor,
590 mask: ProjectionMask,
591 embedded_arrow_schema: Option<&Fields>,
592) -> Result<Option<ParquetField>> {
593 let mut visitor = Visitor {
594 next_col_idx: 0,
595 mask,
596 };
597
598 let context = VisitorContext {
599 rep_level: 0,
600 def_level: 0,
601 data_type: embedded_arrow_schema.map(|fields| DataType::Struct(fields.clone())),
602 };
603
604 visitor.dispatch(&schema.root_schema_ptr(), context)
605}
606
607pub fn convert_type(parquet_type: &TypePtr) -> Result<ParquetField> {
609 let mut visitor = Visitor {
610 next_col_idx: 0,
611 mask: ProjectionMask::all(),
612 };
613
614 let context = VisitorContext {
615 rep_level: 0,
616 def_level: 0,
617 data_type: None,
618 };
619
620 Ok(visitor.dispatch(parquet_type, context)?.unwrap())
621}