parquet/arrow/schema/
complex.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::arrow::schema::extension::try_add_extension_type;
22use crate::arrow::schema::primitive::convert_primitive;
23use crate::arrow::schema::virtual_type::RowNumber;
24use crate::arrow::{PARQUET_FIELD_ID_META_KEY, ProjectionMask};
25use crate::basic::{ConvertedType, Repetition};
26use crate::errors::ParquetError;
27use crate::errors::Result;
28use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
29use arrow_schema::{DataType, Field, Fields, SchemaBuilder, extension::ExtensionType};
30
31fn get_repetition(t: &Type) -> Repetition {
32    let info = t.get_basic_info();
33    match info.has_repetition() {
34        true => info.repetition(),
35        false => Repetition::REQUIRED,
36    }
37}
38
39/// Representation of a parquet schema element, in terms of arrow schema elements
40#[derive(Debug, Clone)]
41pub struct ParquetField {
42    /// The level which represents an insertion into the current list
43    /// i.e. guaranteed to be > 0 for an element of list type
44    pub rep_level: i16,
45    /// The level at which this field is fully defined,
46    /// i.e. guaranteed to be > 0 for a nullable type or child of a
47    /// nullable type
48    pub def_level: i16,
49    /// Whether this field is nullable
50    pub nullable: bool,
51    /// The arrow type of the column data
52    ///
53    /// Note: In certain cases the data stored in parquet may have been coerced
54    /// to a different type and will require conversion on read (e.g. Date64 and Interval)
55    pub arrow_type: DataType,
56    /// The type of this field
57    pub field_type: ParquetFieldType,
58}
59
60impl ParquetField {
61    /// Converts `self` into an arrow list, with its current type as the field type
62    ///
63    /// This is used to convert repeated columns, into their arrow representation
64    fn into_list(self, name: &str) -> Self {
65        ParquetField {
66            rep_level: self.rep_level,
67            def_level: self.def_level,
68            nullable: false,
69            arrow_type: DataType::List(Arc::new(Field::new(name, self.arrow_type.clone(), false))),
70            field_type: ParquetFieldType::Group {
71                children: vec![self],
72            },
73        }
74    }
75
76    /// Returns a list of [`ParquetField`] children if this is a group type
77    pub fn children(&self) -> Option<&[Self]> {
78        match &self.field_type {
79            ParquetFieldType::Primitive { .. } => None,
80            ParquetFieldType::Group { children } => Some(children),
81            ParquetFieldType::Virtual(_) => None,
82        }
83    }
84}
85
86/// Types of virtual columns that can be computed at read time
87#[derive(Debug, Clone, Copy, PartialEq)]
88pub enum VirtualColumnType {
89    /// Row number within the file
90    RowNumber,
91}
92
93#[derive(Debug, Clone)]
94pub enum ParquetFieldType {
95    Primitive {
96        /// The index of the column in parquet
97        col_idx: usize,
98        /// The type of the column in parquet
99        primitive_type: TypePtr,
100    },
101    Group {
102        children: Vec<ParquetField>,
103    },
104    /// Virtual column that doesn't exist in the parquet file
105    /// but is computed at read time (e.g., row_number)
106    Virtual(VirtualColumnType),
107}
108
109/// Encodes the context of the parent of the field currently under consideration
110struct VisitorContext {
111    rep_level: i16,
112    def_level: i16,
113    /// An optional [`DataType`] sourced from the embedded arrow schema
114    data_type: Option<DataType>,
115}
116
117impl VisitorContext {
118    /// Compute the resulting definition level, repetition level and nullability
119    /// for a child field with the given [`Repetition`]
120    fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
121        match repetition {
122            Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
123            Repetition::REQUIRED => (self.def_level, self.rep_level, false),
124            Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
125        }
126    }
127}
128
129/// Walks the parquet schema in a depth-first fashion in order to map it to arrow data structures
130///
131/// See [Logical Types] for more information on the conversion algorithm
132///
133/// [Logical Types]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
134struct Visitor {
135    /// The column index of the next leaf column
136    next_col_idx: usize,
137
138    /// Mask of columns to include
139    mask: ProjectionMask,
140}
141
142impl Visitor {
143    fn visit_primitive(
144        &mut self,
145        primitive_type: &TypePtr,
146        context: VisitorContext,
147    ) -> Result<Option<ParquetField>> {
148        let col_idx = self.next_col_idx;
149        self.next_col_idx += 1;
150
151        if !self.mask.leaf_included(col_idx) {
152            return Ok(None);
153        }
154
155        let repetition = get_repetition(primitive_type);
156        let (def_level, rep_level, nullable) = context.levels(repetition);
157
158        let arrow_type = convert_primitive(primitive_type, context.data_type)?;
159
160        let primitive_field = ParquetField {
161            rep_level,
162            def_level,
163            nullable,
164            arrow_type,
165            field_type: ParquetFieldType::Primitive {
166                primitive_type: primitive_type.clone(),
167                col_idx,
168            },
169        };
170
171        Ok(Some(match repetition {
172            Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),
173            _ => primitive_field,
174        }))
175    }
176
177    fn visit_struct(
178        &mut self,
179        struct_type: &TypePtr,
180        context: VisitorContext,
181    ) -> Result<Option<ParquetField>> {
182        // The root type will not have a repetition level
183        let repetition = get_repetition(struct_type);
184        let (def_level, rep_level, nullable) = context.levels(repetition);
185
186        let parquet_fields = struct_type.get_fields();
187
188        // Extract any arrow fields from the hints
189        let arrow_fields = match &context.data_type {
190            Some(DataType::Struct(fields)) => {
191                if fields.len() != parquet_fields.len() {
192                    return Err(arrow_err!(
193                        "incompatible arrow schema, expected {} struct fields got {}",
194                        parquet_fields.len(),
195                        fields.len()
196                    ));
197                }
198                Some(fields)
199            }
200            Some(d) => {
201                return Err(arrow_err!(
202                    "incompatible arrow schema, expected struct got {}",
203                    d
204                ));
205            }
206            None => None,
207        };
208
209        let mut child_fields = SchemaBuilder::with_capacity(parquet_fields.len());
210        let mut children = Vec::with_capacity(parquet_fields.len());
211
212        // Perform a DFS of children
213        for (idx, parquet_field) in parquet_fields.iter().enumerate() {
214            let data_type = match arrow_fields {
215                Some(fields) => {
216                    let field = &fields[idx];
217                    if field.name() != parquet_field.name() {
218                        return Err(arrow_err!(
219                            "incompatible arrow schema, expected field named {} got {}",
220                            parquet_field.name(),
221                            field.name()
222                        ));
223                    }
224                    Some(field.data_type().clone())
225                }
226                None => None,
227            };
228
229            let arrow_field = arrow_fields.map(|x| &*x[idx]);
230            let child_ctx = VisitorContext {
231                rep_level,
232                def_level,
233                data_type,
234            };
235
236            if let Some(mut child) = self.dispatch(parquet_field, child_ctx)? {
237                // The child type returned may be different from what is encoded in the arrow
238                // schema in the event of a mismatch or a projection
239                child_fields.push(convert_field(parquet_field, &mut child, arrow_field)?);
240                children.push(child);
241            }
242        }
243
244        if children.is_empty() {
245            return Ok(None);
246        }
247
248        let struct_field = ParquetField {
249            rep_level,
250            def_level,
251            nullable,
252            arrow_type: DataType::Struct(child_fields.finish().fields),
253            field_type: ParquetFieldType::Group { children },
254        };
255
256        Ok(Some(match repetition {
257            Repetition::REPEATED => struct_field.into_list(struct_type.name()),
258            _ => struct_field,
259        }))
260    }
261
262    fn visit_map(
263        &mut self,
264        map_type: &TypePtr,
265        context: VisitorContext,
266    ) -> Result<Option<ParquetField>> {
267        let rep_level = context.rep_level + 1;
268        let (def_level, nullable) = match get_repetition(map_type) {
269            Repetition::REQUIRED => (context.def_level + 1, false),
270            Repetition::OPTIONAL => (context.def_level + 2, true),
271            Repetition::REPEATED => return Err(arrow_err!("Map cannot be repeated")),
272        };
273
274        if map_type.get_fields().len() != 1 {
275            return Err(arrow_err!(
276                "Map field must have exactly one key_value child, found {}",
277                map_type.get_fields().len()
278            ));
279        }
280
281        // Add map entry (key_value) to context
282        let map_key_value = &map_type.get_fields()[0];
283        if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
284            return Err(arrow_err!("Child of map field must be repeated"));
285        }
286
287        // According to the specification the values are optional (#1642).
288        // In this case, return the keys as a list.
289        if map_key_value.get_fields().len() == 1 {
290            return self.visit_list(map_type, context);
291        }
292
293        if map_key_value.get_fields().len() != 2 {
294            return Err(arrow_err!(
295                "Child of map field must have two children, found {}",
296                map_key_value.get_fields().len()
297            ));
298        }
299
300        // Get key and value, and create context for each
301        let map_key = &map_key_value.get_fields()[0];
302        let map_value = &map_key_value.get_fields()[1];
303
304        match map_key.get_basic_info().repetition() {
305            Repetition::REPEATED => {
306                return Err(arrow_err!("Map keys cannot be repeated"));
307            }
308            Repetition::REQUIRED | Repetition::OPTIONAL => {
309                // Relaxed check for having repetition REQUIRED as there exists
310                // parquet writers and files that do not conform to this standard.
311                // This allows us to consume a broader range of existing files even
312                // if they are out of spec.
313            }
314        }
315
316        if map_value.get_basic_info().repetition() == Repetition::REPEATED {
317            return Err(arrow_err!("Map values cannot be repeated"));
318        }
319
320        // Extract the arrow fields
321        let (arrow_map, arrow_key, arrow_value, sorted) = match &context.data_type {
322            Some(DataType::Map(field, sorted)) => match field.data_type() {
323                DataType::Struct(fields) => {
324                    if fields.len() != 2 {
325                        return Err(arrow_err!(
326                            "Map data type should contain struct with two children, got {}",
327                            fields.len()
328                        ));
329                    }
330
331                    (Some(field), Some(&*fields[0]), Some(&*fields[1]), *sorted)
332                }
333                d => {
334                    return Err(arrow_err!("Map data type should contain struct got {}", d));
335                }
336            },
337            Some(d) => {
338                return Err(arrow_err!(
339                    "incompatible arrow schema, expected map got {}",
340                    d
341                ));
342            }
343            None => (None, None, None, false),
344        };
345
346        let maybe_key = {
347            let context = VisitorContext {
348                rep_level,
349                def_level,
350                data_type: arrow_key.map(|x| x.data_type().clone()),
351            };
352
353            self.dispatch(map_key, context)?
354        };
355
356        let maybe_value = {
357            let context = VisitorContext {
358                rep_level,
359                def_level,
360                data_type: arrow_value.map(|x| x.data_type().clone()),
361            };
362
363            self.dispatch(map_value, context)?
364        };
365
366        // Need both columns to be projected
367        match (maybe_key, maybe_value) {
368            (Some(mut key), Some(mut value)) => {
369                let key_field = Arc::new(
370                    convert_field(map_key, &mut key, arrow_key)?
371                        // The key is always non-nullable (#5630)
372                        .with_nullable(false),
373                );
374                let value_field = Arc::new(convert_field(map_value, &mut value, arrow_value)?);
375                let field_metadata = match arrow_map {
376                    Some(field) => field.metadata().clone(),
377                    _ => HashMap::default(),
378                };
379
380                let map_field = Field::new_struct(
381                    map_key_value.name(),
382                    [key_field, value_field],
383                    false, // The inner map field is always non-nullable (#1697)
384                )
385                .with_metadata(field_metadata);
386
387                Ok(Some(ParquetField {
388                    rep_level,
389                    def_level,
390                    nullable,
391                    arrow_type: DataType::Map(Arc::new(map_field), sorted),
392                    field_type: ParquetFieldType::Group {
393                        children: vec![key, value],
394                    },
395                }))
396            }
397            _ => Ok(None),
398        }
399    }
400
401    fn visit_list(
402        &mut self,
403        list_type: &TypePtr,
404        context: VisitorContext,
405    ) -> Result<Option<ParquetField>> {
406        if list_type.is_primitive() {
407            return Err(arrow_err!(
408                "{:?} is a list type and can't be processed as primitive.",
409                list_type
410            ));
411        }
412
413        let fields = list_type.get_fields();
414        if fields.len() != 1 {
415            return Err(arrow_err!(
416                "list type must have a single child, found {}",
417                fields.len()
418            ));
419        }
420
421        let repeated_field = &fields[0];
422        if get_repetition(repeated_field) != Repetition::REPEATED {
423            return Err(arrow_err!("List child must be repeated"));
424        }
425
426        // If the list is nullable
427        let (def_level, nullable) = match list_type.get_basic_info().repetition() {
428            Repetition::REQUIRED => (context.def_level, false),
429            Repetition::OPTIONAL => (context.def_level + 1, true),
430            Repetition::REPEATED => return Err(arrow_err!("List type cannot be repeated")),
431        };
432
433        let arrow_field = match &context.data_type {
434            Some(DataType::List(f)) => Some(f.as_ref()),
435            Some(DataType::LargeList(f)) => Some(f.as_ref()),
436            Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
437            Some(d) => {
438                return Err(arrow_err!(
439                    "incompatible arrow schema, expected list got {}",
440                    d
441                ));
442            }
443            None => None,
444        };
445
446        if repeated_field.is_primitive() {
447            // If the repeated field is not a group, then its type is the element type and elements are required.
448            //
449            // required/optional group my_list (LIST) {
450            //   repeated int32 element;
451            // }
452            //
453            let context = VisitorContext {
454                rep_level: context.rep_level,
455                def_level,
456                data_type: arrow_field.map(|f| f.data_type().clone()),
457            };
458
459            return match self.visit_primitive(repeated_field, context) {
460                Ok(Some(mut field)) => {
461                    // visit_primitive will infer a non-nullable list, update if necessary
462                    field.nullable = nullable;
463                    Ok(Some(field))
464                }
465                r => r,
466            };
467        }
468
469        // test to see if the repeated field is a struct or one-tuple
470        let items = repeated_field.get_fields();
471        if items.len() != 1
472            || (!repeated_field.is_list()
473                && !repeated_field.has_single_repeated_child()
474                && (repeated_field.name() == "array"
475                    || repeated_field.name() == format!("{}_tuple", list_type.name())))
476        {
477            // If the repeated field is a group with multiple fields, then its type is the element
478            // type and elements are required.
479            //
480            // If the repeated field is a group with one field and is named either array or uses
481            // the LIST-annotated group's name with _tuple appended then the repeated type is the
482            // element type and elements are required. But this rule only applies if the
483            // repeated field is not annotated, and the single child field is not `repeated`.
484            let context = VisitorContext {
485                rep_level: context.rep_level,
486                def_level,
487                data_type: arrow_field.map(|f| f.data_type().clone()),
488            };
489
490            return match self.visit_struct(repeated_field, context) {
491                Ok(Some(mut field)) => {
492                    field.nullable = nullable;
493                    Ok(Some(field))
494                }
495                r => r,
496            };
497        }
498
499        // Regular list handling logic
500        let item_type = &items[0];
501        let rep_level = context.rep_level + 1;
502        let def_level = def_level + 1;
503
504        let new_context = VisitorContext {
505            def_level,
506            rep_level,
507            data_type: arrow_field.map(|f| f.data_type().clone()),
508        };
509
510        match self.dispatch(item_type, new_context) {
511            Ok(Some(mut item)) => {
512                let item_field = Arc::new(convert_field(item_type, &mut item, arrow_field)?);
513
514                // Use arrow type as hint for index size
515                let arrow_type = match context.data_type {
516                    Some(DataType::LargeList(_)) => DataType::LargeList(item_field),
517                    Some(DataType::FixedSizeList(_, len)) => {
518                        DataType::FixedSizeList(item_field, len)
519                    }
520                    _ => DataType::List(item_field),
521                };
522
523                Ok(Some(ParquetField {
524                    rep_level,
525                    def_level,
526                    nullable,
527                    arrow_type,
528                    field_type: ParquetFieldType::Group {
529                        children: vec![item],
530                    },
531                }))
532            }
533            r => r,
534        }
535    }
536
537    fn dispatch(
538        &mut self,
539        cur_type: &TypePtr,
540        context: VisitorContext,
541    ) -> Result<Option<ParquetField>> {
542        if cur_type.is_primitive() {
543            self.visit_primitive(cur_type, context)
544        } else {
545            match cur_type.get_basic_info().converted_type() {
546                ConvertedType::LIST => self.visit_list(cur_type, context),
547                ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
548                    self.visit_map(cur_type, context)
549                }
550                _ => self.visit_struct(cur_type, context),
551            }
552        }
553    }
554}
555
556/// Converts a virtual Arrow [`Field`] to a [`ParquetField`]
557///
558/// Virtual fields don't correspond to any data in the parquet file,
559/// but are computed at read time (e.g., row_number)
560///
561/// The levels are computed based on the parent context:
562/// - If nullable: def_level = parent_def_level + 1
563/// - If required: def_level = parent_def_level
564/// - rep_level = parent_rep_level (virtual fields are not repeated)
565pub(super) fn convert_virtual_field(
566    arrow_field: &Field,
567    parent_rep_level: i16,
568    parent_def_level: i16,
569) -> Result<ParquetField> {
570    let nullable = arrow_field.is_nullable();
571    let def_level = if nullable {
572        parent_def_level + 1
573    } else {
574        parent_def_level
575    };
576
577    // Determine the virtual column type based on the extension type name
578    let extension_name = arrow_field.extension_type_name().ok_or_else(|| {
579        ParquetError::ArrowError(format!(
580            "virtual column field '{}' must have an extension type",
581            arrow_field.name()
582        ))
583    })?;
584
585    let virtual_type = match extension_name {
586        RowNumber::NAME => VirtualColumnType::RowNumber,
587        _ => {
588            return Err(ParquetError::ArrowError(format!(
589                "unsupported virtual column type '{}' for field '{}'",
590                extension_name,
591                arrow_field.name()
592            )));
593        }
594    };
595
596    Ok(ParquetField {
597        rep_level: parent_rep_level,
598        def_level,
599        nullable,
600        arrow_type: arrow_field.data_type().clone(),
601        field_type: ParquetFieldType::Virtual(virtual_type),
602    })
603}
604
605/// Computes the Arrow [`Field`] for a child column
606///
607/// The resulting Arrow [`Field`] will have the type dictated by the Parquet `field`, a name
608/// dictated by the `parquet_type`, and any metadata from `arrow_hint`
609fn convert_field(
610    parquet_type: &Type,
611    field: &mut ParquetField,
612    arrow_hint: Option<&Field>,
613) -> Result<Field, ParquetError> {
614    let name = parquet_type.name();
615    let data_type = field.arrow_type.clone();
616    let nullable = field.nullable;
617
618    match arrow_hint {
619        Some(hint) => {
620            // If the inferred type is a dictionary, preserve dictionary metadata
621            #[allow(deprecated)]
622            let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) {
623                (DataType::Dictionary(_, _), Some(id), Some(ordered)) =>
624                {
625                    #[allow(deprecated)]
626                    Field::new_dict(name, data_type, nullable, id, ordered)
627                }
628                _ => Field::new(name, data_type, nullable),
629            };
630
631            Ok(field.with_metadata(hint.metadata().clone()))
632        }
633        None => {
634            let mut ret = Field::new(name, data_type, nullable);
635            let basic_info = parquet_type.get_basic_info();
636            if basic_info.has_id() {
637                let mut meta = HashMap::with_capacity(1);
638                meta.insert(
639                    PARQUET_FIELD_ID_META_KEY.to_string(),
640                    basic_info.id().to_string(),
641                );
642                ret.set_metadata(meta);
643            }
644            try_add_extension_type(ret, parquet_type)
645        }
646    }
647}
648
649/// Computes the [`ParquetField`] for the provided [`SchemaDescriptor`] with `leaf_columns` listing
650/// the indexes of leaf columns to project, and `embedded_arrow_schema` the optional
651/// [`Fields`] embedded in the parquet metadata
652///
653/// Note: This does not support out of order column projection
654pub fn convert_schema(
655    schema: &SchemaDescriptor,
656    mask: ProjectionMask,
657    embedded_arrow_schema: Option<&Fields>,
658) -> Result<Option<ParquetField>> {
659    let mut visitor = Visitor {
660        next_col_idx: 0,
661        mask,
662    };
663
664    let context = VisitorContext {
665        rep_level: 0,
666        def_level: 0,
667        data_type: embedded_arrow_schema.map(|fields| DataType::Struct(fields.clone())),
668    };
669
670    visitor.dispatch(&schema.root_schema_ptr(), context)
671}
672
673/// Computes the [`ParquetField`] for the provided `parquet_type`
674pub fn convert_type(parquet_type: &TypePtr) -> Result<ParquetField> {
675    let mut visitor = Visitor {
676        next_col_idx: 0,
677        mask: ProjectionMask::all(),
678    };
679
680    let context = VisitorContext {
681        rep_level: 0,
682        def_level: 0,
683        data_type: None,
684    };
685
686    Ok(visitor.dispatch(parquet_type, context)?.unwrap())
687}