parquet/arrow/schema/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Converting Parquet schema <--> Arrow schema: [`ArrowSchemaConverter`] and [parquet_to_arrow_schema]
19
20use base64::prelude::BASE64_STANDARD;
21use base64::Engine;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26#[cfg(feature = "arrow_canonical_extension_types")]
27use arrow_schema::extension::{Json, Uuid};
28use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
29
30use crate::basic::{
31    ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
32};
33use crate::errors::{ParquetError, Result};
34use crate::file::{metadata::KeyValue, properties::WriterProperties};
35use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
36
37mod complex;
38mod primitive;
39
40use crate::arrow::ProjectionMask;
41pub(crate) use complex::{ParquetField, ParquetFieldType};
42
43use super::PARQUET_FIELD_ID_META_KEY;
44
45/// Convert Parquet schema to Arrow schema including optional metadata
46///
47/// Attempts to decode any existing Arrow schema metadata, falling back
48/// to converting the Parquet schema column-wise
49pub fn parquet_to_arrow_schema(
50    parquet_schema: &SchemaDescriptor,
51    key_value_metadata: Option<&Vec<KeyValue>>,
52) -> Result<Schema> {
53    parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
54}
55
56/// Convert parquet schema to arrow schema including optional metadata,
57/// only preserving some leaf columns.
58pub fn parquet_to_arrow_schema_by_columns(
59    parquet_schema: &SchemaDescriptor,
60    mask: ProjectionMask,
61    key_value_metadata: Option<&Vec<KeyValue>>,
62) -> Result<Schema> {
63    Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0)
64}
65
66/// Extracts the arrow metadata
67pub(crate) fn parquet_to_arrow_schema_and_fields(
68    parquet_schema: &SchemaDescriptor,
69    mask: ProjectionMask,
70    key_value_metadata: Option<&Vec<KeyValue>>,
71) -> Result<(Schema, Option<ParquetField>)> {
72    let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
73    let maybe_schema = metadata
74        .remove(super::ARROW_SCHEMA_META_KEY)
75        .map(|value| get_arrow_schema_from_metadata(&value))
76        .transpose()?;
77
78    // Add the Arrow metadata to the Parquet metadata skipping keys that collide
79    if let Some(arrow_schema) = &maybe_schema {
80        arrow_schema.metadata().iter().for_each(|(k, v)| {
81            metadata.entry(k.clone()).or_insert_with(|| v.clone());
82        });
83    }
84
85    let hint = maybe_schema.as_ref().map(|s| s.fields());
86    let field_levels = parquet_to_arrow_field_levels(parquet_schema, mask, hint)?;
87    let schema = Schema::new_with_metadata(field_levels.fields, metadata);
88    Ok((schema, field_levels.levels))
89}
90
91/// Schema information necessary to decode a parquet file as arrow [`Fields`]
92///
93/// In particular this stores the dremel-level information necessary to correctly
94/// interpret the encoded definition and repetition levels
95///
96/// Note: this is an opaque container intended to be used with lower-level APIs
97/// within this crate
98#[derive(Debug, Clone)]
99pub struct FieldLevels {
100    pub(crate) fields: Fields,
101    pub(crate) levels: Option<ParquetField>,
102}
103
104/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`]
105///
106/// Columns not included within [`ProjectionMask`] will be ignored.
107///
108/// The optional `hint` parameter is the desired Arrow schema. See the
109/// [`arrow`] module documentation for more information.
110///
111/// [`arrow`]: crate::arrow
112///
113/// # Notes:
114/// Where a field type in `hint` is compatible with the corresponding parquet type in `schema`, it
115/// will be used, otherwise the default arrow type for the given parquet column type will be used.
116///
117/// This is to accommodate arrow types that cannot be round-tripped through parquet natively.
118/// Depending on the parquet writer, this can lead to a mismatch between a file's parquet schema
119/// and its embedded arrow schema. The parquet `schema` must be treated as authoritative in such
120/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for more information
121///
122/// Note: this is a low-level API, most users will want to make use of the higher-level
123/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file.
124pub fn parquet_to_arrow_field_levels(
125    schema: &SchemaDescriptor,
126    mask: ProjectionMask,
127    hint: Option<&Fields>,
128) -> Result<FieldLevels> {
129    match complex::convert_schema(schema, mask, hint)? {
130        Some(field) => match &field.arrow_type {
131            DataType::Struct(fields) => Ok(FieldLevels {
132                fields: fields.clone(),
133                levels: Some(field),
134            }),
135            _ => unreachable!(),
136        },
137        None => Ok(FieldLevels {
138            fields: Fields::empty(),
139            levels: None,
140        }),
141    }
142}
143
144/// Try to convert Arrow schema metadata into a schema
145fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
146    let decoded = BASE64_STANDARD.decode(encoded_meta);
147    match decoded {
148        Ok(bytes) => {
149            let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
150                &bytes[8..]
151            } else {
152                bytes.as_slice()
153            };
154            match arrow_ipc::root_as_message(slice) {
155                Ok(message) => message
156                    .header_as_schema()
157                    .map(arrow_ipc::convert::fb_to_schema)
158                    .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
159                Err(err) => {
160                    // The flatbuffers implementation returns an error on verification error.
161                    Err(arrow_err!(
162                        "Unable to get root as message stored in {}: {:?}",
163                        super::ARROW_SCHEMA_META_KEY,
164                        err
165                    ))
166                }
167            }
168        }
169        Err(err) => {
170            // The C++ implementation returns an error if the schema can't be parsed.
171            Err(arrow_err!(
172                "Unable to decode the encoded schema stored in {}, {:?}",
173                super::ARROW_SCHEMA_META_KEY,
174                err
175            ))
176        }
177    }
178}
179
180/// Encodes the Arrow schema into the IPC format, and base64 encodes it
181pub fn encode_arrow_schema(schema: &Schema) -> String {
182    let options = writer::IpcWriteOptions::default();
183    #[allow(deprecated)]
184    let mut dictionary_tracker =
185        writer::DictionaryTracker::new_with_preserve_dict_id(true, options.preserve_dict_id());
186    let data_gen = writer::IpcDataGenerator::default();
187    let mut serialized_schema =
188        data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
189
190    // manually prepending the length to the schema as arrow uses the legacy IPC format
191    // TODO: change after addressing ARROW-9777
192    let schema_len = serialized_schema.ipc_message.len();
193    let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
194    len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
195    len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
196    len_prefix_schema.append(&mut serialized_schema.ipc_message);
197
198    BASE64_STANDARD.encode(&len_prefix_schema)
199}
200
201/// Mutates writer metadata by storing the encoded Arrow schema hint in
202/// [`ARROW_SCHEMA_META_KEY`].
203///
204/// If there is an existing Arrow schema metadata, it is replaced.
205///
206/// [`ARROW_SCHEMA_META_KEY`]: crate::arrow::ARROW_SCHEMA_META_KEY
207pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
208    let encoded = encode_arrow_schema(schema);
209
210    let schema_kv = KeyValue {
211        key: super::ARROW_SCHEMA_META_KEY.to_string(),
212        value: Some(encoded),
213    };
214
215    let meta = props
216        .key_value_metadata
217        .get_or_insert_with(Default::default);
218
219    // check if ARROW:schema exists, and overwrite it
220    let schema_meta = meta
221        .iter()
222        .enumerate()
223        .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
224    match schema_meta {
225        Some((i, _)) => {
226            meta.remove(i);
227            meta.push(schema_kv);
228        }
229        None => {
230            meta.push(schema_kv);
231        }
232    }
233}
234
235/// Converter for Arrow schema to Parquet schema
236///
237/// See the documentation on the [`arrow`] module for background
238/// information on how Arrow schema is represented in Parquet.
239///
240/// [`arrow`]: crate::arrow
241///
242/// # Example:
243/// ```
244/// # use std::sync::Arc;
245/// # use arrow_schema::{Field, Schema, DataType};
246/// # use parquet::arrow::ArrowSchemaConverter;
247/// use parquet::schema::types::{SchemaDescriptor, Type};
248/// use parquet::basic; // note there are two `Type`s in the following example
249/// // create an Arrow Schema
250/// let arrow_schema = Schema::new(vec![
251///   Field::new("a", DataType::Int64, true),
252///   Field::new("b", DataType::Date32, true),
253/// ]);
254/// // convert the Arrow schema to a Parquet schema
255/// let parquet_schema = ArrowSchemaConverter::new()
256///   .convert(&arrow_schema)
257///   .unwrap();
258///
259/// let expected_parquet_schema = SchemaDescriptor::new(
260///   Arc::new(
261///     Type::group_type_builder("arrow_schema")
262///       .with_fields(vec![
263///         Arc::new(
264///          Type::primitive_type_builder("a", basic::Type::INT64)
265///           .build().unwrap()
266///         ),
267///         Arc::new(
268///          Type::primitive_type_builder("b", basic::Type::INT32)
269///           .with_converted_type(basic::ConvertedType::DATE)
270///           .with_logical_type(Some(basic::LogicalType::Date))
271///           .build().unwrap()
272///         ),
273///      ])
274///      .build().unwrap()
275///   )
276/// );
277/// assert_eq!(parquet_schema, expected_parquet_schema);
278/// ```
279#[derive(Debug)]
280pub struct ArrowSchemaConverter<'a> {
281    /// Name of the root schema in Parquet
282    schema_root: &'a str,
283    /// Should we coerce Arrow types to compatible Parquet types?
284    ///
285    /// See docs on [Self::with_coerce_types]`
286    coerce_types: bool,
287}
288
289impl Default for ArrowSchemaConverter<'_> {
290    fn default() -> Self {
291        Self::new()
292    }
293}
294
295impl<'a> ArrowSchemaConverter<'a> {
296    /// Create a new converter
297    pub fn new() -> Self {
298        Self {
299            schema_root: "arrow_schema",
300            coerce_types: false,
301        }
302    }
303
304    /// Should Arrow types be coerced into Parquet native types (default `false`).
305    ///
306    /// Setting this option to `true` will result in Parquet files that can be
307    /// read by more readers, but may lose precision for Arrow types such as
308    /// [`DataType::Date64`] which have no direct [corresponding Parquet type].
309    ///
310    /// By default, this converter does not coerce to native Parquet types. Enabling type
311    /// coercion allows for meaningful representations that do not require
312    /// downstream readers to consider the embedded Arrow schema, and can allow
313    /// for greater compatibility with other Parquet implementations. However,
314    /// type coercion also prevents data from being losslessly round-tripped.
315    ///
316    /// # Discussion
317    ///
318    /// Some Arrow types such as `Date64`, `Timestamp` and `Interval` have no
319    /// corresponding Parquet logical type. Thus, they can not be losslessly
320    /// round-tripped when stored using the appropriate Parquet logical type.
321    /// For example, some Date64 values may be truncated when stored with
322    /// parquet's native 32 bit date type.
323    ///
324    /// For [`List`] and [`Map`] types, some Parquet readers expect certain
325    /// schema elements to have specific names (earlier versions of the spec
326    /// were somewhat ambiguous on this point). Type coercion will use the names
327    /// prescribed by the Parquet specification, potentially losing naming
328    /// metadata from the Arrow schema.
329    ///
330    /// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
331    /// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
332    /// [corresponding Parquet type]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date
333    ///
334    pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
335        self.coerce_types = coerce_types;
336        self
337    }
338
339    /// Set the root schema element name (defaults to `"arrow_schema"`).
340    pub fn schema_root(mut self, schema_root: &'a str) -> Self {
341        self.schema_root = schema_root;
342        self
343    }
344
345    /// Convert the specified Arrow [`Schema`] to the desired Parquet [`SchemaDescriptor`]
346    ///
347    /// See example in [`ArrowSchemaConverter`]
348    pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
349        let fields = schema
350            .fields()
351            .iter()
352            .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
353            .collect::<Result<_>>()?;
354        let group = Type::group_type_builder(self.schema_root)
355            .with_fields(fields)
356            .build()?;
357        Ok(SchemaDescriptor::new(Arc::new(group)))
358    }
359}
360
361fn parse_key_value_metadata(
362    key_value_metadata: Option<&Vec<KeyValue>>,
363) -> Option<HashMap<String, String>> {
364    match key_value_metadata {
365        Some(key_values) => {
366            let map: HashMap<String, String> = key_values
367                .iter()
368                .filter_map(|kv| {
369                    kv.value
370                        .as_ref()
371                        .map(|value| (kv.key.clone(), value.clone()))
372                })
373                .collect();
374
375            if map.is_empty() {
376                None
377            } else {
378                Some(map)
379            }
380        }
381        None => None,
382    }
383}
384
385/// Convert parquet column schema to arrow field.
386pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
387    let field = complex::convert_type(&parquet_column.self_type_ptr())?;
388    let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
389
390    let basic_info = parquet_column.self_type().get_basic_info();
391    let mut meta = HashMap::with_capacity(if cfg!(feature = "arrow_canonical_extension_types") {
392        2
393    } else {
394        1
395    });
396    if basic_info.has_id() {
397        meta.insert(
398            PARQUET_FIELD_ID_META_KEY.to_string(),
399            basic_info.id().to_string(),
400        );
401    }
402    #[cfg(feature = "arrow_canonical_extension_types")]
403    if let Some(logical_type) = basic_info.logical_type() {
404        match logical_type {
405            LogicalType::Uuid => ret.try_with_extension_type(Uuid)?,
406            LogicalType::Json => ret.try_with_extension_type(Json::default())?,
407            _ => {}
408        }
409    }
410    if !meta.is_empty() {
411        ret.set_metadata(meta);
412    }
413
414    Ok(ret)
415}
416
417pub fn decimal_length_from_precision(precision: u8) -> usize {
418    // digits = floor(log_10(2^(8*n - 1) - 1))  // definition in parquet's logical types
419    // ceil(digits) = log10(2^(8*n - 1) - 1)
420    // 10^ceil(digits) = 2^(8*n - 1) - 1
421    // 10^ceil(digits) + 1 = 2^(8*n - 1)
422    // log2(10^ceil(digits) + 1) = (8*n - 1)
423    // log2(10^ceil(digits) + 1) + 1 = 8*n
424    // (log2(10^ceil(a) + 1) + 1) / 8 = n
425    (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
426}
427
428/// Convert an arrow field to a parquet `Type`
429fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
430    const PARQUET_LIST_ELEMENT_NAME: &str = "element";
431    const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
432    const PARQUET_KEY_FIELD_NAME: &str = "key";
433    const PARQUET_VALUE_FIELD_NAME: &str = "value";
434
435    let name = field.name().as_str();
436    let repetition = if field.is_nullable() {
437        Repetition::OPTIONAL
438    } else {
439        Repetition::REQUIRED
440    };
441    let id = field_id(field);
442    // create type from field
443    match field.data_type() {
444        DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
445            .with_logical_type(Some(LogicalType::Unknown))
446            .with_repetition(repetition)
447            .with_id(id)
448            .build(),
449        DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
450            .with_repetition(repetition)
451            .with_id(id)
452            .build(),
453        DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
454            .with_logical_type(Some(LogicalType::Integer {
455                bit_width: 8,
456                is_signed: true,
457            }))
458            .with_repetition(repetition)
459            .with_id(id)
460            .build(),
461        DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
462            .with_logical_type(Some(LogicalType::Integer {
463                bit_width: 16,
464                is_signed: true,
465            }))
466            .with_repetition(repetition)
467            .with_id(id)
468            .build(),
469        DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
470            .with_repetition(repetition)
471            .with_id(id)
472            .build(),
473        DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
474            .with_repetition(repetition)
475            .with_id(id)
476            .build(),
477        DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
478            .with_logical_type(Some(LogicalType::Integer {
479                bit_width: 8,
480                is_signed: false,
481            }))
482            .with_repetition(repetition)
483            .with_id(id)
484            .build(),
485        DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
486            .with_logical_type(Some(LogicalType::Integer {
487                bit_width: 16,
488                is_signed: false,
489            }))
490            .with_repetition(repetition)
491            .with_id(id)
492            .build(),
493        DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
494            .with_logical_type(Some(LogicalType::Integer {
495                bit_width: 32,
496                is_signed: false,
497            }))
498            .with_repetition(repetition)
499            .with_id(id)
500            .build(),
501        DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
502            .with_logical_type(Some(LogicalType::Integer {
503                bit_width: 64,
504                is_signed: false,
505            }))
506            .with_repetition(repetition)
507            .with_id(id)
508            .build(),
509        DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
510            .with_repetition(repetition)
511            .with_id(id)
512            .with_logical_type(Some(LogicalType::Float16))
513            .with_length(2)
514            .build(),
515        DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
516            .with_repetition(repetition)
517            .with_id(id)
518            .build(),
519        DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
520            .with_repetition(repetition)
521            .with_id(id)
522            .build(),
523        DataType::Timestamp(TimeUnit::Second, _) => {
524            // Cannot represent seconds in LogicalType
525            Type::primitive_type_builder(name, PhysicalType::INT64)
526                .with_repetition(repetition)
527                .with_id(id)
528                .build()
529        }
530        DataType::Timestamp(time_unit, tz) => {
531            Type::primitive_type_builder(name, PhysicalType::INT64)
532                .with_logical_type(Some(LogicalType::Timestamp {
533                    // If timezone set, values are normalized to UTC timezone
534                    is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
535                    unit: match time_unit {
536                        TimeUnit::Second => unreachable!(),
537                        TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
538                        TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
539                        TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
540                    },
541                }))
542                .with_repetition(repetition)
543                .with_id(id)
544                .build()
545        }
546        DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
547            .with_logical_type(Some(LogicalType::Date))
548            .with_repetition(repetition)
549            .with_id(id)
550            .build(),
551        DataType::Date64 => {
552            if coerce_types {
553                Type::primitive_type_builder(name, PhysicalType::INT32)
554                    .with_logical_type(Some(LogicalType::Date))
555                    .with_repetition(repetition)
556                    .with_id(id)
557                    .build()
558            } else {
559                Type::primitive_type_builder(name, PhysicalType::INT64)
560                    .with_repetition(repetition)
561                    .with_id(id)
562                    .build()
563            }
564        }
565        DataType::Time32(TimeUnit::Second) => {
566            // Cannot represent seconds in LogicalType
567            Type::primitive_type_builder(name, PhysicalType::INT32)
568                .with_repetition(repetition)
569                .with_id(id)
570                .build()
571        }
572        DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
573            .with_logical_type(Some(LogicalType::Time {
574                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
575                unit: match unit {
576                    TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
577                    u => unreachable!("Invalid unit for Time32: {:?}", u),
578                },
579            }))
580            .with_repetition(repetition)
581            .with_id(id)
582            .build(),
583        DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
584            .with_logical_type(Some(LogicalType::Time {
585                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
586                unit: match unit {
587                    TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
588                    TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
589                    u => unreachable!("Invalid unit for Time64: {:?}", u),
590                },
591            }))
592            .with_repetition(repetition)
593            .with_id(id)
594            .build(),
595        DataType::Duration(_) => Type::primitive_type_builder(name, PhysicalType::INT64)
596            .with_repetition(repetition)
597            .with_id(id)
598            .build(),
599        DataType::Interval(_) => {
600            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
601                .with_converted_type(ConvertedType::INTERVAL)
602                .with_repetition(repetition)
603                .with_id(id)
604                .with_length(12)
605                .build()
606        }
607        DataType::Binary | DataType::LargeBinary => {
608            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
609                .with_repetition(repetition)
610                .with_id(id)
611                .build()
612        }
613        DataType::FixedSizeBinary(length) => {
614            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
615                .with_repetition(repetition)
616                .with_id(id)
617                .with_length(*length)
618                .with_logical_type(
619                    #[cfg(feature = "arrow_canonical_extension_types")]
620                    // If set, map arrow uuid extension type to parquet uuid logical type.
621                    field
622                        .try_extension_type::<Uuid>()
623                        .ok()
624                        .map(|_| LogicalType::Uuid),
625                    #[cfg(not(feature = "arrow_canonical_extension_types"))]
626                    None,
627                )
628                .build()
629        }
630        DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
631            .with_repetition(repetition)
632            .with_id(id)
633            .build(),
634        DataType::Decimal32(precision, scale)
635        | DataType::Decimal64(precision, scale)
636        | DataType::Decimal128(precision, scale)
637        | DataType::Decimal256(precision, scale) => {
638            // Decimal precision determines the Parquet physical type to use.
639            // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
640            let (physical_type, length) = if *precision > 1 && *precision <= 9 {
641                (PhysicalType::INT32, -1)
642            } else if *precision <= 18 {
643                (PhysicalType::INT64, -1)
644            } else {
645                (
646                    PhysicalType::FIXED_LEN_BYTE_ARRAY,
647                    decimal_length_from_precision(*precision) as i32,
648                )
649            };
650            Type::primitive_type_builder(name, physical_type)
651                .with_repetition(repetition)
652                .with_id(id)
653                .with_length(length)
654                .with_logical_type(Some(LogicalType::Decimal {
655                    scale: *scale as i32,
656                    precision: *precision as i32,
657                }))
658                .with_precision(*precision as i32)
659                .with_scale(*scale as i32)
660                .build()
661        }
662        DataType::Utf8 | DataType::LargeUtf8 => {
663            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
664                .with_logical_type({
665                    #[cfg(feature = "arrow_canonical_extension_types")]
666                    {
667                        // Use the Json logical type if the canonical Json
668                        // extension type is set on this field.
669                        field
670                            .try_extension_type::<Json>()
671                            .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
672                    }
673                    #[cfg(not(feature = "arrow_canonical_extension_types"))]
674                    Some(LogicalType::String)
675                })
676                .with_repetition(repetition)
677                .with_id(id)
678                .build()
679        }
680        DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
681            .with_logical_type({
682                #[cfg(feature = "arrow_canonical_extension_types")]
683                {
684                    // Use the Json logical type if the canonical Json
685                    // extension type is set on this field.
686                    field
687                        .try_extension_type::<Json>()
688                        .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
689                }
690                #[cfg(not(feature = "arrow_canonical_extension_types"))]
691                Some(LogicalType::String)
692            })
693            .with_repetition(repetition)
694            .with_id(id)
695            .build(),
696        DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
697            let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
698                // Ensure proper naming per the Parquet specification
699                let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
700                Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
701            } else {
702                Arc::new(arrow_to_parquet_type(f, coerce_types)?)
703            };
704
705            Type::group_type_builder(name)
706                .with_fields(vec![Arc::new(
707                    Type::group_type_builder("list")
708                        .with_fields(vec![field_ref])
709                        .with_repetition(Repetition::REPEATED)
710                        .build()?,
711                )])
712                .with_logical_type(Some(LogicalType::List))
713                .with_repetition(repetition)
714                .with_id(id)
715                .build()
716        }
717        DataType::ListView(_) | DataType::LargeListView(_) => {
718            unimplemented!("ListView/LargeListView not implemented")
719        }
720        DataType::Struct(fields) => {
721            if fields.is_empty() {
722                return Err(arrow_err!("Parquet does not support writing empty structs",));
723            }
724            // recursively convert children to types/nodes
725            let fields = fields
726                .iter()
727                .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
728                .collect::<Result<_>>()?;
729            Type::group_type_builder(name)
730                .with_fields(fields)
731                .with_repetition(repetition)
732                .with_id(id)
733                .build()
734        }
735        DataType::Map(field, _) => {
736            if let DataType::Struct(struct_fields) = field.data_type() {
737                // If coercing then set inner struct name to "key_value"
738                let map_struct_name = if coerce_types {
739                    PARQUET_MAP_STRUCT_NAME
740                } else {
741                    field.name()
742                };
743
744                // If coercing then ensure struct fields are named "key" and "value"
745                let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
746                    if coerce_types && fld.name() != name {
747                        let f = fld.as_ref().clone().with_name(name);
748                        Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
749                    } else {
750                        Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
751                    }
752                };
753                let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
754                let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
755
756                Type::group_type_builder(name)
757                    .with_fields(vec![Arc::new(
758                        Type::group_type_builder(map_struct_name)
759                            .with_fields(vec![key_field, val_field])
760                            .with_repetition(Repetition::REPEATED)
761                            .build()?,
762                    )])
763                    .with_logical_type(Some(LogicalType::Map))
764                    .with_repetition(repetition)
765                    .with_id(id)
766                    .build()
767            } else {
768                Err(arrow_err!(
769                    "DataType::Map should contain a struct field child",
770                ))
771            }
772        }
773        DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
774        DataType::Dictionary(_, ref value) => {
775            // Dictionary encoding not handled at the schema level
776            let dict_field = field.clone().with_data_type(value.as_ref().clone());
777            arrow_to_parquet_type(&dict_field, coerce_types)
778        }
779        DataType::RunEndEncoded(_, _) => Err(arrow_err!(
780            "Converting RunEndEncodedType to parquet not supported",
781        )),
782    }
783}
784
785fn field_id(field: &Field) -> Option<i32> {
786    let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
787    value.parse().ok() // Fail quietly if not a valid integer
788}
789
790#[cfg(test)]
791mod tests {
792    use super::*;
793
794    use std::{collections::HashMap, sync::Arc};
795
796    use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
797
798    use crate::arrow::PARQUET_FIELD_ID_META_KEY;
799    use crate::file::metadata::KeyValue;
800    use crate::file::reader::FileReader;
801    use crate::{
802        arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
803        schema::{parser::parse_message_type, types::SchemaDescriptor},
804    };
805
806    #[test]
807    fn test_flat_primitives() {
808        let message_type = "
809        message test_schema {
810            REQUIRED BOOLEAN boolean;
811            REQUIRED INT32   int8  (INT_8);
812            REQUIRED INT32   int16 (INT_16);
813            REQUIRED INT32   uint8 (INTEGER(8,false));
814            REQUIRED INT32   uint16 (INTEGER(16,false));
815            REQUIRED INT32   int32;
816            REQUIRED INT64   int64;
817            OPTIONAL DOUBLE  double;
818            OPTIONAL FLOAT   float;
819            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
820            OPTIONAL BINARY  string (UTF8);
821            OPTIONAL BINARY  string_2 (STRING);
822            OPTIONAL BINARY  json (JSON);
823        }
824        ";
825        let parquet_group_type = parse_message_type(message_type).unwrap();
826
827        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
828        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
829
830        let arrow_fields = Fields::from(vec![
831            Field::new("boolean", DataType::Boolean, false),
832            Field::new("int8", DataType::Int8, false),
833            Field::new("int16", DataType::Int16, false),
834            Field::new("uint8", DataType::UInt8, false),
835            Field::new("uint16", DataType::UInt16, false),
836            Field::new("int32", DataType::Int32, false),
837            Field::new("int64", DataType::Int64, false),
838            Field::new("double", DataType::Float64, true),
839            Field::new("float", DataType::Float32, true),
840            Field::new("float16", DataType::Float16, true),
841            Field::new("string", DataType::Utf8, true),
842            Field::new("string_2", DataType::Utf8, true),
843            Field::new("json", DataType::Utf8, true),
844        ]);
845
846        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
847    }
848
849    #[test]
850    fn test_decimal_fields() {
851        let message_type = "
852        message test_schema {
853                    REQUIRED INT32 decimal1 (DECIMAL(4,2));
854                    REQUIRED INT64 decimal2 (DECIMAL(12,2));
855                    REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
856                    REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
857                    REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
858                    REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
859                    REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
860        }
861        ";
862
863        let parquet_group_type = parse_message_type(message_type).unwrap();
864
865        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
866        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
867
868        let arrow_fields = Fields::from(vec![
869            Field::new("decimal1", DataType::Decimal128(4, 2), false),
870            Field::new("decimal2", DataType::Decimal128(12, 2), false),
871            Field::new("decimal3", DataType::Decimal128(30, 2), false),
872            Field::new("decimal4", DataType::Decimal128(33, 2), false),
873            Field::new("decimal5", DataType::Decimal128(38, 2), false),
874            Field::new("decimal6", DataType::Decimal256(39, 2), false),
875            Field::new("decimal7", DataType::Decimal256(39, 2), false),
876        ]);
877        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
878    }
879
880    #[test]
881    fn test_byte_array_fields() {
882        let message_type = "
883        message test_schema {
884            REQUIRED BYTE_ARRAY binary;
885            REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
886        }
887        ";
888
889        let parquet_group_type = parse_message_type(message_type).unwrap();
890
891        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
892        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
893
894        let arrow_fields = Fields::from(vec![
895            Field::new("binary", DataType::Binary, false),
896            Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
897        ]);
898        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
899    }
900
901    #[test]
902    fn test_duplicate_fields() {
903        let message_type = "
904        message test_schema {
905            REQUIRED BOOLEAN boolean;
906            REQUIRED INT32 int8 (INT_8);
907        }
908        ";
909
910        let parquet_group_type = parse_message_type(message_type).unwrap();
911
912        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
913        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
914
915        let arrow_fields = Fields::from(vec![
916            Field::new("boolean", DataType::Boolean, false),
917            Field::new("int8", DataType::Int8, false),
918        ]);
919        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
920
921        let converted_arrow_schema =
922            parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
923                .unwrap();
924        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
925    }
926
927    #[test]
928    fn test_parquet_lists() {
929        let mut arrow_fields = Vec::new();
930
931        // LIST encoding example taken from parquet-format/LogicalTypes.md
932        let message_type = "
933        message test_schema {
934          REQUIRED GROUP my_list (LIST) {
935            REPEATED GROUP list {
936              OPTIONAL BINARY element (UTF8);
937            }
938          }
939          OPTIONAL GROUP my_list (LIST) {
940            REPEATED GROUP list {
941              REQUIRED BINARY element (UTF8);
942            }
943          }
944          OPTIONAL GROUP array_of_arrays (LIST) {
945            REPEATED GROUP list {
946              REQUIRED GROUP element (LIST) {
947                REPEATED GROUP list {
948                  REQUIRED INT32 element;
949                }
950              }
951            }
952          }
953          OPTIONAL GROUP my_list (LIST) {
954            REPEATED GROUP element {
955              REQUIRED BINARY str (UTF8);
956            }
957          }
958          OPTIONAL GROUP my_list (LIST) {
959            REPEATED INT32 element;
960          }
961          OPTIONAL GROUP my_list (LIST) {
962            REPEATED GROUP element {
963              REQUIRED BINARY str (UTF8);
964              REQUIRED INT32 num;
965            }
966          }
967          OPTIONAL GROUP my_list (LIST) {
968            REPEATED GROUP array {
969              REQUIRED BINARY str (UTF8);
970            }
971
972          }
973          OPTIONAL GROUP my_list (LIST) {
974            REPEATED GROUP my_list_tuple {
975              REQUIRED BINARY str (UTF8);
976            }
977          }
978          REPEATED INT32 name;
979        }
980        ";
981
982        // // List<String> (list non-null, elements nullable)
983        // required group my_list (LIST) {
984        //   repeated group list {
985        //     optional binary element (UTF8);
986        //   }
987        // }
988        {
989            arrow_fields.push(Field::new_list(
990                "my_list",
991                Field::new("element", DataType::Utf8, true),
992                false,
993            ));
994        }
995
996        // // List<String> (list nullable, elements non-null)
997        // optional group my_list (LIST) {
998        //   repeated group list {
999        //     required binary element (UTF8);
1000        //   }
1001        // }
1002        {
1003            arrow_fields.push(Field::new_list(
1004                "my_list",
1005                Field::new("element", DataType::Utf8, false),
1006                true,
1007            ));
1008        }
1009
1010        // Element types can be nested structures. For example, a list of lists:
1011        //
1012        // // List<List<Integer>>
1013        // optional group array_of_arrays (LIST) {
1014        //   repeated group list {
1015        //     required group element (LIST) {
1016        //       repeated group list {
1017        //         required int32 element;
1018        //       }
1019        //     }
1020        //   }
1021        // }
1022        {
1023            let arrow_inner_list = Field::new("element", DataType::Int32, false);
1024            arrow_fields.push(Field::new_list(
1025                "array_of_arrays",
1026                Field::new_list("element", arrow_inner_list, false),
1027                true,
1028            ));
1029        }
1030
1031        // // List<String> (list nullable, elements non-null)
1032        // optional group my_list (LIST) {
1033        //   repeated group element {
1034        //     required binary str (UTF8);
1035        //   };
1036        // }
1037        {
1038            arrow_fields.push(Field::new_list(
1039                "my_list",
1040                Field::new("str", DataType::Utf8, false),
1041                true,
1042            ));
1043        }
1044
1045        // // List<Integer> (nullable list, non-null elements)
1046        // optional group my_list (LIST) {
1047        //   repeated int32 element;
1048        // }
1049        {
1050            arrow_fields.push(Field::new_list(
1051                "my_list",
1052                Field::new("element", DataType::Int32, false),
1053                true,
1054            ));
1055        }
1056
1057        // // List<Tuple<String, Integer>> (nullable list, non-null elements)
1058        // optional group my_list (LIST) {
1059        //   repeated group element {
1060        //     required binary str (UTF8);
1061        //     required int32 num;
1062        //   };
1063        // }
1064        {
1065            let fields = vec![
1066                Field::new("str", DataType::Utf8, false),
1067                Field::new("num", DataType::Int32, false),
1068            ];
1069            arrow_fields.push(Field::new_list(
1070                "my_list",
1071                Field::new_struct("element", fields, false),
1072                true,
1073            ));
1074        }
1075
1076        // // List<OneTuple<String>> (nullable list, non-null elements)
1077        // optional group my_list (LIST) {
1078        //   repeated group array {
1079        //     required binary str (UTF8);
1080        //   };
1081        // }
1082        // Special case: group is named array
1083        {
1084            let fields = vec![Field::new("str", DataType::Utf8, false)];
1085            arrow_fields.push(Field::new_list(
1086                "my_list",
1087                Field::new_struct("array", fields, false),
1088                true,
1089            ));
1090        }
1091
1092        // // List<OneTuple<String>> (nullable list, non-null elements)
1093        // optional group my_list (LIST) {
1094        //   repeated group my_list_tuple {
1095        //     required binary str (UTF8);
1096        //   };
1097        // }
1098        // Special case: group named ends in _tuple
1099        {
1100            let fields = vec![Field::new("str", DataType::Utf8, false)];
1101            arrow_fields.push(Field::new_list(
1102                "my_list",
1103                Field::new_struct("my_list_tuple", fields, false),
1104                true,
1105            ));
1106        }
1107
1108        // One-level encoding: Only allows required lists with required cells
1109        //   repeated value_type name
1110        {
1111            arrow_fields.push(Field::new_list(
1112                "name",
1113                Field::new("name", DataType::Int32, false),
1114                false,
1115            ));
1116        }
1117
1118        let parquet_group_type = parse_message_type(message_type).unwrap();
1119
1120        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1121        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1122        let converted_fields = converted_arrow_schema.fields();
1123
1124        assert_eq!(arrow_fields.len(), converted_fields.len());
1125        for i in 0..arrow_fields.len() {
1126            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1127        }
1128    }
1129
1130    #[test]
1131    fn test_parquet_list_nullable() {
1132        let mut arrow_fields = Vec::new();
1133
1134        let message_type = "
1135        message test_schema {
1136          REQUIRED GROUP my_list1 (LIST) {
1137            REPEATED GROUP list {
1138              OPTIONAL BINARY element (UTF8);
1139            }
1140          }
1141          OPTIONAL GROUP my_list2 (LIST) {
1142            REPEATED GROUP list {
1143              REQUIRED BINARY element (UTF8);
1144            }
1145          }
1146          REQUIRED GROUP my_list3 (LIST) {
1147            REPEATED GROUP list {
1148              REQUIRED BINARY element (UTF8);
1149            }
1150          }
1151        }
1152        ";
1153
1154        // // List<String> (list non-null, elements nullable)
1155        // required group my_list1 (LIST) {
1156        //   repeated group list {
1157        //     optional binary element (UTF8);
1158        //   }
1159        // }
1160        {
1161            arrow_fields.push(Field::new_list(
1162                "my_list1",
1163                Field::new("element", DataType::Utf8, true),
1164                false,
1165            ));
1166        }
1167
1168        // // List<String> (list nullable, elements non-null)
1169        // optional group my_list2 (LIST) {
1170        //   repeated group list {
1171        //     required binary element (UTF8);
1172        //   }
1173        // }
1174        {
1175            arrow_fields.push(Field::new_list(
1176                "my_list2",
1177                Field::new("element", DataType::Utf8, false),
1178                true,
1179            ));
1180        }
1181
1182        // // List<String> (list non-null, elements non-null)
1183        // repeated group my_list3 (LIST) {
1184        //   repeated group list {
1185        //     required binary element (UTF8);
1186        //   }
1187        // }
1188        {
1189            arrow_fields.push(Field::new_list(
1190                "my_list3",
1191                Field::new("element", DataType::Utf8, false),
1192                false,
1193            ));
1194        }
1195
1196        let parquet_group_type = parse_message_type(message_type).unwrap();
1197
1198        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1199        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1200        let converted_fields = converted_arrow_schema.fields();
1201
1202        assert_eq!(arrow_fields.len(), converted_fields.len());
1203        for i in 0..arrow_fields.len() {
1204            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1205        }
1206    }
1207
1208    #[test]
1209    fn test_parquet_maps() {
1210        let mut arrow_fields = Vec::new();
1211
1212        // LIST encoding example taken from parquet-format/LogicalTypes.md
1213        let message_type = "
1214        message test_schema {
1215          REQUIRED group my_map1 (MAP) {
1216            REPEATED group key_value {
1217              REQUIRED binary key (UTF8);
1218              OPTIONAL int32 value;
1219            }
1220          }
1221          OPTIONAL group my_map2 (MAP) {
1222            REPEATED group map {
1223              REQUIRED binary str (UTF8);
1224              REQUIRED int32 num;
1225            }
1226          }
1227          OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1228            REPEATED group map {
1229              REQUIRED binary key (UTF8);
1230              OPTIONAL int32 value;
1231            }
1232          }
1233          REQUIRED group my_map4 (MAP) {
1234            REPEATED group map {
1235              OPTIONAL binary key (UTF8);
1236              REQUIRED int32 value;
1237            }
1238          }
1239        }
1240        ";
1241
1242        // // Map<String, Integer>
1243        // required group my_map (MAP) {
1244        //   repeated group key_value {
1245        //     required binary key (UTF8);
1246        //     optional int32 value;
1247        //   }
1248        // }
1249        {
1250            arrow_fields.push(Field::new_map(
1251                "my_map1",
1252                "key_value",
1253                Field::new("key", DataType::Utf8, false),
1254                Field::new("value", DataType::Int32, true),
1255                false,
1256                false,
1257            ));
1258        }
1259
1260        // // Map<String, Integer> (nullable map, non-null values)
1261        // optional group my_map (MAP) {
1262        //   repeated group map {
1263        //     required binary str (UTF8);
1264        //     required int32 num;
1265        //   }
1266        // }
1267        {
1268            arrow_fields.push(Field::new_map(
1269                "my_map2",
1270                "map",
1271                Field::new("str", DataType::Utf8, false),
1272                Field::new("num", DataType::Int32, false),
1273                false,
1274                true,
1275            ));
1276        }
1277
1278        // // Map<String, Integer> (nullable map, nullable values)
1279        // optional group my_map (MAP_KEY_VALUE) {
1280        //   repeated group map {
1281        //     required binary key (UTF8);
1282        //     optional int32 value;
1283        //   }
1284        // }
1285        {
1286            arrow_fields.push(Field::new_map(
1287                "my_map3",
1288                "map",
1289                Field::new("key", DataType::Utf8, false),
1290                Field::new("value", DataType::Int32, true),
1291                false,
1292                true,
1293            ));
1294        }
1295
1296        // // Map<String, Integer> (non-compliant nullable key)
1297        // group my_map (MAP_KEY_VALUE) {
1298        //   repeated group map {
1299        //     optional binary key (UTF8);
1300        //     required int32 value;
1301        //   }
1302        // }
1303        {
1304            arrow_fields.push(Field::new_map(
1305                "my_map4",
1306                "map",
1307                Field::new("key", DataType::Utf8, false), // The key is always non-nullable (#5630)
1308                Field::new("value", DataType::Int32, false),
1309                false,
1310                false,
1311            ));
1312        }
1313
1314        let parquet_group_type = parse_message_type(message_type).unwrap();
1315
1316        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1317        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1318        let converted_fields = converted_arrow_schema.fields();
1319
1320        assert_eq!(arrow_fields.len(), converted_fields.len());
1321        for i in 0..arrow_fields.len() {
1322            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1323        }
1324    }
1325
1326    #[test]
1327    fn test_nested_schema() {
1328        let mut arrow_fields = Vec::new();
1329        {
1330            let group1_fields = Fields::from(vec![
1331                Field::new("leaf1", DataType::Boolean, false),
1332                Field::new("leaf2", DataType::Int32, false),
1333            ]);
1334            let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1335            arrow_fields.push(group1_struct);
1336
1337            let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1338            arrow_fields.push(leaf3_field);
1339        }
1340
1341        let message_type = "
1342        message test_schema {
1343          REQUIRED GROUP group1 {
1344            REQUIRED BOOLEAN leaf1;
1345            REQUIRED INT32 leaf2;
1346          }
1347          REQUIRED INT64 leaf3;
1348        }
1349        ";
1350        let parquet_group_type = parse_message_type(message_type).unwrap();
1351
1352        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1353        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1354        let converted_fields = converted_arrow_schema.fields();
1355
1356        assert_eq!(arrow_fields.len(), converted_fields.len());
1357        for i in 0..arrow_fields.len() {
1358            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1359        }
1360    }
1361
1362    #[test]
1363    fn test_nested_schema_partial() {
1364        let mut arrow_fields = Vec::new();
1365        {
1366            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1367            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1368            arrow_fields.push(group1);
1369
1370            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1371            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1372            arrow_fields.push(group2);
1373
1374            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1375        }
1376
1377        let message_type = "
1378        message test_schema {
1379          REQUIRED GROUP group1 {
1380            REQUIRED INT64 leaf1;
1381            REQUIRED INT64 leaf2;
1382          }
1383          REQUIRED  GROUP group2 {
1384            REQUIRED INT64 leaf3;
1385            REQUIRED INT64 leaf4;
1386          }
1387          REQUIRED INT64 leaf5;
1388        }
1389        ";
1390        let parquet_group_type = parse_message_type(message_type).unwrap();
1391
1392        // Expected partial arrow schema (columns 0, 3, 4):
1393        // required group group1 {
1394        //   required int64 leaf1;
1395        // }
1396        // required group group2 {
1397        //   required int64 leaf4;
1398        // }
1399        // required int64 leaf5;
1400
1401        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1402        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1403        let converted_arrow_schema =
1404            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1405        let converted_fields = converted_arrow_schema.fields();
1406
1407        assert_eq!(arrow_fields.len(), converted_fields.len());
1408        for i in 0..arrow_fields.len() {
1409            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1410        }
1411    }
1412
1413    #[test]
1414    fn test_nested_schema_partial_ordering() {
1415        let mut arrow_fields = Vec::new();
1416        {
1417            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1418            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1419            arrow_fields.push(group1);
1420
1421            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1422            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1423            arrow_fields.push(group2);
1424
1425            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1426        }
1427
1428        let message_type = "
1429        message test_schema {
1430          REQUIRED GROUP group1 {
1431            REQUIRED INT64 leaf1;
1432            REQUIRED INT64 leaf2;
1433          }
1434          REQUIRED  GROUP group2 {
1435            REQUIRED INT64 leaf3;
1436            REQUIRED INT64 leaf4;
1437          }
1438          REQUIRED INT64 leaf5;
1439        }
1440        ";
1441        let parquet_group_type = parse_message_type(message_type).unwrap();
1442
1443        // Expected partial arrow schema (columns 3, 4, 0):
1444        // required group group1 {
1445        //   required int64 leaf1;
1446        // }
1447        // required group group2 {
1448        //   required int64 leaf4;
1449        // }
1450        // required int64 leaf5;
1451
1452        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1453        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1454        let converted_arrow_schema =
1455            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1456        let converted_fields = converted_arrow_schema.fields();
1457
1458        assert_eq!(arrow_fields.len(), converted_fields.len());
1459        for i in 0..arrow_fields.len() {
1460            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1461        }
1462
1463        let mask =
1464            ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1465        let converted_arrow_schema =
1466            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1467        let converted_fields = converted_arrow_schema.fields();
1468
1469        assert_eq!(arrow_fields.len(), converted_fields.len());
1470        for i in 0..arrow_fields.len() {
1471            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1472        }
1473    }
1474
1475    #[test]
1476    fn test_repeated_nested_schema() {
1477        let mut arrow_fields = Vec::new();
1478        {
1479            arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1480
1481            let inner_group_list = Field::new_list(
1482                "innerGroup",
1483                Field::new_struct(
1484                    "innerGroup",
1485                    vec![Field::new("leaf3", DataType::Int32, true)],
1486                    false,
1487                ),
1488                false,
1489            );
1490
1491            let outer_group_list = Field::new_list(
1492                "outerGroup",
1493                Field::new_struct(
1494                    "outerGroup",
1495                    vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1496                    false,
1497                ),
1498                false,
1499            );
1500            arrow_fields.push(outer_group_list);
1501        }
1502
1503        let message_type = "
1504        message test_schema {
1505          OPTIONAL INT32 leaf1;
1506          REPEATED GROUP outerGroup {
1507            OPTIONAL INT32 leaf2;
1508            REPEATED GROUP innerGroup {
1509              OPTIONAL INT32 leaf3;
1510            }
1511          }
1512        }
1513        ";
1514        let parquet_group_type = parse_message_type(message_type).unwrap();
1515
1516        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1517        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1518        let converted_fields = converted_arrow_schema.fields();
1519
1520        assert_eq!(arrow_fields.len(), converted_fields.len());
1521        for i in 0..arrow_fields.len() {
1522            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1523        }
1524    }
1525
1526    #[test]
1527    fn test_column_desc_to_field() {
1528        let message_type = "
1529        message test_schema {
1530            REQUIRED BOOLEAN boolean;
1531            REQUIRED INT32   int8  (INT_8);
1532            REQUIRED INT32   uint8 (INTEGER(8,false));
1533            REQUIRED INT32   int16 (INT_16);
1534            REQUIRED INT32   uint16 (INTEGER(16,false));
1535            REQUIRED INT32   int32;
1536            REQUIRED INT64   int64;
1537            OPTIONAL DOUBLE  double;
1538            OPTIONAL FLOAT   float;
1539            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1540            OPTIONAL BINARY  string (UTF8);
1541            REPEATED BOOLEAN bools;
1542            OPTIONAL INT32   date       (DATE);
1543            OPTIONAL INT32   time_milli (TIME_MILLIS);
1544            OPTIONAL INT64   time_micro (TIME_MICROS);
1545            OPTIONAL INT64   time_nano (TIME(NANOS,false));
1546            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1547            REQUIRED INT64   ts_micro (TIMESTAMP_MICROS);
1548            REQUIRED INT64   ts_nano (TIMESTAMP(NANOS,true));
1549            REPEATED INT32   int_list;
1550            REPEATED BINARY  byte_list;
1551            REPEATED BINARY  string_list (UTF8);
1552            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1553            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1554            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1555        }
1556        ";
1557        let parquet_group_type = parse_message_type(message_type).unwrap();
1558
1559        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1560        let converted_arrow_fields = parquet_schema
1561            .columns()
1562            .iter()
1563            .map(|c| parquet_to_arrow_field(c).unwrap())
1564            .collect::<Vec<Field>>();
1565
1566        let arrow_fields = vec![
1567            Field::new("boolean", DataType::Boolean, false),
1568            Field::new("int8", DataType::Int8, false),
1569            Field::new("uint8", DataType::UInt8, false),
1570            Field::new("int16", DataType::Int16, false),
1571            Field::new("uint16", DataType::UInt16, false),
1572            Field::new("int32", DataType::Int32, false),
1573            Field::new("int64", DataType::Int64, false),
1574            Field::new("double", DataType::Float64, true),
1575            Field::new("float", DataType::Float32, true),
1576            Field::new("float16", DataType::Float16, true),
1577            Field::new("string", DataType::Utf8, true),
1578            Field::new_list(
1579                "bools",
1580                Field::new("bools", DataType::Boolean, false),
1581                false,
1582            ),
1583            Field::new("date", DataType::Date32, true),
1584            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1585            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1586            Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1587            Field::new(
1588                "ts_milli",
1589                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1590                true,
1591            ),
1592            Field::new(
1593                "ts_micro",
1594                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1595                false,
1596            ),
1597            Field::new(
1598                "ts_nano",
1599                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1600                false,
1601            ),
1602            Field::new_list(
1603                "int_list",
1604                Field::new("int_list", DataType::Int32, false),
1605                false,
1606            ),
1607            Field::new_list(
1608                "byte_list",
1609                Field::new("byte_list", DataType::Binary, false),
1610                false,
1611            ),
1612            Field::new_list(
1613                "string_list",
1614                Field::new("string_list", DataType::Utf8, false),
1615                false,
1616            ),
1617            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1618            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1619            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1620        ];
1621
1622        assert_eq!(arrow_fields, converted_arrow_fields);
1623    }
1624
1625    #[test]
1626    fn test_coerced_map_list() {
1627        // Create Arrow schema with non-Parquet naming
1628        let arrow_fields = vec![
1629            Field::new_list(
1630                "my_list",
1631                Field::new("item", DataType::Boolean, true),
1632                false,
1633            ),
1634            Field::new_map(
1635                "my_map",
1636                "entries",
1637                Field::new("keys", DataType::Utf8, false),
1638                Field::new("values", DataType::Int32, true),
1639                false,
1640                true,
1641            ),
1642        ];
1643        let arrow_schema = Schema::new(arrow_fields);
1644
1645        // Create Parquet schema with coerced names
1646        let message_type = "
1647        message parquet_schema {
1648            REQUIRED GROUP my_list (LIST) {
1649                REPEATED GROUP list {
1650                    OPTIONAL BOOLEAN element;
1651                }
1652            }
1653            OPTIONAL GROUP my_map (MAP) {
1654                REPEATED GROUP key_value {
1655                    REQUIRED BINARY key (STRING);
1656                    OPTIONAL INT32 value;
1657                }
1658            }
1659        }
1660        ";
1661        let parquet_group_type = parse_message_type(message_type).unwrap();
1662        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1663        let converted_arrow_schema = ArrowSchemaConverter::new()
1664            .with_coerce_types(true)
1665            .convert(&arrow_schema)
1666            .unwrap();
1667        assert_eq!(
1668            parquet_schema.columns().len(),
1669            converted_arrow_schema.columns().len()
1670        );
1671
1672        // Create Parquet schema without coerced names
1673        let message_type = "
1674        message parquet_schema {
1675            REQUIRED GROUP my_list (LIST) {
1676                REPEATED GROUP list {
1677                    OPTIONAL BOOLEAN item;
1678                }
1679            }
1680            OPTIONAL GROUP my_map (MAP) {
1681                REPEATED GROUP entries {
1682                    REQUIRED BINARY keys (STRING);
1683                    OPTIONAL INT32 values;
1684                }
1685            }
1686        }
1687        ";
1688        let parquet_group_type = parse_message_type(message_type).unwrap();
1689        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1690        let converted_arrow_schema = ArrowSchemaConverter::new()
1691            .with_coerce_types(false)
1692            .convert(&arrow_schema)
1693            .unwrap();
1694        assert_eq!(
1695            parquet_schema.columns().len(),
1696            converted_arrow_schema.columns().len()
1697        );
1698    }
1699
1700    #[test]
1701    fn test_field_to_column_desc() {
1702        let message_type = "
1703        message arrow_schema {
1704            REQUIRED BOOLEAN boolean;
1705            REQUIRED INT32   int8  (INT_8);
1706            REQUIRED INT32   int16 (INTEGER(16,true));
1707            REQUIRED INT32   int32;
1708            REQUIRED INT64   int64;
1709            OPTIONAL DOUBLE  double;
1710            OPTIONAL FLOAT   float;
1711            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1712            OPTIONAL BINARY  string (STRING);
1713            OPTIONAL GROUP   bools (LIST) {
1714                REPEATED GROUP list {
1715                    OPTIONAL BOOLEAN element;
1716                }
1717            }
1718            REQUIRED GROUP   bools_non_null (LIST) {
1719                REPEATED GROUP list {
1720                    REQUIRED BOOLEAN element;
1721                }
1722            }
1723            OPTIONAL INT32   date       (DATE);
1724            OPTIONAL INT32   time_milli (TIME(MILLIS,false));
1725            OPTIONAL INT32   time_milli_utc (TIME(MILLIS,true));
1726            OPTIONAL INT64   time_micro (TIME_MICROS);
1727            OPTIONAL INT64   time_micro_utc (TIME(MICROS, true));
1728            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1729            REQUIRED INT64   ts_micro (TIMESTAMP(MICROS,false));
1730            REQUIRED INT64   ts_seconds;
1731            REQUIRED INT64   ts_micro_utc (TIMESTAMP(MICROS, true));
1732            REQUIRED INT64   ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1733            REQUIRED INT64   ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1734            REQUIRED INT64   ts_micro_non_utc (TIMESTAMP(MICROS, true));
1735            REQUIRED GROUP struct {
1736                REQUIRED BOOLEAN bools;
1737                REQUIRED INT32 uint32 (INTEGER(32,false));
1738                REQUIRED GROUP   int32 (LIST) {
1739                    REPEATED GROUP list {
1740                        OPTIONAL INT32 element;
1741                    }
1742                }
1743            }
1744            REQUIRED BINARY  dictionary_strings (STRING);
1745            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1746            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1747            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1748            REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1749            REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1750        }
1751        ";
1752        let parquet_group_type = parse_message_type(message_type).unwrap();
1753
1754        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1755
1756        let arrow_fields = vec![
1757            Field::new("boolean", DataType::Boolean, false),
1758            Field::new("int8", DataType::Int8, false),
1759            Field::new("int16", DataType::Int16, false),
1760            Field::new("int32", DataType::Int32, false),
1761            Field::new("int64", DataType::Int64, false),
1762            Field::new("double", DataType::Float64, true),
1763            Field::new("float", DataType::Float32, true),
1764            Field::new("float16", DataType::Float16, true),
1765            Field::new("string", DataType::Utf8, true),
1766            Field::new_list(
1767                "bools",
1768                Field::new("element", DataType::Boolean, true),
1769                true,
1770            ),
1771            Field::new_list(
1772                "bools_non_null",
1773                Field::new("element", DataType::Boolean, false),
1774                false,
1775            ),
1776            Field::new("date", DataType::Date32, true),
1777            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1778            Field::new(
1779                "time_milli_utc",
1780                DataType::Time32(TimeUnit::Millisecond),
1781                true,
1782            )
1783            .with_metadata(HashMap::from_iter(vec![(
1784                "adjusted_to_utc".to_string(),
1785                "".to_string(),
1786            )])),
1787            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1788            Field::new(
1789                "time_micro_utc",
1790                DataType::Time64(TimeUnit::Microsecond),
1791                true,
1792            )
1793            .with_metadata(HashMap::from_iter(vec![(
1794                "adjusted_to_utc".to_string(),
1795                "".to_string(),
1796            )])),
1797            Field::new(
1798                "ts_milli",
1799                DataType::Timestamp(TimeUnit::Millisecond, None),
1800                true,
1801            ),
1802            Field::new(
1803                "ts_micro",
1804                DataType::Timestamp(TimeUnit::Microsecond, None),
1805                false,
1806            ),
1807            Field::new(
1808                "ts_seconds",
1809                DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1810                false,
1811            ),
1812            Field::new(
1813                "ts_micro_utc",
1814                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1815                false,
1816            ),
1817            Field::new(
1818                "ts_millis_zero_offset",
1819                DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1820                false,
1821            ),
1822            Field::new(
1823                "ts_millis_zero_negative_offset",
1824                DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1825                false,
1826            ),
1827            Field::new(
1828                "ts_micro_non_utc",
1829                DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1830                false,
1831            ),
1832            Field::new_struct(
1833                "struct",
1834                vec![
1835                    Field::new("bools", DataType::Boolean, false),
1836                    Field::new("uint32", DataType::UInt32, false),
1837                    Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1838                ],
1839                false,
1840            ),
1841            Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1842            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1843            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1844            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1845            Field::new("decimal128", DataType::Decimal128(38, 2), false),
1846            Field::new("decimal256", DataType::Decimal256(39, 2), false),
1847        ];
1848        let arrow_schema = Schema::new(arrow_fields);
1849        let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1850
1851        assert_eq!(
1852            parquet_schema.columns().len(),
1853            converted_arrow_schema.columns().len()
1854        );
1855        parquet_schema
1856            .columns()
1857            .iter()
1858            .zip(converted_arrow_schema.columns())
1859            .for_each(|(a, b)| {
1860                // Only check logical type if it's set on the Parquet side.
1861                // This is because the Arrow conversion always sets logical type,
1862                // even if there wasn't originally one.
1863                // This is not an issue, but is an inconvenience for this test.
1864                match a.logical_type() {
1865                    Some(_) => {
1866                        assert_eq!(a, b)
1867                    }
1868                    None => {
1869                        assert_eq!(a.name(), b.name());
1870                        assert_eq!(a.physical_type(), b.physical_type());
1871                        assert_eq!(a.converted_type(), b.converted_type());
1872                    }
1873                };
1874            });
1875    }
1876
1877    #[test]
1878    #[should_panic(expected = "Parquet does not support writing empty structs")]
1879    fn test_empty_struct_field() {
1880        let arrow_fields = vec![Field::new(
1881            "struct",
1882            DataType::Struct(Fields::empty()),
1883            false,
1884        )];
1885        let arrow_schema = Schema::new(arrow_fields);
1886        let converted_arrow_schema = ArrowSchemaConverter::new()
1887            .with_coerce_types(true)
1888            .convert(&arrow_schema);
1889
1890        converted_arrow_schema.unwrap();
1891    }
1892
1893    #[test]
1894    fn test_metadata() {
1895        let message_type = "
1896        message test_schema {
1897            OPTIONAL BINARY  string (STRING);
1898        }
1899        ";
1900        let parquet_group_type = parse_message_type(message_type).unwrap();
1901
1902        let key_value_metadata = vec![
1903            KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
1904            KeyValue::new("baz".to_owned(), None),
1905        ];
1906
1907        let mut expected_metadata: HashMap<String, String> = HashMap::new();
1908        expected_metadata.insert("foo".to_owned(), "bar".to_owned());
1909
1910        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1911        let converted_arrow_schema =
1912            parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
1913
1914        assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
1915    }
1916
1917    #[test]
1918    fn test_arrow_schema_roundtrip() -> Result<()> {
1919        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
1920            a.iter()
1921                .map(|(a, b)| (a.to_string(), b.to_string()))
1922                .collect()
1923        };
1924
1925        let schema = Schema::new_with_metadata(
1926            vec![
1927                Field::new("c1", DataType::Utf8, false)
1928                    .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
1929                Field::new("c2", DataType::Binary, false),
1930                Field::new("c3", DataType::FixedSizeBinary(3), false),
1931                Field::new("c4", DataType::Boolean, false),
1932                Field::new("c5", DataType::Date32, false),
1933                Field::new("c6", DataType::Date64, false),
1934                Field::new("c7", DataType::Time32(TimeUnit::Second), false),
1935                Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
1936                Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
1937                Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
1938                Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
1939                Field::new(
1940                    "c16",
1941                    DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1942                    false,
1943                ),
1944                Field::new(
1945                    "c17",
1946                    DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1947                    false,
1948                ),
1949                Field::new(
1950                    "c18",
1951                    DataType::Timestamp(TimeUnit::Nanosecond, None),
1952                    false,
1953                ),
1954                Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
1955                Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
1956                Field::new_list(
1957                    "c21",
1958                    Field::new_list_field(DataType::Boolean, true)
1959                        .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
1960                    false,
1961                )
1962                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
1963                Field::new(
1964                    "c22",
1965                    DataType::FixedSizeList(
1966                        Arc::new(Field::new_list_field(DataType::Boolean, true)),
1967                        5,
1968                    ),
1969                    false,
1970                ),
1971                Field::new_list(
1972                    "c23",
1973                    Field::new_large_list(
1974                        "inner",
1975                        Field::new_list_field(
1976                            DataType::Struct(
1977                                vec![
1978                                    Field::new("a", DataType::Int16, true),
1979                                    Field::new("b", DataType::Float64, false),
1980                                    Field::new("c", DataType::Float32, false),
1981                                    Field::new("d", DataType::Float16, false),
1982                                ]
1983                                .into(),
1984                            ),
1985                            false,
1986                        ),
1987                        true,
1988                    ),
1989                    false,
1990                ),
1991                Field::new(
1992                    "c24",
1993                    DataType::Struct(Fields::from(vec![
1994                        Field::new("a", DataType::Utf8, false),
1995                        Field::new("b", DataType::UInt16, false),
1996                    ])),
1997                    false,
1998                ),
1999                Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
2000                Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
2001                // Duration types not supported
2002                // Field::new("c27", DataType::Duration(TimeUnit::Second), false),
2003                // Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false),
2004                // Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false),
2005                // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false),
2006                #[allow(deprecated)]
2007                Field::new_dict(
2008                    "c31",
2009                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2010                    true,
2011                    123,
2012                    true,
2013                )
2014                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
2015                Field::new("c32", DataType::LargeBinary, true),
2016                Field::new("c33", DataType::LargeUtf8, true),
2017                Field::new_large_list(
2018                    "c34",
2019                    Field::new_list(
2020                        "inner",
2021                        Field::new_list_field(
2022                            DataType::Struct(
2023                                vec![
2024                                    Field::new("a", DataType::Int16, true),
2025                                    Field::new("b", DataType::Float64, true),
2026                                ]
2027                                .into(),
2028                            ),
2029                            true,
2030                        ),
2031                        true,
2032                    ),
2033                    true,
2034                ),
2035                Field::new("c35", DataType::Null, true),
2036                Field::new("c36", DataType::Decimal128(2, 1), false),
2037                Field::new("c37", DataType::Decimal256(50, 20), false),
2038                Field::new("c38", DataType::Decimal128(18, 12), true),
2039                Field::new_map(
2040                    "c39",
2041                    "key_value",
2042                    Field::new("key", DataType::Utf8, false),
2043                    Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2044                    false, // fails to roundtrip keys_sorted
2045                    true,
2046                ),
2047                Field::new_map(
2048                    "c40",
2049                    "my_entries",
2050                    Field::new("my_key", DataType::Utf8, false)
2051                        .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2052                    Field::new_list(
2053                        "my_value",
2054                        Field::new_list_field(DataType::Utf8, true)
2055                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2056                        true,
2057                    )
2058                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2059                    false, // fails to roundtrip keys_sorted
2060                    true,
2061                )
2062                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2063                Field::new_map(
2064                    "c41",
2065                    "my_entries",
2066                    Field::new("my_key", DataType::Utf8, false),
2067                    Field::new_list(
2068                        "my_value",
2069                        Field::new_list_field(DataType::Utf8, true)
2070                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2071                        true,
2072                    ),
2073                    false, // fails to roundtrip keys_sorted
2074                    false,
2075                ),
2076            ],
2077            meta(&[("Key", "Value")]),
2078        );
2079
2080        // write to an empty parquet file so that schema is serialized
2081        let file = tempfile::tempfile().unwrap();
2082        let writer =
2083            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2084        writer.close()?;
2085
2086        // read file back
2087        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2088
2089        // Check arrow schema
2090        let read_schema = arrow_reader.schema();
2091        assert_eq!(&schema, read_schema.as_ref());
2092
2093        // Walk schema finding field IDs
2094        let mut stack = Vec::with_capacity(10);
2095        let mut out = Vec::with_capacity(10);
2096
2097        let root = arrow_reader.parquet_schema().root_schema_ptr();
2098        stack.push((root.name().to_string(), root));
2099
2100        while let Some((p, t)) = stack.pop() {
2101            if t.is_group() {
2102                for f in t.get_fields() {
2103                    stack.push((format!("{p}.{}", f.name()), f.clone()))
2104                }
2105            }
2106
2107            let info = t.get_basic_info();
2108            if info.has_id() {
2109                out.push(format!("{p} -> {}", info.id()))
2110            }
2111        }
2112        out.sort_unstable();
2113        let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2114
2115        assert_eq!(
2116            &out,
2117            &[
2118                "arrow_schema.c1 -> 2",
2119                "arrow_schema.c21 -> 4",
2120                "arrow_schema.c21.list.item -> 5",
2121                "arrow_schema.c31 -> 6",
2122                "arrow_schema.c40 -> 7",
2123                "arrow_schema.c40.my_entries.my_key -> 8",
2124                "arrow_schema.c40.my_entries.my_value -> 9",
2125                "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2126                "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2127            ]
2128        );
2129
2130        Ok(())
2131    }
2132
2133    #[test]
2134    fn test_read_parquet_field_ids_raw() -> Result<()> {
2135        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2136            a.iter()
2137                .map(|(a, b)| (a.to_string(), b.to_string()))
2138                .collect()
2139        };
2140        let schema = Schema::new_with_metadata(
2141            vec![
2142                Field::new("c1", DataType::Utf8, true)
2143                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2144                Field::new("c2", DataType::Utf8, true)
2145                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2146            ],
2147            HashMap::new(),
2148        );
2149
2150        let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2151        let parquet_bytes = writer.into_inner()?;
2152
2153        let reader =
2154            crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2155        let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2156
2157        // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema
2158        let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2159
2160        let parq_schema_descr = ArrowSchemaConverter::new()
2161            .with_coerce_types(true)
2162            .convert(&arrow_schema)?;
2163        let parq_fields = parq_schema_descr.root_schema().get_fields();
2164        assert_eq!(parq_fields.len(), 2);
2165        assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2166        assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2167
2168        Ok(())
2169    }
2170
2171    #[test]
2172    fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2173        let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2174            .iter()
2175            .cloned()
2176            .collect();
2177
2178        let schema = Schema::new_with_metadata(
2179            vec![
2180                Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2181                Field::new(
2182                    "c22",
2183                    DataType::FixedSizeList(
2184                        Arc::new(Field::new("items", DataType::Boolean, false)),
2185                        5,
2186                    ),
2187                    false,
2188                ),
2189                Field::new_list(
2190                    "c23",
2191                    Field::new_large_list(
2192                        "items",
2193                        Field::new_struct(
2194                            "items",
2195                            vec![
2196                                Field::new("a", DataType::Int16, true),
2197                                Field::new("b", DataType::Float64, false),
2198                            ],
2199                            true,
2200                        ),
2201                        true,
2202                    ),
2203                    true,
2204                ),
2205            ],
2206            metadata,
2207        );
2208
2209        // write to an empty parquet file so that schema is serialized
2210        let file = tempfile::tempfile().unwrap();
2211        let writer =
2212            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2213        writer.close()?;
2214
2215        // read file back
2216        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2217        let read_schema = arrow_reader.schema();
2218        assert_eq!(&schema, read_schema.as_ref());
2219        Ok(())
2220    }
2221
2222    #[test]
2223    fn test_get_arrow_schema_from_metadata() {
2224        assert!(get_arrow_schema_from_metadata("").is_err());
2225    }
2226
2227    #[test]
2228    #[cfg(feature = "arrow_canonical_extension_types")]
2229    fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2230        let arrow_schema = Schema::new(vec![Field::new(
2231            "uuid",
2232            DataType::FixedSizeBinary(16),
2233            false,
2234        )
2235        .with_extension_type(Uuid)]);
2236
2237        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2238
2239        assert_eq!(
2240            parquet_schema.column(0).logical_type(),
2241            Some(LogicalType::Uuid)
2242        );
2243
2244        // TODO: roundtrip
2245        // let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2246        // assert_eq!(arrow_schema.field(0).try_extension_type::<Uuid>()?, Uuid);
2247
2248        Ok(())
2249    }
2250
2251    #[test]
2252    #[cfg(feature = "arrow_canonical_extension_types")]
2253    fn arrow_json_to_parquet_json() -> Result<()> {
2254        let arrow_schema = Schema::new(vec![
2255            Field::new("json", DataType::Utf8, false).with_extension_type(Json::default())
2256        ]);
2257
2258        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2259
2260        assert_eq!(
2261            parquet_schema.column(0).logical_type(),
2262            Some(LogicalType::Json)
2263        );
2264
2265        // TODO: roundtrip
2266        // https://github.com/apache/arrow-rs/issues/7063
2267        // let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2268        // assert_eq!(
2269        //     arrow_schema.field(0).try_extension_type::<Json>()?,
2270        //     Json::default()
2271        // );
2272
2273        Ok(())
2274    }
2275}