parquet/arrow/schema/
complex.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::arrow::schema::primitive::convert_primitive;
22use crate::arrow::{ProjectionMask, PARQUET_FIELD_ID_META_KEY};
23use crate::basic::{ConvertedType, Repetition};
24use crate::errors::ParquetError;
25use crate::errors::Result;
26use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
27use arrow_schema::{DataType, Field, Fields, SchemaBuilder};
28
29fn get_repetition(t: &Type) -> Repetition {
30    let info = t.get_basic_info();
31    match info.has_repetition() {
32        true => info.repetition(),
33        false => Repetition::REQUIRED,
34    }
35}
36
37/// Representation of a parquet schema element, in terms of arrow schema elements
38#[derive(Debug, Clone)]
39pub struct ParquetField {
40    /// The level which represents an insertion into the current list
41    /// i.e. guaranteed to be > 0 for an element of list type
42    pub rep_level: i16,
43    /// The level at which this field is fully defined,
44    /// i.e. guaranteed to be > 0 for a nullable type or child of a
45    /// nullable type
46    pub def_level: i16,
47    /// Whether this field is nullable
48    pub nullable: bool,
49    /// The arrow type of the column data
50    ///
51    /// Note: In certain cases the data stored in parquet may have been coerced
52    /// to a different type and will require conversion on read (e.g. Date64 and Interval)
53    pub arrow_type: DataType,
54    /// The type of this field
55    pub field_type: ParquetFieldType,
56}
57
58impl ParquetField {
59    /// Converts `self` into an arrow list, with its current type as the field type
60    ///
61    /// This is used to convert repeated columns, into their arrow representation
62    fn into_list(self, name: &str) -> Self {
63        ParquetField {
64            rep_level: self.rep_level,
65            def_level: self.def_level,
66            nullable: false,
67            arrow_type: DataType::List(Arc::new(Field::new(name, self.arrow_type.clone(), false))),
68            field_type: ParquetFieldType::Group {
69                children: vec![self],
70            },
71        }
72    }
73
74    /// Returns a list of [`ParquetField`] children if this is a group type
75    pub fn children(&self) -> Option<&[Self]> {
76        match &self.field_type {
77            ParquetFieldType::Primitive { .. } => None,
78            ParquetFieldType::Group { children } => Some(children),
79        }
80    }
81}
82
83#[derive(Debug, Clone)]
84pub enum ParquetFieldType {
85    Primitive {
86        /// The index of the column in parquet
87        col_idx: usize,
88        /// The type of the column in parquet
89        primitive_type: TypePtr,
90    },
91    Group {
92        children: Vec<ParquetField>,
93    },
94}
95
96/// Encodes the context of the parent of the field currently under consideration
97struct VisitorContext {
98    rep_level: i16,
99    def_level: i16,
100    /// An optional [`DataType`] sourced from the embedded arrow schema
101    data_type: Option<DataType>,
102}
103
104impl VisitorContext {
105    /// Compute the resulting definition level, repetition level and nullability
106    /// for a child field with the given [`Repetition`]
107    fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
108        match repetition {
109            Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
110            Repetition::REQUIRED => (self.def_level, self.rep_level, false),
111            Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
112        }
113    }
114}
115
116/// Walks the parquet schema in a depth-first fashion in order to map it to arrow data structures
117///
118/// See [Logical Types] for more information on the conversion algorithm
119///
120/// [Logical Types]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
121struct Visitor {
122    /// The column index of the next leaf column
123    next_col_idx: usize,
124
125    /// Mask of columns to include
126    mask: ProjectionMask,
127}
128
129impl Visitor {
130    fn visit_primitive(
131        &mut self,
132        primitive_type: &TypePtr,
133        context: VisitorContext,
134    ) -> Result<Option<ParquetField>> {
135        let col_idx = self.next_col_idx;
136        self.next_col_idx += 1;
137
138        if !self.mask.leaf_included(col_idx) {
139            return Ok(None);
140        }
141
142        let repetition = get_repetition(primitive_type);
143        let (def_level, rep_level, nullable) = context.levels(repetition);
144
145        let arrow_type = convert_primitive(primitive_type, context.data_type)?;
146
147        let primitive_field = ParquetField {
148            rep_level,
149            def_level,
150            nullable,
151            arrow_type,
152            field_type: ParquetFieldType::Primitive {
153                primitive_type: primitive_type.clone(),
154                col_idx,
155            },
156        };
157
158        Ok(Some(match repetition {
159            Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),
160            _ => primitive_field,
161        }))
162    }
163
164    fn visit_struct(
165        &mut self,
166        struct_type: &TypePtr,
167        context: VisitorContext,
168    ) -> Result<Option<ParquetField>> {
169        // The root type will not have a repetition level
170        let repetition = get_repetition(struct_type);
171        let (def_level, rep_level, nullable) = context.levels(repetition);
172
173        let parquet_fields = struct_type.get_fields();
174
175        // Extract the arrow fields
176        let arrow_fields = match &context.data_type {
177            Some(DataType::Struct(fields)) => {
178                if fields.len() != parquet_fields.len() {
179                    return Err(arrow_err!(
180                        "incompatible arrow schema, expected {} struct fields got {}",
181                        parquet_fields.len(),
182                        fields.len()
183                    ));
184                }
185                Some(fields)
186            }
187            Some(d) => {
188                return Err(arrow_err!(
189                    "incompatible arrow schema, expected struct got {}",
190                    d
191                ))
192            }
193            None => None,
194        };
195
196        let mut child_fields = SchemaBuilder::with_capacity(parquet_fields.len());
197        let mut children = Vec::with_capacity(parquet_fields.len());
198
199        // Perform a DFS of children
200        for (idx, parquet_field) in parquet_fields.iter().enumerate() {
201            let data_type = match arrow_fields {
202                Some(fields) => {
203                    let field = &fields[idx];
204                    if field.name() != parquet_field.name() {
205                        return Err(arrow_err!(
206                            "incompatible arrow schema, expected field named {} got {}",
207                            parquet_field.name(),
208                            field.name()
209                        ));
210                    }
211                    Some(field.data_type().clone())
212                }
213                None => None,
214            };
215
216            let arrow_field = arrow_fields.map(|x| &*x[idx]);
217            let child_ctx = VisitorContext {
218                rep_level,
219                def_level,
220                data_type,
221            };
222
223            if let Some(child) = self.dispatch(parquet_field, child_ctx)? {
224                // The child type returned may be different from what is encoded in the arrow
225                // schema in the event of a mismatch or a projection
226                child_fields.push(convert_field(parquet_field, &child, arrow_field));
227                children.push(child);
228            }
229        }
230
231        if children.is_empty() {
232            return Ok(None);
233        }
234
235        let struct_field = ParquetField {
236            rep_level,
237            def_level,
238            nullable,
239            arrow_type: DataType::Struct(child_fields.finish().fields),
240            field_type: ParquetFieldType::Group { children },
241        };
242
243        Ok(Some(match repetition {
244            Repetition::REPEATED => struct_field.into_list(struct_type.name()),
245            _ => struct_field,
246        }))
247    }
248
249    fn visit_map(
250        &mut self,
251        map_type: &TypePtr,
252        context: VisitorContext,
253    ) -> Result<Option<ParquetField>> {
254        let rep_level = context.rep_level + 1;
255        let (def_level, nullable) = match get_repetition(map_type) {
256            Repetition::REQUIRED => (context.def_level + 1, false),
257            Repetition::OPTIONAL => (context.def_level + 2, true),
258            Repetition::REPEATED => return Err(arrow_err!("Map cannot be repeated")),
259        };
260
261        if map_type.get_fields().len() != 1 {
262            return Err(arrow_err!(
263                "Map field must have exactly one key_value child, found {}",
264                map_type.get_fields().len()
265            ));
266        }
267
268        // Add map entry (key_value) to context
269        let map_key_value = &map_type.get_fields()[0];
270        if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
271            return Err(arrow_err!("Child of map field must be repeated"));
272        }
273
274        // According to the specification the values are optional (#1642).
275        // In this case, return the keys as a list.
276        if map_key_value.get_fields().len() == 1 {
277            return self.visit_list(map_type, context);
278        }
279
280        if map_key_value.get_fields().len() != 2 {
281            return Err(arrow_err!(
282                "Child of map field must have two children, found {}",
283                map_key_value.get_fields().len()
284            ));
285        }
286
287        // Get key and value, and create context for each
288        let map_key = &map_key_value.get_fields()[0];
289        let map_value = &map_key_value.get_fields()[1];
290
291        match map_key.get_basic_info().repetition() {
292            Repetition::REPEATED => {
293                return Err(arrow_err!("Map keys cannot be repeated"));
294            }
295            Repetition::REQUIRED | Repetition::OPTIONAL => {
296                // Relaxed check for having repetition REQUIRED as there exists
297                // parquet writers and files that do not conform to this standard.
298                // This allows us to consume a broader range of existing files even
299                // if they are out of spec.
300            }
301        }
302
303        if map_value.get_basic_info().repetition() == Repetition::REPEATED {
304            return Err(arrow_err!("Map values cannot be repeated"));
305        }
306
307        // Extract the arrow fields
308        let (arrow_map, arrow_key, arrow_value, sorted) = match &context.data_type {
309            Some(DataType::Map(field, sorted)) => match field.data_type() {
310                DataType::Struct(fields) => {
311                    if fields.len() != 2 {
312                        return Err(arrow_err!(
313                            "Map data type should contain struct with two children, got {}",
314                            fields.len()
315                        ));
316                    }
317
318                    (Some(field), Some(&*fields[0]), Some(&*fields[1]), *sorted)
319                }
320                d => {
321                    return Err(arrow_err!("Map data type should contain struct got {}", d));
322                }
323            },
324            Some(d) => {
325                return Err(arrow_err!(
326                    "incompatible arrow schema, expected map got {}",
327                    d
328                ))
329            }
330            None => (None, None, None, false),
331        };
332
333        let maybe_key = {
334            let context = VisitorContext {
335                rep_level,
336                def_level,
337                data_type: arrow_key.map(|x| x.data_type().clone()),
338            };
339
340            self.dispatch(map_key, context)?
341        };
342
343        let maybe_value = {
344            let context = VisitorContext {
345                rep_level,
346                def_level,
347                data_type: arrow_value.map(|x| x.data_type().clone()),
348            };
349
350            self.dispatch(map_value, context)?
351        };
352
353        // Need both columns to be projected
354        match (maybe_key, maybe_value) {
355            (Some(key), Some(value)) => {
356                let key_field = Arc::new(
357                    convert_field(map_key, &key, arrow_key)
358                        // The key is always non-nullable (#5630)
359                        .with_nullable(false),
360                );
361                let value_field = Arc::new(convert_field(map_value, &value, arrow_value));
362                let field_metadata = match arrow_map {
363                    Some(field) => field.metadata().clone(),
364                    _ => HashMap::default(),
365                };
366
367                let map_field = Field::new_struct(
368                    map_key_value.name(),
369                    [key_field, value_field],
370                    false, // The inner map field is always non-nullable (#1697)
371                )
372                .with_metadata(field_metadata);
373
374                Ok(Some(ParquetField {
375                    rep_level,
376                    def_level,
377                    nullable,
378                    arrow_type: DataType::Map(Arc::new(map_field), sorted),
379                    field_type: ParquetFieldType::Group {
380                        children: vec![key, value],
381                    },
382                }))
383            }
384            _ => Ok(None),
385        }
386    }
387
388    fn visit_list(
389        &mut self,
390        list_type: &TypePtr,
391        context: VisitorContext,
392    ) -> Result<Option<ParquetField>> {
393        if list_type.is_primitive() {
394            return Err(arrow_err!(
395                "{:?} is a list type and can't be processed as primitive.",
396                list_type
397            ));
398        }
399
400        let fields = list_type.get_fields();
401        if fields.len() != 1 {
402            return Err(arrow_err!(
403                "list type must have a single child, found {}",
404                fields.len()
405            ));
406        }
407
408        let repeated_field = &fields[0];
409        if get_repetition(repeated_field) != Repetition::REPEATED {
410            return Err(arrow_err!("List child must be repeated"));
411        }
412
413        // If the list is nullable
414        let (def_level, nullable) = match list_type.get_basic_info().repetition() {
415            Repetition::REQUIRED => (context.def_level, false),
416            Repetition::OPTIONAL => (context.def_level + 1, true),
417            Repetition::REPEATED => return Err(arrow_err!("List type cannot be repeated")),
418        };
419
420        let arrow_field = match &context.data_type {
421            Some(DataType::List(f)) => Some(f.as_ref()),
422            Some(DataType::LargeList(f)) => Some(f.as_ref()),
423            Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
424            Some(d) => {
425                return Err(arrow_err!(
426                    "incompatible arrow schema, expected list got {}",
427                    d
428                ))
429            }
430            None => None,
431        };
432
433        if repeated_field.is_primitive() {
434            // If the repeated field is not a group, then its type is the element type and elements are required.
435            //
436            // required/optional group my_list (LIST) {
437            //   repeated int32 element;
438            // }
439            //
440            let context = VisitorContext {
441                rep_level: context.rep_level,
442                def_level,
443                data_type: arrow_field.map(|f| f.data_type().clone()),
444            };
445
446            return match self.visit_primitive(repeated_field, context) {
447                Ok(Some(mut field)) => {
448                    // visit_primitive will infer a non-nullable list, update if necessary
449                    field.nullable = nullable;
450                    Ok(Some(field))
451                }
452                r => r,
453            };
454        }
455
456        // test to see if the repeated field is a struct or one-tuple
457        let items = repeated_field.get_fields();
458        if items.len() != 1
459            || (!repeated_field.is_list()
460                && !repeated_field.has_single_repeated_child()
461                && (repeated_field.name() == "array"
462                    || repeated_field.name() == format!("{}_tuple", list_type.name())))
463        {
464            // If the repeated field is a group with multiple fields, then its type is the element
465            // type and elements are required.
466            //
467            // If the repeated field is a group with one field and is named either array or uses
468            // the LIST-annotated group's name with _tuple appended then the repeated type is the
469            // element type and elements are required. But this rule only applies if the
470            // repeated field is not annotated, and the single child field is not `repeated`.
471            let context = VisitorContext {
472                rep_level: context.rep_level,
473                def_level,
474                data_type: arrow_field.map(|f| f.data_type().clone()),
475            };
476
477            return match self.visit_struct(repeated_field, context) {
478                Ok(Some(mut field)) => {
479                    field.nullable = nullable;
480                    Ok(Some(field))
481                }
482                r => r,
483            };
484        }
485
486        // Regular list handling logic
487        let item_type = &items[0];
488        let rep_level = context.rep_level + 1;
489        let def_level = def_level + 1;
490
491        let new_context = VisitorContext {
492            def_level,
493            rep_level,
494            data_type: arrow_field.map(|f| f.data_type().clone()),
495        };
496
497        match self.dispatch(item_type, new_context) {
498            Ok(Some(item)) => {
499                let item_field = Arc::new(convert_field(item_type, &item, arrow_field));
500
501                // Use arrow type as hint for index size
502                let arrow_type = match context.data_type {
503                    Some(DataType::LargeList(_)) => DataType::LargeList(item_field),
504                    Some(DataType::FixedSizeList(_, len)) => {
505                        DataType::FixedSizeList(item_field, len)
506                    }
507                    _ => DataType::List(item_field),
508                };
509
510                Ok(Some(ParquetField {
511                    rep_level,
512                    def_level,
513                    nullable,
514                    arrow_type,
515                    field_type: ParquetFieldType::Group {
516                        children: vec![item],
517                    },
518                }))
519            }
520            r => r,
521        }
522    }
523
524    fn dispatch(
525        &mut self,
526        cur_type: &TypePtr,
527        context: VisitorContext,
528    ) -> Result<Option<ParquetField>> {
529        if cur_type.is_primitive() {
530            self.visit_primitive(cur_type, context)
531        } else {
532            match cur_type.get_basic_info().converted_type() {
533                ConvertedType::LIST => self.visit_list(cur_type, context),
534                ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
535                    self.visit_map(cur_type, context)
536                }
537                _ => self.visit_struct(cur_type, context),
538            }
539        }
540    }
541}
542
543/// Computes the [`Field`] for a child column
544///
545/// The resulting [`Field`] will have the type dictated by `field`, a name
546/// dictated by the `parquet_type`, and any metadata from `arrow_hint`
547fn convert_field(parquet_type: &Type, field: &ParquetField, arrow_hint: Option<&Field>) -> Field {
548    let name = parquet_type.name();
549    let data_type = field.arrow_type.clone();
550    let nullable = field.nullable;
551
552    match arrow_hint {
553        Some(hint) => {
554            // If the inferred type is a dictionary, preserve dictionary metadata
555            #[allow(deprecated)]
556            let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) {
557                (DataType::Dictionary(_, _), Some(id), Some(ordered)) =>
558                {
559                    #[allow(deprecated)]
560                    Field::new_dict(name, data_type, nullable, id, ordered)
561                }
562                _ => Field::new(name, data_type, nullable),
563            };
564
565            field.with_metadata(hint.metadata().clone())
566        }
567        None => {
568            let mut ret = Field::new(name, data_type, nullable);
569            let basic_info = parquet_type.get_basic_info();
570            if basic_info.has_id() {
571                let mut meta = HashMap::with_capacity(1);
572                meta.insert(
573                    PARQUET_FIELD_ID_META_KEY.to_string(),
574                    basic_info.id().to_string(),
575                );
576                ret.set_metadata(meta);
577            }
578            ret
579        }
580    }
581}
582
583/// Computes the [`ParquetField`] for the provided [`SchemaDescriptor`] with `leaf_columns` listing
584/// the indexes of leaf columns to project, and `embedded_arrow_schema` the optional
585/// [`Fields`] embedded in the parquet metadata
586///
587/// Note: This does not support out of order column projection
588pub fn convert_schema(
589    schema: &SchemaDescriptor,
590    mask: ProjectionMask,
591    embedded_arrow_schema: Option<&Fields>,
592) -> Result<Option<ParquetField>> {
593    let mut visitor = Visitor {
594        next_col_idx: 0,
595        mask,
596    };
597
598    let context = VisitorContext {
599        rep_level: 0,
600        def_level: 0,
601        data_type: embedded_arrow_schema.map(|fields| DataType::Struct(fields.clone())),
602    };
603
604    visitor.dispatch(&schema.root_schema_ptr(), context)
605}
606
607/// Computes the [`ParquetField`] for the provided `parquet_type`
608pub fn convert_type(parquet_type: &TypePtr) -> Result<ParquetField> {
609    let mut visitor = Visitor {
610        next_col_idx: 0,
611        mask: ProjectionMask::all(),
612    };
613
614    let context = VisitorContext {
615        rep_level: 0,
616        def_level: 0,
617        data_type: None,
618    };
619
620    Ok(visitor.dispatch(parquet_type, context)?.unwrap())
621}