parquet/arrow/schema/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Converting Parquet schema <--> Arrow schema: [`ArrowSchemaConverter`] and [parquet_to_arrow_schema]
19
20use base64::prelude::BASE64_STANDARD;
21use base64::Engine;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
27
28use crate::basic::{
29    ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
30};
31use crate::errors::{ParquetError, Result};
32use crate::file::{metadata::KeyValue, properties::WriterProperties};
33use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
34
35mod complex;
36mod extension;
37mod primitive;
38
39use super::PARQUET_FIELD_ID_META_KEY;
40use crate::arrow::schema::extension::{
41    has_extension_type, logical_type_for_fixed_size_binary, logical_type_for_string,
42    logical_type_for_struct, try_add_extension_type,
43};
44use crate::arrow::ProjectionMask;
45pub(crate) use complex::{ParquetField, ParquetFieldType};
46
47/// Convert Parquet schema to Arrow schema including optional metadata
48///
49/// Attempts to decode any existing Arrow schema metadata, falling back
50/// to converting the Parquet schema column-wise
51pub fn parquet_to_arrow_schema(
52    parquet_schema: &SchemaDescriptor,
53    key_value_metadata: Option<&Vec<KeyValue>>,
54) -> Result<Schema> {
55    parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
56}
57
58/// Convert parquet schema to arrow schema including optional metadata,
59/// only preserving some leaf columns.
60pub fn parquet_to_arrow_schema_by_columns(
61    parquet_schema: &SchemaDescriptor,
62    mask: ProjectionMask,
63    key_value_metadata: Option<&Vec<KeyValue>>,
64) -> Result<Schema> {
65    Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0)
66}
67
68/// Determines the Arrow Schema from a Parquet schema
69///
70/// Looks for an Arrow schema metadata "hint" (see
71/// [`parquet_to_arrow_field_levels`]), and uses it if present to ensure
72/// lossless round trips.
73pub(crate) fn parquet_to_arrow_schema_and_fields(
74    parquet_schema: &SchemaDescriptor,
75    mask: ProjectionMask,
76    key_value_metadata: Option<&Vec<KeyValue>>,
77) -> Result<(Schema, Option<ParquetField>)> {
78    let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
79    let maybe_schema = metadata
80        .remove(super::ARROW_SCHEMA_META_KEY)
81        .map(|value| get_arrow_schema_from_metadata(&value))
82        .transpose()?;
83
84    // Add the Arrow metadata to the Parquet metadata skipping keys that collide
85    if let Some(arrow_schema) = &maybe_schema {
86        arrow_schema.metadata().iter().for_each(|(k, v)| {
87            metadata.entry(k.clone()).or_insert_with(|| v.clone());
88        });
89    }
90
91    let hint = maybe_schema.as_ref().map(|s| s.fields());
92    let field_levels = parquet_to_arrow_field_levels(parquet_schema, mask, hint)?;
93    let schema = Schema::new_with_metadata(field_levels.fields, metadata);
94    Ok((schema, field_levels.levels))
95}
96
97/// Schema information necessary to decode a parquet file as arrow [`Fields`]
98///
99/// In particular this stores the dremel-level information necessary to correctly
100/// interpret the encoded definition and repetition levels
101///
102/// Note: this is an opaque container intended to be used with lower-level APIs
103/// within this crate
104#[derive(Debug, Clone)]
105pub struct FieldLevels {
106    pub(crate) fields: Fields,
107    pub(crate) levels: Option<ParquetField>,
108}
109
110/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`]
111///
112/// Columns not included within [`ProjectionMask`] will be ignored.
113///
114/// The optional `hint` parameter is the desired Arrow schema. See the
115/// [`arrow`] module documentation for more information.
116///
117/// [`arrow`]: crate::arrow
118///
119/// # Notes:
120/// Where a field type in `hint` is compatible with the corresponding parquet type in `schema`, it
121/// will be used, otherwise the default arrow type for the given parquet column type will be used.
122///
123/// This is to accommodate arrow types that cannot be round-tripped through parquet natively.
124/// Depending on the parquet writer, this can lead to a mismatch between a file's parquet schema
125/// and its embedded arrow schema. The parquet `schema` must be treated as authoritative in such
126/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for more information
127///
128/// Note: this is a low-level API, most users will want to make use of the higher-level
129/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file.
130pub fn parquet_to_arrow_field_levels(
131    schema: &SchemaDescriptor,
132    mask: ProjectionMask,
133    hint: Option<&Fields>,
134) -> Result<FieldLevels> {
135    match complex::convert_schema(schema, mask, hint)? {
136        Some(field) => match &field.arrow_type {
137            DataType::Struct(fields) => Ok(FieldLevels {
138                fields: fields.clone(),
139                levels: Some(field),
140            }),
141            _ => unreachable!(),
142        },
143        None => Ok(FieldLevels {
144            fields: Fields::empty(),
145            levels: None,
146        }),
147    }
148}
149
150/// Try to convert Arrow schema metadata into a schema
151fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
152    let decoded = BASE64_STANDARD.decode(encoded_meta);
153    match decoded {
154        Ok(bytes) => {
155            let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
156                &bytes[8..]
157            } else {
158                bytes.as_slice()
159            };
160            match arrow_ipc::root_as_message(slice) {
161                Ok(message) => message
162                    .header_as_schema()
163                    .map(arrow_ipc::convert::fb_to_schema)
164                    .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
165                Err(err) => {
166                    // The flatbuffers implementation returns an error on verification error.
167                    Err(arrow_err!(
168                        "Unable to get root as message stored in {}: {:?}",
169                        super::ARROW_SCHEMA_META_KEY,
170                        err
171                    ))
172                }
173            }
174        }
175        Err(err) => {
176            // The C++ implementation returns an error if the schema can't be parsed.
177            Err(arrow_err!(
178                "Unable to decode the encoded schema stored in {}, {:?}",
179                super::ARROW_SCHEMA_META_KEY,
180                err
181            ))
182        }
183    }
184}
185
186/// Encodes the Arrow schema into the IPC format, and base64 encodes it
187pub fn encode_arrow_schema(schema: &Schema) -> String {
188    let options = writer::IpcWriteOptions::default();
189    let mut dictionary_tracker = writer::DictionaryTracker::new(true);
190    let data_gen = writer::IpcDataGenerator::default();
191    let mut serialized_schema =
192        data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
193
194    // manually prepending the length to the schema as arrow uses the legacy IPC format
195    // TODO: change after addressing ARROW-9777
196    let schema_len = serialized_schema.ipc_message.len();
197    let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
198    len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
199    len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
200    len_prefix_schema.append(&mut serialized_schema.ipc_message);
201
202    BASE64_STANDARD.encode(&len_prefix_schema)
203}
204
205/// Mutates writer metadata by storing the encoded Arrow schema hint in
206/// [`ARROW_SCHEMA_META_KEY`].
207///
208/// If there is an existing Arrow schema metadata, it is replaced.
209///
210/// [`ARROW_SCHEMA_META_KEY`]: crate::arrow::ARROW_SCHEMA_META_KEY
211pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
212    let encoded = encode_arrow_schema(schema);
213
214    let schema_kv = KeyValue {
215        key: super::ARROW_SCHEMA_META_KEY.to_string(),
216        value: Some(encoded),
217    };
218
219    let meta = props
220        .key_value_metadata
221        .get_or_insert_with(Default::default);
222
223    // check if ARROW:schema exists, and overwrite it
224    let schema_meta = meta
225        .iter()
226        .enumerate()
227        .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
228    match schema_meta {
229        Some((i, _)) => {
230            meta.remove(i);
231            meta.push(schema_kv);
232        }
233        None => {
234            meta.push(schema_kv);
235        }
236    }
237}
238
239/// Converter for Arrow schema to Parquet schema
240///
241/// See the documentation on the [`arrow`] module for background
242/// information on how Arrow schema is represented in Parquet.
243///
244/// [`arrow`]: crate::arrow
245///
246/// # Example:
247/// ```
248/// # use std::sync::Arc;
249/// # use arrow_schema::{Field, Schema, DataType};
250/// # use parquet::arrow::ArrowSchemaConverter;
251/// use parquet::schema::types::{SchemaDescriptor, Type};
252/// use parquet::basic; // note there are two `Type`s in the following example
253/// // create an Arrow Schema
254/// let arrow_schema = Schema::new(vec![
255///   Field::new("a", DataType::Int64, true),
256///   Field::new("b", DataType::Date32, true),
257/// ]);
258/// // convert the Arrow schema to a Parquet schema
259/// let parquet_schema = ArrowSchemaConverter::new()
260///   .convert(&arrow_schema)
261///   .unwrap();
262///
263/// let expected_parquet_schema = SchemaDescriptor::new(
264///   Arc::new(
265///     Type::group_type_builder("arrow_schema")
266///       .with_fields(vec![
267///         Arc::new(
268///          Type::primitive_type_builder("a", basic::Type::INT64)
269///           .build().unwrap()
270///         ),
271///         Arc::new(
272///          Type::primitive_type_builder("b", basic::Type::INT32)
273///           .with_converted_type(basic::ConvertedType::DATE)
274///           .with_logical_type(Some(basic::LogicalType::Date))
275///           .build().unwrap()
276///         ),
277///      ])
278///      .build().unwrap()
279///   )
280/// );
281/// assert_eq!(parquet_schema, expected_parquet_schema);
282/// ```
283#[derive(Debug)]
284pub struct ArrowSchemaConverter<'a> {
285    /// Name of the root schema in Parquet
286    schema_root: &'a str,
287    /// Should we coerce Arrow types to compatible Parquet types?
288    ///
289    /// See docs on [Self::with_coerce_types]`
290    coerce_types: bool,
291}
292
293impl Default for ArrowSchemaConverter<'_> {
294    fn default() -> Self {
295        Self::new()
296    }
297}
298
299impl<'a> ArrowSchemaConverter<'a> {
300    /// Create a new converter
301    pub fn new() -> Self {
302        Self {
303            schema_root: "arrow_schema",
304            coerce_types: false,
305        }
306    }
307
308    /// Should Arrow types be coerced into Parquet native types (default `false`).
309    ///
310    /// Setting this option to `true` will result in Parquet files that can be
311    /// read by more readers, but may lose precision for Arrow types such as
312    /// [`DataType::Date64`] which have no direct [corresponding Parquet type].
313    ///
314    /// By default, this converter does not coerce to native Parquet types. Enabling type
315    /// coercion allows for meaningful representations that do not require
316    /// downstream readers to consider the embedded Arrow schema, and can allow
317    /// for greater compatibility with other Parquet implementations. However,
318    /// type coercion also prevents data from being losslessly round-tripped.
319    ///
320    /// # Discussion
321    ///
322    /// Some Arrow types such as `Date64`, `Timestamp` and `Interval` have no
323    /// corresponding Parquet logical type. Thus, they can not be losslessly
324    /// round-tripped when stored using the appropriate Parquet logical type.
325    /// For example, some Date64 values may be truncated when stored with
326    /// parquet's native 32 bit date type.
327    ///
328    /// For [`List`] and [`Map`] types, some Parquet readers expect certain
329    /// schema elements to have specific names (earlier versions of the spec
330    /// were somewhat ambiguous on this point). Type coercion will use the names
331    /// prescribed by the Parquet specification, potentially losing naming
332    /// metadata from the Arrow schema.
333    ///
334    /// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
335    /// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
336    /// [corresponding Parquet type]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date
337    ///
338    pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
339        self.coerce_types = coerce_types;
340        self
341    }
342
343    /// Set the root schema element name (defaults to `"arrow_schema"`).
344    pub fn schema_root(mut self, schema_root: &'a str) -> Self {
345        self.schema_root = schema_root;
346        self
347    }
348
349    /// Convert the specified Arrow [`Schema`] to the desired Parquet [`SchemaDescriptor`]
350    ///
351    /// See example in [`ArrowSchemaConverter`]
352    pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
353        let fields = schema
354            .fields()
355            .iter()
356            .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
357            .collect::<Result<_>>()?;
358        let group = Type::group_type_builder(self.schema_root)
359            .with_fields(fields)
360            .build()?;
361        Ok(SchemaDescriptor::new(Arc::new(group)))
362    }
363}
364
365fn parse_key_value_metadata(
366    key_value_metadata: Option<&Vec<KeyValue>>,
367) -> Option<HashMap<String, String>> {
368    match key_value_metadata {
369        Some(key_values) => {
370            let map: HashMap<String, String> = key_values
371                .iter()
372                .filter_map(|kv| {
373                    kv.value
374                        .as_ref()
375                        .map(|value| (kv.key.clone(), value.clone()))
376                })
377                .collect();
378
379            if map.is_empty() {
380                None
381            } else {
382                Some(map)
383            }
384        }
385        None => None,
386    }
387}
388
389/// Convert parquet column schema to arrow field.
390pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
391    let field = complex::convert_type(&parquet_column.self_type_ptr())?;
392    let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
393
394    let parquet_type = parquet_column.self_type();
395    let basic_info = parquet_type.get_basic_info();
396
397    let mut hash_map_size = 0;
398    if basic_info.has_id() {
399        hash_map_size += 1;
400    }
401    if has_extension_type(parquet_type) {
402        hash_map_size += 1;
403    }
404    if hash_map_size == 0 {
405        return Ok(ret);
406    }
407    ret.set_metadata(HashMap::with_capacity(hash_map_size));
408    if basic_info.has_id() {
409        ret.metadata_mut().insert(
410            PARQUET_FIELD_ID_META_KEY.to_string(),
411            basic_info.id().to_string(),
412        );
413    }
414    try_add_extension_type(ret, parquet_column.self_type())
415}
416
417pub fn decimal_length_from_precision(precision: u8) -> usize {
418    // digits = floor(log_10(2^(8*n - 1) - 1))  // definition in parquet's logical types
419    // ceil(digits) = log10(2^(8*n - 1) - 1)
420    // 10^ceil(digits) = 2^(8*n - 1) - 1
421    // 10^ceil(digits) + 1 = 2^(8*n - 1)
422    // log2(10^ceil(digits) + 1) = (8*n - 1)
423    // log2(10^ceil(digits) + 1) + 1 = 8*n
424    // (log2(10^ceil(a) + 1) + 1) / 8 = n
425    (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
426}
427
428/// Convert an arrow field to a parquet `Type`
429fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
430    const PARQUET_LIST_ELEMENT_NAME: &str = "element";
431    const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
432    const PARQUET_KEY_FIELD_NAME: &str = "key";
433    const PARQUET_VALUE_FIELD_NAME: &str = "value";
434
435    let name = field.name().as_str();
436    let repetition = if field.is_nullable() {
437        Repetition::OPTIONAL
438    } else {
439        Repetition::REQUIRED
440    };
441    let id = field_id(field);
442    // create type from field
443    match field.data_type() {
444        DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
445            .with_logical_type(Some(LogicalType::Unknown))
446            .with_repetition(repetition)
447            .with_id(id)
448            .build(),
449        DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
450            .with_repetition(repetition)
451            .with_id(id)
452            .build(),
453        DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
454            .with_logical_type(Some(LogicalType::Integer {
455                bit_width: 8,
456                is_signed: true,
457            }))
458            .with_repetition(repetition)
459            .with_id(id)
460            .build(),
461        DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
462            .with_logical_type(Some(LogicalType::Integer {
463                bit_width: 16,
464                is_signed: true,
465            }))
466            .with_repetition(repetition)
467            .with_id(id)
468            .build(),
469        DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
470            .with_repetition(repetition)
471            .with_id(id)
472            .build(),
473        DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
474            .with_repetition(repetition)
475            .with_id(id)
476            .build(),
477        DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
478            .with_logical_type(Some(LogicalType::Integer {
479                bit_width: 8,
480                is_signed: false,
481            }))
482            .with_repetition(repetition)
483            .with_id(id)
484            .build(),
485        DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
486            .with_logical_type(Some(LogicalType::Integer {
487                bit_width: 16,
488                is_signed: false,
489            }))
490            .with_repetition(repetition)
491            .with_id(id)
492            .build(),
493        DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
494            .with_logical_type(Some(LogicalType::Integer {
495                bit_width: 32,
496                is_signed: false,
497            }))
498            .with_repetition(repetition)
499            .with_id(id)
500            .build(),
501        DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
502            .with_logical_type(Some(LogicalType::Integer {
503                bit_width: 64,
504                is_signed: false,
505            }))
506            .with_repetition(repetition)
507            .with_id(id)
508            .build(),
509        DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
510            .with_repetition(repetition)
511            .with_id(id)
512            .with_logical_type(Some(LogicalType::Float16))
513            .with_length(2)
514            .build(),
515        DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
516            .with_repetition(repetition)
517            .with_id(id)
518            .build(),
519        DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
520            .with_repetition(repetition)
521            .with_id(id)
522            .build(),
523        DataType::Timestamp(TimeUnit::Second, _) => {
524            // Cannot represent seconds in LogicalType
525            Type::primitive_type_builder(name, PhysicalType::INT64)
526                .with_repetition(repetition)
527                .with_id(id)
528                .build()
529        }
530        DataType::Timestamp(time_unit, tz) => {
531            Type::primitive_type_builder(name, PhysicalType::INT64)
532                .with_logical_type(Some(LogicalType::Timestamp {
533                    // If timezone set, values are normalized to UTC timezone
534                    is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
535                    unit: match time_unit {
536                        TimeUnit::Second => unreachable!(),
537                        TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
538                        TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
539                        TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
540                    },
541                }))
542                .with_repetition(repetition)
543                .with_id(id)
544                .build()
545        }
546        DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
547            .with_logical_type(Some(LogicalType::Date))
548            .with_repetition(repetition)
549            .with_id(id)
550            .build(),
551        DataType::Date64 => {
552            if coerce_types {
553                Type::primitive_type_builder(name, PhysicalType::INT32)
554                    .with_logical_type(Some(LogicalType::Date))
555                    .with_repetition(repetition)
556                    .with_id(id)
557                    .build()
558            } else {
559                Type::primitive_type_builder(name, PhysicalType::INT64)
560                    .with_repetition(repetition)
561                    .with_id(id)
562                    .build()
563            }
564        }
565        DataType::Time32(TimeUnit::Second) => {
566            // Cannot represent seconds in LogicalType
567            Type::primitive_type_builder(name, PhysicalType::INT32)
568                .with_repetition(repetition)
569                .with_id(id)
570                .build()
571        }
572        DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
573            .with_logical_type(Some(LogicalType::Time {
574                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
575                unit: match unit {
576                    TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
577                    u => unreachable!("Invalid unit for Time32: {:?}", u),
578                },
579            }))
580            .with_repetition(repetition)
581            .with_id(id)
582            .build(),
583        DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
584            .with_logical_type(Some(LogicalType::Time {
585                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
586                unit: match unit {
587                    TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
588                    TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
589                    u => unreachable!("Invalid unit for Time64: {:?}", u),
590                },
591            }))
592            .with_repetition(repetition)
593            .with_id(id)
594            .build(),
595        DataType::Duration(_) => Type::primitive_type_builder(name, PhysicalType::INT64)
596            .with_repetition(repetition)
597            .with_id(id)
598            .build(),
599        DataType::Interval(_) => {
600            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
601                .with_converted_type(ConvertedType::INTERVAL)
602                .with_repetition(repetition)
603                .with_id(id)
604                .with_length(12)
605                .build()
606        }
607        DataType::Binary | DataType::LargeBinary => {
608            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
609                .with_repetition(repetition)
610                .with_id(id)
611                .build()
612        }
613        DataType::FixedSizeBinary(length) => {
614            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
615                .with_repetition(repetition)
616                .with_id(id)
617                .with_length(*length)
618                .with_logical_type(logical_type_for_fixed_size_binary(field))
619                .build()
620        }
621        DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
622            .with_repetition(repetition)
623            .with_id(id)
624            .build(),
625        DataType::Decimal32(precision, scale)
626        | DataType::Decimal64(precision, scale)
627        | DataType::Decimal128(precision, scale)
628        | DataType::Decimal256(precision, scale) => {
629            // Decimal precision determines the Parquet physical type to use.
630            // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
631            let (physical_type, length) = if *precision > 1 && *precision <= 9 {
632                (PhysicalType::INT32, -1)
633            } else if *precision <= 18 {
634                (PhysicalType::INT64, -1)
635            } else {
636                (
637                    PhysicalType::FIXED_LEN_BYTE_ARRAY,
638                    decimal_length_from_precision(*precision) as i32,
639                )
640            };
641            Type::primitive_type_builder(name, physical_type)
642                .with_repetition(repetition)
643                .with_id(id)
644                .with_length(length)
645                .with_logical_type(Some(LogicalType::Decimal {
646                    scale: *scale as i32,
647                    precision: *precision as i32,
648                }))
649                .with_precision(*precision as i32)
650                .with_scale(*scale as i32)
651                .build()
652        }
653        DataType::Utf8 | DataType::LargeUtf8 => {
654            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
655                .with_logical_type(logical_type_for_string(field))
656                .with_repetition(repetition)
657                .with_id(id)
658                .build()
659        }
660        DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
661            .with_logical_type(logical_type_for_string(field))
662            .with_repetition(repetition)
663            .with_id(id)
664            .build(),
665        DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
666            let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
667                // Ensure proper naming per the Parquet specification
668                let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
669                Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
670            } else {
671                Arc::new(arrow_to_parquet_type(f, coerce_types)?)
672            };
673
674            Type::group_type_builder(name)
675                .with_fields(vec![Arc::new(
676                    Type::group_type_builder("list")
677                        .with_fields(vec![field_ref])
678                        .with_repetition(Repetition::REPEATED)
679                        .build()?,
680                )])
681                .with_logical_type(Some(LogicalType::List))
682                .with_repetition(repetition)
683                .with_id(id)
684                .build()
685        }
686        DataType::ListView(_) | DataType::LargeListView(_) => {
687            unimplemented!("ListView/LargeListView not implemented")
688        }
689        DataType::Struct(fields) => {
690            if fields.is_empty() {
691                return Err(arrow_err!("Parquet does not support writing empty structs",));
692            }
693            // recursively convert children to types/nodes
694            let fields = fields
695                .iter()
696                .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
697                .collect::<Result<_>>()?;
698            Type::group_type_builder(name)
699                .with_fields(fields)
700                .with_repetition(repetition)
701                .with_id(id)
702                .with_logical_type(logical_type_for_struct(field))
703                .build()
704        }
705        DataType::Map(field, _) => {
706            if let DataType::Struct(struct_fields) = field.data_type() {
707                // If coercing then set inner struct name to "key_value"
708                let map_struct_name = if coerce_types {
709                    PARQUET_MAP_STRUCT_NAME
710                } else {
711                    field.name()
712                };
713
714                // If coercing then ensure struct fields are named "key" and "value"
715                let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
716                    if coerce_types && fld.name() != name {
717                        let f = fld.as_ref().clone().with_name(name);
718                        Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
719                    } else {
720                        Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
721                    }
722                };
723                let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
724                let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
725
726                Type::group_type_builder(name)
727                    .with_fields(vec![Arc::new(
728                        Type::group_type_builder(map_struct_name)
729                            .with_fields(vec![key_field, val_field])
730                            .with_repetition(Repetition::REPEATED)
731                            .build()?,
732                    )])
733                    .with_logical_type(Some(LogicalType::Map))
734                    .with_repetition(repetition)
735                    .with_id(id)
736                    .build()
737            } else {
738                Err(arrow_err!(
739                    "DataType::Map should contain a struct field child",
740                ))
741            }
742        }
743        DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
744        DataType::Dictionary(_, ref value) => {
745            // Dictionary encoding not handled at the schema level
746            let dict_field = field.clone().with_data_type(value.as_ref().clone());
747            arrow_to_parquet_type(&dict_field, coerce_types)
748        }
749        DataType::RunEndEncoded(_, _) => Err(arrow_err!(
750            "Converting RunEndEncodedType to parquet not supported",
751        )),
752    }
753}
754
755fn field_id(field: &Field) -> Option<i32> {
756    let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
757    value.parse().ok() // Fail quietly if not a valid integer
758}
759
760#[cfg(test)]
761mod tests {
762    use super::*;
763
764    use std::{collections::HashMap, sync::Arc};
765
766    use crate::arrow::PARQUET_FIELD_ID_META_KEY;
767    use crate::file::metadata::KeyValue;
768    use crate::file::reader::FileReader;
769    use crate::{
770        arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
771        schema::{parser::parse_message_type, types::SchemaDescriptor},
772    };
773    use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
774
775    #[test]
776    fn test_flat_primitives() {
777        let message_type = "
778        message test_schema {
779            REQUIRED BOOLEAN boolean;
780            REQUIRED INT32   int8  (INT_8);
781            REQUIRED INT32   int16 (INT_16);
782            REQUIRED INT32   uint8 (INTEGER(8,false));
783            REQUIRED INT32   uint16 (INTEGER(16,false));
784            REQUIRED INT32   int32;
785            REQUIRED INT64   int64;
786            OPTIONAL DOUBLE  double;
787            OPTIONAL FLOAT   float;
788            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
789            OPTIONAL BINARY  string (UTF8);
790            OPTIONAL BINARY  string_2 (STRING);
791            OPTIONAL BINARY  json (JSON);
792        }
793        ";
794        let parquet_group_type = parse_message_type(message_type).unwrap();
795
796        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
797        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
798
799        let arrow_fields = Fields::from(vec![
800            Field::new("boolean", DataType::Boolean, false),
801            Field::new("int8", DataType::Int8, false),
802            Field::new("int16", DataType::Int16, false),
803            Field::new("uint8", DataType::UInt8, false),
804            Field::new("uint16", DataType::UInt16, false),
805            Field::new("int32", DataType::Int32, false),
806            Field::new("int64", DataType::Int64, false),
807            Field::new("double", DataType::Float64, true),
808            Field::new("float", DataType::Float32, true),
809            Field::new("float16", DataType::Float16, true),
810            Field::new("string", DataType::Utf8, true),
811            Field::new("string_2", DataType::Utf8, true),
812            json_field(),
813        ]);
814
815        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
816    }
817
818    /// Return the expected Field for a Parquet column annotated with
819    /// the JSON logical type.
820    fn json_field() -> Field {
821        #[cfg(feature = "arrow_canonical_extension_types")]
822        {
823            Field::new("json", DataType::Utf8, true)
824                .with_extension_type(arrow_schema::extension::Json::default())
825        }
826        #[cfg(not(feature = "arrow_canonical_extension_types"))]
827        {
828            Field::new("json", DataType::Utf8, true)
829        }
830    }
831
832    #[test]
833    fn test_decimal_fields() {
834        let message_type = "
835        message test_schema {
836                    REQUIRED INT32 decimal1 (DECIMAL(4,2));
837                    REQUIRED INT64 decimal2 (DECIMAL(12,2));
838                    REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
839                    REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
840                    REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
841                    REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
842                    REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
843        }
844        ";
845
846        let parquet_group_type = parse_message_type(message_type).unwrap();
847
848        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
849        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
850
851        let arrow_fields = Fields::from(vec![
852            Field::new("decimal1", DataType::Decimal128(4, 2), false),
853            Field::new("decimal2", DataType::Decimal128(12, 2), false),
854            Field::new("decimal3", DataType::Decimal128(30, 2), false),
855            Field::new("decimal4", DataType::Decimal128(33, 2), false),
856            Field::new("decimal5", DataType::Decimal128(38, 2), false),
857            Field::new("decimal6", DataType::Decimal256(39, 2), false),
858            Field::new("decimal7", DataType::Decimal256(39, 2), false),
859        ]);
860        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
861    }
862
863    #[test]
864    fn test_byte_array_fields() {
865        let message_type = "
866        message test_schema {
867            REQUIRED BYTE_ARRAY binary;
868            REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
869        }
870        ";
871
872        let parquet_group_type = parse_message_type(message_type).unwrap();
873
874        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
875        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
876
877        let arrow_fields = Fields::from(vec![
878            Field::new("binary", DataType::Binary, false),
879            Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
880        ]);
881        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
882    }
883
884    #[test]
885    fn test_duplicate_fields() {
886        let message_type = "
887        message test_schema {
888            REQUIRED BOOLEAN boolean;
889            REQUIRED INT32 int8 (INT_8);
890        }
891        ";
892
893        let parquet_group_type = parse_message_type(message_type).unwrap();
894
895        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
896        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
897
898        let arrow_fields = Fields::from(vec![
899            Field::new("boolean", DataType::Boolean, false),
900            Field::new("int8", DataType::Int8, false),
901        ]);
902        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
903
904        let converted_arrow_schema =
905            parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
906                .unwrap();
907        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
908    }
909
910    #[test]
911    fn test_parquet_lists() {
912        let mut arrow_fields = Vec::new();
913
914        // LIST encoding example taken from parquet-format/LogicalTypes.md
915        let message_type = "
916        message test_schema {
917          REQUIRED GROUP my_list (LIST) {
918            REPEATED GROUP list {
919              OPTIONAL BINARY element (UTF8);
920            }
921          }
922          OPTIONAL GROUP my_list (LIST) {
923            REPEATED GROUP list {
924              REQUIRED BINARY element (UTF8);
925            }
926          }
927          OPTIONAL GROUP array_of_arrays (LIST) {
928            REPEATED GROUP list {
929              REQUIRED GROUP element (LIST) {
930                REPEATED GROUP list {
931                  REQUIRED INT32 element;
932                }
933              }
934            }
935          }
936          OPTIONAL GROUP my_list (LIST) {
937            REPEATED GROUP element {
938              REQUIRED BINARY str (UTF8);
939            }
940          }
941          OPTIONAL GROUP my_list (LIST) {
942            REPEATED INT32 element;
943          }
944          OPTIONAL GROUP my_list (LIST) {
945            REPEATED GROUP element {
946              REQUIRED BINARY str (UTF8);
947              REQUIRED INT32 num;
948            }
949          }
950          OPTIONAL GROUP my_list (LIST) {
951            REPEATED GROUP array {
952              REQUIRED BINARY str (UTF8);
953            }
954
955          }
956          OPTIONAL GROUP my_list (LIST) {
957            REPEATED GROUP my_list_tuple {
958              REQUIRED BINARY str (UTF8);
959            }
960          }
961          REPEATED INT32 name;
962        }
963        ";
964
965        // // List<String> (list non-null, elements nullable)
966        // required group my_list (LIST) {
967        //   repeated group list {
968        //     optional binary element (UTF8);
969        //   }
970        // }
971        {
972            arrow_fields.push(Field::new_list(
973                "my_list",
974                Field::new("element", DataType::Utf8, true),
975                false,
976            ));
977        }
978
979        // // List<String> (list nullable, elements non-null)
980        // optional group my_list (LIST) {
981        //   repeated group list {
982        //     required binary element (UTF8);
983        //   }
984        // }
985        {
986            arrow_fields.push(Field::new_list(
987                "my_list",
988                Field::new("element", DataType::Utf8, false),
989                true,
990            ));
991        }
992
993        // Element types can be nested structures. For example, a list of lists:
994        //
995        // // List<List<Integer>>
996        // optional group array_of_arrays (LIST) {
997        //   repeated group list {
998        //     required group element (LIST) {
999        //       repeated group list {
1000        //         required int32 element;
1001        //       }
1002        //     }
1003        //   }
1004        // }
1005        {
1006            let arrow_inner_list = Field::new("element", DataType::Int32, false);
1007            arrow_fields.push(Field::new_list(
1008                "array_of_arrays",
1009                Field::new_list("element", arrow_inner_list, false),
1010                true,
1011            ));
1012        }
1013
1014        // // List<String> (list nullable, elements non-null)
1015        // optional group my_list (LIST) {
1016        //   repeated group element {
1017        //     required binary str (UTF8);
1018        //   };
1019        // }
1020        {
1021            arrow_fields.push(Field::new_list(
1022                "my_list",
1023                Field::new("str", DataType::Utf8, false),
1024                true,
1025            ));
1026        }
1027
1028        // // List<Integer> (nullable list, non-null elements)
1029        // optional group my_list (LIST) {
1030        //   repeated int32 element;
1031        // }
1032        {
1033            arrow_fields.push(Field::new_list(
1034                "my_list",
1035                Field::new("element", DataType::Int32, false),
1036                true,
1037            ));
1038        }
1039
1040        // // List<Tuple<String, Integer>> (nullable list, non-null elements)
1041        // optional group my_list (LIST) {
1042        //   repeated group element {
1043        //     required binary str (UTF8);
1044        //     required int32 num;
1045        //   };
1046        // }
1047        {
1048            let fields = vec![
1049                Field::new("str", DataType::Utf8, false),
1050                Field::new("num", DataType::Int32, false),
1051            ];
1052            arrow_fields.push(Field::new_list(
1053                "my_list",
1054                Field::new_struct("element", fields, false),
1055                true,
1056            ));
1057        }
1058
1059        // // List<OneTuple<String>> (nullable list, non-null elements)
1060        // optional group my_list (LIST) {
1061        //   repeated group array {
1062        //     required binary str (UTF8);
1063        //   };
1064        // }
1065        // Special case: group is named array
1066        {
1067            let fields = vec![Field::new("str", DataType::Utf8, false)];
1068            arrow_fields.push(Field::new_list(
1069                "my_list",
1070                Field::new_struct("array", fields, false),
1071                true,
1072            ));
1073        }
1074
1075        // // List<OneTuple<String>> (nullable list, non-null elements)
1076        // optional group my_list (LIST) {
1077        //   repeated group my_list_tuple {
1078        //     required binary str (UTF8);
1079        //   };
1080        // }
1081        // Special case: group named ends in _tuple
1082        {
1083            let fields = vec![Field::new("str", DataType::Utf8, false)];
1084            arrow_fields.push(Field::new_list(
1085                "my_list",
1086                Field::new_struct("my_list_tuple", fields, false),
1087                true,
1088            ));
1089        }
1090
1091        // One-level encoding: Only allows required lists with required cells
1092        //   repeated value_type name
1093        {
1094            arrow_fields.push(Field::new_list(
1095                "name",
1096                Field::new("name", DataType::Int32, false),
1097                false,
1098            ));
1099        }
1100
1101        let parquet_group_type = parse_message_type(message_type).unwrap();
1102
1103        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1104        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1105        let converted_fields = converted_arrow_schema.fields();
1106
1107        assert_eq!(arrow_fields.len(), converted_fields.len());
1108        for i in 0..arrow_fields.len() {
1109            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1110        }
1111    }
1112
1113    #[test]
1114    fn test_parquet_list_nullable() {
1115        let mut arrow_fields = Vec::new();
1116
1117        let message_type = "
1118        message test_schema {
1119          REQUIRED GROUP my_list1 (LIST) {
1120            REPEATED GROUP list {
1121              OPTIONAL BINARY element (UTF8);
1122            }
1123          }
1124          OPTIONAL GROUP my_list2 (LIST) {
1125            REPEATED GROUP list {
1126              REQUIRED BINARY element (UTF8);
1127            }
1128          }
1129          REQUIRED GROUP my_list3 (LIST) {
1130            REPEATED GROUP list {
1131              REQUIRED BINARY element (UTF8);
1132            }
1133          }
1134        }
1135        ";
1136
1137        // // List<String> (list non-null, elements nullable)
1138        // required group my_list1 (LIST) {
1139        //   repeated group list {
1140        //     optional binary element (UTF8);
1141        //   }
1142        // }
1143        {
1144            arrow_fields.push(Field::new_list(
1145                "my_list1",
1146                Field::new("element", DataType::Utf8, true),
1147                false,
1148            ));
1149        }
1150
1151        // // List<String> (list nullable, elements non-null)
1152        // optional group my_list2 (LIST) {
1153        //   repeated group list {
1154        //     required binary element (UTF8);
1155        //   }
1156        // }
1157        {
1158            arrow_fields.push(Field::new_list(
1159                "my_list2",
1160                Field::new("element", DataType::Utf8, false),
1161                true,
1162            ));
1163        }
1164
1165        // // List<String> (list non-null, elements non-null)
1166        // repeated group my_list3 (LIST) {
1167        //   repeated group list {
1168        //     required binary element (UTF8);
1169        //   }
1170        // }
1171        {
1172            arrow_fields.push(Field::new_list(
1173                "my_list3",
1174                Field::new("element", DataType::Utf8, false),
1175                false,
1176            ));
1177        }
1178
1179        let parquet_group_type = parse_message_type(message_type).unwrap();
1180
1181        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1182        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1183        let converted_fields = converted_arrow_schema.fields();
1184
1185        assert_eq!(arrow_fields.len(), converted_fields.len());
1186        for i in 0..arrow_fields.len() {
1187            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1188        }
1189    }
1190
1191    #[test]
1192    fn test_parquet_maps() {
1193        let mut arrow_fields = Vec::new();
1194
1195        // LIST encoding example taken from parquet-format/LogicalTypes.md
1196        let message_type = "
1197        message test_schema {
1198          REQUIRED group my_map1 (MAP) {
1199            REPEATED group key_value {
1200              REQUIRED binary key (UTF8);
1201              OPTIONAL int32 value;
1202            }
1203          }
1204          OPTIONAL group my_map2 (MAP) {
1205            REPEATED group map {
1206              REQUIRED binary str (UTF8);
1207              REQUIRED int32 num;
1208            }
1209          }
1210          OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1211            REPEATED group map {
1212              REQUIRED binary key (UTF8);
1213              OPTIONAL int32 value;
1214            }
1215          }
1216          REQUIRED group my_map4 (MAP) {
1217            REPEATED group map {
1218              OPTIONAL binary key (UTF8);
1219              REQUIRED int32 value;
1220            }
1221          }
1222        }
1223        ";
1224
1225        // // Map<String, Integer>
1226        // required group my_map (MAP) {
1227        //   repeated group key_value {
1228        //     required binary key (UTF8);
1229        //     optional int32 value;
1230        //   }
1231        // }
1232        {
1233            arrow_fields.push(Field::new_map(
1234                "my_map1",
1235                "key_value",
1236                Field::new("key", DataType::Utf8, false),
1237                Field::new("value", DataType::Int32, true),
1238                false,
1239                false,
1240            ));
1241        }
1242
1243        // // Map<String, Integer> (nullable map, non-null values)
1244        // optional group my_map (MAP) {
1245        //   repeated group map {
1246        //     required binary str (UTF8);
1247        //     required int32 num;
1248        //   }
1249        // }
1250        {
1251            arrow_fields.push(Field::new_map(
1252                "my_map2",
1253                "map",
1254                Field::new("str", DataType::Utf8, false),
1255                Field::new("num", DataType::Int32, false),
1256                false,
1257                true,
1258            ));
1259        }
1260
1261        // // Map<String, Integer> (nullable map, nullable values)
1262        // optional group my_map (MAP_KEY_VALUE) {
1263        //   repeated group map {
1264        //     required binary key (UTF8);
1265        //     optional int32 value;
1266        //   }
1267        // }
1268        {
1269            arrow_fields.push(Field::new_map(
1270                "my_map3",
1271                "map",
1272                Field::new("key", DataType::Utf8, false),
1273                Field::new("value", DataType::Int32, true),
1274                false,
1275                true,
1276            ));
1277        }
1278
1279        // // Map<String, Integer> (non-compliant nullable key)
1280        // group my_map (MAP_KEY_VALUE) {
1281        //   repeated group map {
1282        //     optional binary key (UTF8);
1283        //     required int32 value;
1284        //   }
1285        // }
1286        {
1287            arrow_fields.push(Field::new_map(
1288                "my_map4",
1289                "map",
1290                Field::new("key", DataType::Utf8, false), // The key is always non-nullable (#5630)
1291                Field::new("value", DataType::Int32, false),
1292                false,
1293                false,
1294            ));
1295        }
1296
1297        let parquet_group_type = parse_message_type(message_type).unwrap();
1298
1299        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1300        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1301        let converted_fields = converted_arrow_schema.fields();
1302
1303        assert_eq!(arrow_fields.len(), converted_fields.len());
1304        for i in 0..arrow_fields.len() {
1305            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1306        }
1307    }
1308
1309    #[test]
1310    fn test_nested_schema() {
1311        let mut arrow_fields = Vec::new();
1312        {
1313            let group1_fields = Fields::from(vec![
1314                Field::new("leaf1", DataType::Boolean, false),
1315                Field::new("leaf2", DataType::Int32, false),
1316            ]);
1317            let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1318            arrow_fields.push(group1_struct);
1319
1320            let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1321            arrow_fields.push(leaf3_field);
1322        }
1323
1324        let message_type = "
1325        message test_schema {
1326          REQUIRED GROUP group1 {
1327            REQUIRED BOOLEAN leaf1;
1328            REQUIRED INT32 leaf2;
1329          }
1330          REQUIRED INT64 leaf3;
1331        }
1332        ";
1333        let parquet_group_type = parse_message_type(message_type).unwrap();
1334
1335        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1336        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1337        let converted_fields = converted_arrow_schema.fields();
1338
1339        assert_eq!(arrow_fields.len(), converted_fields.len());
1340        for i in 0..arrow_fields.len() {
1341            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1342        }
1343    }
1344
1345    #[test]
1346    fn test_nested_schema_partial() {
1347        let mut arrow_fields = Vec::new();
1348        {
1349            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1350            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1351            arrow_fields.push(group1);
1352
1353            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1354            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1355            arrow_fields.push(group2);
1356
1357            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1358        }
1359
1360        let message_type = "
1361        message test_schema {
1362          REQUIRED GROUP group1 {
1363            REQUIRED INT64 leaf1;
1364            REQUIRED INT64 leaf2;
1365          }
1366          REQUIRED  GROUP group2 {
1367            REQUIRED INT64 leaf3;
1368            REQUIRED INT64 leaf4;
1369          }
1370          REQUIRED INT64 leaf5;
1371        }
1372        ";
1373        let parquet_group_type = parse_message_type(message_type).unwrap();
1374
1375        // Expected partial arrow schema (columns 0, 3, 4):
1376        // required group group1 {
1377        //   required int64 leaf1;
1378        // }
1379        // required group group2 {
1380        //   required int64 leaf4;
1381        // }
1382        // required int64 leaf5;
1383
1384        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1385        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1386        let converted_arrow_schema =
1387            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1388        let converted_fields = converted_arrow_schema.fields();
1389
1390        assert_eq!(arrow_fields.len(), converted_fields.len());
1391        for i in 0..arrow_fields.len() {
1392            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1393        }
1394    }
1395
1396    #[test]
1397    fn test_nested_schema_partial_ordering() {
1398        let mut arrow_fields = Vec::new();
1399        {
1400            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1401            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1402            arrow_fields.push(group1);
1403
1404            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1405            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1406            arrow_fields.push(group2);
1407
1408            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1409        }
1410
1411        let message_type = "
1412        message test_schema {
1413          REQUIRED GROUP group1 {
1414            REQUIRED INT64 leaf1;
1415            REQUIRED INT64 leaf2;
1416          }
1417          REQUIRED  GROUP group2 {
1418            REQUIRED INT64 leaf3;
1419            REQUIRED INT64 leaf4;
1420          }
1421          REQUIRED INT64 leaf5;
1422        }
1423        ";
1424        let parquet_group_type = parse_message_type(message_type).unwrap();
1425
1426        // Expected partial arrow schema (columns 3, 4, 0):
1427        // required group group1 {
1428        //   required int64 leaf1;
1429        // }
1430        // required group group2 {
1431        //   required int64 leaf4;
1432        // }
1433        // required int64 leaf5;
1434
1435        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1436        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1437        let converted_arrow_schema =
1438            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1439        let converted_fields = converted_arrow_schema.fields();
1440
1441        assert_eq!(arrow_fields.len(), converted_fields.len());
1442        for i in 0..arrow_fields.len() {
1443            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1444        }
1445
1446        let mask =
1447            ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1448        let converted_arrow_schema =
1449            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1450        let converted_fields = converted_arrow_schema.fields();
1451
1452        assert_eq!(arrow_fields.len(), converted_fields.len());
1453        for i in 0..arrow_fields.len() {
1454            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1455        }
1456    }
1457
1458    #[test]
1459    fn test_repeated_nested_schema() {
1460        let mut arrow_fields = Vec::new();
1461        {
1462            arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1463
1464            let inner_group_list = Field::new_list(
1465                "innerGroup",
1466                Field::new_struct(
1467                    "innerGroup",
1468                    vec![Field::new("leaf3", DataType::Int32, true)],
1469                    false,
1470                ),
1471                false,
1472            );
1473
1474            let outer_group_list = Field::new_list(
1475                "outerGroup",
1476                Field::new_struct(
1477                    "outerGroup",
1478                    vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1479                    false,
1480                ),
1481                false,
1482            );
1483            arrow_fields.push(outer_group_list);
1484        }
1485
1486        let message_type = "
1487        message test_schema {
1488          OPTIONAL INT32 leaf1;
1489          REPEATED GROUP outerGroup {
1490            OPTIONAL INT32 leaf2;
1491            REPEATED GROUP innerGroup {
1492              OPTIONAL INT32 leaf3;
1493            }
1494          }
1495        }
1496        ";
1497        let parquet_group_type = parse_message_type(message_type).unwrap();
1498
1499        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1500        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1501        let converted_fields = converted_arrow_schema.fields();
1502
1503        assert_eq!(arrow_fields.len(), converted_fields.len());
1504        for i in 0..arrow_fields.len() {
1505            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1506        }
1507    }
1508
1509    #[test]
1510    fn test_column_desc_to_field() {
1511        let message_type = "
1512        message test_schema {
1513            REQUIRED BOOLEAN boolean;
1514            REQUIRED INT32   int8  (INT_8);
1515            REQUIRED INT32   uint8 (INTEGER(8,false));
1516            REQUIRED INT32   int16 (INT_16);
1517            REQUIRED INT32   uint16 (INTEGER(16,false));
1518            REQUIRED INT32   int32;
1519            REQUIRED INT64   int64;
1520            OPTIONAL DOUBLE  double;
1521            OPTIONAL FLOAT   float;
1522            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1523            OPTIONAL BINARY  string (UTF8);
1524            REPEATED BOOLEAN bools;
1525            OPTIONAL INT32   date       (DATE);
1526            OPTIONAL INT32   time_milli (TIME_MILLIS);
1527            OPTIONAL INT64   time_micro (TIME_MICROS);
1528            OPTIONAL INT64   time_nano (TIME(NANOS,false));
1529            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1530            REQUIRED INT64   ts_micro (TIMESTAMP_MICROS);
1531            REQUIRED INT64   ts_nano (TIMESTAMP(NANOS,true));
1532            REPEATED INT32   int_list;
1533            REPEATED BINARY  byte_list;
1534            REPEATED BINARY  string_list (UTF8);
1535            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1536            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1537            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1538        }
1539        ";
1540        let parquet_group_type = parse_message_type(message_type).unwrap();
1541
1542        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1543        let converted_arrow_fields = parquet_schema
1544            .columns()
1545            .iter()
1546            .map(|c| parquet_to_arrow_field(c).unwrap())
1547            .collect::<Vec<Field>>();
1548
1549        let arrow_fields = vec![
1550            Field::new("boolean", DataType::Boolean, false),
1551            Field::new("int8", DataType::Int8, false),
1552            Field::new("uint8", DataType::UInt8, false),
1553            Field::new("int16", DataType::Int16, false),
1554            Field::new("uint16", DataType::UInt16, false),
1555            Field::new("int32", DataType::Int32, false),
1556            Field::new("int64", DataType::Int64, false),
1557            Field::new("double", DataType::Float64, true),
1558            Field::new("float", DataType::Float32, true),
1559            Field::new("float16", DataType::Float16, true),
1560            Field::new("string", DataType::Utf8, true),
1561            Field::new_list(
1562                "bools",
1563                Field::new("bools", DataType::Boolean, false),
1564                false,
1565            ),
1566            Field::new("date", DataType::Date32, true),
1567            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1568            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1569            Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1570            Field::new(
1571                "ts_milli",
1572                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1573                true,
1574            ),
1575            Field::new(
1576                "ts_micro",
1577                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1578                false,
1579            ),
1580            Field::new(
1581                "ts_nano",
1582                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1583                false,
1584            ),
1585            Field::new_list(
1586                "int_list",
1587                Field::new("int_list", DataType::Int32, false),
1588                false,
1589            ),
1590            Field::new_list(
1591                "byte_list",
1592                Field::new("byte_list", DataType::Binary, false),
1593                false,
1594            ),
1595            Field::new_list(
1596                "string_list",
1597                Field::new("string_list", DataType::Utf8, false),
1598                false,
1599            ),
1600            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1601            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1602            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1603        ];
1604
1605        assert_eq!(arrow_fields, converted_arrow_fields);
1606    }
1607
1608    #[test]
1609    fn test_coerced_map_list() {
1610        // Create Arrow schema with non-Parquet naming
1611        let arrow_fields = vec![
1612            Field::new_list(
1613                "my_list",
1614                Field::new("item", DataType::Boolean, true),
1615                false,
1616            ),
1617            Field::new_map(
1618                "my_map",
1619                "entries",
1620                Field::new("keys", DataType::Utf8, false),
1621                Field::new("values", DataType::Int32, true),
1622                false,
1623                true,
1624            ),
1625        ];
1626        let arrow_schema = Schema::new(arrow_fields);
1627
1628        // Create Parquet schema with coerced names
1629        let message_type = "
1630        message parquet_schema {
1631            REQUIRED GROUP my_list (LIST) {
1632                REPEATED GROUP list {
1633                    OPTIONAL BOOLEAN element;
1634                }
1635            }
1636            OPTIONAL GROUP my_map (MAP) {
1637                REPEATED GROUP key_value {
1638                    REQUIRED BINARY key (STRING);
1639                    OPTIONAL INT32 value;
1640                }
1641            }
1642        }
1643        ";
1644        let parquet_group_type = parse_message_type(message_type).unwrap();
1645        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1646        let converted_arrow_schema = ArrowSchemaConverter::new()
1647            .with_coerce_types(true)
1648            .convert(&arrow_schema)
1649            .unwrap();
1650        assert_eq!(
1651            parquet_schema.columns().len(),
1652            converted_arrow_schema.columns().len()
1653        );
1654
1655        // Create Parquet schema without coerced names
1656        let message_type = "
1657        message parquet_schema {
1658            REQUIRED GROUP my_list (LIST) {
1659                REPEATED GROUP list {
1660                    OPTIONAL BOOLEAN item;
1661                }
1662            }
1663            OPTIONAL GROUP my_map (MAP) {
1664                REPEATED GROUP entries {
1665                    REQUIRED BINARY keys (STRING);
1666                    OPTIONAL INT32 values;
1667                }
1668            }
1669        }
1670        ";
1671        let parquet_group_type = parse_message_type(message_type).unwrap();
1672        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1673        let converted_arrow_schema = ArrowSchemaConverter::new()
1674            .with_coerce_types(false)
1675            .convert(&arrow_schema)
1676            .unwrap();
1677        assert_eq!(
1678            parquet_schema.columns().len(),
1679            converted_arrow_schema.columns().len()
1680        );
1681    }
1682
1683    #[test]
1684    fn test_field_to_column_desc() {
1685        let message_type = "
1686        message arrow_schema {
1687            REQUIRED BOOLEAN boolean;
1688            REQUIRED INT32   int8  (INT_8);
1689            REQUIRED INT32   int16 (INTEGER(16,true));
1690            REQUIRED INT32   int32;
1691            REQUIRED INT64   int64;
1692            OPTIONAL DOUBLE  double;
1693            OPTIONAL FLOAT   float;
1694            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1695            OPTIONAL BINARY  string (STRING);
1696            OPTIONAL GROUP   bools (LIST) {
1697                REPEATED GROUP list {
1698                    OPTIONAL BOOLEAN element;
1699                }
1700            }
1701            REQUIRED GROUP   bools_non_null (LIST) {
1702                REPEATED GROUP list {
1703                    REQUIRED BOOLEAN element;
1704                }
1705            }
1706            OPTIONAL INT32   date       (DATE);
1707            OPTIONAL INT32   time_milli (TIME(MILLIS,false));
1708            OPTIONAL INT32   time_milli_utc (TIME(MILLIS,true));
1709            OPTIONAL INT64   time_micro (TIME_MICROS);
1710            OPTIONAL INT64   time_micro_utc (TIME(MICROS, true));
1711            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1712            REQUIRED INT64   ts_micro (TIMESTAMP(MICROS,false));
1713            REQUIRED INT64   ts_seconds;
1714            REQUIRED INT64   ts_micro_utc (TIMESTAMP(MICROS, true));
1715            REQUIRED INT64   ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1716            REQUIRED INT64   ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1717            REQUIRED INT64   ts_micro_non_utc (TIMESTAMP(MICROS, true));
1718            REQUIRED GROUP struct {
1719                REQUIRED BOOLEAN bools;
1720                REQUIRED INT32 uint32 (INTEGER(32,false));
1721                REQUIRED GROUP   int32 (LIST) {
1722                    REPEATED GROUP list {
1723                        OPTIONAL INT32 element;
1724                    }
1725                }
1726            }
1727            REQUIRED BINARY  dictionary_strings (STRING);
1728            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1729            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1730            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1731            REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1732            REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1733        }
1734        ";
1735        let parquet_group_type = parse_message_type(message_type).unwrap();
1736
1737        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1738
1739        let arrow_fields = vec![
1740            Field::new("boolean", DataType::Boolean, false),
1741            Field::new("int8", DataType::Int8, false),
1742            Field::new("int16", DataType::Int16, false),
1743            Field::new("int32", DataType::Int32, false),
1744            Field::new("int64", DataType::Int64, false),
1745            Field::new("double", DataType::Float64, true),
1746            Field::new("float", DataType::Float32, true),
1747            Field::new("float16", DataType::Float16, true),
1748            Field::new("string", DataType::Utf8, true),
1749            Field::new_list(
1750                "bools",
1751                Field::new("element", DataType::Boolean, true),
1752                true,
1753            ),
1754            Field::new_list(
1755                "bools_non_null",
1756                Field::new("element", DataType::Boolean, false),
1757                false,
1758            ),
1759            Field::new("date", DataType::Date32, true),
1760            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1761            Field::new(
1762                "time_milli_utc",
1763                DataType::Time32(TimeUnit::Millisecond),
1764                true,
1765            )
1766            .with_metadata(HashMap::from_iter(vec![(
1767                "adjusted_to_utc".to_string(),
1768                "".to_string(),
1769            )])),
1770            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1771            Field::new(
1772                "time_micro_utc",
1773                DataType::Time64(TimeUnit::Microsecond),
1774                true,
1775            )
1776            .with_metadata(HashMap::from_iter(vec![(
1777                "adjusted_to_utc".to_string(),
1778                "".to_string(),
1779            )])),
1780            Field::new(
1781                "ts_milli",
1782                DataType::Timestamp(TimeUnit::Millisecond, None),
1783                true,
1784            ),
1785            Field::new(
1786                "ts_micro",
1787                DataType::Timestamp(TimeUnit::Microsecond, None),
1788                false,
1789            ),
1790            Field::new(
1791                "ts_seconds",
1792                DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1793                false,
1794            ),
1795            Field::new(
1796                "ts_micro_utc",
1797                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1798                false,
1799            ),
1800            Field::new(
1801                "ts_millis_zero_offset",
1802                DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1803                false,
1804            ),
1805            Field::new(
1806                "ts_millis_zero_negative_offset",
1807                DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1808                false,
1809            ),
1810            Field::new(
1811                "ts_micro_non_utc",
1812                DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1813                false,
1814            ),
1815            Field::new_struct(
1816                "struct",
1817                vec![
1818                    Field::new("bools", DataType::Boolean, false),
1819                    Field::new("uint32", DataType::UInt32, false),
1820                    Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1821                ],
1822                false,
1823            ),
1824            Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1825            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1826            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1827            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1828            Field::new("decimal128", DataType::Decimal128(38, 2), false),
1829            Field::new("decimal256", DataType::Decimal256(39, 2), false),
1830        ];
1831        let arrow_schema = Schema::new(arrow_fields);
1832        let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1833
1834        assert_eq!(
1835            parquet_schema.columns().len(),
1836            converted_arrow_schema.columns().len()
1837        );
1838        parquet_schema
1839            .columns()
1840            .iter()
1841            .zip(converted_arrow_schema.columns())
1842            .for_each(|(a, b)| {
1843                // Only check logical type if it's set on the Parquet side.
1844                // This is because the Arrow conversion always sets logical type,
1845                // even if there wasn't originally one.
1846                // This is not an issue, but is an inconvenience for this test.
1847                match a.logical_type() {
1848                    Some(_) => {
1849                        assert_eq!(a, b)
1850                    }
1851                    None => {
1852                        assert_eq!(a.name(), b.name());
1853                        assert_eq!(a.physical_type(), b.physical_type());
1854                        assert_eq!(a.converted_type(), b.converted_type());
1855                    }
1856                };
1857            });
1858    }
1859
1860    #[test]
1861    #[should_panic(expected = "Parquet does not support writing empty structs")]
1862    fn test_empty_struct_field() {
1863        let arrow_fields = vec![Field::new(
1864            "struct",
1865            DataType::Struct(Fields::empty()),
1866            false,
1867        )];
1868        let arrow_schema = Schema::new(arrow_fields);
1869        let converted_arrow_schema = ArrowSchemaConverter::new()
1870            .with_coerce_types(true)
1871            .convert(&arrow_schema);
1872
1873        converted_arrow_schema.unwrap();
1874    }
1875
1876    #[test]
1877    fn test_metadata() {
1878        let message_type = "
1879        message test_schema {
1880            OPTIONAL BINARY  string (STRING);
1881        }
1882        ";
1883        let parquet_group_type = parse_message_type(message_type).unwrap();
1884
1885        let key_value_metadata = vec![
1886            KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
1887            KeyValue::new("baz".to_owned(), None),
1888        ];
1889
1890        let mut expected_metadata: HashMap<String, String> = HashMap::new();
1891        expected_metadata.insert("foo".to_owned(), "bar".to_owned());
1892
1893        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1894        let converted_arrow_schema =
1895            parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
1896
1897        assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
1898    }
1899
1900    #[test]
1901    fn test_arrow_schema_roundtrip() -> Result<()> {
1902        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
1903            a.iter()
1904                .map(|(a, b)| (a.to_string(), b.to_string()))
1905                .collect()
1906        };
1907
1908        let schema = Schema::new_with_metadata(
1909            vec![
1910                Field::new("c1", DataType::Utf8, false)
1911                    .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
1912                Field::new("c2", DataType::Binary, false),
1913                Field::new("c3", DataType::FixedSizeBinary(3), false),
1914                Field::new("c4", DataType::Boolean, false),
1915                Field::new("c5", DataType::Date32, false),
1916                Field::new("c6", DataType::Date64, false),
1917                Field::new("c7", DataType::Time32(TimeUnit::Second), false),
1918                Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
1919                Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
1920                Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
1921                Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
1922                Field::new(
1923                    "c16",
1924                    DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1925                    false,
1926                ),
1927                Field::new(
1928                    "c17",
1929                    DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1930                    false,
1931                ),
1932                Field::new(
1933                    "c18",
1934                    DataType::Timestamp(TimeUnit::Nanosecond, None),
1935                    false,
1936                ),
1937                Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
1938                Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
1939                Field::new_list(
1940                    "c21",
1941                    Field::new_list_field(DataType::Boolean, true)
1942                        .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
1943                    false,
1944                )
1945                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
1946                Field::new(
1947                    "c22",
1948                    DataType::FixedSizeList(
1949                        Arc::new(Field::new_list_field(DataType::Boolean, true)),
1950                        5,
1951                    ),
1952                    false,
1953                ),
1954                Field::new_list(
1955                    "c23",
1956                    Field::new_large_list(
1957                        "inner",
1958                        Field::new_list_field(
1959                            DataType::Struct(
1960                                vec![
1961                                    Field::new("a", DataType::Int16, true),
1962                                    Field::new("b", DataType::Float64, false),
1963                                    Field::new("c", DataType::Float32, false),
1964                                    Field::new("d", DataType::Float16, false),
1965                                ]
1966                                .into(),
1967                            ),
1968                            false,
1969                        ),
1970                        true,
1971                    ),
1972                    false,
1973                ),
1974                Field::new(
1975                    "c24",
1976                    DataType::Struct(Fields::from(vec![
1977                        Field::new("a", DataType::Utf8, false),
1978                        Field::new("b", DataType::UInt16, false),
1979                    ])),
1980                    false,
1981                ),
1982                Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
1983                Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
1984                // Duration types not supported
1985                // Field::new("c27", DataType::Duration(TimeUnit::Second), false),
1986                // Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false),
1987                // Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false),
1988                // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false),
1989                #[allow(deprecated)]
1990                Field::new_dict(
1991                    "c31",
1992                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1993                    true,
1994                    123,
1995                    true,
1996                )
1997                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
1998                Field::new("c32", DataType::LargeBinary, true),
1999                Field::new("c33", DataType::LargeUtf8, true),
2000                Field::new_large_list(
2001                    "c34",
2002                    Field::new_list(
2003                        "inner",
2004                        Field::new_list_field(
2005                            DataType::Struct(
2006                                vec![
2007                                    Field::new("a", DataType::Int16, true),
2008                                    Field::new("b", DataType::Float64, true),
2009                                ]
2010                                .into(),
2011                            ),
2012                            true,
2013                        ),
2014                        true,
2015                    ),
2016                    true,
2017                ),
2018                Field::new("c35", DataType::Null, true),
2019                Field::new("c36", DataType::Decimal128(2, 1), false),
2020                Field::new("c37", DataType::Decimal256(50, 20), false),
2021                Field::new("c38", DataType::Decimal128(18, 12), true),
2022                Field::new_map(
2023                    "c39",
2024                    "key_value",
2025                    Field::new("key", DataType::Utf8, false),
2026                    Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2027                    false, // fails to roundtrip keys_sorted
2028                    true,
2029                ),
2030                Field::new_map(
2031                    "c40",
2032                    "my_entries",
2033                    Field::new("my_key", DataType::Utf8, false)
2034                        .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2035                    Field::new_list(
2036                        "my_value",
2037                        Field::new_list_field(DataType::Utf8, true)
2038                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2039                        true,
2040                    )
2041                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2042                    false, // fails to roundtrip keys_sorted
2043                    true,
2044                )
2045                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2046                Field::new_map(
2047                    "c41",
2048                    "my_entries",
2049                    Field::new("my_key", DataType::Utf8, false),
2050                    Field::new_list(
2051                        "my_value",
2052                        Field::new_list_field(DataType::Utf8, true)
2053                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2054                        true,
2055                    ),
2056                    false, // fails to roundtrip keys_sorted
2057                    false,
2058                ),
2059                Field::new("c42", DataType::Decimal32(5, 2), false),
2060                Field::new("c43", DataType::Decimal64(18, 12), true),
2061            ],
2062            meta(&[("Key", "Value")]),
2063        );
2064
2065        // write to an empty parquet file so that schema is serialized
2066        let file = tempfile::tempfile().unwrap();
2067        let writer =
2068            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2069        writer.close()?;
2070
2071        // read file back
2072        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2073
2074        // Check arrow schema
2075        let read_schema = arrow_reader.schema();
2076        assert_eq!(&schema, read_schema.as_ref());
2077
2078        // Walk schema finding field IDs
2079        let mut stack = Vec::with_capacity(10);
2080        let mut out = Vec::with_capacity(10);
2081
2082        let root = arrow_reader.parquet_schema().root_schema_ptr();
2083        stack.push((root.name().to_string(), root));
2084
2085        while let Some((p, t)) = stack.pop() {
2086            if t.is_group() {
2087                for f in t.get_fields() {
2088                    stack.push((format!("{p}.{}", f.name()), f.clone()))
2089                }
2090            }
2091
2092            let info = t.get_basic_info();
2093            if info.has_id() {
2094                out.push(format!("{p} -> {}", info.id()))
2095            }
2096        }
2097        out.sort_unstable();
2098        let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2099
2100        assert_eq!(
2101            &out,
2102            &[
2103                "arrow_schema.c1 -> 2",
2104                "arrow_schema.c21 -> 4",
2105                "arrow_schema.c21.list.item -> 5",
2106                "arrow_schema.c31 -> 6",
2107                "arrow_schema.c40 -> 7",
2108                "arrow_schema.c40.my_entries.my_key -> 8",
2109                "arrow_schema.c40.my_entries.my_value -> 9",
2110                "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2111                "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2112            ]
2113        );
2114
2115        Ok(())
2116    }
2117
2118    #[test]
2119    fn test_read_parquet_field_ids_raw() -> Result<()> {
2120        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2121            a.iter()
2122                .map(|(a, b)| (a.to_string(), b.to_string()))
2123                .collect()
2124        };
2125        let schema = Schema::new_with_metadata(
2126            vec![
2127                Field::new("c1", DataType::Utf8, true)
2128                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2129                Field::new("c2", DataType::Utf8, true)
2130                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2131            ],
2132            HashMap::new(),
2133        );
2134
2135        let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2136        let parquet_bytes = writer.into_inner()?;
2137
2138        let reader =
2139            crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2140        let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2141
2142        // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema
2143        let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2144
2145        let parq_schema_descr = ArrowSchemaConverter::new()
2146            .with_coerce_types(true)
2147            .convert(&arrow_schema)?;
2148        let parq_fields = parq_schema_descr.root_schema().get_fields();
2149        assert_eq!(parq_fields.len(), 2);
2150        assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2151        assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2152
2153        Ok(())
2154    }
2155
2156    #[test]
2157    fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2158        let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2159            .iter()
2160            .cloned()
2161            .collect();
2162
2163        let schema = Schema::new_with_metadata(
2164            vec![
2165                Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2166                Field::new(
2167                    "c22",
2168                    DataType::FixedSizeList(
2169                        Arc::new(Field::new("items", DataType::Boolean, false)),
2170                        5,
2171                    ),
2172                    false,
2173                ),
2174                Field::new_list(
2175                    "c23",
2176                    Field::new_large_list(
2177                        "items",
2178                        Field::new_struct(
2179                            "items",
2180                            vec![
2181                                Field::new("a", DataType::Int16, true),
2182                                Field::new("b", DataType::Float64, false),
2183                            ],
2184                            true,
2185                        ),
2186                        true,
2187                    ),
2188                    true,
2189                ),
2190            ],
2191            metadata,
2192        );
2193
2194        // write to an empty parquet file so that schema is serialized
2195        let file = tempfile::tempfile().unwrap();
2196        let writer =
2197            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2198        writer.close()?;
2199
2200        // read file back
2201        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2202        let read_schema = arrow_reader.schema();
2203        assert_eq!(&schema, read_schema.as_ref());
2204        Ok(())
2205    }
2206
2207    #[test]
2208    fn test_get_arrow_schema_from_metadata() {
2209        assert!(get_arrow_schema_from_metadata("").is_err());
2210    }
2211
2212    #[test]
2213    #[cfg(feature = "arrow_canonical_extension_types")]
2214    fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2215        use arrow_schema::extension::Uuid;
2216        let arrow_schema = Schema::new(vec![Field::new(
2217            "uuid",
2218            DataType::FixedSizeBinary(16),
2219            false,
2220        )
2221        .with_extension_type(Uuid)]);
2222
2223        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2224
2225        assert_eq!(
2226            parquet_schema.column(0).logical_type(),
2227            Some(LogicalType::Uuid)
2228        );
2229
2230        let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2231        assert_eq!(arrow_schema.field(0).try_extension_type::<Uuid>()?, Uuid);
2232
2233        Ok(())
2234    }
2235
2236    #[test]
2237    #[cfg(feature = "arrow_canonical_extension_types")]
2238    fn arrow_json_to_parquet_json() -> Result<()> {
2239        use arrow_schema::extension::Json;
2240        let arrow_schema = Schema::new(vec![
2241            Field::new("json", DataType::Utf8, false).with_extension_type(Json::default())
2242        ]);
2243
2244        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2245
2246        assert_eq!(
2247            parquet_schema.column(0).logical_type(),
2248            Some(LogicalType::Json)
2249        );
2250
2251        let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2252        assert_eq!(
2253            arrow_schema.field(0).try_extension_type::<Json>()?,
2254            Json::default()
2255        );
2256
2257        Ok(())
2258    }
2259}