parquet/arrow/schema/
complex.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::arrow::schema::extension::try_add_extension_type;
22use crate::arrow::schema::primitive::convert_primitive;
23use crate::arrow::schema::virtual_type::{RowGroupIndex, RowNumber};
24use crate::arrow::{PARQUET_FIELD_ID_META_KEY, ProjectionMask};
25use crate::basic::{ConvertedType, Repetition};
26use crate::errors::ParquetError;
27use crate::errors::Result;
28use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
29use arrow_schema::{DataType, Field, Fields, SchemaBuilder, extension::ExtensionType};
30
31fn get_repetition(t: &Type) -> Repetition {
32    let info = t.get_basic_info();
33    match info.has_repetition() {
34        true => info.repetition(),
35        false => Repetition::REQUIRED,
36    }
37}
38
39/// Representation of a parquet schema element, in terms of arrow schema elements
40#[derive(Debug, Clone)]
41pub struct ParquetField {
42    /// The level which represents an insertion into the current list
43    /// i.e. guaranteed to be > 0 for an element of list type
44    pub rep_level: i16,
45    /// The level at which this field is fully defined,
46    /// i.e. guaranteed to be > 0 for a nullable type or child of a
47    /// nullable type
48    pub def_level: i16,
49    /// Whether this field is nullable
50    pub nullable: bool,
51    /// The arrow type of the column data
52    ///
53    /// Note: In certain cases the data stored in parquet may have been coerced
54    /// to a different type and will require conversion on read (e.g. Date64 and Interval)
55    pub arrow_type: DataType,
56    /// The type of this field
57    pub field_type: ParquetFieldType,
58}
59
60impl ParquetField {
61    /// Converts `self` into an arrow list, with its current type as the field type
62    ///
63    /// This is used to convert repeated columns, into their arrow representation
64    fn into_list(self, name: &str) -> Self {
65        ParquetField {
66            rep_level: self.rep_level,
67            def_level: self.def_level,
68            nullable: false,
69            arrow_type: DataType::List(Arc::new(Field::new(name, self.arrow_type.clone(), false))),
70            field_type: ParquetFieldType::Group {
71                children: vec![self],
72            },
73        }
74    }
75
76    /// Returns a list of [`ParquetField`] children if this is a group type
77    pub fn children(&self) -> Option<&[Self]> {
78        match &self.field_type {
79            ParquetFieldType::Primitive { .. } => None,
80            ParquetFieldType::Group { children } => Some(children),
81            ParquetFieldType::Virtual(_) => None,
82        }
83    }
84}
85
86/// Types of virtual columns that can be computed at read time
87#[derive(Debug, Clone, Copy, PartialEq)]
88pub enum VirtualColumnType {
89    /// Row number within the file
90    RowNumber,
91    /// Row group index
92    RowGroupIndex,
93}
94
95#[derive(Debug, Clone)]
96pub enum ParquetFieldType {
97    Primitive {
98        /// The index of the column in parquet
99        col_idx: usize,
100        /// The type of the column in parquet
101        primitive_type: TypePtr,
102    },
103    Group {
104        children: Vec<ParquetField>,
105    },
106    /// Virtual column that doesn't exist in the parquet file
107    /// but is computed at read time (e.g., row_number)
108    Virtual(VirtualColumnType),
109}
110
111/// Encodes the context of the parent of the field currently under consideration
112struct VisitorContext {
113    rep_level: i16,
114    def_level: i16,
115    /// An optional [`DataType`] sourced from the embedded arrow schema
116    data_type: Option<DataType>,
117}
118
119impl VisitorContext {
120    /// Compute the resulting definition level, repetition level and nullability
121    /// for a child field with the given [`Repetition`]
122    fn levels(&self, repetition: Repetition) -> (i16, i16, bool) {
123        match repetition {
124            Repetition::OPTIONAL => (self.def_level + 1, self.rep_level, true),
125            Repetition::REQUIRED => (self.def_level, self.rep_level, false),
126            Repetition::REPEATED => (self.def_level + 1, self.rep_level + 1, false),
127        }
128    }
129}
130
131/// Walks the parquet schema in a depth-first fashion in order to map it to arrow data structures
132///
133/// See [Logical Types] for more information on the conversion algorithm
134///
135/// [Logical Types]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
136struct Visitor {
137    /// The column index of the next leaf column
138    next_col_idx: usize,
139
140    /// Mask of columns to include
141    mask: ProjectionMask,
142}
143
144impl Visitor {
145    fn visit_primitive(
146        &mut self,
147        primitive_type: &TypePtr,
148        context: VisitorContext,
149    ) -> Result<Option<ParquetField>> {
150        let col_idx = self.next_col_idx;
151        self.next_col_idx += 1;
152
153        if !self.mask.leaf_included(col_idx) {
154            return Ok(None);
155        }
156
157        let repetition = get_repetition(primitive_type);
158        let (def_level, rep_level, nullable) = context.levels(repetition);
159
160        let arrow_type = convert_primitive(primitive_type, context.data_type)?;
161
162        let primitive_field = ParquetField {
163            rep_level,
164            def_level,
165            nullable,
166            arrow_type,
167            field_type: ParquetFieldType::Primitive {
168                primitive_type: primitive_type.clone(),
169                col_idx,
170            },
171        };
172
173        Ok(Some(match repetition {
174            Repetition::REPEATED => primitive_field.into_list(primitive_type.name()),
175            _ => primitive_field,
176        }))
177    }
178
179    fn visit_struct(
180        &mut self,
181        struct_type: &TypePtr,
182        context: VisitorContext,
183    ) -> Result<Option<ParquetField>> {
184        // The root type will not have a repetition level
185        let repetition = get_repetition(struct_type);
186        let (def_level, rep_level, nullable) = context.levels(repetition);
187
188        let parquet_fields = struct_type.get_fields();
189
190        // Extract any arrow fields from the hints
191        let arrow_fields = match &context.data_type {
192            Some(DataType::Struct(fields)) => {
193                if fields.len() != parquet_fields.len() {
194                    return Err(arrow_err!(
195                        "incompatible arrow schema, expected {} struct fields got {}",
196                        parquet_fields.len(),
197                        fields.len()
198                    ));
199                }
200                Some(fields)
201            }
202            Some(d) => {
203                return Err(arrow_err!(
204                    "incompatible arrow schema, expected struct got {}",
205                    d
206                ));
207            }
208            None => None,
209        };
210
211        let mut child_fields = SchemaBuilder::with_capacity(parquet_fields.len());
212        let mut children = Vec::with_capacity(parquet_fields.len());
213
214        // Perform a DFS of children
215        for (idx, parquet_field) in parquet_fields.iter().enumerate() {
216            let data_type = match arrow_fields {
217                Some(fields) => {
218                    let field = &fields[idx];
219                    if field.name() != parquet_field.name() {
220                        return Err(arrow_err!(
221                            "incompatible arrow schema, expected field named {} got {}",
222                            parquet_field.name(),
223                            field.name()
224                        ));
225                    }
226                    Some(field.data_type().clone())
227                }
228                None => None,
229            };
230
231            let arrow_field = arrow_fields.map(|x| &*x[idx]);
232            let child_ctx = VisitorContext {
233                rep_level,
234                def_level,
235                data_type,
236            };
237
238            if let Some(mut child) = self.dispatch(parquet_field, child_ctx)? {
239                // The child type returned may be different from what is encoded in the arrow
240                // schema in the event of a mismatch or a projection
241                child_fields.push(convert_field(parquet_field, &mut child, arrow_field)?);
242                children.push(child);
243            }
244        }
245
246        if children.is_empty() {
247            return Ok(None);
248        }
249
250        let struct_field = ParquetField {
251            rep_level,
252            def_level,
253            nullable,
254            arrow_type: DataType::Struct(child_fields.finish().fields),
255            field_type: ParquetFieldType::Group { children },
256        };
257
258        Ok(Some(match repetition {
259            Repetition::REPEATED => struct_field.into_list(struct_type.name()),
260            _ => struct_field,
261        }))
262    }
263
264    fn visit_map(
265        &mut self,
266        map_type: &TypePtr,
267        context: VisitorContext,
268    ) -> Result<Option<ParquetField>> {
269        let rep_level = context.rep_level + 1;
270        let (def_level, nullable) = match get_repetition(map_type) {
271            Repetition::REQUIRED => (context.def_level + 1, false),
272            Repetition::OPTIONAL => (context.def_level + 2, true),
273            Repetition::REPEATED => return Err(arrow_err!("Map cannot be repeated")),
274        };
275
276        if map_type.get_fields().len() != 1 {
277            return Err(arrow_err!(
278                "Map field must have exactly one key_value child, found {}",
279                map_type.get_fields().len()
280            ));
281        }
282
283        // Add map entry (key_value) to context
284        let map_key_value = &map_type.get_fields()[0];
285        if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
286            return Err(arrow_err!("Child of map field must be repeated"));
287        }
288
289        // According to the specification the values are optional (#1642).
290        // In this case, return the keys as a list.
291        if map_key_value.get_fields().len() == 1 {
292            return self.visit_list(map_type, context);
293        }
294
295        if map_key_value.get_fields().len() != 2 {
296            return Err(arrow_err!(
297                "Child of map field must have two children, found {}",
298                map_key_value.get_fields().len()
299            ));
300        }
301
302        // Get key and value, and create context for each
303        let map_key = &map_key_value.get_fields()[0];
304        let map_value = &map_key_value.get_fields()[1];
305
306        match map_key.get_basic_info().repetition() {
307            Repetition::REPEATED => {
308                return Err(arrow_err!("Map keys cannot be repeated"));
309            }
310            Repetition::REQUIRED | Repetition::OPTIONAL => {
311                // Relaxed check for having repetition REQUIRED as there exists
312                // parquet writers and files that do not conform to this standard.
313                // This allows us to consume a broader range of existing files even
314                // if they are out of spec.
315            }
316        }
317
318        if map_value.get_basic_info().repetition() == Repetition::REPEATED {
319            return Err(arrow_err!("Map values cannot be repeated"));
320        }
321
322        // Extract the arrow fields
323        let (arrow_map, arrow_key, arrow_value, sorted) = match &context.data_type {
324            Some(DataType::Map(field, sorted)) => match field.data_type() {
325                DataType::Struct(fields) => {
326                    if fields.len() != 2 {
327                        return Err(arrow_err!(
328                            "Map data type should contain struct with two children, got {}",
329                            fields.len()
330                        ));
331                    }
332
333                    (Some(field), Some(&*fields[0]), Some(&*fields[1]), *sorted)
334                }
335                d => {
336                    return Err(arrow_err!("Map data type should contain struct got {}", d));
337                }
338            },
339            Some(d) => {
340                return Err(arrow_err!(
341                    "incompatible arrow schema, expected map got {}",
342                    d
343                ));
344            }
345            None => (None, None, None, false),
346        };
347
348        let maybe_key = {
349            let context = VisitorContext {
350                rep_level,
351                def_level,
352                data_type: arrow_key.map(|x| x.data_type().clone()),
353            };
354
355            self.dispatch(map_key, context)?
356        };
357
358        let maybe_value = {
359            let context = VisitorContext {
360                rep_level,
361                def_level,
362                data_type: arrow_value.map(|x| x.data_type().clone()),
363            };
364
365            self.dispatch(map_value, context)?
366        };
367
368        // Need both columns to be projected
369        match (maybe_key, maybe_value) {
370            (Some(mut key), Some(mut value)) => {
371                let key_field = Arc::new(
372                    convert_field(map_key, &mut key, arrow_key)?
373                        // The key is always non-nullable (#5630)
374                        .with_nullable(false),
375                );
376                let value_field = Arc::new(convert_field(map_value, &mut value, arrow_value)?);
377                let field_metadata = match arrow_map {
378                    Some(field) => field.metadata().clone(),
379                    _ => HashMap::default(),
380                };
381
382                let map_field = Field::new_struct(
383                    map_key_value.name(),
384                    [key_field, value_field],
385                    false, // The inner map field is always non-nullable (#1697)
386                )
387                .with_metadata(field_metadata);
388
389                Ok(Some(ParquetField {
390                    rep_level,
391                    def_level,
392                    nullable,
393                    arrow_type: DataType::Map(Arc::new(map_field), sorted),
394                    field_type: ParquetFieldType::Group {
395                        children: vec![key, value],
396                    },
397                }))
398            }
399            _ => Ok(None),
400        }
401    }
402
403    fn visit_list(
404        &mut self,
405        list_type: &TypePtr,
406        context: VisitorContext,
407    ) -> Result<Option<ParquetField>> {
408        if list_type.is_primitive() {
409            return Err(arrow_err!(
410                "{:?} is a list type and can't be processed as primitive.",
411                list_type
412            ));
413        }
414
415        let fields = list_type.get_fields();
416        if fields.len() != 1 {
417            return Err(arrow_err!(
418                "list type must have a single child, found {}",
419                fields.len()
420            ));
421        }
422
423        let repeated_field = &fields[0];
424        if get_repetition(repeated_field) != Repetition::REPEATED {
425            return Err(arrow_err!("List child must be repeated"));
426        }
427
428        // If the list is nullable
429        let (def_level, nullable) = match list_type.get_basic_info().repetition() {
430            Repetition::REQUIRED => (context.def_level, false),
431            Repetition::OPTIONAL => (context.def_level + 1, true),
432            Repetition::REPEATED => return Err(arrow_err!("List type cannot be repeated")),
433        };
434
435        let arrow_field = match &context.data_type {
436            Some(DataType::List(f)) => Some(f.as_ref()),
437            Some(DataType::LargeList(f)) => Some(f.as_ref()),
438            Some(DataType::FixedSizeList(f, _)) => Some(f.as_ref()),
439            Some(d) => {
440                return Err(arrow_err!(
441                    "incompatible arrow schema, expected list got {}",
442                    d
443                ));
444            }
445            None => None,
446        };
447
448        if repeated_field.is_primitive() {
449            // If the repeated field is not a group, then its type is the element type and elements are required.
450            //
451            // required/optional group my_list (LIST) {
452            //   repeated int32 element;
453            // }
454            //
455            let context = VisitorContext {
456                rep_level: context.rep_level,
457                def_level,
458                data_type: arrow_field.map(|f| f.data_type().clone()),
459            };
460
461            return match self.visit_primitive(repeated_field, context) {
462                Ok(Some(mut field)) => {
463                    // visit_primitive will infer a non-nullable list, update if necessary
464                    field.nullable = nullable;
465                    Ok(Some(field))
466                }
467                r => r,
468            };
469        }
470
471        // test to see if the repeated field is a struct or one-tuple
472        let items = repeated_field.get_fields();
473        if items.len() != 1
474            || (!repeated_field.is_list()
475                && !repeated_field.has_single_repeated_child()
476                && (repeated_field.name() == "array"
477                    || repeated_field.name() == format!("{}_tuple", list_type.name())))
478        {
479            // If the repeated field is a group with multiple fields, then its type is the element
480            // type and elements are required.
481            //
482            // If the repeated field is a group with one field and is named either array or uses
483            // the LIST-annotated group's name with _tuple appended then the repeated type is the
484            // element type and elements are required. But this rule only applies if the
485            // repeated field is not annotated, and the single child field is not `repeated`.
486            let context = VisitorContext {
487                rep_level: context.rep_level,
488                def_level,
489                data_type: arrow_field.map(|f| f.data_type().clone()),
490            };
491
492            return match self.visit_struct(repeated_field, context) {
493                Ok(Some(mut field)) => {
494                    field.nullable = nullable;
495                    Ok(Some(field))
496                }
497                r => r,
498            };
499        }
500
501        // Regular list handling logic
502        let item_type = &items[0];
503        let rep_level = context.rep_level + 1;
504        let def_level = def_level + 1;
505
506        let new_context = VisitorContext {
507            def_level,
508            rep_level,
509            data_type: arrow_field.map(|f| f.data_type().clone()),
510        };
511
512        match self.dispatch(item_type, new_context) {
513            Ok(Some(mut item)) => {
514                let item_field = Arc::new(convert_field(item_type, &mut item, arrow_field)?);
515
516                // Use arrow type as hint for index size
517                let arrow_type = match context.data_type {
518                    Some(DataType::LargeList(_)) => DataType::LargeList(item_field),
519                    Some(DataType::FixedSizeList(_, len)) => {
520                        DataType::FixedSizeList(item_field, len)
521                    }
522                    _ => DataType::List(item_field),
523                };
524
525                Ok(Some(ParquetField {
526                    rep_level,
527                    def_level,
528                    nullable,
529                    arrow_type,
530                    field_type: ParquetFieldType::Group {
531                        children: vec![item],
532                    },
533                }))
534            }
535            r => r,
536        }
537    }
538
539    fn dispatch(
540        &mut self,
541        cur_type: &TypePtr,
542        context: VisitorContext,
543    ) -> Result<Option<ParquetField>> {
544        if cur_type.is_primitive() {
545            self.visit_primitive(cur_type, context)
546        } else {
547            match cur_type.get_basic_info().converted_type() {
548                ConvertedType::LIST => self.visit_list(cur_type, context),
549                ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
550                    self.visit_map(cur_type, context)
551                }
552                _ => self.visit_struct(cur_type, context),
553            }
554        }
555    }
556}
557
558/// Converts a virtual Arrow [`Field`] to a [`ParquetField`]
559///
560/// Virtual fields don't correspond to any data in the parquet file,
561/// but are computed at read time (e.g., row_number)
562///
563/// The levels are computed based on the parent context:
564/// - If nullable: def_level = parent_def_level + 1
565/// - If required: def_level = parent_def_level
566/// - rep_level = parent_rep_level (virtual fields are not repeated)
567pub(super) fn convert_virtual_field(
568    arrow_field: &Field,
569    parent_rep_level: i16,
570    parent_def_level: i16,
571) -> Result<ParquetField> {
572    let nullable = arrow_field.is_nullable();
573    let def_level = if nullable {
574        parent_def_level + 1
575    } else {
576        parent_def_level
577    };
578
579    // Determine the virtual column type based on the extension type name
580    let extension_name = arrow_field.extension_type_name().ok_or_else(|| {
581        ParquetError::ArrowError(format!(
582            "virtual column field '{}' must have an extension type",
583            arrow_field.name()
584        ))
585    })?;
586
587    let virtual_type = match extension_name {
588        RowNumber::NAME => VirtualColumnType::RowNumber,
589        RowGroupIndex::NAME => VirtualColumnType::RowGroupIndex,
590        _ => {
591            return Err(ParquetError::ArrowError(format!(
592                "unsupported virtual column type '{}' for field '{}'",
593                extension_name,
594                arrow_field.name()
595            )));
596        }
597    };
598
599    Ok(ParquetField {
600        rep_level: parent_rep_level,
601        def_level,
602        nullable,
603        arrow_type: arrow_field.data_type().clone(),
604        field_type: ParquetFieldType::Virtual(virtual_type),
605    })
606}
607
608/// Computes the Arrow [`Field`] for a child column
609///
610/// The resulting Arrow [`Field`] will have the type dictated by the Parquet `field`, a name
611/// dictated by the `parquet_type`, and any metadata from `arrow_hint`
612fn convert_field(
613    parquet_type: &Type,
614    field: &mut ParquetField,
615    arrow_hint: Option<&Field>,
616) -> Result<Field, ParquetError> {
617    let name = parquet_type.name();
618    let data_type = field.arrow_type.clone();
619    let nullable = field.nullable;
620
621    match arrow_hint {
622        Some(hint) => {
623            // If the inferred type is a dictionary, preserve dictionary metadata
624            #[allow(deprecated)]
625            let field = match (&data_type, hint.dict_id(), hint.dict_is_ordered()) {
626                (DataType::Dictionary(_, _), Some(id), Some(ordered)) =>
627                {
628                    #[allow(deprecated)]
629                    Field::new_dict(name, data_type, nullable, id, ordered)
630                }
631                _ => Field::new(name, data_type, nullable),
632            };
633
634            Ok(field.with_metadata(hint.metadata().clone()))
635        }
636        None => {
637            let mut ret = Field::new(name, data_type, nullable);
638            let basic_info = parquet_type.get_basic_info();
639            if basic_info.has_id() {
640                let mut meta = HashMap::with_capacity(1);
641                meta.insert(
642                    PARQUET_FIELD_ID_META_KEY.to_string(),
643                    basic_info.id().to_string(),
644                );
645                ret.set_metadata(meta);
646            }
647            try_add_extension_type(ret, parquet_type)
648        }
649    }
650}
651
652/// Computes the [`ParquetField`] for the provided [`SchemaDescriptor`] with `leaf_columns` listing
653/// the indexes of leaf columns to project, and `embedded_arrow_schema` the optional
654/// [`Fields`] embedded in the parquet metadata
655///
656/// Note: This does not support out of order column projection
657pub fn convert_schema(
658    schema: &SchemaDescriptor,
659    mask: ProjectionMask,
660    embedded_arrow_schema: Option<&Fields>,
661) -> Result<Option<ParquetField>> {
662    let mut visitor = Visitor {
663        next_col_idx: 0,
664        mask,
665    };
666
667    let context = VisitorContext {
668        rep_level: 0,
669        def_level: 0,
670        data_type: embedded_arrow_schema.map(|fields| DataType::Struct(fields.clone())),
671    };
672
673    visitor.dispatch(&schema.root_schema_ptr(), context)
674}
675
676/// Computes the [`ParquetField`] for the provided `parquet_type`
677pub fn convert_type(parquet_type: &TypePtr) -> Result<ParquetField> {
678    let mut visitor = Visitor {
679        next_col_idx: 0,
680        mask: ProjectionMask::all(),
681    };
682
683    let context = VisitorContext {
684        rep_level: 0,
685        def_level: 0,
686        data_type: None,
687    };
688
689    Ok(visitor.dispatch(parquet_type, context)?.unwrap())
690}