Skip to main content

parquet/arrow/schema/
complex.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::arrow::schema::extension::try_add_extension_type;
22use crate::arrow::schema::primitive::convert_primitive;
23use crate::arrow::schema::virtual_type::{RowGroupIndex, RowNumber};
24use crate::arrow::{PARQUET_FIELD_ID_META_KEY, ProjectionMask};
25use crate::basic::{ConvertedType, Repetition};
26use crate::errors::ParquetError;
27use crate::errors::Result;
28use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
29use arrow_schema::{DataType, Field, Fields, SchemaBuilder, extension::ExtensionType};
30
31fn get_repetition(t: &Type) -> Repetition {
32    let info = t.get_basic_info();
33    match info.has_repetition() {
34        true => info.repetition(),
35        false => Repetition::REQUIRED,
36    }
37}
38
39/// Representation of a parquet schema element, in terms of arrow schema elements
40#[derive(Debug, Clone)]
41pub struct ParquetField {
42    /// The level which represents an insertion into the current list
43    /// i.e. guaranteed to be > 0 for an element of list type
44    pub rep_level: i16,
45    /// The level at which this field is fully defined,
46    /// i.e. guaranteed to be > 0 for a nullable type or child of a
47    /// nullable type
48    pub def_level: i16,
49    /// Whether this field is nullable
50    pub nullable: bool,
51    /// The arrow type of the column data
52    ///
53    /// Note: In certain cases the data stored in parquet may have been coerced
54    /// to a different type and will require conversion on read (e.g. Date64 and Interval)
55    pub arrow_type: DataType,
56    /// The type of this field
57    pub field_type: ParquetFieldType,
58}
59
60impl ParquetField {
61    /// Converts `self` into an arrow list, with its current type as the field type
62    ///
63    /// This is used to convert repeated columns, into their arrow representation
64    fn into_list(self, name: &str) -> Self {
65        ParquetField {
66            rep_level: self.rep_level,
67            def_level: self.def_level,
68            nullable: false,
69            arrow_type: DataType::List(Arc::new(Field::new(name, self.arrow_type.clone(), false))),
70            field_type: ParquetFieldType::Group {
71                children: vec![self],
72            },
73        }
74    }
75
76    /// Returns a list of [`ParquetField`] children if this is a group type
77    pub fn children(&self) -> Option<&[Self]> {
78        match &self.field_type {
79            ParquetFieldType::Primitive { .. } => None,
80            ParquetFieldType::Group { children } => Some(children),
81            ParquetFieldType::Virtual(_) => None,
82        }
83    }
84}
85
86/// Types of virtual columns that can be computed at read time
87#[derive(Debug, Clone, Copy, PartialEq)]
88pub enum VirtualColumnType {
89    /// Row number within the file
90    RowNumber,
91    /// Row group index
92    RowGroupIndex,
93}
94
95#[derive(Debug, Clone)]
96pub enum ParquetFieldType {
97    Primitive {
98        /// The index of the column in parquet
99        col_idx: usize,
100        /// The type of the column in parquet
101        primitive_type: TypePtr,
102    },
103    Group {
104        children: Vec<ParquetField>,
105    },
106    /// Virtual column that doesn't exist in the parquet file
107    /// but is computed at read time (e.g., row_number)
108    Virtual(VirtualColumnType),
109}
110
111/// Encodes the context of the parent of the field currently under consideration
112struct VisitorContext {
113    rep_level: i16,
114    def_level: i16,
115    /// An optional [`DataType`] sourced from the embedded arrow schema
116    data_type: Option<DataType>,
117}
118
119impl VisitorContext {
120    /// Compute the resulting definition level, repetition level and nullability
121    /// for a child field with the given [`Repetition`]
122    fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
123        match repetition {
124            Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
125            Repetition::REQUIRED => (self.def_level, self.rep_level, false),
126            Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
127        }
128    }
129}
130
131/// Walks the parquet schema in a depth-first fashion in order to map it to arrow data structures
132///
133/// See [Logical Types] for more information on the conversion algorithm
134///
135/// [Logical Types]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
136struct Visitor {
137    /// The column index of the next leaf column
138    next_col_idx: usize,
139
140    /// Mask of columns to include
141    mask: ProjectionMask,
142}
143
144impl Visitor {
145    fn visit_primitive(
146        &mut self,
147        primitive_type: &TypePtr,
148        context: VisitorContext,
149    ) -> Result<Option<ParquetField>> {
150        let col_idx = self.next_col_idx;
151        self.next_col_idx += 1;
152
153        if !self.mask.leaf_included(col_idx) {
154            return Ok(None);
155        }
156
157        let repetition = get_repetition(primitive_type);
158        let (def_level, rep_level, nullable) = context.levels(repetition);
159
160        let arrow_type = convert_primitive(primitive_type, context.data_type)?;
161
162        let primitive_field = ParquetField {
163            rep_level,
164            def_level,
165            nullable,
166            arrow_type,
167            field_type: ParquetFieldType::Primitive {
168                primitive_type: primitive_type.clone(),
169                col_idx,
170            },
171        };
172
173        Ok(Some(match repetition {
174            Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),
175            _ => primitive_field,
176        }))
177    }
178
179    fn visit_struct(
180        &mut self,
181        struct_type: &TypePtr,
182        context: VisitorContext,
183    ) -> Result<Option<ParquetField>> {
184        // The root type will not have a repetition level
185        let repetition = get_repetition(struct_type);
186        let (def_level, rep_level, nullable) = context.levels(repetition);
187
188        let parquet_fields = struct_type.get_fields();
189
190        // Extract any arrow fields from the hints
191        let arrow_fields = match &context.data_type {
192            Some(DataType::Struct(fields)) => {
193                if fields.len() != parquet_fields.len() {
194                    return Err(arrow_err!(
195                        "incompatible arrow schema, expected {} struct fields got {}",
196                        parquet_fields.len(),
197                        fields.len()
198                    ));
199                }
200                Some(fields)
201            }
202            Some(d) => {
203                return Err(arrow_err!(
204                    "incompatible arrow schema, expected struct got {}",
205                    d
206                ));
207            }
208            None => None,
209        };
210
211        let mut child_fields = SchemaBuilder::with_capacity(parquet_fields.len());
212        let mut children = Vec::with_capacity(parquet_fields.len());
213
214        // Perform a DFS of children
215        for (idx, parquet_field) in parquet_fields.iter().enumerate() {
216            let data_type = match arrow_fields {
217                Some(fields) => {
218                    let field = &fields[idx];
219                    if field.name() != parquet_field.name() {
220                        return Err(arrow_err!(
221                            "incompatible arrow schema, expected field named {} got {}",
222                            parquet_field.name(),
223                            field.name()
224                        ));
225                    }
226                    Some(field.data_type().clone())
227                }
228                None => None,
229            };
230
231            let arrow_field = arrow_fields.map(|x| &*x[idx]);
232            let child_ctx = VisitorContext {
233                rep_level,
234                def_level,
235                data_type,
236            };
237
238            if let Some(mut child) = self.dispatch(parquet_field, child_ctx)? {
239                // The child type returned may be different from what is encoded in the arrow
240                // schema in the event of a mismatch or a projection
241                child_fields.push(convert_field(parquet_field, &mut child, arrow_field)?);
242                children.push(child);
243            }
244        }
245
246        if children.is_empty() {
247            return Ok(None);
248        }
249
250        let struct_field = ParquetField {
251            rep_level,
252            def_level,
253            nullable,
254            arrow_type: DataType::Struct(child_fields.finish().fields),
255            field_type: ParquetFieldType::Group { children },
256        };
257
258        Ok(Some(match repetition {
259            Repetition::REPEATED => struct_field.into_list(struct_type.name()),
260            _ => struct_field,
261        }))
262    }
263
264    fn visit_map(
265        &mut self,
266        map_type: &TypePtr,
267        context: VisitorContext,
268    ) -> Result<Option<ParquetField>> {
269        let rep_level = context.rep_level + 1;
270        let (def_level, nullable) = match get_repetition(map_type) {
271            Repetition::REQUIRED => (context.def_level + 1, false),
272            Repetition::OPTIONAL => (context.def_level + 2, true),
273            Repetition::REPEATED => return Err(arrow_err!("Map cannot be repeated")),
274        };
275
276        if map_type.get_fields().len() != 1 {
277            return Err(arrow_err!(
278                "Map field must have exactly one key_value child, found {}",
279                map_type.get_fields().len()
280            ));
281        }
282
283        // Add map entry (key_value) to context
284        let map_key_value = &map_type.get_fields()[0];
285        if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
286            return Err(arrow_err!("Child of map field must be repeated"));
287        }
288
289        // According to the specification the values are optional (#1642).
290        // In this case, return the keys as a list.
291        if map_key_value.get_fields().len() == 1 {
292            return self.visit_list(map_type, context);
293        }
294
295        if map_key_value.get_fields().len() != 2 {
296            return Err(arrow_err!(
297                "Child of map field must have two children, found {}",
298                map_key_value.get_fields().len()
299            ));
300        }
301
302        // Get key and value, and create context for each
303        let map_key = &map_key_value.get_fields()[0];
304        let map_value = &map_key_value.get_fields()[1];
305
306        match map_key.get_basic_info().repetition() {
307            Repetition::REPEATED => {
308                return Err(arrow_err!("Map keys cannot be repeated"));
309            }
310            Repetition::REQUIRED | Repetition::OPTIONAL => {
311                // Relaxed check for having repetition REQUIRED as there exists
312                // parquet writers and files that do not conform to this standard.
313                // This allows us to consume a broader range of existing files even
314                // if they are out of spec.
315            }
316        }
317
318        if map_value.get_basic_info().repetition() == Repetition::REPEATED {
319            return Err(arrow_err!("Map values cannot be repeated"));
320        }
321
322        // Extract the arrow fields
323        let (arrow_map, arrow_key, arrow_value, sorted) = match &context.data_type {
324            Some(DataType::Map(field, sorted)) => match field.data_type() {
325                DataType::Struct(fields) => {
326                    if fields.len() != 2 {
327                        return Err(arrow_err!(
328                            "Map data type should contain struct with two children, got {}",
329                            fields.len()
330                        ));
331                    }
332
333                    (Some(field), Some(&*fields[0]), Some(&*fields[1]), *sorted)
334                }
335                d => {
336                    return Err(arrow_err!("Map data type should contain struct got {}", d));
337                }
338            },
339            Some(d) => {
340                return Err(arrow_err!(
341                    "incompatible arrow schema, expected map got {}",
342                    d
343                ));
344            }
345            None => (None, None, None, false),
346        };
347
348        let maybe_key = {
349            let context = VisitorContext {
350                rep_level,
351                def_level,
352                data_type: arrow_key.map(|x| x.data_type().clone()),
353            };
354
355            self.dispatch(map_key, context)?
356        };
357
358        let maybe_value = {
359            let context = VisitorContext {
360                rep_level,
361                def_level,
362                data_type: arrow_value.map(|x| x.data_type().clone()),
363            };
364
365            self.dispatch(map_value, context)?
366        };
367
368        // Need both columns to be projected
369        match (maybe_key, maybe_value) {
370            (Some(mut key), Some(mut value)) => {
371                let key_field = Arc::new(
372                    convert_field(map_key, &mut key, arrow_key)?
373                        // The key is always non-nullable (#5630)
374                        .with_nullable(false),
375                );
376                let value_field = Arc::new(convert_field(map_value, &mut value, arrow_value)?);
377                let field_metadata = match arrow_map {
378                    Some(field) => field.metadata().clone(),
379                    _ => HashMap::default(),
380                };
381
382                let map_field = Field::new_struct(
383                    map_key_value.name(),
384                    [key_field, value_field],
385                    false, // The inner map field is always non-nullable (#1697)
386                )
387                .with_metadata(field_metadata);
388
389                Ok(Some(ParquetField {
390                    rep_level,
391                    def_level,
392                    nullable,
393                    arrow_type: DataType::Map(Arc::new(map_field), sorted),
394                    field_type: ParquetFieldType::Group {
395                        children: vec![key, value],
396                    },
397                }))
398            }
399            _ => Ok(None),
400        }
401    }
402
403    fn visit_list(
404        &mut self,
405        list_type: &TypePtr,
406        context: VisitorContext,
407    ) -> Result<Option<ParquetField>> {
408        if list_type.is_primitive() {
409            return Err(arrow_err!(
410                "{:?} is a list type and can't be processed as primitive.",
411                list_type
412            ));
413        }
414
415        let fields = list_type.get_fields();
416        if fields.len() != 1 {
417            return Err(arrow_err!(
418                "list type must have a single child, found {}",
419                fields.len()
420            ));
421        }
422
423        let repeated_field = &fields[0];
424        if get_repetition(repeated_field) != Repetition::REPEATED {
425            return Err(arrow_err!("List child must be repeated"));
426        }
427
428        // If the list is nullable
429        let (def_level, nullable) = match list_type.get_basic_info().repetition() {
430            Repetition::REQUIRED => (context.def_level, false),
431            Repetition::OPTIONAL => (context.def_level + 1, true),
432            Repetition::REPEATED => return Err(arrow_err!("List type cannot be repeated")),
433        };
434
435        let arrow_field = match &context.data_type {
436            Some(DataType::List(f)) => Some(f.as_ref()),
437            Some(DataType::LargeList(f)) => Some(f.as_ref()),
438            Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
439            Some(DataType::ListView(f)) => Some(f.as_ref()),
440            Some(DataType::LargeListView(f)) => Some(f.as_ref()),
441            Some(d) => {
442                return Err(arrow_err!(
443                    "incompatible arrow schema, expected list got {}",
444                    d
445                ));
446            }
447            None => None,
448        };
449
450        if repeated_field.is_primitive() {
451            // If the repeated field is not a group, then its type is the element type and elements are required.
452            //
453            // required/optional group my_list (LIST) {
454            //   repeated int32 element;
455            // }
456            //
457            let context = VisitorContext {
458                rep_level: context.rep_level,
459                def_level,
460                data_type: arrow_field.map(|f| f.data_type().clone()),
461            };
462
463            return match self.visit_primitive(repeated_field, context) {
464                Ok(Some(mut field)) => {
465                    // visit_primitive will infer a non-nullable list, update if necessary
466                    field.nullable = nullable;
467                    Ok(Some(field))
468                }
469                r => r,
470            };
471        }
472
473        // test to see if the repeated field is a struct or one-tuple
474        let items = repeated_field.get_fields();
475        if items.len() != 1
476            || (!repeated_field.is_list()
477                && !repeated_field.has_single_repeated_child()
478                && (repeated_field.name() == "array"
479                    || repeated_field.name() == format!("{}_tuple", list_type.name())))
480        {
481            // If the repeated field is a group with multiple fields, then its type is the element
482            // type and elements are required.
483            //
484            // If the repeated field is a group with one field and is named either array or uses
485            // the LIST-annotated group's name with _tuple appended then the repeated type is the
486            // element type and elements are required. But this rule only applies if the
487            // repeated field is not annotated, and the single child field is not `repeated`.
488            let context = VisitorContext {
489                rep_level: context.rep_level,
490                def_level,
491                data_type: arrow_field.map(|f| f.data_type().clone()),
492            };
493
494            return match self.visit_struct(repeated_field, context) {
495                Ok(Some(mut field)) => {
496                    field.nullable = nullable;
497                    Ok(Some(field))
498                }
499                r => r,
500            };
501        }
502
503        // Regular list handling logic
504        let item_type = &items[0];
505        let rep_level = context.rep_level + 1;
506        let def_level = def_level + 1;
507
508        let new_context = VisitorContext {
509            def_level,
510            rep_level,
511            data_type: arrow_field.map(|f| f.data_type().clone()),
512        };
513
514        match self.dispatch(item_type, new_context) {
515            Ok(Some(mut item)) => {
516                let item_field = Arc::new(convert_field(item_type, &mut item, arrow_field)?);
517
518                // Use arrow type as hint for index size
519                let arrow_type = match context.data_type {
520                    Some(DataType::LargeList(_)) => DataType::LargeList(item_field),
521                    Some(DataType::FixedSizeList(_, len)) => {
522                        DataType::FixedSizeList(item_field, len)
523                    }
524                    Some(DataType::ListView(_)) => DataType::ListView(item_field),
525                    Some(DataType::LargeListView(_)) => DataType::LargeListView(item_field),
526                    _ => DataType::List(item_field),
527                };
528
529                Ok(Some(ParquetField {
530                    rep_level,
531                    def_level,
532                    nullable,
533                    arrow_type,
534                    field_type: ParquetFieldType::Group {
535                        children: vec![item],
536                    },
537                }))
538            }
539            r => r,
540        }
541    }
542
543    fn dispatch(
544        &mut self,
545        cur_type: &TypePtr,
546        context: VisitorContext,
547    ) -> Result<Option<ParquetField>> {
548        if cur_type.is_primitive() {
549            self.visit_primitive(cur_type, context)
550        } else {
551            match cur_type.get_basic_info().converted_type() {
552                ConvertedType::LIST => self.visit_list(cur_type, context),
553                ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
554                    self.visit_map(cur_type, context)
555                }
556                _ => self.visit_struct(cur_type, context),
557            }
558        }
559    }
560}
561
562/// Converts a virtual Arrow [`Field`] to a [`ParquetField`]
563///
564/// Virtual fields don't correspond to any data in the parquet file,
565/// but are computed at read time (e.g., row_number)
566///
567/// The levels are computed based on the parent context:
568/// - If nullable: def_level = parent_def_level + 1
569/// - If required: def_level = parent_def_level
570/// - rep_level = parent_rep_level (virtual fields are not repeated)
571pub(super) fn convert_virtual_field(
572    arrow_field: &Field,
573    parent_rep_level: i16,
574    parent_def_level: i16,
575) -> Result<ParquetField> {
576    let nullable = arrow_field.is_nullable();
577    let def_level = if nullable {
578        parent_def_level + 1
579    } else {
580        parent_def_level
581    };
582
583    // Determine the virtual column type based on the extension type name
584    let extension_name = arrow_field.extension_type_name().ok_or_else(|| {
585        ParquetError::ArrowError(format!(
586            "virtual column field '{}' must have an extension type",
587            arrow_field.name()
588        ))
589    })?;
590
591    let virtual_type = match extension_name {
592        RowNumber::NAME => VirtualColumnType::RowNumber,
593        RowGroupIndex::NAME => VirtualColumnType::RowGroupIndex,
594        _ => {
595            return Err(ParquetError::ArrowError(format!(
596                "unsupported virtual column type '{}' for field '{}'",
597                extension_name,
598                arrow_field.name()
599            )));
600        }
601    };
602
603    Ok(ParquetField {
604        rep_level: parent_rep_level,
605        def_level,
606        nullable,
607        arrow_type: arrow_field.data_type().clone(),
608        field_type: ParquetFieldType::Virtual(virtual_type),
609    })
610}
611
612/// Computes the Arrow [`Field`] for a child column
613///
614/// The resulting Arrow [`Field`] will have the type dictated by the Parquet `field`, a name
615/// dictated by the `parquet_type`, and any metadata from `arrow_hint`
616fn convert_field(
617    parquet_type: &Type,
618    field: &mut ParquetField,
619    arrow_hint: Option<&Field>,
620) -> Result<Field, ParquetError> {
621    let name = parquet_type.name();
622    let data_type = field.arrow_type.clone();
623    let nullable = field.nullable;
624
625    match arrow_hint {
626        Some(hint) => {
627            // If the inferred type is a dictionary, preserve dictionary metadata
628            #[allow(deprecated)]
629            let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) {
630                (DataType::Dictionary(_, _), Some(id), Some(ordered)) =>
631                {
632                    #[allow(deprecated)]
633                    Field::new_dict(name, data_type, nullable, id, ordered)
634                }
635                _ => Field::new(name, data_type, nullable),
636            };
637
638            Ok(field.with_metadata(hint.metadata().clone()))
639        }
640        None => {
641            let mut ret = Field::new(name, data_type, nullable);
642            let basic_info = parquet_type.get_basic_info();
643            if basic_info.has_id() {
644                let mut meta = HashMap::with_capacity(1);
645                meta.insert(
646                    PARQUET_FIELD_ID_META_KEY.to_string(),
647                    basic_info.id().to_string(),
648                );
649                ret.set_metadata(meta);
650            }
651            try_add_extension_type(ret, parquet_type)
652        }
653    }
654}
655
656/// Computes the [`ParquetField`] for the provided [`SchemaDescriptor`] with `leaf_columns` listing
657/// the indexes of leaf columns to project, and `embedded_arrow_schema` the optional
658/// [`Fields`] embedded in the parquet metadata
659///
660/// Note: This does not support out of order column projection
661pub fn convert_schema(
662    schema: &SchemaDescriptor,
663    mask: ProjectionMask,
664    embedded_arrow_schema: Option<&Fields>,
665) -> Result<Option<ParquetField>> {
666    let mut visitor = Visitor {
667        next_col_idx: 0,
668        mask,
669    };
670
671    let context = VisitorContext {
672        rep_level: 0,
673        def_level: 0,
674        data_type: embedded_arrow_schema.map(|fields| DataType::Struct(fields.clone())),
675    };
676
677    visitor.dispatch(&schema.root_schema_ptr(), context)
678}
679
680/// Computes the [`ParquetField`] for the provided `parquet_type`
681pub fn convert_type(parquet_type: &TypePtr) -> Result<ParquetField> {
682    let mut visitor = Visitor {
683        next_col_idx: 0,
684        mask: ProjectionMask::all(),
685    };
686
687    let context = VisitorContext {
688        rep_level: 0,
689        def_level: 0,
690        data_type: None,
691    };
692
693    Ok(visitor.dispatch(parquet_type, context)?.unwrap())
694}