parquet/arrow/schema/
complex.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::arrow::schema::extension::try_add_extension_type;
22use crate::arrow::schema::primitive::convert_primitive;
23use crate::arrow::{PARQUET_FIELD_ID_META_KEY, ProjectionMask};
24use crate::basic::{ConvertedType, Repetition};
25use crate::errors::ParquetError;
26use crate::errors::Result;
27use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
28use arrow_schema::{DataType, Field, Fields, SchemaBuilder};
29
30fn get_repetition(t: &Type) -> Repetition {
31    let info = t.get_basic_info();
32    match info.has_repetition() {
33        true => info.repetition(),
34        false => Repetition::REQUIRED,
35    }
36}
37
38/// Representation of a parquet schema element, in terms of arrow schema elements
39#[derive(Debug, Clone)]
40pub struct ParquetField {
41    /// The level which represents an insertion into the current list
42    /// i.e. guaranteed to be > 0 for an element of list type
43    pub rep_level: i16,
44    /// The level at which this field is fully defined,
45    /// i.e. guaranteed to be > 0 for a nullable type or child of a
46    /// nullable type
47    pub def_level: i16,
48    /// Whether this field is nullable
49    pub nullable: bool,
50    /// The arrow type of the column data
51    ///
52    /// Note: In certain cases the data stored in parquet may have been coerced
53    /// to a different type and will require conversion on read (e.g. Date64 and Interval)
54    pub arrow_type: DataType,
55    /// The type of this field
56    pub field_type: ParquetFieldType,
57}
58
59impl ParquetField {
60    /// Converts `self` into an arrow list, with its current type as the field type
61    ///
62    /// This is used to convert repeated columns, into their arrow representation
63    fn into_list(self, name: &str) -> Self {
64        ParquetField {
65            rep_level: self.rep_level,
66            def_level: self.def_level,
67            nullable: false,
68            arrow_type: DataType::List(Arc::new(Field::new(name, self.arrow_type.clone(), false))),
69            field_type: ParquetFieldType::Group {
70                children: vec![self],
71            },
72        }
73    }
74
75    /// Returns a list of [`ParquetField`] children if this is a group type
76    pub fn children(&self) -> Option<&[Self]> {
77        match &self.field_type {
78            ParquetFieldType::Primitive { .. } => None,
79            ParquetFieldType::Group { children } => Some(children),
80        }
81    }
82}
83
84#[derive(Debug, Clone)]
85pub enum ParquetFieldType {
86    Primitive {
87        /// The index of the column in parquet
88        col_idx: usize,
89        /// The type of the column in parquet
90        primitive_type: TypePtr,
91    },
92    Group {
93        children: Vec<ParquetField>,
94    },
95}
96
97/// Encodes the context of the parent of the field currently under consideration
98struct VisitorContext {
99    rep_level: i16,
100    def_level: i16,
101    /// An optional [`DataType`] sourced from the embedded arrow schema
102    data_type: Option<DataType>,
103}
104
105impl VisitorContext {
106    /// Compute the resulting definition level, repetition level and nullability
107    /// for a child field with the given [`Repetition`]
108    fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
109        match repetition {
110            Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
111            Repetition::REQUIRED => (self.def_level, self.rep_level, false),
112            Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
113        }
114    }
115}
116
117/// Walks the parquet schema in a depth-first fashion in order to map it to arrow data structures
118///
119/// See [Logical Types] for more information on the conversion algorithm
120///
121/// [Logical Types]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
122struct Visitor {
123    /// The column index of the next leaf column
124    next_col_idx: usize,
125
126    /// Mask of columns to include
127    mask: ProjectionMask,
128}
129
130impl Visitor {
131    fn visit_primitive(
132        &mut self,
133        primitive_type: &TypePtr,
134        context: VisitorContext,
135    ) -> Result<Option<ParquetField>> {
136        let col_idx = self.next_col_idx;
137        self.next_col_idx += 1;
138
139        if !self.mask.leaf_included(col_idx) {
140            return Ok(None);
141        }
142
143        let repetition = get_repetition(primitive_type);
144        let (def_level, rep_level, nullable) = context.levels(repetition);
145
146        let arrow_type = convert_primitive(primitive_type, context.data_type)?;
147
148        let primitive_field = ParquetField {
149            rep_level,
150            def_level,
151            nullable,
152            arrow_type,
153            field_type: ParquetFieldType::Primitive {
154                primitive_type: primitive_type.clone(),
155                col_idx,
156            },
157        };
158
159        Ok(Some(match repetition {
160            Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),
161            _ => primitive_field,
162        }))
163    }
164
165    fn visit_struct(
166        &mut self,
167        struct_type: &TypePtr,
168        context: VisitorContext,
169    ) -> Result<Option<ParquetField>> {
170        // The root type will not have a repetition level
171        let repetition = get_repetition(struct_type);
172        let (def_level, rep_level, nullable) = context.levels(repetition);
173
174        let parquet_fields = struct_type.get_fields();
175
176        // Extract any arrow fields from the hints
177        let arrow_fields = match &context.data_type {
178            Some(DataType::Struct(fields)) => {
179                if fields.len() != parquet_fields.len() {
180                    return Err(arrow_err!(
181                        "incompatible arrow schema, expected {} struct fields got {}",
182                        parquet_fields.len(),
183                        fields.len()
184                    ));
185                }
186                Some(fields)
187            }
188            Some(d) => {
189                return Err(arrow_err!(
190                    "incompatible arrow schema, expected struct got {}",
191                    d
192                ));
193            }
194            None => None,
195        };
196
197        let mut child_fields = SchemaBuilder::with_capacity(parquet_fields.len());
198        let mut children = Vec::with_capacity(parquet_fields.len());
199
200        // Perform a DFS of children
201        for (idx, parquet_field) in parquet_fields.iter().enumerate() {
202            let data_type = match arrow_fields {
203                Some(fields) => {
204                    let field = &fields[idx];
205                    if field.name() != parquet_field.name() {
206                        return Err(arrow_err!(
207                            "incompatible arrow schema, expected field named {} got {}",
208                            parquet_field.name(),
209                            field.name()
210                        ));
211                    }
212                    Some(field.data_type().clone())
213                }
214                None => None,
215            };
216
217            let arrow_field = arrow_fields.map(|x| &*x[idx]);
218            let child_ctx = VisitorContext {
219                rep_level,
220                def_level,
221                data_type,
222            };
223
224            if let Some(mut child) = self.dispatch(parquet_field, child_ctx)? {
225                // The child type returned may be different from what is encoded in the arrow
226                // schema in the event of a mismatch or a projection
227                child_fields.push(convert_field(parquet_field, &mut child, arrow_field)?);
228                children.push(child);
229            }
230        }
231
232        if children.is_empty() {
233            return Ok(None);
234        }
235
236        let struct_field = ParquetField {
237            rep_level,
238            def_level,
239            nullable,
240            arrow_type: DataType::Struct(child_fields.finish().fields),
241            field_type: ParquetFieldType::Group { children },
242        };
243
244        Ok(Some(match repetition {
245            Repetition::REPEATED => struct_field.into_list(struct_type.name()),
246            _ => struct_field,
247        }))
248    }
249
250    fn visit_map(
251        &mut self,
252        map_type: &TypePtr,
253        context: VisitorContext,
254    ) -> Result<Option<ParquetField>> {
255        let rep_level = context.rep_level + 1;
256        let (def_level, nullable) = match get_repetition(map_type) {
257            Repetition::REQUIRED => (context.def_level + 1, false),
258            Repetition::OPTIONAL => (context.def_level + 2, true),
259            Repetition::REPEATED => return Err(arrow_err!("Map cannot be repeated")),
260        };
261
262        if map_type.get_fields().len() != 1 {
263            return Err(arrow_err!(
264                "Map field must have exactly one key_value child, found {}",
265                map_type.get_fields().len()
266            ));
267        }
268
269        // Add map entry (key_value) to context
270        let map_key_value = &map_type.get_fields()[0];
271        if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
272            return Err(arrow_err!("Child of map field must be repeated"));
273        }
274
275        // According to the specification the values are optional (#1642).
276        // In this case, return the keys as a list.
277        if map_key_value.get_fields().len() == 1 {
278            return self.visit_list(map_type, context);
279        }
280
281        if map_key_value.get_fields().len() != 2 {
282            return Err(arrow_err!(
283                "Child of map field must have two children, found {}",
284                map_key_value.get_fields().len()
285            ));
286        }
287
288        // Get key and value, and create context for each
289        let map_key = &map_key_value.get_fields()[0];
290        let map_value = &map_key_value.get_fields()[1];
291
292        match map_key.get_basic_info().repetition() {
293            Repetition::REPEATED => {
294                return Err(arrow_err!("Map keys cannot be repeated"));
295            }
296            Repetition::REQUIRED | Repetition::OPTIONAL => {
297                // Relaxed check for having repetition REQUIRED as there exists
298                // parquet writers and files that do not conform to this standard.
299                // This allows us to consume a broader range of existing files even
300                // if they are out of spec.
301            }
302        }
303
304        if map_value.get_basic_info().repetition() == Repetition::REPEATED {
305            return Err(arrow_err!("Map values cannot be repeated"));
306        }
307
308        // Extract the arrow fields
309        let (arrow_map, arrow_key, arrow_value, sorted) = match &context.data_type {
310            Some(DataType::Map(field, sorted)) => match field.data_type() {
311                DataType::Struct(fields) => {
312                    if fields.len() != 2 {
313                        return Err(arrow_err!(
314                            "Map data type should contain struct with two children, got {}",
315                            fields.len()
316                        ));
317                    }
318
319                    (Some(field), Some(&*fields[0]), Some(&*fields[1]), *sorted)
320                }
321                d => {
322                    return Err(arrow_err!("Map data type should contain struct got {}", d));
323                }
324            },
325            Some(d) => {
326                return Err(arrow_err!(
327                    "incompatible arrow schema, expected map got {}",
328                    d
329                ));
330            }
331            None => (None, None, None, false),
332        };
333
334        let maybe_key = {
335            let context = VisitorContext {
336                rep_level,
337                def_level,
338                data_type: arrow_key.map(|x| x.data_type().clone()),
339            };
340
341            self.dispatch(map_key, context)?
342        };
343
344        let maybe_value = {
345            let context = VisitorContext {
346                rep_level,
347                def_level,
348                data_type: arrow_value.map(|x| x.data_type().clone()),
349            };
350
351            self.dispatch(map_value, context)?
352        };
353
354        // Need both columns to be projected
355        match (maybe_key, maybe_value) {
356            (Some(mut key), Some(mut value)) => {
357                let key_field = Arc::new(
358                    convert_field(map_key, &mut key, arrow_key)?
359                        // The key is always non-nullable (#5630)
360                        .with_nullable(false),
361                );
362                let value_field = Arc::new(convert_field(map_value, &mut value, arrow_value)?);
363                let field_metadata = match arrow_map {
364                    Some(field) => field.metadata().clone(),
365                    _ => HashMap::default(),
366                };
367
368                let map_field = Field::new_struct(
369                    map_key_value.name(),
370                    [key_field, value_field],
371                    false, // The inner map field is always non-nullable (#1697)
372                )
373                .with_metadata(field_metadata);
374
375                Ok(Some(ParquetField {
376                    rep_level,
377                    def_level,
378                    nullable,
379                    arrow_type: DataType::Map(Arc::new(map_field), sorted),
380                    field_type: ParquetFieldType::Group {
381                        children: vec![key, value],
382                    },
383                }))
384            }
385            _ => Ok(None),
386        }
387    }
388
389    fn visit_list(
390        &mut self,
391        list_type: &TypePtr,
392        context: VisitorContext,
393    ) -> Result<Option<ParquetField>> {
394        if list_type.is_primitive() {
395            return Err(arrow_err!(
396                "{:?} is a list type and can't be processed as primitive.",
397                list_type
398            ));
399        }
400
401        let fields = list_type.get_fields();
402        if fields.len() != 1 {
403            return Err(arrow_err!(
404                "list type must have a single child, found {}",
405                fields.len()
406            ));
407        }
408
409        let repeated_field = &fields[0];
410        if get_repetition(repeated_field) != Repetition::REPEATED {
411            return Err(arrow_err!("List child must be repeated"));
412        }
413
414        // If the list is nullable
415        let (def_level, nullable) = match list_type.get_basic_info().repetition() {
416            Repetition::REQUIRED => (context.def_level, false),
417            Repetition::OPTIONAL => (context.def_level + 1, true),
418            Repetition::REPEATED => return Err(arrow_err!("List type cannot be repeated")),
419        };
420
421        let arrow_field = match &context.data_type {
422            Some(DataType::List(f)) => Some(f.as_ref()),
423            Some(DataType::LargeList(f)) => Some(f.as_ref()),
424            Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
425            Some(d) => {
426                return Err(arrow_err!(
427                    "incompatible arrow schema, expected list got {}",
428                    d
429                ));
430            }
431            None => None,
432        };
433
434        if repeated_field.is_primitive() {
435            // If the repeated field is not a group, then its type is the element type and elements are required.
436            //
437            // required/optional group my_list (LIST) {
438            //   repeated int32 element;
439            // }
440            //
441            let context = VisitorContext {
442                rep_level: context.rep_level,
443                def_level,
444                data_type: arrow_field.map(|f| f.data_type().clone()),
445            };
446
447            return match self.visit_primitive(repeated_field, context) {
448                Ok(Some(mut field)) => {
449                    // visit_primitive will infer a non-nullable list, update if necessary
450                    field.nullable = nullable;
451                    Ok(Some(field))
452                }
453                r => r,
454            };
455        }
456
457        // test to see if the repeated field is a struct or one-tuple
458        let items = repeated_field.get_fields();
459        if items.len() != 1
460            || (!repeated_field.is_list()
461                && !repeated_field.has_single_repeated_child()
462                && (repeated_field.name() == "array"
463                    || repeated_field.name() == format!("{}_tuple", list_type.name())))
464        {
465            // If the repeated field is a group with multiple fields, then its type is the element
466            // type and elements are required.
467            //
468            // If the repeated field is a group with one field and is named either array or uses
469            // the LIST-annotated group's name with _tuple appended then the repeated type is the
470            // element type and elements are required. But this rule only applies if the
471            // repeated field is not annotated, and the single child field is not `repeated`.
472            let context = VisitorContext {
473                rep_level: context.rep_level,
474                def_level,
475                data_type: arrow_field.map(|f| f.data_type().clone()),
476            };
477
478            return match self.visit_struct(repeated_field, context) {
479                Ok(Some(mut field)) => {
480                    field.nullable = nullable;
481                    Ok(Some(field))
482                }
483                r => r,
484            };
485        }
486
487        // Regular list handling logic
488        let item_type = &items[0];
489        let rep_level = context.rep_level + 1;
490        let def_level = def_level + 1;
491
492        let new_context = VisitorContext {
493            def_level,
494            rep_level,
495            data_type: arrow_field.map(|f| f.data_type().clone()),
496        };
497
498        match self.dispatch(item_type, new_context) {
499            Ok(Some(mut item)) => {
500                let item_field = Arc::new(convert_field(item_type, &mut item, arrow_field)?);
501
502                // Use arrow type as hint for index size
503                let arrow_type = match context.data_type {
504                    Some(DataType::LargeList(_)) => DataType::LargeList(item_field),
505                    Some(DataType::FixedSizeList(_, len)) => {
506                        DataType::FixedSizeList(item_field, len)
507                    }
508                    _ => DataType::List(item_field),
509                };
510
511                Ok(Some(ParquetField {
512                    rep_level,
513                    def_level,
514                    nullable,
515                    arrow_type,
516                    field_type: ParquetFieldType::Group {
517                        children: vec![item],
518                    },
519                }))
520            }
521            r => r,
522        }
523    }
524
525    fn dispatch(
526        &mut self,
527        cur_type: &TypePtr,
528        context: VisitorContext,
529    ) -> Result<Option<ParquetField>> {
530        if cur_type.is_primitive() {
531            self.visit_primitive(cur_type, context)
532        } else {
533            match cur_type.get_basic_info().converted_type() {
534                ConvertedType::LIST => self.visit_list(cur_type, context),
535                ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
536                    self.visit_map(cur_type, context)
537                }
538                _ => self.visit_struct(cur_type, context),
539            }
540        }
541    }
542}
543
544/// Computes the Arrow [`Field`] for a child column
545///
546/// The resulting Arrow [`Field`] will have the type dictated by the Parquet `field`, a name
547/// dictated by the `parquet_type`, and any metadata from `arrow_hint`
548fn convert_field(
549    parquet_type: &Type,
550    field: &mut ParquetField,
551    arrow_hint: Option<&Field>,
552) -> Result<Field, ParquetError> {
553    let name = parquet_type.name();
554    let data_type = field.arrow_type.clone();
555    let nullable = field.nullable;
556
557    match arrow_hint {
558        Some(hint) => {
559            // If the inferred type is a dictionary, preserve dictionary metadata
560            #[allow(deprecated)]
561            let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) {
562                (DataType::Dictionary(_, _), Some(id), Some(ordered)) =>
563                {
564                    #[allow(deprecated)]
565                    Field::new_dict(name, data_type, nullable, id, ordered)
566                }
567                _ => Field::new(name, data_type, nullable),
568            };
569
570            Ok(field.with_metadata(hint.metadata().clone()))
571        }
572        None => {
573            let mut ret = Field::new(name, data_type, nullable);
574            let basic_info = parquet_type.get_basic_info();
575            if basic_info.has_id() {
576                let mut meta = HashMap::with_capacity(1);
577                meta.insert(
578                    PARQUET_FIELD_ID_META_KEY.to_string(),
579                    basic_info.id().to_string(),
580                );
581                ret.set_metadata(meta);
582            }
583            try_add_extension_type(ret, parquet_type)
584        }
585    }
586}
587
588/// Computes the [`ParquetField`] for the provided [`SchemaDescriptor`] with `leaf_columns` listing
589/// the indexes of leaf columns to project, and `embedded_arrow_schema` the optional
590/// [`Fields`] embedded in the parquet metadata
591///
592/// Note: This does not support out of order column projection
593pub fn convert_schema(
594    schema: &SchemaDescriptor,
595    mask: ProjectionMask,
596    embedded_arrow_schema: Option<&Fields>,
597) -> Result<Option<ParquetField>> {
598    let mut visitor = Visitor {
599        next_col_idx: 0,
600        mask,
601    };
602
603    let context = VisitorContext {
604        rep_level: 0,
605        def_level: 0,
606        data_type: embedded_arrow_schema.map(|fields| DataType::Struct(fields.clone())),
607    };
608
609    visitor.dispatch(&schema.root_schema_ptr(), context)
610}
611
612/// Computes the [`ParquetField`] for the provided `parquet_type`
613pub fn convert_type(parquet_type: &TypePtr) -> Result<ParquetField> {
614    let mut visitor = Visitor {
615        next_col_idx: 0,
616        mask: ProjectionMask::all(),
617    };
618
619    let context = VisitorContext {
620        rep_level: 0,
621        def_level: 0,
622        data_type: None,
623    };
624
625    Ok(visitor.dispatch(parquet_type, context)?.unwrap())
626}