parquet/arrow/schema/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Converting Parquet schema <--> Arrow schema: [`ArrowSchemaConverter`] and [parquet_to_arrow_schema]
19
20use base64::Engine;
21use base64::prelude::BASE64_STANDARD;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
27
28use crate::basic::{
29    ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
30};
31use crate::errors::{ParquetError, Result};
32use crate::file::{metadata::KeyValue, properties::WriterProperties};
33use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
34
35mod complex;
36mod extension;
37mod primitive;
38
39use super::PARQUET_FIELD_ID_META_KEY;
40use crate::arrow::ProjectionMask;
41use crate::arrow::schema::extension::{
42    has_extension_type, logical_type_for_fixed_size_binary, logical_type_for_string,
43    logical_type_for_struct, try_add_extension_type,
44};
45pub(crate) use complex::{ParquetField, ParquetFieldType};
46
47/// Convert Parquet schema to Arrow schema including optional metadata
48///
49/// Attempts to decode any existing Arrow schema metadata, falling back
50/// to converting the Parquet schema column-wise
51pub fn parquet_to_arrow_schema(
52    parquet_schema: &SchemaDescriptor,
53    key_value_metadata: Option<&Vec<KeyValue>>,
54) -> Result<Schema> {
55    parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
56}
57
58/// Convert parquet schema to arrow schema including optional metadata,
59/// only preserving some leaf columns.
60pub fn parquet_to_arrow_schema_by_columns(
61    parquet_schema: &SchemaDescriptor,
62    mask: ProjectionMask,
63    key_value_metadata: Option<&Vec<KeyValue>>,
64) -> Result<Schema> {
65    Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0)
66}
67
68/// Determines the Arrow Schema from a Parquet schema
69///
70/// Looks for an Arrow schema metadata "hint" (see
71/// [`parquet_to_arrow_field_levels`]), and uses it if present to ensure
72/// lossless round trips.
73pub(crate) fn parquet_to_arrow_schema_and_fields(
74    parquet_schema: &SchemaDescriptor,
75    mask: ProjectionMask,
76    key_value_metadata: Option<&Vec<KeyValue>>,
77) -> Result<(Schema, Option<ParquetField>)> {
78    let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
79    let maybe_schema = metadata
80        .remove(super::ARROW_SCHEMA_META_KEY)
81        .map(|value| get_arrow_schema_from_metadata(&value))
82        .transpose()?;
83
84    // Add the Arrow metadata to the Parquet metadata skipping keys that collide
85    if let Some(arrow_schema) = &maybe_schema {
86        arrow_schema.metadata().iter().for_each(|(k, v)| {
87            metadata.entry(k.clone()).or_insert_with(|| v.clone());
88        });
89    }
90
91    let hint = maybe_schema.as_ref().map(|s| s.fields());
92    let field_levels = parquet_to_arrow_field_levels(parquet_schema, mask, hint)?;
93    let schema = Schema::new_with_metadata(field_levels.fields, metadata);
94    Ok((schema, field_levels.levels))
95}
96
97/// Schema information necessary to decode a parquet file as arrow [`Fields`]
98///
99/// In particular this stores the dremel-level information necessary to correctly
100/// interpret the encoded definition and repetition levels
101///
102/// Note: this is an opaque container intended to be used with lower-level APIs
103/// within this crate
104#[derive(Debug, Clone)]
105pub struct FieldLevels {
106    pub(crate) fields: Fields,
107    pub(crate) levels: Option<ParquetField>,
108}
109
110/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`]
111///
112/// Columns not included within [`ProjectionMask`] will be ignored.
113///
114/// The optional `hint` parameter is the desired Arrow schema. See the
115/// [`arrow`] module documentation for more information.
116///
117/// [`arrow`]: crate::arrow
118///
119/// # Notes:
120/// Where a field type in `hint` is compatible with the corresponding parquet type in `schema`, it
121/// will be used, otherwise the default arrow type for the given parquet column type will be used.
122///
123/// This is to accommodate arrow types that cannot be round-tripped through parquet natively.
124/// Depending on the parquet writer, this can lead to a mismatch between a file's parquet schema
125/// and its embedded arrow schema. The parquet `schema` must be treated as authoritative in such
126/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for more information
127///
128/// Note: this is a low-level API, most users will want to make use of the higher-level
129/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file.
130pub fn parquet_to_arrow_field_levels(
131    schema: &SchemaDescriptor,
132    mask: ProjectionMask,
133    hint: Option<&Fields>,
134) -> Result<FieldLevels> {
135    match complex::convert_schema(schema, mask, hint)? {
136        Some(field) => match &field.arrow_type {
137            DataType::Struct(fields) => Ok(FieldLevels {
138                fields: fields.clone(),
139                levels: Some(field),
140            }),
141            _ => unreachable!(),
142        },
143        None => Ok(FieldLevels {
144            fields: Fields::empty(),
145            levels: None,
146        }),
147    }
148}
149
150/// Try to convert Arrow schema metadata into a schema
151fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
152    let decoded = BASE64_STANDARD.decode(encoded_meta);
153    match decoded {
154        Ok(bytes) => {
155            let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
156                &bytes[8..]
157            } else {
158                bytes.as_slice()
159            };
160            match arrow_ipc::root_as_message(slice) {
161                Ok(message) => message
162                    .header_as_schema()
163                    .map(arrow_ipc::convert::fb_to_schema)
164                    .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
165                Err(err) => {
166                    // The flatbuffers implementation returns an error on verification error.
167                    Err(arrow_err!(
168                        "Unable to get root as message stored in {}: {:?}",
169                        super::ARROW_SCHEMA_META_KEY,
170                        err
171                    ))
172                }
173            }
174        }
175        Err(err) => {
176            // The C++ implementation returns an error if the schema can't be parsed.
177            Err(arrow_err!(
178                "Unable to decode the encoded schema stored in {}, {:?}",
179                super::ARROW_SCHEMA_META_KEY,
180                err
181            ))
182        }
183    }
184}
185
186/// Encodes the Arrow schema into the IPC format, and base64 encodes it
187pub fn encode_arrow_schema(schema: &Schema) -> String {
188    let options = writer::IpcWriteOptions::default();
189    let mut dictionary_tracker = writer::DictionaryTracker::new(true);
190    let data_gen = writer::IpcDataGenerator::default();
191    let mut serialized_schema =
192        data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
193
194    // manually prepending the length to the schema as arrow uses the legacy IPC format
195    // TODO: change after addressing ARROW-9777
196    let schema_len = serialized_schema.ipc_message.len();
197    let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
198    len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
199    len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
200    len_prefix_schema.append(&mut serialized_schema.ipc_message);
201
202    BASE64_STANDARD.encode(&len_prefix_schema)
203}
204
205/// Mutates writer metadata by storing the encoded Arrow schema hint in
206/// [`ARROW_SCHEMA_META_KEY`].
207///
208/// If there is an existing Arrow schema metadata, it is replaced.
209///
210/// [`ARROW_SCHEMA_META_KEY`]: crate::arrow::ARROW_SCHEMA_META_KEY
211pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
212    let encoded = encode_arrow_schema(schema);
213
214    let schema_kv = KeyValue {
215        key: super::ARROW_SCHEMA_META_KEY.to_string(),
216        value: Some(encoded),
217    };
218
219    let meta = props
220        .key_value_metadata
221        .get_or_insert_with(Default::default);
222
223    // check if ARROW:schema exists, and overwrite it
224    let schema_meta = meta
225        .iter()
226        .enumerate()
227        .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
228    match schema_meta {
229        Some((i, _)) => {
230            meta.remove(i);
231            meta.push(schema_kv);
232        }
233        None => {
234            meta.push(schema_kv);
235        }
236    }
237}
238
239/// Converter for Arrow schema to Parquet schema
240///
241/// See the documentation on the [`arrow`] module for background
242/// information on how Arrow schema is represented in Parquet.
243///
244/// [`arrow`]: crate::arrow
245///
246/// # Example:
247/// ```
248/// # use std::sync::Arc;
249/// # use arrow_schema::{Field, Schema, DataType};
250/// # use parquet::arrow::ArrowSchemaConverter;
251/// use parquet::schema::types::{SchemaDescriptor, Type};
252/// use parquet::basic; // note there are two `Type`s in the following example
253/// // create an Arrow Schema
254/// let arrow_schema = Schema::new(vec![
255///   Field::new("a", DataType::Int64, true),
256///   Field::new("b", DataType::Date32, true),
257/// ]);
258/// // convert the Arrow schema to a Parquet schema
259/// let parquet_schema = ArrowSchemaConverter::new()
260///   .convert(&arrow_schema)
261///   .unwrap();
262///
263/// let expected_parquet_schema = SchemaDescriptor::new(
264///   Arc::new(
265///     Type::group_type_builder("arrow_schema")
266///       .with_fields(vec![
267///         Arc::new(
268///          Type::primitive_type_builder("a", basic::Type::INT64)
269///           .build().unwrap()
270///         ),
271///         Arc::new(
272///          Type::primitive_type_builder("b", basic::Type::INT32)
273///           .with_converted_type(basic::ConvertedType::DATE)
274///           .with_logical_type(Some(basic::LogicalType::Date))
275///           .build().unwrap()
276///         ),
277///      ])
278///      .build().unwrap()
279///   )
280/// );
281/// assert_eq!(parquet_schema, expected_parquet_schema);
282/// ```
283#[derive(Debug)]
284pub struct ArrowSchemaConverter<'a> {
285    /// Name of the root schema in Parquet
286    schema_root: &'a str,
287    /// Should we coerce Arrow types to compatible Parquet types?
288    ///
289    /// See docs on [Self::with_coerce_types]`
290    coerce_types: bool,
291}
292
293impl Default for ArrowSchemaConverter<'_> {
294    fn default() -> Self {
295        Self::new()
296    }
297}
298
299impl<'a> ArrowSchemaConverter<'a> {
300    /// Create a new converter
301    pub fn new() -> Self {
302        Self {
303            schema_root: "arrow_schema",
304            coerce_types: false,
305        }
306    }
307
308    /// Should Arrow types be coerced into Parquet native types (default `false`).
309    ///
310    /// Setting this option to `true` will result in Parquet files that can be
311    /// read by more readers, but may lose precision for Arrow types such as
312    /// [`DataType::Date64`] which have no direct [corresponding Parquet type].
313    ///
314    /// By default, this converter does not coerce to native Parquet types. Enabling type
315    /// coercion allows for meaningful representations that do not require
316    /// downstream readers to consider the embedded Arrow schema, and can allow
317    /// for greater compatibility with other Parquet implementations. However,
318    /// type coercion also prevents data from being losslessly round-tripped.
319    ///
320    /// # Discussion
321    ///
322    /// Some Arrow types such as `Date64`, `Timestamp` and `Interval` have no
323    /// corresponding Parquet logical type. Thus, they can not be losslessly
324    /// round-tripped when stored using the appropriate Parquet logical type.
325    /// For example, some Date64 values may be truncated when stored with
326    /// parquet's native 32 bit date type.
327    ///
328    /// For [`List`] and [`Map`] types, some Parquet readers expect certain
329    /// schema elements to have specific names (earlier versions of the spec
330    /// were somewhat ambiguous on this point). Type coercion will use the names
331    /// prescribed by the Parquet specification, potentially losing naming
332    /// metadata from the Arrow schema.
333    ///
334    /// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
335    /// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
336    /// [corresponding Parquet type]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date
337    ///
338    pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
339        self.coerce_types = coerce_types;
340        self
341    }
342
343    /// Set the root schema element name (defaults to `"arrow_schema"`).
344    pub fn schema_root(mut self, schema_root: &'a str) -> Self {
345        self.schema_root = schema_root;
346        self
347    }
348
349    /// Convert the specified Arrow [`Schema`] to the desired Parquet [`SchemaDescriptor`]
350    ///
351    /// See example in [`ArrowSchemaConverter`]
352    pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
353        let fields = schema
354            .fields()
355            .iter()
356            .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
357            .collect::<Result<_>>()?;
358        let group = Type::group_type_builder(self.schema_root)
359            .with_fields(fields)
360            .build()?;
361        Ok(SchemaDescriptor::new(Arc::new(group)))
362    }
363}
364
365fn parse_key_value_metadata(
366    key_value_metadata: Option<&Vec<KeyValue>>,
367) -> Option<HashMap<String, String>> {
368    match key_value_metadata {
369        Some(key_values) => {
370            let map: HashMap<String, String> = key_values
371                .iter()
372                .filter_map(|kv| {
373                    kv.value
374                        .as_ref()
375                        .map(|value| (kv.key.clone(), value.clone()))
376                })
377                .collect();
378
379            if map.is_empty() { None } else { Some(map) }
380        }
381        None => None,
382    }
383}
384
385/// Convert parquet column schema to arrow field.
386pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
387    let field = complex::convert_type(&parquet_column.self_type_ptr())?;
388    let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
389
390    let parquet_type = parquet_column.self_type();
391    let basic_info = parquet_type.get_basic_info();
392
393    let mut hash_map_size = 0;
394    if basic_info.has_id() {
395        hash_map_size += 1;
396    }
397    if has_extension_type(parquet_type) {
398        hash_map_size += 1;
399    }
400    if hash_map_size == 0 {
401        return Ok(ret);
402    }
403    ret.set_metadata(HashMap::with_capacity(hash_map_size));
404    if basic_info.has_id() {
405        ret.metadata_mut().insert(
406            PARQUET_FIELD_ID_META_KEY.to_string(),
407            basic_info.id().to_string(),
408        );
409    }
410    try_add_extension_type(ret, parquet_column.self_type())
411}
412
413pub fn decimal_length_from_precision(precision: u8) -> usize {
414    // digits = floor(log_10(2^(8*n - 1) - 1))  // definition in parquet's logical types
415    // ceil(digits) = log10(2^(8*n - 1) - 1)
416    // 10^ceil(digits) = 2^(8*n - 1) - 1
417    // 10^ceil(digits) + 1 = 2^(8*n - 1)
418    // log2(10^ceil(digits) + 1) = (8*n - 1)
419    // log2(10^ceil(digits) + 1) + 1 = 8*n
420    // (log2(10^ceil(a) + 1) + 1) / 8 = n
421    (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
422}
423
424/// Convert an arrow field to a parquet `Type`
425fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
426    const PARQUET_LIST_ELEMENT_NAME: &str = "element";
427    const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
428    const PARQUET_KEY_FIELD_NAME: &str = "key";
429    const PARQUET_VALUE_FIELD_NAME: &str = "value";
430
431    let name = field.name().as_str();
432    let repetition = if field.is_nullable() {
433        Repetition::OPTIONAL
434    } else {
435        Repetition::REQUIRED
436    };
437    let id = field_id(field);
438    // create type from field
439    match field.data_type() {
440        DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
441            .with_logical_type(Some(LogicalType::Unknown))
442            .with_repetition(repetition)
443            .with_id(id)
444            .build(),
445        DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
446            .with_repetition(repetition)
447            .with_id(id)
448            .build(),
449        DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
450            .with_logical_type(Some(LogicalType::Integer {
451                bit_width: 8,
452                is_signed: true,
453            }))
454            .with_repetition(repetition)
455            .with_id(id)
456            .build(),
457        DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
458            .with_logical_type(Some(LogicalType::Integer {
459                bit_width: 16,
460                is_signed: true,
461            }))
462            .with_repetition(repetition)
463            .with_id(id)
464            .build(),
465        DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
466            .with_repetition(repetition)
467            .with_id(id)
468            .build(),
469        DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
470            .with_repetition(repetition)
471            .with_id(id)
472            .build(),
473        DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
474            .with_logical_type(Some(LogicalType::Integer {
475                bit_width: 8,
476                is_signed: false,
477            }))
478            .with_repetition(repetition)
479            .with_id(id)
480            .build(),
481        DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
482            .with_logical_type(Some(LogicalType::Integer {
483                bit_width: 16,
484                is_signed: false,
485            }))
486            .with_repetition(repetition)
487            .with_id(id)
488            .build(),
489        DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
490            .with_logical_type(Some(LogicalType::Integer {
491                bit_width: 32,
492                is_signed: false,
493            }))
494            .with_repetition(repetition)
495            .with_id(id)
496            .build(),
497        DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
498            .with_logical_type(Some(LogicalType::Integer {
499                bit_width: 64,
500                is_signed: false,
501            }))
502            .with_repetition(repetition)
503            .with_id(id)
504            .build(),
505        DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
506            .with_repetition(repetition)
507            .with_id(id)
508            .with_logical_type(Some(LogicalType::Float16))
509            .with_length(2)
510            .build(),
511        DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
512            .with_repetition(repetition)
513            .with_id(id)
514            .build(),
515        DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
516            .with_repetition(repetition)
517            .with_id(id)
518            .build(),
519        DataType::Timestamp(TimeUnit::Second, _) => {
520            // Cannot represent seconds in LogicalType
521            Type::primitive_type_builder(name, PhysicalType::INT64)
522                .with_repetition(repetition)
523                .with_id(id)
524                .build()
525        }
526        DataType::Timestamp(time_unit, tz) => {
527            Type::primitive_type_builder(name, PhysicalType::INT64)
528                .with_logical_type(Some(LogicalType::Timestamp {
529                    // If timezone set, values are normalized to UTC timezone
530                    is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
531                    unit: match time_unit {
532                        TimeUnit::Second => unreachable!(),
533                        TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
534                        TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
535                        TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
536                    },
537                }))
538                .with_repetition(repetition)
539                .with_id(id)
540                .build()
541        }
542        DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
543            .with_logical_type(Some(LogicalType::Date))
544            .with_repetition(repetition)
545            .with_id(id)
546            .build(),
547        DataType::Date64 => {
548            if coerce_types {
549                Type::primitive_type_builder(name, PhysicalType::INT32)
550                    .with_logical_type(Some(LogicalType::Date))
551                    .with_repetition(repetition)
552                    .with_id(id)
553                    .build()
554            } else {
555                Type::primitive_type_builder(name, PhysicalType::INT64)
556                    .with_repetition(repetition)
557                    .with_id(id)
558                    .build()
559            }
560        }
561        DataType::Time32(TimeUnit::Second) => {
562            // Cannot represent seconds in LogicalType
563            Type::primitive_type_builder(name, PhysicalType::INT32)
564                .with_repetition(repetition)
565                .with_id(id)
566                .build()
567        }
568        DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
569            .with_logical_type(Some(LogicalType::Time {
570                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
571                unit: match unit {
572                    TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
573                    u => unreachable!("Invalid unit for Time32: {:?}", u),
574                },
575            }))
576            .with_repetition(repetition)
577            .with_id(id)
578            .build(),
579        DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
580            .with_logical_type(Some(LogicalType::Time {
581                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
582                unit: match unit {
583                    TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
584                    TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
585                    u => unreachable!("Invalid unit for Time64: {:?}", u),
586                },
587            }))
588            .with_repetition(repetition)
589            .with_id(id)
590            .build(),
591        DataType::Duration(_) => Type::primitive_type_builder(name, PhysicalType::INT64)
592            .with_repetition(repetition)
593            .with_id(id)
594            .build(),
595        DataType::Interval(_) => {
596            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
597                .with_converted_type(ConvertedType::INTERVAL)
598                .with_repetition(repetition)
599                .with_id(id)
600                .with_length(12)
601                .build()
602        }
603        DataType::Binary | DataType::LargeBinary => {
604            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
605                .with_repetition(repetition)
606                .with_id(id)
607                .build()
608        }
609        DataType::FixedSizeBinary(length) => {
610            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
611                .with_repetition(repetition)
612                .with_id(id)
613                .with_length(*length)
614                .with_logical_type(logical_type_for_fixed_size_binary(field))
615                .build()
616        }
617        DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
618            .with_repetition(repetition)
619            .with_id(id)
620            .build(),
621        DataType::Decimal32(precision, scale)
622        | DataType::Decimal64(precision, scale)
623        | DataType::Decimal128(precision, scale)
624        | DataType::Decimal256(precision, scale) => {
625            // Decimal precision determines the Parquet physical type to use.
626            // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
627            let (physical_type, length) = if *precision > 1 && *precision <= 9 {
628                (PhysicalType::INT32, -1)
629            } else if *precision <= 18 {
630                (PhysicalType::INT64, -1)
631            } else {
632                (
633                    PhysicalType::FIXED_LEN_BYTE_ARRAY,
634                    decimal_length_from_precision(*precision) as i32,
635                )
636            };
637            Type::primitive_type_builder(name, physical_type)
638                .with_repetition(repetition)
639                .with_id(id)
640                .with_length(length)
641                .with_logical_type(Some(LogicalType::Decimal {
642                    scale: *scale as i32,
643                    precision: *precision as i32,
644                }))
645                .with_precision(*precision as i32)
646                .with_scale(*scale as i32)
647                .build()
648        }
649        DataType::Utf8 | DataType::LargeUtf8 => {
650            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
651                .with_logical_type(logical_type_for_string(field))
652                .with_repetition(repetition)
653                .with_id(id)
654                .build()
655        }
656        DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
657            .with_logical_type(logical_type_for_string(field))
658            .with_repetition(repetition)
659            .with_id(id)
660            .build(),
661        DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
662            let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
663                // Ensure proper naming per the Parquet specification
664                let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
665                Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
666            } else {
667                Arc::new(arrow_to_parquet_type(f, coerce_types)?)
668            };
669
670            Type::group_type_builder(name)
671                .with_fields(vec![Arc::new(
672                    Type::group_type_builder("list")
673                        .with_fields(vec![field_ref])
674                        .with_repetition(Repetition::REPEATED)
675                        .build()?,
676                )])
677                .with_logical_type(Some(LogicalType::List))
678                .with_repetition(repetition)
679                .with_id(id)
680                .build()
681        }
682        DataType::ListView(_) | DataType::LargeListView(_) => {
683            unimplemented!("ListView/LargeListView not implemented")
684        }
685        DataType::Struct(fields) => {
686            if fields.is_empty() {
687                return Err(arrow_err!("Parquet does not support writing empty structs",));
688            }
689            // recursively convert children to types/nodes
690            let fields = fields
691                .iter()
692                .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
693                .collect::<Result<_>>()?;
694            Type::group_type_builder(name)
695                .with_fields(fields)
696                .with_repetition(repetition)
697                .with_id(id)
698                .with_logical_type(logical_type_for_struct(field))
699                .build()
700        }
701        DataType::Map(field, _) => {
702            if let DataType::Struct(struct_fields) = field.data_type() {
703                // If coercing then set inner struct name to "key_value"
704                let map_struct_name = if coerce_types {
705                    PARQUET_MAP_STRUCT_NAME
706                } else {
707                    field.name()
708                };
709
710                // If coercing then ensure struct fields are named "key" and "value"
711                let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
712                    if coerce_types && fld.name() != name {
713                        let f = fld.as_ref().clone().with_name(name);
714                        Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
715                    } else {
716                        Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
717                    }
718                };
719                let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
720                let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
721
722                Type::group_type_builder(name)
723                    .with_fields(vec![Arc::new(
724                        Type::group_type_builder(map_struct_name)
725                            .with_fields(vec![key_field, val_field])
726                            .with_repetition(Repetition::REPEATED)
727                            .build()?,
728                    )])
729                    .with_logical_type(Some(LogicalType::Map))
730                    .with_repetition(repetition)
731                    .with_id(id)
732                    .build()
733            } else {
734                Err(arrow_err!(
735                    "DataType::Map should contain a struct field child",
736                ))
737            }
738        }
739        DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
740        DataType::Dictionary(_, value) => {
741            // Dictionary encoding not handled at the schema level
742            let dict_field = field.clone().with_data_type(value.as_ref().clone());
743            arrow_to_parquet_type(&dict_field, coerce_types)
744        }
745        DataType::RunEndEncoded(_, _) => Err(arrow_err!(
746            "Converting RunEndEncodedType to parquet not supported",
747        )),
748    }
749}
750
751fn field_id(field: &Field) -> Option<i32> {
752    let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
753    value.parse().ok() // Fail quietly if not a valid integer
754}
755
756#[cfg(test)]
757mod tests {
758    use super::*;
759
760    use std::{collections::HashMap, sync::Arc};
761
762    use crate::arrow::PARQUET_FIELD_ID_META_KEY;
763    use crate::file::metadata::KeyValue;
764    use crate::file::reader::FileReader;
765    use crate::{
766        arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
767        schema::{parser::parse_message_type, types::SchemaDescriptor},
768    };
769    use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
770
771    #[test]
772    fn test_flat_primitives() {
773        let message_type = "
774        message test_schema {
775            REQUIRED BOOLEAN boolean;
776            REQUIRED INT32   int8  (INT_8);
777            REQUIRED INT32   int16 (INT_16);
778            REQUIRED INT32   uint8 (INTEGER(8,false));
779            REQUIRED INT32   uint16 (INTEGER(16,false));
780            REQUIRED INT32   int32;
781            REQUIRED INT64   int64;
782            OPTIONAL DOUBLE  double;
783            OPTIONAL FLOAT   float;
784            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
785            OPTIONAL BINARY  string (UTF8);
786            OPTIONAL BINARY  string_2 (STRING);
787            OPTIONAL BINARY  json (JSON);
788        }
789        ";
790        let parquet_group_type = parse_message_type(message_type).unwrap();
791
792        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
793        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
794
795        let arrow_fields = Fields::from(vec![
796            Field::new("boolean", DataType::Boolean, false),
797            Field::new("int8", DataType::Int8, false),
798            Field::new("int16", DataType::Int16, false),
799            Field::new("uint8", DataType::UInt8, false),
800            Field::new("uint16", DataType::UInt16, false),
801            Field::new("int32", DataType::Int32, false),
802            Field::new("int64", DataType::Int64, false),
803            Field::new("double", DataType::Float64, true),
804            Field::new("float", DataType::Float32, true),
805            Field::new("float16", DataType::Float16, true),
806            Field::new("string", DataType::Utf8, true),
807            Field::new("string_2", DataType::Utf8, true),
808            json_field(),
809        ]);
810
811        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
812    }
813
814    /// Return the expected Field for a Parquet column annotated with
815    /// the JSON logical type.
816    fn json_field() -> Field {
817        #[cfg(feature = "arrow_canonical_extension_types")]
818        {
819            Field::new("json", DataType::Utf8, true)
820                .with_extension_type(arrow_schema::extension::Json::default())
821        }
822        #[cfg(not(feature = "arrow_canonical_extension_types"))]
823        {
824            Field::new("json", DataType::Utf8, true)
825        }
826    }
827
828    #[test]
829    fn test_decimal_fields() {
830        let message_type = "
831        message test_schema {
832                    REQUIRED INT32 decimal1 (DECIMAL(4,2));
833                    REQUIRED INT64 decimal2 (DECIMAL(12,2));
834                    REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
835                    REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
836                    REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
837                    REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
838                    REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
839        }
840        ";
841
842        let parquet_group_type = parse_message_type(message_type).unwrap();
843
844        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
845        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
846
847        let arrow_fields = Fields::from(vec![
848            Field::new("decimal1", DataType::Decimal128(4, 2), false),
849            Field::new("decimal2", DataType::Decimal128(12, 2), false),
850            Field::new("decimal3", DataType::Decimal128(30, 2), false),
851            Field::new("decimal4", DataType::Decimal128(33, 2), false),
852            Field::new("decimal5", DataType::Decimal128(38, 2), false),
853            Field::new("decimal6", DataType::Decimal256(39, 2), false),
854            Field::new("decimal7", DataType::Decimal256(39, 2), false),
855        ]);
856        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
857    }
858
859    #[test]
860    fn test_byte_array_fields() {
861        let message_type = "
862        message test_schema {
863            REQUIRED BYTE_ARRAY binary;
864            REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
865        }
866        ";
867
868        let parquet_group_type = parse_message_type(message_type).unwrap();
869
870        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
871        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
872
873        let arrow_fields = Fields::from(vec![
874            Field::new("binary", DataType::Binary, false),
875            Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
876        ]);
877        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
878    }
879
880    #[test]
881    fn test_duplicate_fields() {
882        let message_type = "
883        message test_schema {
884            REQUIRED BOOLEAN boolean;
885            REQUIRED INT32 int8 (INT_8);
886        }
887        ";
888
889        let parquet_group_type = parse_message_type(message_type).unwrap();
890
891        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
892        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
893
894        let arrow_fields = Fields::from(vec![
895            Field::new("boolean", DataType::Boolean, false),
896            Field::new("int8", DataType::Int8, false),
897        ]);
898        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
899
900        let converted_arrow_schema =
901            parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
902                .unwrap();
903        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
904    }
905
906    #[test]
907    fn test_parquet_lists() {
908        let mut arrow_fields = Vec::new();
909
910        // LIST encoding example taken from parquet-format/LogicalTypes.md
911        let message_type = "
912        message test_schema {
913          REQUIRED GROUP my_list (LIST) {
914            REPEATED GROUP list {
915              OPTIONAL BINARY element (UTF8);
916            }
917          }
918          OPTIONAL GROUP my_list (LIST) {
919            REPEATED GROUP list {
920              REQUIRED BINARY element (UTF8);
921            }
922          }
923          OPTIONAL GROUP array_of_arrays (LIST) {
924            REPEATED GROUP list {
925              REQUIRED GROUP element (LIST) {
926                REPEATED GROUP list {
927                  REQUIRED INT32 element;
928                }
929              }
930            }
931          }
932          OPTIONAL GROUP my_list (LIST) {
933            REPEATED GROUP element {
934              REQUIRED BINARY str (UTF8);
935            }
936          }
937          OPTIONAL GROUP my_list (LIST) {
938            REPEATED INT32 element;
939          }
940          OPTIONAL GROUP my_list (LIST) {
941            REPEATED GROUP element {
942              REQUIRED BINARY str (UTF8);
943              REQUIRED INT32 num;
944            }
945          }
946          OPTIONAL GROUP my_list (LIST) {
947            REPEATED GROUP array {
948              REQUIRED BINARY str (UTF8);
949            }
950
951          }
952          OPTIONAL GROUP my_list (LIST) {
953            REPEATED GROUP my_list_tuple {
954              REQUIRED BINARY str (UTF8);
955            }
956          }
957          REPEATED INT32 name;
958        }
959        ";
960
961        // // List<String> (list non-null, elements nullable)
962        // required group my_list (LIST) {
963        //   repeated group list {
964        //     optional binary element (UTF8);
965        //   }
966        // }
967        {
968            arrow_fields.push(Field::new_list(
969                "my_list",
970                Field::new("element", DataType::Utf8, true),
971                false,
972            ));
973        }
974
975        // // List<String> (list nullable, elements non-null)
976        // optional group my_list (LIST) {
977        //   repeated group list {
978        //     required binary element (UTF8);
979        //   }
980        // }
981        {
982            arrow_fields.push(Field::new_list(
983                "my_list",
984                Field::new("element", DataType::Utf8, false),
985                true,
986            ));
987        }
988
989        // Element types can be nested structures. For example, a list of lists:
990        //
991        // // List<List<Integer>>
992        // optional group array_of_arrays (LIST) {
993        //   repeated group list {
994        //     required group element (LIST) {
995        //       repeated group list {
996        //         required int32 element;
997        //       }
998        //     }
999        //   }
1000        // }
1001        {
1002            let arrow_inner_list = Field::new("element", DataType::Int32, false);
1003            arrow_fields.push(Field::new_list(
1004                "array_of_arrays",
1005                Field::new_list("element", arrow_inner_list, false),
1006                true,
1007            ));
1008        }
1009
1010        // // List<String> (list nullable, elements non-null)
1011        // optional group my_list (LIST) {
1012        //   repeated group element {
1013        //     required binary str (UTF8);
1014        //   };
1015        // }
1016        {
1017            arrow_fields.push(Field::new_list(
1018                "my_list",
1019                Field::new("str", DataType::Utf8, false),
1020                true,
1021            ));
1022        }
1023
1024        // // List<Integer> (nullable list, non-null elements)
1025        // optional group my_list (LIST) {
1026        //   repeated int32 element;
1027        // }
1028        {
1029            arrow_fields.push(Field::new_list(
1030                "my_list",
1031                Field::new("element", DataType::Int32, false),
1032                true,
1033            ));
1034        }
1035
1036        // // List<Tuple<String, Integer>> (nullable list, non-null elements)
1037        // optional group my_list (LIST) {
1038        //   repeated group element {
1039        //     required binary str (UTF8);
1040        //     required int32 num;
1041        //   };
1042        // }
1043        {
1044            let fields = vec![
1045                Field::new("str", DataType::Utf8, false),
1046                Field::new("num", DataType::Int32, false),
1047            ];
1048            arrow_fields.push(Field::new_list(
1049                "my_list",
1050                Field::new_struct("element", fields, false),
1051                true,
1052            ));
1053        }
1054
1055        // // List<OneTuple<String>> (nullable list, non-null elements)
1056        // optional group my_list (LIST) {
1057        //   repeated group array {
1058        //     required binary str (UTF8);
1059        //   };
1060        // }
1061        // Special case: group is named array
1062        {
1063            let fields = vec![Field::new("str", DataType::Utf8, false)];
1064            arrow_fields.push(Field::new_list(
1065                "my_list",
1066                Field::new_struct("array", fields, false),
1067                true,
1068            ));
1069        }
1070
1071        // // List<OneTuple<String>> (nullable list, non-null elements)
1072        // optional group my_list (LIST) {
1073        //   repeated group my_list_tuple {
1074        //     required binary str (UTF8);
1075        //   };
1076        // }
1077        // Special case: group named ends in _tuple
1078        {
1079            let fields = vec![Field::new("str", DataType::Utf8, false)];
1080            arrow_fields.push(Field::new_list(
1081                "my_list",
1082                Field::new_struct("my_list_tuple", fields, false),
1083                true,
1084            ));
1085        }
1086
1087        // One-level encoding: Only allows required lists with required cells
1088        //   repeated value_type name
1089        {
1090            arrow_fields.push(Field::new_list(
1091                "name",
1092                Field::new("name", DataType::Int32, false),
1093                false,
1094            ));
1095        }
1096
1097        let parquet_group_type = parse_message_type(message_type).unwrap();
1098
1099        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1100        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1101        let converted_fields = converted_arrow_schema.fields();
1102
1103        assert_eq!(arrow_fields.len(), converted_fields.len());
1104        for i in 0..arrow_fields.len() {
1105            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1106        }
1107    }
1108
1109    #[test]
1110    fn test_parquet_list_nullable() {
1111        let mut arrow_fields = Vec::new();
1112
1113        let message_type = "
1114        message test_schema {
1115          REQUIRED GROUP my_list1 (LIST) {
1116            REPEATED GROUP list {
1117              OPTIONAL BINARY element (UTF8);
1118            }
1119          }
1120          OPTIONAL GROUP my_list2 (LIST) {
1121            REPEATED GROUP list {
1122              REQUIRED BINARY element (UTF8);
1123            }
1124          }
1125          REQUIRED GROUP my_list3 (LIST) {
1126            REPEATED GROUP list {
1127              REQUIRED BINARY element (UTF8);
1128            }
1129          }
1130        }
1131        ";
1132
1133        // // List<String> (list non-null, elements nullable)
1134        // required group my_list1 (LIST) {
1135        //   repeated group list {
1136        //     optional binary element (UTF8);
1137        //   }
1138        // }
1139        {
1140            arrow_fields.push(Field::new_list(
1141                "my_list1",
1142                Field::new("element", DataType::Utf8, true),
1143                false,
1144            ));
1145        }
1146
1147        // // List<String> (list nullable, elements non-null)
1148        // optional group my_list2 (LIST) {
1149        //   repeated group list {
1150        //     required binary element (UTF8);
1151        //   }
1152        // }
1153        {
1154            arrow_fields.push(Field::new_list(
1155                "my_list2",
1156                Field::new("element", DataType::Utf8, false),
1157                true,
1158            ));
1159        }
1160
1161        // // List<String> (list non-null, elements non-null)
1162        // repeated group my_list3 (LIST) {
1163        //   repeated group list {
1164        //     required binary element (UTF8);
1165        //   }
1166        // }
1167        {
1168            arrow_fields.push(Field::new_list(
1169                "my_list3",
1170                Field::new("element", DataType::Utf8, false),
1171                false,
1172            ));
1173        }
1174
1175        let parquet_group_type = parse_message_type(message_type).unwrap();
1176
1177        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1178        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1179        let converted_fields = converted_arrow_schema.fields();
1180
1181        assert_eq!(arrow_fields.len(), converted_fields.len());
1182        for i in 0..arrow_fields.len() {
1183            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1184        }
1185    }
1186
1187    #[test]
1188    fn test_parquet_maps() {
1189        let mut arrow_fields = Vec::new();
1190
1191        // LIST encoding example taken from parquet-format/LogicalTypes.md
1192        let message_type = "
1193        message test_schema {
1194          REQUIRED group my_map1 (MAP) {
1195            REPEATED group key_value {
1196              REQUIRED binary key (UTF8);
1197              OPTIONAL int32 value;
1198            }
1199          }
1200          OPTIONAL group my_map2 (MAP) {
1201            REPEATED group map {
1202              REQUIRED binary str (UTF8);
1203              REQUIRED int32 num;
1204            }
1205          }
1206          OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1207            REPEATED group map {
1208              REQUIRED binary key (UTF8);
1209              OPTIONAL int32 value;
1210            }
1211          }
1212          REQUIRED group my_map4 (MAP) {
1213            REPEATED group map {
1214              OPTIONAL binary key (UTF8);
1215              REQUIRED int32 value;
1216            }
1217          }
1218        }
1219        ";
1220
1221        // // Map<String, Integer>
1222        // required group my_map (MAP) {
1223        //   repeated group key_value {
1224        //     required binary key (UTF8);
1225        //     optional int32 value;
1226        //   }
1227        // }
1228        {
1229            arrow_fields.push(Field::new_map(
1230                "my_map1",
1231                "key_value",
1232                Field::new("key", DataType::Utf8, false),
1233                Field::new("value", DataType::Int32, true),
1234                false,
1235                false,
1236            ));
1237        }
1238
1239        // // Map<String, Integer> (nullable map, non-null values)
1240        // optional group my_map (MAP) {
1241        //   repeated group map {
1242        //     required binary str (UTF8);
1243        //     required int32 num;
1244        //   }
1245        // }
1246        {
1247            arrow_fields.push(Field::new_map(
1248                "my_map2",
1249                "map",
1250                Field::new("str", DataType::Utf8, false),
1251                Field::new("num", DataType::Int32, false),
1252                false,
1253                true,
1254            ));
1255        }
1256
1257        // // Map<String, Integer> (nullable map, nullable values)
1258        // optional group my_map (MAP_KEY_VALUE) {
1259        //   repeated group map {
1260        //     required binary key (UTF8);
1261        //     optional int32 value;
1262        //   }
1263        // }
1264        {
1265            arrow_fields.push(Field::new_map(
1266                "my_map3",
1267                "map",
1268                Field::new("key", DataType::Utf8, false),
1269                Field::new("value", DataType::Int32, true),
1270                false,
1271                true,
1272            ));
1273        }
1274
1275        // // Map<String, Integer> (non-compliant nullable key)
1276        // group my_map (MAP_KEY_VALUE) {
1277        //   repeated group map {
1278        //     optional binary key (UTF8);
1279        //     required int32 value;
1280        //   }
1281        // }
1282        {
1283            arrow_fields.push(Field::new_map(
1284                "my_map4",
1285                "map",
1286                Field::new("key", DataType::Utf8, false), // The key is always non-nullable (#5630)
1287                Field::new("value", DataType::Int32, false),
1288                false,
1289                false,
1290            ));
1291        }
1292
1293        let parquet_group_type = parse_message_type(message_type).unwrap();
1294
1295        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1296        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1297        let converted_fields = converted_arrow_schema.fields();
1298
1299        assert_eq!(arrow_fields.len(), converted_fields.len());
1300        for i in 0..arrow_fields.len() {
1301            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1302        }
1303    }
1304
1305    #[test]
1306    fn test_nested_schema() {
1307        let mut arrow_fields = Vec::new();
1308        {
1309            let group1_fields = Fields::from(vec![
1310                Field::new("leaf1", DataType::Boolean, false),
1311                Field::new("leaf2", DataType::Int32, false),
1312            ]);
1313            let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1314            arrow_fields.push(group1_struct);
1315
1316            let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1317            arrow_fields.push(leaf3_field);
1318        }
1319
1320        let message_type = "
1321        message test_schema {
1322          REQUIRED GROUP group1 {
1323            REQUIRED BOOLEAN leaf1;
1324            REQUIRED INT32 leaf2;
1325          }
1326          REQUIRED INT64 leaf3;
1327        }
1328        ";
1329        let parquet_group_type = parse_message_type(message_type).unwrap();
1330
1331        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1332        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1333        let converted_fields = converted_arrow_schema.fields();
1334
1335        assert_eq!(arrow_fields.len(), converted_fields.len());
1336        for i in 0..arrow_fields.len() {
1337            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1338        }
1339    }
1340
1341    #[test]
1342    fn test_nested_schema_partial() {
1343        let mut arrow_fields = Vec::new();
1344        {
1345            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1346            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1347            arrow_fields.push(group1);
1348
1349            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1350            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1351            arrow_fields.push(group2);
1352
1353            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1354        }
1355
1356        let message_type = "
1357        message test_schema {
1358          REQUIRED GROUP group1 {
1359            REQUIRED INT64 leaf1;
1360            REQUIRED INT64 leaf2;
1361          }
1362          REQUIRED  GROUP group2 {
1363            REQUIRED INT64 leaf3;
1364            REQUIRED INT64 leaf4;
1365          }
1366          REQUIRED INT64 leaf5;
1367        }
1368        ";
1369        let parquet_group_type = parse_message_type(message_type).unwrap();
1370
1371        // Expected partial arrow schema (columns 0, 3, 4):
1372        // required group group1 {
1373        //   required int64 leaf1;
1374        // }
1375        // required group group2 {
1376        //   required int64 leaf4;
1377        // }
1378        // required int64 leaf5;
1379
1380        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1381        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1382        let converted_arrow_schema =
1383            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1384        let converted_fields = converted_arrow_schema.fields();
1385
1386        assert_eq!(arrow_fields.len(), converted_fields.len());
1387        for i in 0..arrow_fields.len() {
1388            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1389        }
1390    }
1391
1392    #[test]
1393    fn test_nested_schema_partial_ordering() {
1394        let mut arrow_fields = Vec::new();
1395        {
1396            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1397            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1398            arrow_fields.push(group1);
1399
1400            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1401            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1402            arrow_fields.push(group2);
1403
1404            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1405        }
1406
1407        let message_type = "
1408        message test_schema {
1409          REQUIRED GROUP group1 {
1410            REQUIRED INT64 leaf1;
1411            REQUIRED INT64 leaf2;
1412          }
1413          REQUIRED  GROUP group2 {
1414            REQUIRED INT64 leaf3;
1415            REQUIRED INT64 leaf4;
1416          }
1417          REQUIRED INT64 leaf5;
1418        }
1419        ";
1420        let parquet_group_type = parse_message_type(message_type).unwrap();
1421
1422        // Expected partial arrow schema (columns 3, 4, 0):
1423        // required group group1 {
1424        //   required int64 leaf1;
1425        // }
1426        // required group group2 {
1427        //   required int64 leaf4;
1428        // }
1429        // required int64 leaf5;
1430
1431        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1432        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1433        let converted_arrow_schema =
1434            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1435        let converted_fields = converted_arrow_schema.fields();
1436
1437        assert_eq!(arrow_fields.len(), converted_fields.len());
1438        for i in 0..arrow_fields.len() {
1439            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1440        }
1441
1442        let mask =
1443            ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1444        let converted_arrow_schema =
1445            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1446        let converted_fields = converted_arrow_schema.fields();
1447
1448        assert_eq!(arrow_fields.len(), converted_fields.len());
1449        for i in 0..arrow_fields.len() {
1450            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1451        }
1452    }
1453
1454    #[test]
1455    fn test_repeated_nested_schema() {
1456        let mut arrow_fields = Vec::new();
1457        {
1458            arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1459
1460            let inner_group_list = Field::new_list(
1461                "innerGroup",
1462                Field::new_struct(
1463                    "innerGroup",
1464                    vec![Field::new("leaf3", DataType::Int32, true)],
1465                    false,
1466                ),
1467                false,
1468            );
1469
1470            let outer_group_list = Field::new_list(
1471                "outerGroup",
1472                Field::new_struct(
1473                    "outerGroup",
1474                    vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1475                    false,
1476                ),
1477                false,
1478            );
1479            arrow_fields.push(outer_group_list);
1480        }
1481
1482        let message_type = "
1483        message test_schema {
1484          OPTIONAL INT32 leaf1;
1485          REPEATED GROUP outerGroup {
1486            OPTIONAL INT32 leaf2;
1487            REPEATED GROUP innerGroup {
1488              OPTIONAL INT32 leaf3;
1489            }
1490          }
1491        }
1492        ";
1493        let parquet_group_type = parse_message_type(message_type).unwrap();
1494
1495        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1496        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1497        let converted_fields = converted_arrow_schema.fields();
1498
1499        assert_eq!(arrow_fields.len(), converted_fields.len());
1500        for i in 0..arrow_fields.len() {
1501            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1502        }
1503    }
1504
1505    #[test]
1506    fn test_column_desc_to_field() {
1507        let message_type = "
1508        message test_schema {
1509            REQUIRED BOOLEAN boolean;
1510            REQUIRED INT32   int8  (INT_8);
1511            REQUIRED INT32   uint8 (INTEGER(8,false));
1512            REQUIRED INT32   int16 (INT_16);
1513            REQUIRED INT32   uint16 (INTEGER(16,false));
1514            REQUIRED INT32   int32;
1515            REQUIRED INT64   int64;
1516            OPTIONAL DOUBLE  double;
1517            OPTIONAL FLOAT   float;
1518            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1519            OPTIONAL BINARY  string (UTF8);
1520            REPEATED BOOLEAN bools;
1521            OPTIONAL INT32   date       (DATE);
1522            OPTIONAL INT32   time_milli (TIME_MILLIS);
1523            OPTIONAL INT64   time_micro (TIME_MICROS);
1524            OPTIONAL INT64   time_nano (TIME(NANOS,false));
1525            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1526            REQUIRED INT64   ts_micro (TIMESTAMP_MICROS);
1527            REQUIRED INT64   ts_nano (TIMESTAMP(NANOS,true));
1528            REPEATED INT32   int_list;
1529            REPEATED BINARY  byte_list;
1530            REPEATED BINARY  string_list (UTF8);
1531            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1532            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1533            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1534        }
1535        ";
1536        let parquet_group_type = parse_message_type(message_type).unwrap();
1537
1538        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1539        let converted_arrow_fields = parquet_schema
1540            .columns()
1541            .iter()
1542            .map(|c| parquet_to_arrow_field(c).unwrap())
1543            .collect::<Vec<Field>>();
1544
1545        let arrow_fields = vec![
1546            Field::new("boolean", DataType::Boolean, false),
1547            Field::new("int8", DataType::Int8, false),
1548            Field::new("uint8", DataType::UInt8, false),
1549            Field::new("int16", DataType::Int16, false),
1550            Field::new("uint16", DataType::UInt16, false),
1551            Field::new("int32", DataType::Int32, false),
1552            Field::new("int64", DataType::Int64, false),
1553            Field::new("double", DataType::Float64, true),
1554            Field::new("float", DataType::Float32, true),
1555            Field::new("float16", DataType::Float16, true),
1556            Field::new("string", DataType::Utf8, true),
1557            Field::new_list(
1558                "bools",
1559                Field::new("bools", DataType::Boolean, false),
1560                false,
1561            ),
1562            Field::new("date", DataType::Date32, true),
1563            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1564            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1565            Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1566            Field::new(
1567                "ts_milli",
1568                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1569                true,
1570            ),
1571            Field::new(
1572                "ts_micro",
1573                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1574                false,
1575            ),
1576            Field::new(
1577                "ts_nano",
1578                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1579                false,
1580            ),
1581            Field::new_list(
1582                "int_list",
1583                Field::new("int_list", DataType::Int32, false),
1584                false,
1585            ),
1586            Field::new_list(
1587                "byte_list",
1588                Field::new("byte_list", DataType::Binary, false),
1589                false,
1590            ),
1591            Field::new_list(
1592                "string_list",
1593                Field::new("string_list", DataType::Utf8, false),
1594                false,
1595            ),
1596            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1597            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1598            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1599        ];
1600
1601        assert_eq!(arrow_fields, converted_arrow_fields);
1602    }
1603
1604    #[test]
1605    fn test_coerced_map_list() {
1606        // Create Arrow schema with non-Parquet naming
1607        let arrow_fields = vec![
1608            Field::new_list(
1609                "my_list",
1610                Field::new("item", DataType::Boolean, true),
1611                false,
1612            ),
1613            Field::new_map(
1614                "my_map",
1615                "entries",
1616                Field::new("keys", DataType::Utf8, false),
1617                Field::new("values", DataType::Int32, true),
1618                false,
1619                true,
1620            ),
1621        ];
1622        let arrow_schema = Schema::new(arrow_fields);
1623
1624        // Create Parquet schema with coerced names
1625        let message_type = "
1626        message parquet_schema {
1627            REQUIRED GROUP my_list (LIST) {
1628                REPEATED GROUP list {
1629                    OPTIONAL BOOLEAN element;
1630                }
1631            }
1632            OPTIONAL GROUP my_map (MAP) {
1633                REPEATED GROUP key_value {
1634                    REQUIRED BINARY key (STRING);
1635                    OPTIONAL INT32 value;
1636                }
1637            }
1638        }
1639        ";
1640        let parquet_group_type = parse_message_type(message_type).unwrap();
1641        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1642        let converted_arrow_schema = ArrowSchemaConverter::new()
1643            .with_coerce_types(true)
1644            .convert(&arrow_schema)
1645            .unwrap();
1646        assert_eq!(
1647            parquet_schema.columns().len(),
1648            converted_arrow_schema.columns().len()
1649        );
1650
1651        // Create Parquet schema without coerced names
1652        let message_type = "
1653        message parquet_schema {
1654            REQUIRED GROUP my_list (LIST) {
1655                REPEATED GROUP list {
1656                    OPTIONAL BOOLEAN item;
1657                }
1658            }
1659            OPTIONAL GROUP my_map (MAP) {
1660                REPEATED GROUP entries {
1661                    REQUIRED BINARY keys (STRING);
1662                    OPTIONAL INT32 values;
1663                }
1664            }
1665        }
1666        ";
1667        let parquet_group_type = parse_message_type(message_type).unwrap();
1668        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1669        let converted_arrow_schema = ArrowSchemaConverter::new()
1670            .with_coerce_types(false)
1671            .convert(&arrow_schema)
1672            .unwrap();
1673        assert_eq!(
1674            parquet_schema.columns().len(),
1675            converted_arrow_schema.columns().len()
1676        );
1677    }
1678
1679    #[test]
1680    fn test_field_to_column_desc() {
1681        let message_type = "
1682        message arrow_schema {
1683            REQUIRED BOOLEAN boolean;
1684            REQUIRED INT32   int8  (INT_8);
1685            REQUIRED INT32   int16 (INTEGER(16,true));
1686            REQUIRED INT32   int32;
1687            REQUIRED INT64   int64;
1688            OPTIONAL DOUBLE  double;
1689            OPTIONAL FLOAT   float;
1690            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1691            OPTIONAL BINARY  string (STRING);
1692            OPTIONAL GROUP   bools (LIST) {
1693                REPEATED GROUP list {
1694                    OPTIONAL BOOLEAN element;
1695                }
1696            }
1697            REQUIRED GROUP   bools_non_null (LIST) {
1698                REPEATED GROUP list {
1699                    REQUIRED BOOLEAN element;
1700                }
1701            }
1702            OPTIONAL INT32   date       (DATE);
1703            OPTIONAL INT32   time_milli (TIME(MILLIS,false));
1704            OPTIONAL INT32   time_milli_utc (TIME(MILLIS,true));
1705            OPTIONAL INT64   time_micro (TIME_MICROS);
1706            OPTIONAL INT64   time_micro_utc (TIME(MICROS, true));
1707            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1708            REQUIRED INT64   ts_micro (TIMESTAMP(MICROS,false));
1709            REQUIRED INT64   ts_seconds;
1710            REQUIRED INT64   ts_micro_utc (TIMESTAMP(MICROS, true));
1711            REQUIRED INT64   ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1712            REQUIRED INT64   ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1713            REQUIRED INT64   ts_micro_non_utc (TIMESTAMP(MICROS, true));
1714            REQUIRED GROUP struct {
1715                REQUIRED BOOLEAN bools;
1716                REQUIRED INT32 uint32 (INTEGER(32,false));
1717                REQUIRED GROUP   int32 (LIST) {
1718                    REPEATED GROUP list {
1719                        OPTIONAL INT32 element;
1720                    }
1721                }
1722            }
1723            REQUIRED BINARY  dictionary_strings (STRING);
1724            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1725            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1726            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1727            REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1728            REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1729        }
1730        ";
1731        let parquet_group_type = parse_message_type(message_type).unwrap();
1732
1733        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1734
1735        let arrow_fields = vec![
1736            Field::new("boolean", DataType::Boolean, false),
1737            Field::new("int8", DataType::Int8, false),
1738            Field::new("int16", DataType::Int16, false),
1739            Field::new("int32", DataType::Int32, false),
1740            Field::new("int64", DataType::Int64, false),
1741            Field::new("double", DataType::Float64, true),
1742            Field::new("float", DataType::Float32, true),
1743            Field::new("float16", DataType::Float16, true),
1744            Field::new("string", DataType::Utf8, true),
1745            Field::new_list(
1746                "bools",
1747                Field::new("element", DataType::Boolean, true),
1748                true,
1749            ),
1750            Field::new_list(
1751                "bools_non_null",
1752                Field::new("element", DataType::Boolean, false),
1753                false,
1754            ),
1755            Field::new("date", DataType::Date32, true),
1756            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1757            Field::new(
1758                "time_milli_utc",
1759                DataType::Time32(TimeUnit::Millisecond),
1760                true,
1761            )
1762            .with_metadata(HashMap::from_iter(vec![(
1763                "adjusted_to_utc".to_string(),
1764                "".to_string(),
1765            )])),
1766            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1767            Field::new(
1768                "time_micro_utc",
1769                DataType::Time64(TimeUnit::Microsecond),
1770                true,
1771            )
1772            .with_metadata(HashMap::from_iter(vec![(
1773                "adjusted_to_utc".to_string(),
1774                "".to_string(),
1775            )])),
1776            Field::new(
1777                "ts_milli",
1778                DataType::Timestamp(TimeUnit::Millisecond, None),
1779                true,
1780            ),
1781            Field::new(
1782                "ts_micro",
1783                DataType::Timestamp(TimeUnit::Microsecond, None),
1784                false,
1785            ),
1786            Field::new(
1787                "ts_seconds",
1788                DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1789                false,
1790            ),
1791            Field::new(
1792                "ts_micro_utc",
1793                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1794                false,
1795            ),
1796            Field::new(
1797                "ts_millis_zero_offset",
1798                DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1799                false,
1800            ),
1801            Field::new(
1802                "ts_millis_zero_negative_offset",
1803                DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1804                false,
1805            ),
1806            Field::new(
1807                "ts_micro_non_utc",
1808                DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1809                false,
1810            ),
1811            Field::new_struct(
1812                "struct",
1813                vec![
1814                    Field::new("bools", DataType::Boolean, false),
1815                    Field::new("uint32", DataType::UInt32, false),
1816                    Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1817                ],
1818                false,
1819            ),
1820            Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1821            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1822            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1823            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1824            Field::new("decimal128", DataType::Decimal128(38, 2), false),
1825            Field::new("decimal256", DataType::Decimal256(39, 2), false),
1826        ];
1827        let arrow_schema = Schema::new(arrow_fields);
1828        let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1829
1830        assert_eq!(
1831            parquet_schema.columns().len(),
1832            converted_arrow_schema.columns().len()
1833        );
1834        parquet_schema
1835            .columns()
1836            .iter()
1837            .zip(converted_arrow_schema.columns())
1838            .for_each(|(a, b)| {
1839                // Only check logical type if it's set on the Parquet side.
1840                // This is because the Arrow conversion always sets logical type,
1841                // even if there wasn't originally one.
1842                // This is not an issue, but is an inconvenience for this test.
1843                match a.logical_type() {
1844                    Some(_) => {
1845                        assert_eq!(a, b)
1846                    }
1847                    None => {
1848                        assert_eq!(a.name(), b.name());
1849                        assert_eq!(a.physical_type(), b.physical_type());
1850                        assert_eq!(a.converted_type(), b.converted_type());
1851                    }
1852                };
1853            });
1854    }
1855
1856    #[test]
1857    #[should_panic(expected = "Parquet does not support writing empty structs")]
1858    fn test_empty_struct_field() {
1859        let arrow_fields = vec![Field::new(
1860            "struct",
1861            DataType::Struct(Fields::empty()),
1862            false,
1863        )];
1864        let arrow_schema = Schema::new(arrow_fields);
1865        let converted_arrow_schema = ArrowSchemaConverter::new()
1866            .with_coerce_types(true)
1867            .convert(&arrow_schema);
1868
1869        converted_arrow_schema.unwrap();
1870    }
1871
1872    #[test]
1873    fn test_metadata() {
1874        let message_type = "
1875        message test_schema {
1876            OPTIONAL BINARY  string (STRING);
1877        }
1878        ";
1879        let parquet_group_type = parse_message_type(message_type).unwrap();
1880
1881        let key_value_metadata = vec![
1882            KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
1883            KeyValue::new("baz".to_owned(), None),
1884        ];
1885
1886        let mut expected_metadata: HashMap<String, String> = HashMap::new();
1887        expected_metadata.insert("foo".to_owned(), "bar".to_owned());
1888
1889        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1890        let converted_arrow_schema =
1891            parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
1892
1893        assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
1894    }
1895
1896    #[test]
1897    fn test_arrow_schema_roundtrip() -> Result<()> {
1898        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
1899            a.iter()
1900                .map(|(a, b)| (a.to_string(), b.to_string()))
1901                .collect()
1902        };
1903
1904        let schema = Schema::new_with_metadata(
1905            vec![
1906                Field::new("c1", DataType::Utf8, false)
1907                    .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
1908                Field::new("c2", DataType::Binary, false),
1909                Field::new("c3", DataType::FixedSizeBinary(3), false),
1910                Field::new("c4", DataType::Boolean, false),
1911                Field::new("c5", DataType::Date32, false),
1912                Field::new("c6", DataType::Date64, false),
1913                Field::new("c7", DataType::Time32(TimeUnit::Second), false),
1914                Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
1915                Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
1916                Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
1917                Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
1918                Field::new(
1919                    "c16",
1920                    DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1921                    false,
1922                ),
1923                Field::new(
1924                    "c17",
1925                    DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1926                    false,
1927                ),
1928                Field::new(
1929                    "c18",
1930                    DataType::Timestamp(TimeUnit::Nanosecond, None),
1931                    false,
1932                ),
1933                Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
1934                Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
1935                Field::new_list(
1936                    "c21",
1937                    Field::new_list_field(DataType::Boolean, true)
1938                        .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
1939                    false,
1940                )
1941                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
1942                Field::new(
1943                    "c22",
1944                    DataType::FixedSizeList(
1945                        Arc::new(Field::new_list_field(DataType::Boolean, true)),
1946                        5,
1947                    ),
1948                    false,
1949                ),
1950                Field::new_list(
1951                    "c23",
1952                    Field::new_large_list(
1953                        "inner",
1954                        Field::new_list_field(
1955                            DataType::Struct(
1956                                vec![
1957                                    Field::new("a", DataType::Int16, true),
1958                                    Field::new("b", DataType::Float64, false),
1959                                    Field::new("c", DataType::Float32, false),
1960                                    Field::new("d", DataType::Float16, false),
1961                                ]
1962                                .into(),
1963                            ),
1964                            false,
1965                        ),
1966                        true,
1967                    ),
1968                    false,
1969                ),
1970                Field::new(
1971                    "c24",
1972                    DataType::Struct(Fields::from(vec![
1973                        Field::new("a", DataType::Utf8, false),
1974                        Field::new("b", DataType::UInt16, false),
1975                    ])),
1976                    false,
1977                ),
1978                Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
1979                Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
1980                // Duration types not supported
1981                // Field::new("c27", DataType::Duration(TimeUnit::Second), false),
1982                // Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false),
1983                // Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false),
1984                // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false),
1985                #[allow(deprecated)]
1986                Field::new_dict(
1987                    "c31",
1988                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1989                    true,
1990                    123,
1991                    true,
1992                )
1993                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
1994                Field::new("c32", DataType::LargeBinary, true),
1995                Field::new("c33", DataType::LargeUtf8, true),
1996                Field::new_large_list(
1997                    "c34",
1998                    Field::new_list(
1999                        "inner",
2000                        Field::new_list_field(
2001                            DataType::Struct(
2002                                vec![
2003                                    Field::new("a", DataType::Int16, true),
2004                                    Field::new("b", DataType::Float64, true),
2005                                ]
2006                                .into(),
2007                            ),
2008                            true,
2009                        ),
2010                        true,
2011                    ),
2012                    true,
2013                ),
2014                Field::new("c35", DataType::Null, true),
2015                Field::new("c36", DataType::Decimal128(2, 1), false),
2016                Field::new("c37", DataType::Decimal256(50, 20), false),
2017                Field::new("c38", DataType::Decimal128(18, 12), true),
2018                Field::new_map(
2019                    "c39",
2020                    "key_value",
2021                    Field::new("key", DataType::Utf8, false),
2022                    Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2023                    false, // fails to roundtrip keys_sorted
2024                    true,
2025                ),
2026                Field::new_map(
2027                    "c40",
2028                    "my_entries",
2029                    Field::new("my_key", DataType::Utf8, false)
2030                        .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2031                    Field::new_list(
2032                        "my_value",
2033                        Field::new_list_field(DataType::Utf8, true)
2034                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2035                        true,
2036                    )
2037                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2038                    false, // fails to roundtrip keys_sorted
2039                    true,
2040                )
2041                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2042                Field::new_map(
2043                    "c41",
2044                    "my_entries",
2045                    Field::new("my_key", DataType::Utf8, false),
2046                    Field::new_list(
2047                        "my_value",
2048                        Field::new_list_field(DataType::Utf8, true)
2049                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2050                        true,
2051                    ),
2052                    false, // fails to roundtrip keys_sorted
2053                    false,
2054                ),
2055                Field::new("c42", DataType::Decimal32(5, 2), false),
2056                Field::new("c43", DataType::Decimal64(18, 12), true),
2057            ],
2058            meta(&[("Key", "Value")]),
2059        );
2060
2061        // write to an empty parquet file so that schema is serialized
2062        let file = tempfile::tempfile().unwrap();
2063        let writer =
2064            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2065        writer.close()?;
2066
2067        // read file back
2068        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2069
2070        // Check arrow schema
2071        let read_schema = arrow_reader.schema();
2072        assert_eq!(&schema, read_schema.as_ref());
2073
2074        // Walk schema finding field IDs
2075        let mut stack = Vec::with_capacity(10);
2076        let mut out = Vec::with_capacity(10);
2077
2078        let root = arrow_reader.parquet_schema().root_schema_ptr();
2079        stack.push((root.name().to_string(), root));
2080
2081        while let Some((p, t)) = stack.pop() {
2082            if t.is_group() {
2083                for f in t.get_fields() {
2084                    stack.push((format!("{p}.{}", f.name()), f.clone()))
2085                }
2086            }
2087
2088            let info = t.get_basic_info();
2089            if info.has_id() {
2090                out.push(format!("{p} -> {}", info.id()))
2091            }
2092        }
2093        out.sort_unstable();
2094        let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2095
2096        assert_eq!(
2097            &out,
2098            &[
2099                "arrow_schema.c1 -> 2",
2100                "arrow_schema.c21 -> 4",
2101                "arrow_schema.c21.list.item -> 5",
2102                "arrow_schema.c31 -> 6",
2103                "arrow_schema.c40 -> 7",
2104                "arrow_schema.c40.my_entries.my_key -> 8",
2105                "arrow_schema.c40.my_entries.my_value -> 9",
2106                "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2107                "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2108            ]
2109        );
2110
2111        Ok(())
2112    }
2113
2114    #[test]
2115    fn test_read_parquet_field_ids_raw() -> Result<()> {
2116        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2117            a.iter()
2118                .map(|(a, b)| (a.to_string(), b.to_string()))
2119                .collect()
2120        };
2121        let schema = Schema::new_with_metadata(
2122            vec![
2123                Field::new("c1", DataType::Utf8, true)
2124                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2125                Field::new("c2", DataType::Utf8, true)
2126                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2127            ],
2128            HashMap::new(),
2129        );
2130
2131        let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2132        let parquet_bytes = writer.into_inner()?;
2133
2134        let reader =
2135            crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2136        let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2137
2138        // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema
2139        let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2140
2141        let parq_schema_descr = ArrowSchemaConverter::new()
2142            .with_coerce_types(true)
2143            .convert(&arrow_schema)?;
2144        let parq_fields = parq_schema_descr.root_schema().get_fields();
2145        assert_eq!(parq_fields.len(), 2);
2146        assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2147        assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2148
2149        Ok(())
2150    }
2151
2152    #[test]
2153    fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2154        let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2155            .iter()
2156            .cloned()
2157            .collect();
2158
2159        let schema = Schema::new_with_metadata(
2160            vec![
2161                Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2162                Field::new(
2163                    "c22",
2164                    DataType::FixedSizeList(
2165                        Arc::new(Field::new("items", DataType::Boolean, false)),
2166                        5,
2167                    ),
2168                    false,
2169                ),
2170                Field::new_list(
2171                    "c23",
2172                    Field::new_large_list(
2173                        "items",
2174                        Field::new_struct(
2175                            "items",
2176                            vec![
2177                                Field::new("a", DataType::Int16, true),
2178                                Field::new("b", DataType::Float64, false),
2179                            ],
2180                            true,
2181                        ),
2182                        true,
2183                    ),
2184                    true,
2185                ),
2186            ],
2187            metadata,
2188        );
2189
2190        // write to an empty parquet file so that schema is serialized
2191        let file = tempfile::tempfile().unwrap();
2192        let writer =
2193            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2194        writer.close()?;
2195
2196        // read file back
2197        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2198        let read_schema = arrow_reader.schema();
2199        assert_eq!(&schema, read_schema.as_ref());
2200        Ok(())
2201    }
2202
2203    #[test]
2204    fn test_get_arrow_schema_from_metadata() {
2205        assert!(get_arrow_schema_from_metadata("").is_err());
2206    }
2207
2208    #[test]
2209    #[cfg(feature = "arrow_canonical_extension_types")]
2210    fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2211        use arrow_schema::extension::Uuid;
2212        let arrow_schema = Schema::new(vec![
2213            Field::new("uuid", DataType::FixedSizeBinary(16), false).with_extension_type(Uuid),
2214        ]);
2215
2216        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2217
2218        assert_eq!(
2219            parquet_schema.column(0).logical_type(),
2220            Some(LogicalType::Uuid)
2221        );
2222
2223        let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2224        assert_eq!(arrow_schema.field(0).try_extension_type::<Uuid>()?, Uuid);
2225
2226        Ok(())
2227    }
2228
2229    #[test]
2230    #[cfg(feature = "arrow_canonical_extension_types")]
2231    fn arrow_json_to_parquet_json() -> Result<()> {
2232        use arrow_schema::extension::Json;
2233        let arrow_schema = Schema::new(vec![
2234            Field::new("json", DataType::Utf8, false).with_extension_type(Json::default()),
2235        ]);
2236
2237        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2238
2239        assert_eq!(
2240            parquet_schema.column(0).logical_type(),
2241            Some(LogicalType::Json)
2242        );
2243
2244        let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2245        assert_eq!(
2246            arrow_schema.field(0).try_extension_type::<Json>()?,
2247            Json::default()
2248        );
2249
2250        Ok(())
2251    }
2252}