Skip to main content

parquet/arrow/schema/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Converting Parquet schema <--> Arrow schema: [`ArrowSchemaConverter`] and [parquet_to_arrow_schema]
19
20use base64::Engine;
21use base64::prelude::BASE64_STANDARD;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, TimeUnit};
27
28use crate::basic::{
29    ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
30};
31use crate::errors::{ParquetError, Result};
32use crate::file::{metadata::KeyValue, properties::WriterProperties};
33use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
34
35mod complex;
36mod extension;
37mod primitive;
38pub mod virtual_type;
39
40use super::PARQUET_FIELD_ID_META_KEY;
41use crate::arrow::ProjectionMask;
42use crate::arrow::schema::extension::{
43    has_extension_type, logical_type_for_binary, logical_type_for_binary_view,
44    logical_type_for_fixed_size_binary, logical_type_for_string, logical_type_for_struct,
45    try_add_extension_type,
46};
47pub(crate) use complex::{ParquetField, ParquetFieldType, VirtualColumnType};
48
49/// Convert Parquet schema to Arrow schema including optional metadata
50///
51/// Attempts to decode any existing Arrow schema metadata, falling back
52/// to converting the Parquet schema column-wise
53pub fn parquet_to_arrow_schema(
54    parquet_schema: &SchemaDescriptor,
55    key_value_metadata: Option<&Vec<KeyValue>>,
56) -> Result<Schema> {
57    parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
58}
59
60/// Convert parquet schema to arrow schema including optional metadata,
61/// only preserving some leaf columns.
62pub fn parquet_to_arrow_schema_by_columns(
63    parquet_schema: &SchemaDescriptor,
64    mask: ProjectionMask,
65    key_value_metadata: Option<&Vec<KeyValue>>,
66) -> Result<Schema> {
67    Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata, &[])?.0)
68}
69
70/// Determines the Arrow Schema from a Parquet schema
71///
72/// Looks for an Arrow schema metadata "hint" (see
73/// [`parquet_to_arrow_field_levels`]), and uses it if present to ensure
74/// lossless round trips.
75pub(crate) fn parquet_to_arrow_schema_and_fields(
76    parquet_schema: &SchemaDescriptor,
77    mask: ProjectionMask,
78    key_value_metadata: Option<&Vec<KeyValue>>,
79    virtual_columns: &[FieldRef],
80) -> Result<(Schema, Option<ParquetField>)> {
81    let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
82    let maybe_schema = metadata
83        .remove(super::ARROW_SCHEMA_META_KEY)
84        .map(|value| get_arrow_schema_from_metadata(&value))
85        .transpose()?;
86
87    // Add the Arrow metadata to the Parquet metadata skipping keys that collide
88    if let Some(arrow_schema) = &maybe_schema {
89        arrow_schema.metadata().iter().for_each(|(k, v)| {
90            metadata.entry(k.clone()).or_insert_with(|| v.clone());
91        });
92    }
93
94    let hint = maybe_schema.as_ref().map(|s| s.fields());
95    let field_levels =
96        parquet_to_arrow_field_levels_with_virtual(parquet_schema, mask, hint, virtual_columns)?;
97    let schema = Schema::new_with_metadata(field_levels.fields, metadata);
98    Ok((schema, field_levels.levels))
99}
100
101/// Schema information necessary to decode a parquet file as arrow [`Fields`]
102///
103/// In particular this stores the dremel-level information necessary to correctly
104/// interpret the encoded definition and repetition levels
105///
106/// Note: this is an opaque container intended to be used with lower-level APIs
107/// within this crate
108#[derive(Debug, Clone)]
109pub struct FieldLevels {
110    pub(crate) fields: Fields,
111    pub(crate) levels: Option<ParquetField>,
112}
113
114/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`]
115///
116/// Columns not included within [`ProjectionMask`] will be ignored.
117///
118/// The optional `hint` parameter is the desired Arrow schema. See the
119/// [`arrow`] module documentation for more information.
120///
121/// [`arrow`]: crate::arrow
122///
123/// # Notes:
124/// Where a field type in `hint` is compatible with the corresponding parquet type in `schema`, it
125/// will be used, otherwise the default arrow type for the given parquet column type will be used.
126///
127/// This is to accommodate arrow types that cannot be round-tripped through parquet natively.
128/// Depending on the parquet writer, this can lead to a mismatch between a file's parquet schema
129/// and its embedded arrow schema. The parquet `schema` must be treated as authoritative in such
130/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for more information
131///
132/// Note: this is a low-level API, most users will want to make use of the higher-level
133/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file.
134pub fn parquet_to_arrow_field_levels(
135    schema: &SchemaDescriptor,
136    mask: ProjectionMask,
137    hint: Option<&Fields>,
138) -> Result<FieldLevels> {
139    parquet_to_arrow_field_levels_with_virtual(schema, mask, hint, &[])
140}
141
142/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`] with support for virtual columns
143///
144/// Columns not included within [`ProjectionMask`] will be ignored.
145///
146/// The optional `hint` parameter is the desired Arrow schema. See the
147/// [`arrow`] module documentation for more information.
148///
149/// [`arrow`]: crate::arrow
150///
151/// # Arguments
152/// * `schema` - The Parquet schema descriptor
153/// * `mask` - Projection mask to select which columns to include
154/// * `hint` - Optional hint for Arrow field types to use instead of defaults
155/// * `virtual_columns` - Virtual columns to append to the schema (e.g., row numbers)
156///
157/// # Notes:
158/// Where a field type in `hint` is compatible with the corresponding parquet type in `schema`, it
159/// will be used, otherwise the default arrow type for the given parquet column type will be used.
160///
161/// Virtual columns are columns that don't exist in the Parquet file but are generated during reading.
162/// They must have extension type names starting with "arrow.virtual.".
163///
164/// This is to accommodate arrow types that cannot be round-tripped through parquet natively.
165/// Depending on the parquet writer, this can lead to a mismatch between a file's parquet schema
166/// and its embedded arrow schema. The parquet `schema` must be treated as authoritative in such
167/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for more information
168///
169/// Note: this is a low-level API, most users will want to make use of the higher-level
170/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file.
171pub fn parquet_to_arrow_field_levels_with_virtual(
172    schema: &SchemaDescriptor,
173    mask: ProjectionMask,
174    hint: Option<&Fields>,
175    virtual_columns: &[FieldRef],
176) -> Result<FieldLevels> {
177    // Validate that all fields are virtual columns
178    for field in virtual_columns {
179        if !virtual_type::is_virtual_column(field) {
180            return Err(ParquetError::General(format!(
181                "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
182                field.name()
183            )));
184        }
185    }
186
187    // Convert the regular schema first
188    let mut parquet_field = match complex::convert_schema(schema, mask, hint)? {
189        Some(field) => field,
190        None if virtual_columns.is_empty() => {
191            return Ok(FieldLevels {
192                fields: Fields::empty(),
193                levels: None,
194            });
195        }
196        None => {
197            // No regular fields, but we have virtual columns - create empty root struct
198            ParquetField {
199                rep_level: 0,
200                def_level: 0,
201                nullable: false,
202                arrow_type: DataType::Struct(Fields::empty()),
203                field_type: ParquetFieldType::Group {
204                    children: Vec::new(),
205                },
206            }
207        }
208    };
209
210    // Append virtual columns if any
211    if !virtual_columns.is_empty() {
212        match &mut parquet_field.field_type {
213            ParquetFieldType::Group { children } => {
214                // Get the mutable fields from the struct type
215                let DataType::Struct(ref mut fields) = parquet_field.arrow_type else {
216                    unreachable!("Root field must be a struct");
217                };
218
219                // Convert to mutable Vec to append
220                let mut fields_vec: Vec<FieldRef> = fields.iter().cloned().collect();
221
222                // Append each virtual column
223                for virtual_column in virtual_columns {
224                    // Virtual columns can only be added at the root level
225                    assert_eq!(
226                        parquet_field.rep_level, 0,
227                        "Virtual columns can only be added at rep level 0"
228                    );
229                    assert_eq!(
230                        parquet_field.def_level, 0,
231                        "Virtual columns can only be added at def level 0"
232                    );
233
234                    fields_vec.push(virtual_column.clone());
235                    let virtual_parquet_field = complex::convert_virtual_field(
236                        virtual_column,
237                        parquet_field.rep_level,
238                        parquet_field.def_level,
239                    )?;
240                    children.push(virtual_parquet_field);
241                }
242
243                // Update the fields
244                parquet_field.arrow_type = DataType::Struct(Fields::from(fields_vec));
245            }
246            _ => unreachable!("Root field must be a group"),
247        }
248    }
249
250    match &parquet_field.arrow_type {
251        DataType::Struct(fields) => Ok(FieldLevels {
252            fields: fields.clone(),
253            levels: Some(parquet_field),
254        }),
255        _ => unreachable!(),
256    }
257}
258
259/// Try to convert Arrow schema metadata into a schema
260fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
261    let decoded = BASE64_STANDARD.decode(encoded_meta);
262    match decoded {
263        Ok(bytes) => {
264            let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
265                &bytes[8..]
266            } else {
267                bytes.as_slice()
268            };
269            match arrow_ipc::root_as_message(slice) {
270                Ok(message) => message
271                    .header_as_schema()
272                    .map(arrow_ipc::convert::fb_to_schema)
273                    .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
274                Err(err) => {
275                    // The flatbuffers implementation returns an error on verification error.
276                    Err(arrow_err!(
277                        "Unable to get root as message stored in {}: {:?}",
278                        super::ARROW_SCHEMA_META_KEY,
279                        err
280                    ))
281                }
282            }
283        }
284        Err(err) => {
285            // The C++ implementation returns an error if the schema can't be parsed.
286            Err(arrow_err!(
287                "Unable to decode the encoded schema stored in {}, {:?}",
288                super::ARROW_SCHEMA_META_KEY,
289                err
290            ))
291        }
292    }
293}
294
295/// Encodes the Arrow schema into the IPC format, and base64 encodes it
296pub fn encode_arrow_schema(schema: &Schema) -> String {
297    let options = writer::IpcWriteOptions::default();
298    let mut dictionary_tracker = writer::DictionaryTracker::new(true);
299    let data_gen = writer::IpcDataGenerator::default();
300    let mut serialized_schema =
301        data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
302
303    // manually prepending the length to the schema as arrow uses the legacy IPC format
304    // TODO: change after addressing ARROW-9777
305    let schema_len = serialized_schema.ipc_message.len();
306    let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
307    len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
308    len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
309    len_prefix_schema.append(&mut serialized_schema.ipc_message);
310
311    BASE64_STANDARD.encode(&len_prefix_schema)
312}
313
314/// Mutates writer metadata by storing the encoded Arrow schema hint in
315/// [`ARROW_SCHEMA_META_KEY`].
316///
317/// If there is an existing Arrow schema metadata, it is replaced.
318///
319/// [`ARROW_SCHEMA_META_KEY`]: crate::arrow::ARROW_SCHEMA_META_KEY
320pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
321    let encoded = encode_arrow_schema(schema);
322
323    let schema_kv = KeyValue {
324        key: super::ARROW_SCHEMA_META_KEY.to_string(),
325        value: Some(encoded),
326    };
327
328    let meta = props
329        .key_value_metadata
330        .get_or_insert_with(Default::default);
331
332    // check if ARROW:schema exists, and overwrite it
333    let schema_meta = meta
334        .iter()
335        .enumerate()
336        .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
337    match schema_meta {
338        Some((i, _)) => {
339            meta.remove(i);
340            meta.push(schema_kv);
341        }
342        None => {
343            meta.push(schema_kv);
344        }
345    }
346}
347
348/// Converter for Arrow schema to Parquet schema
349///
350/// See the documentation on the [`arrow`] module for background
351/// information on how Arrow schema is represented in Parquet.
352///
353/// [`arrow`]: crate::arrow
354///
355/// # Example:
356/// ```
357/// # use std::sync::Arc;
358/// # use arrow_schema::{Field, Schema, DataType};
359/// # use parquet::arrow::ArrowSchemaConverter;
360/// use parquet::schema::types::{SchemaDescriptor, Type};
361/// use parquet::basic; // note there are two `Type`s in the following example
362/// // create an Arrow Schema
363/// let arrow_schema = Schema::new(vec![
364///   Field::new("a", DataType::Int64, true),
365///   Field::new("b", DataType::Date32, true),
366/// ]);
367/// // convert the Arrow schema to a Parquet schema
368/// let parquet_schema = ArrowSchemaConverter::new()
369///   .convert(&arrow_schema)
370///   .unwrap();
371///
372/// let expected_parquet_schema = SchemaDescriptor::new(
373///   Arc::new(
374///     Type::group_type_builder("arrow_schema")
375///       .with_fields(vec![
376///         Arc::new(
377///          Type::primitive_type_builder("a", basic::Type::INT64)
378///           .build().unwrap()
379///         ),
380///         Arc::new(
381///          Type::primitive_type_builder("b", basic::Type::INT32)
382///           .with_converted_type(basic::ConvertedType::DATE)
383///           .with_logical_type(Some(basic::LogicalType::Date))
384///           .build().unwrap()
385///         ),
386///      ])
387///      .build().unwrap()
388///   )
389/// );
390/// assert_eq!(parquet_schema, expected_parquet_schema);
391/// ```
392#[derive(Debug)]
393pub struct ArrowSchemaConverter<'a> {
394    /// Name of the root schema in Parquet
395    schema_root: &'a str,
396    /// Should we coerce Arrow types to compatible Parquet types?
397    ///
398    /// See docs on [Self::with_coerce_types]`
399    coerce_types: bool,
400}
401
402impl Default for ArrowSchemaConverter<'_> {
403    fn default() -> Self {
404        Self::new()
405    }
406}
407
408impl<'a> ArrowSchemaConverter<'a> {
409    /// Create a new converter
410    pub fn new() -> Self {
411        Self {
412            schema_root: "arrow_schema",
413            coerce_types: false,
414        }
415    }
416
417    /// Should Arrow types be coerced into Parquet native types (default `false`).
418    ///
419    /// Setting this option to `true` will result in Parquet files that can be
420    /// read by more readers, but may lose precision for Arrow types such as
421    /// [`DataType::Date64`] which have no direct [corresponding Parquet type].
422    ///
423    /// By default, this converter does not coerce to native Parquet types. Enabling type
424    /// coercion allows for meaningful representations that do not require
425    /// downstream readers to consider the embedded Arrow schema, and can allow
426    /// for greater compatibility with other Parquet implementations. However,
427    /// type coercion also prevents data from being losslessly round-tripped.
428    ///
429    /// # Discussion
430    ///
431    /// Some Arrow types such as `Date64`, `Timestamp` and `Interval` have no
432    /// corresponding Parquet logical type. Thus, they can not be losslessly
433    /// round-tripped when stored using the appropriate Parquet logical type.
434    /// For example, some Date64 values may be truncated when stored with
435    /// parquet's native 32 bit date type.
436    ///
437    /// For [`List`] and [`Map`] types, some Parquet readers expect certain
438    /// schema elements to have specific names (earlier versions of the spec
439    /// were somewhat ambiguous on this point). Type coercion will use the names
440    /// prescribed by the Parquet specification, potentially losing naming
441    /// metadata from the Arrow schema.
442    ///
443    /// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
444    /// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
445    /// [corresponding Parquet type]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date
446    ///
447    pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
448        self.coerce_types = coerce_types;
449        self
450    }
451
452    /// Set the root schema element name (defaults to `"arrow_schema"`).
453    pub fn schema_root(mut self, schema_root: &'a str) -> Self {
454        self.schema_root = schema_root;
455        self
456    }
457
458    /// Convert the specified Arrow [`Schema`] to the desired Parquet [`SchemaDescriptor`]
459    ///
460    /// See example in [`ArrowSchemaConverter`]
461    pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
462        let fields = schema
463            .fields()
464            .iter()
465            .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
466            .collect::<Result<_>>()?;
467        let group = Type::group_type_builder(self.schema_root)
468            .with_fields(fields)
469            .build()?;
470        Ok(SchemaDescriptor::new(Arc::new(group)))
471    }
472}
473
474fn parse_key_value_metadata(
475    key_value_metadata: Option<&Vec<KeyValue>>,
476) -> Option<HashMap<String, String>> {
477    match key_value_metadata {
478        Some(key_values) => {
479            let map: HashMap<String, String> = key_values
480                .iter()
481                .filter_map(|kv| {
482                    kv.value
483                        .as_ref()
484                        .map(|value| (kv.key.clone(), value.clone()))
485                })
486                .collect();
487
488            if map.is_empty() { None } else { Some(map) }
489        }
490        None => None,
491    }
492}
493
494/// Convert parquet column schema to arrow field.
495pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
496    let field = complex::convert_type(&parquet_column.self_type_ptr())?;
497    let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
498
499    let parquet_type = parquet_column.self_type();
500    let basic_info = parquet_type.get_basic_info();
501
502    let mut hash_map_size = 0;
503    if basic_info.has_id() {
504        hash_map_size += 1;
505    }
506    if has_extension_type(parquet_type) {
507        hash_map_size += 1;
508    }
509    if hash_map_size == 0 {
510        return Ok(ret);
511    }
512    ret.set_metadata(HashMap::with_capacity(hash_map_size));
513    if basic_info.has_id() {
514        ret.metadata_mut().insert(
515            PARQUET_FIELD_ID_META_KEY.to_string(),
516            basic_info.id().to_string(),
517        );
518    }
519    try_add_extension_type(ret, parquet_column.self_type())
520}
521
522pub fn decimal_length_from_precision(precision: u8) -> usize {
523    // digits = floor(log_10(2^(8*n - 1) - 1))  // definition in parquet's logical types
524    // ceil(digits) = log10(2^(8*n - 1) - 1)
525    // 10^ceil(digits) = 2^(8*n - 1) - 1
526    // 10^ceil(digits) + 1 = 2^(8*n - 1)
527    // log2(10^ceil(digits) + 1) = (8*n - 1)
528    // log2(10^ceil(digits) + 1) + 1 = 8*n
529    // (log2(10^ceil(a) + 1) + 1) / 8 = n
530    (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
531}
532
533/// Convert an arrow field to a parquet `Type`
534fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
535    const PARQUET_LIST_ELEMENT_NAME: &str = "element";
536    const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
537    const PARQUET_KEY_FIELD_NAME: &str = "key";
538    const PARQUET_VALUE_FIELD_NAME: &str = "value";
539
540    let name = field.name().as_str();
541    let repetition = if field.is_nullable() {
542        Repetition::OPTIONAL
543    } else {
544        Repetition::REQUIRED
545    };
546    let id = field_id(field);
547    // create type from field
548    match field.data_type() {
549        DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
550            .with_logical_type(Some(LogicalType::Unknown))
551            .with_repetition(repetition)
552            .with_id(id)
553            .build(),
554        DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
555            .with_repetition(repetition)
556            .with_id(id)
557            .build(),
558        DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
559            .with_logical_type(Some(LogicalType::integer(8, true)))
560            .with_repetition(repetition)
561            .with_id(id)
562            .build(),
563        DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
564            .with_logical_type(Some(LogicalType::integer(16, true)))
565            .with_repetition(repetition)
566            .with_id(id)
567            .build(),
568        DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
569            .with_repetition(repetition)
570            .with_id(id)
571            .build(),
572        DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
573            .with_repetition(repetition)
574            .with_id(id)
575            .build(),
576        DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
577            .with_logical_type(Some(LogicalType::integer(8, false)))
578            .with_repetition(repetition)
579            .with_id(id)
580            .build(),
581        DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
582            .with_logical_type(Some(LogicalType::integer(16, false)))
583            .with_repetition(repetition)
584            .with_id(id)
585            .build(),
586        DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
587            .with_logical_type(Some(LogicalType::integer(32, false)))
588            .with_repetition(repetition)
589            .with_id(id)
590            .build(),
591        DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
592            .with_logical_type(Some(LogicalType::integer(64, false)))
593            .with_repetition(repetition)
594            .with_id(id)
595            .build(),
596        DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
597            .with_repetition(repetition)
598            .with_id(id)
599            .with_logical_type(Some(LogicalType::Float16))
600            .with_length(2)
601            .build(),
602        DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
603            .with_repetition(repetition)
604            .with_id(id)
605            .build(),
606        DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
607            .with_repetition(repetition)
608            .with_id(id)
609            .build(),
610        DataType::Timestamp(TimeUnit::Second, _) => {
611            // Cannot represent seconds in LogicalType
612            Type::primitive_type_builder(name, PhysicalType::INT64)
613                .with_repetition(repetition)
614                .with_id(id)
615                .build()
616        }
617        DataType::Timestamp(time_unit, tz) => {
618            Type::primitive_type_builder(name, PhysicalType::INT64)
619                .with_logical_type(Some(LogicalType::timestamp(
620                    // If timezone set, values are normalized to UTC timezone
621                    matches!(tz, Some(z) if !z.as_ref().is_empty()),
622                    match time_unit {
623                        TimeUnit::Second => unreachable!(),
624                        TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
625                        TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
626                        TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
627                    },
628                )))
629                .with_repetition(repetition)
630                .with_id(id)
631                .build()
632        }
633        DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
634            .with_logical_type(Some(LogicalType::Date))
635            .with_repetition(repetition)
636            .with_id(id)
637            .build(),
638        DataType::Date64 => {
639            if coerce_types {
640                Type::primitive_type_builder(name, PhysicalType::INT32)
641                    .with_logical_type(Some(LogicalType::Date))
642                    .with_repetition(repetition)
643                    .with_id(id)
644                    .build()
645            } else {
646                Type::primitive_type_builder(name, PhysicalType::INT64)
647                    .with_repetition(repetition)
648                    .with_id(id)
649                    .build()
650            }
651        }
652        DataType::Time32(TimeUnit::Second) => {
653            // Cannot represent seconds in LogicalType
654            Type::primitive_type_builder(name, PhysicalType::INT32)
655                .with_repetition(repetition)
656                .with_id(id)
657                .build()
658        }
659        DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
660            .with_logical_type(Some(LogicalType::time(
661                field.metadata().contains_key("adjusted_to_utc"),
662                match unit {
663                    TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
664                    u => unreachable!("Invalid unit for Time32: {:?}", u),
665                },
666            )))
667            .with_repetition(repetition)
668            .with_id(id)
669            .build(),
670        DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
671            .with_logical_type(Some(LogicalType::time(
672                field.metadata().contains_key("adjusted_to_utc"),
673                match unit {
674                    TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
675                    TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
676                    u => unreachable!("Invalid unit for Time64: {:?}", u),
677                },
678            )))
679            .with_repetition(repetition)
680            .with_id(id)
681            .build(),
682        DataType::Duration(_) => Type::primitive_type_builder(name, PhysicalType::INT64)
683            .with_repetition(repetition)
684            .with_id(id)
685            .build(),
686        DataType::Interval(_) => {
687            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
688                .with_converted_type(ConvertedType::INTERVAL)
689                .with_repetition(repetition)
690                .with_id(id)
691                .with_length(12)
692                .build()
693        }
694        DataType::Binary | DataType::LargeBinary => {
695            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
696                .with_repetition(repetition)
697                .with_id(id)
698                .with_logical_type(logical_type_for_binary(field))
699                .build()
700        }
701        DataType::FixedSizeBinary(length) => {
702            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
703                .with_repetition(repetition)
704                .with_id(id)
705                .with_length(*length)
706                .with_logical_type(logical_type_for_fixed_size_binary(field))
707                .build()
708        }
709        DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
710            .with_repetition(repetition)
711            .with_id(id)
712            .with_logical_type(logical_type_for_binary_view(field))
713            .build(),
714        DataType::Decimal32(precision, scale)
715        | DataType::Decimal64(precision, scale)
716        | DataType::Decimal128(precision, scale)
717        | DataType::Decimal256(precision, scale) => {
718            // Decimal precision determines the Parquet physical type to use.
719            // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
720            let (physical_type, length) = if *precision > 1 && *precision <= 9 {
721                (PhysicalType::INT32, -1)
722            } else if *precision <= 18 {
723                (PhysicalType::INT64, -1)
724            } else {
725                (
726                    PhysicalType::FIXED_LEN_BYTE_ARRAY,
727                    decimal_length_from_precision(*precision) as i32,
728                )
729            };
730            Type::primitive_type_builder(name, physical_type)
731                .with_repetition(repetition)
732                .with_id(id)
733                .with_length(length)
734                .with_logical_type(Some(LogicalType::decimal(*scale as i32, *precision as i32)))
735                .with_precision(*precision as i32)
736                .with_scale(*scale as i32)
737                .build()
738        }
739        DataType::Utf8 | DataType::LargeUtf8 => {
740            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
741                .with_logical_type(logical_type_for_string(field))
742                .with_repetition(repetition)
743                .with_id(id)
744                .build()
745        }
746        DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
747            .with_logical_type(logical_type_for_string(field))
748            .with_repetition(repetition)
749            .with_id(id)
750            .build(),
751        DataType::List(f)
752        | DataType::FixedSizeList(f, _)
753        | DataType::LargeList(f)
754        | DataType::ListView(f)
755        | DataType::LargeListView(f) => {
756            let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
757                // Ensure proper naming per the Parquet specification
758                let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
759                Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
760            } else {
761                Arc::new(arrow_to_parquet_type(f, coerce_types)?)
762            };
763
764            Type::group_type_builder(name)
765                .with_fields(vec![Arc::new(
766                    Type::group_type_builder("list")
767                        .with_fields(vec![field_ref])
768                        .with_repetition(Repetition::REPEATED)
769                        .build()?,
770                )])
771                .with_logical_type(Some(LogicalType::List))
772                .with_repetition(repetition)
773                .with_id(id)
774                .build()
775        }
776        DataType::Struct(fields) => {
777            if fields.is_empty() {
778                return Err(arrow_err!("Parquet does not support writing empty structs",));
779            }
780            // recursively convert children to types/nodes
781            let fields = fields
782                .iter()
783                .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
784                .collect::<Result<_>>()?;
785            Type::group_type_builder(name)
786                .with_fields(fields)
787                .with_repetition(repetition)
788                .with_id(id)
789                .with_logical_type(logical_type_for_struct(field))
790                .build()
791        }
792        DataType::Map(field, _) => {
793            if let DataType::Struct(struct_fields) = field.data_type() {
794                // If coercing then set inner struct name to "key_value"
795                let map_struct_name = if coerce_types {
796                    PARQUET_MAP_STRUCT_NAME
797                } else {
798                    field.name()
799                };
800
801                // If coercing then ensure struct fields are named "key" and "value"
802                let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
803                    if coerce_types && fld.name() != name {
804                        let f = fld.as_ref().clone().with_name(name);
805                        Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
806                    } else {
807                        Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
808                    }
809                };
810                let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
811                let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
812
813                Type::group_type_builder(name)
814                    .with_fields(vec![Arc::new(
815                        Type::group_type_builder(map_struct_name)
816                            .with_fields(vec![key_field, val_field])
817                            .with_repetition(Repetition::REPEATED)
818                            .build()?,
819                    )])
820                    .with_logical_type(Some(LogicalType::Map))
821                    .with_repetition(repetition)
822                    .with_id(id)
823                    .build()
824            } else {
825                Err(arrow_err!(
826                    "DataType::Map should contain a struct field child",
827                ))
828            }
829        }
830        DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
831        DataType::Dictionary(_, value) => {
832            // Dictionary encoding not handled at the schema level
833            let dict_field = field.clone().with_data_type(value.as_ref().clone());
834            arrow_to_parquet_type(&dict_field, coerce_types)
835        }
836        DataType::RunEndEncoded(_, _) => Err(arrow_err!(
837            "Converting RunEndEncodedType to parquet not supported",
838        )),
839    }
840}
841
842fn field_id(field: &Field) -> Option<i32> {
843    let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
844    value.parse().ok() // Fail quietly if not a valid integer
845}
846
847#[cfg(test)]
848mod tests {
849    use super::*;
850
851    use std::{collections::HashMap, sync::Arc};
852
853    use crate::arrow::PARQUET_FIELD_ID_META_KEY;
854    use crate::file::metadata::KeyValue;
855    use crate::file::reader::FileReader;
856    use crate::{
857        arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
858        schema::{parser::parse_message_type, types::SchemaDescriptor},
859    };
860    use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
861
862    #[test]
863    fn test_flat_primitives() {
864        let message_type = "
865        message test_schema {
866            REQUIRED BOOLEAN boolean;
867            REQUIRED INT32   int8  (INT_8);
868            REQUIRED INT32   int16 (INT_16);
869            REQUIRED INT32   uint8 (INTEGER(8,false));
870            REQUIRED INT32   uint16 (INTEGER(16,false));
871            REQUIRED INT32   int32;
872            REQUIRED INT64   int64;
873            OPTIONAL DOUBLE  double;
874            OPTIONAL FLOAT   float;
875            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
876            OPTIONAL BINARY  string (UTF8);
877            OPTIONAL BINARY  string_2 (STRING);
878            OPTIONAL BINARY  json (JSON);
879        }
880        ";
881        let parquet_group_type = parse_message_type(message_type).unwrap();
882
883        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
884        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
885
886        let arrow_fields = Fields::from(vec![
887            Field::new("boolean", DataType::Boolean, false),
888            Field::new("int8", DataType::Int8, false),
889            Field::new("int16", DataType::Int16, false),
890            Field::new("uint8", DataType::UInt8, false),
891            Field::new("uint16", DataType::UInt16, false),
892            Field::new("int32", DataType::Int32, false),
893            Field::new("int64", DataType::Int64, false),
894            Field::new("double", DataType::Float64, true),
895            Field::new("float", DataType::Float32, true),
896            Field::new("float16", DataType::Float16, true),
897            Field::new("string", DataType::Utf8, true),
898            Field::new("string_2", DataType::Utf8, true),
899            json_field(),
900        ]);
901
902        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
903    }
904
905    /// Return the expected Field for a Parquet column annotated with
906    /// the JSON logical type.
907    fn json_field() -> Field {
908        #[cfg(feature = "arrow_canonical_extension_types")]
909        {
910            Field::new("json", DataType::Utf8, true)
911                .with_extension_type(arrow_schema::extension::Json::default())
912        }
913        #[cfg(not(feature = "arrow_canonical_extension_types"))]
914        {
915            Field::new("json", DataType::Utf8, true)
916        }
917    }
918
919    #[test]
920    fn test_decimal_fields() {
921        let message_type = "
922        message test_schema {
923                    REQUIRED INT32 decimal1 (DECIMAL(4,2));
924                    REQUIRED INT64 decimal2 (DECIMAL(12,2));
925                    REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
926                    REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
927                    REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
928                    REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
929                    REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
930        }
931        ";
932
933        let parquet_group_type = parse_message_type(message_type).unwrap();
934
935        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
936        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
937
938        let arrow_fields = Fields::from(vec![
939            Field::new("decimal1", DataType::Decimal128(4, 2), false),
940            Field::new("decimal2", DataType::Decimal128(12, 2), false),
941            Field::new("decimal3", DataType::Decimal128(30, 2), false),
942            Field::new("decimal4", DataType::Decimal128(33, 2), false),
943            Field::new("decimal5", DataType::Decimal128(38, 2), false),
944            Field::new("decimal6", DataType::Decimal256(39, 2), false),
945            Field::new("decimal7", DataType::Decimal256(39, 2), false),
946        ]);
947        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
948    }
949
950    #[test]
951    fn test_byte_array_fields() {
952        let message_type = "
953        message test_schema {
954            REQUIRED BYTE_ARRAY binary;
955            REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
956        }
957        ";
958
959        let parquet_group_type = parse_message_type(message_type).unwrap();
960
961        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
962        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
963
964        let arrow_fields = Fields::from(vec![
965            Field::new("binary", DataType::Binary, false),
966            Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
967        ]);
968        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
969    }
970
971    #[test]
972    fn test_duplicate_fields() {
973        let message_type = "
974        message test_schema {
975            REQUIRED BOOLEAN boolean;
976            REQUIRED INT32 int8 (INT_8);
977        }
978        ";
979
980        let parquet_group_type = parse_message_type(message_type).unwrap();
981
982        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
983        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
984
985        let arrow_fields = Fields::from(vec![
986            Field::new("boolean", DataType::Boolean, false),
987            Field::new("int8", DataType::Int8, false),
988        ]);
989        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
990
991        let converted_arrow_schema =
992            parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
993                .unwrap();
994        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
995    }
996
997    #[test]
998    fn test_parquet_lists() {
999        let mut arrow_fields = Vec::new();
1000
1001        // LIST encoding example taken from parquet-format/LogicalTypes.md
1002        let message_type = "
1003        message test_schema {
1004          REQUIRED GROUP my_list (LIST) {
1005            REPEATED GROUP list {
1006              OPTIONAL BINARY element (UTF8);
1007            }
1008          }
1009          OPTIONAL GROUP my_list (LIST) {
1010            REPEATED GROUP list {
1011              REQUIRED BINARY element (UTF8);
1012            }
1013          }
1014          OPTIONAL GROUP array_of_arrays (LIST) {
1015            REPEATED GROUP list {
1016              REQUIRED GROUP element (LIST) {
1017                REPEATED GROUP list {
1018                  REQUIRED INT32 element;
1019                }
1020              }
1021            }
1022          }
1023          OPTIONAL GROUP my_list (LIST) {
1024            REPEATED GROUP element {
1025              REQUIRED BINARY str (UTF8);
1026            }
1027          }
1028          OPTIONAL GROUP my_list (LIST) {
1029            REPEATED INT32 element;
1030          }
1031          OPTIONAL GROUP my_list (LIST) {
1032            REPEATED GROUP element {
1033              REQUIRED BINARY str (UTF8);
1034              REQUIRED INT32 num;
1035            }
1036          }
1037          OPTIONAL GROUP my_list (LIST) {
1038            REPEATED GROUP array {
1039              REQUIRED BINARY str (UTF8);
1040            }
1041
1042          }
1043          OPTIONAL GROUP my_list (LIST) {
1044            REPEATED GROUP my_list_tuple {
1045              REQUIRED BINARY str (UTF8);
1046            }
1047          }
1048          REPEATED INT32 name;
1049        }
1050        ";
1051
1052        // // List<String> (list non-null, elements nullable)
1053        // required group my_list (LIST) {
1054        //   repeated group list {
1055        //     optional binary element (UTF8);
1056        //   }
1057        // }
1058        {
1059            arrow_fields.push(Field::new_list(
1060                "my_list",
1061                Field::new("element", DataType::Utf8, true),
1062                false,
1063            ));
1064        }
1065
1066        // // List<String> (list nullable, elements non-null)
1067        // optional group my_list (LIST) {
1068        //   repeated group list {
1069        //     required binary element (UTF8);
1070        //   }
1071        // }
1072        {
1073            arrow_fields.push(Field::new_list(
1074                "my_list",
1075                Field::new("element", DataType::Utf8, false),
1076                true,
1077            ));
1078        }
1079
1080        // Element types can be nested structures. For example, a list of lists:
1081        //
1082        // // List<List<Integer>>
1083        // optional group array_of_arrays (LIST) {
1084        //   repeated group list {
1085        //     required group element (LIST) {
1086        //       repeated group list {
1087        //         required int32 element;
1088        //       }
1089        //     }
1090        //   }
1091        // }
1092        {
1093            let arrow_inner_list = Field::new("element", DataType::Int32, false);
1094            arrow_fields.push(Field::new_list(
1095                "array_of_arrays",
1096                Field::new_list("element", arrow_inner_list, false),
1097                true,
1098            ));
1099        }
1100
1101        // // List<String> (list nullable, elements non-null)
1102        // optional group my_list (LIST) {
1103        //   repeated group element {
1104        //     required binary str (UTF8);
1105        //   };
1106        // }
1107        {
1108            arrow_fields.push(Field::new_list(
1109                "my_list",
1110                Field::new("str", DataType::Utf8, false),
1111                true,
1112            ));
1113        }
1114
1115        // // List<Integer> (nullable list, non-null elements)
1116        // optional group my_list (LIST) {
1117        //   repeated int32 element;
1118        // }
1119        {
1120            arrow_fields.push(Field::new_list(
1121                "my_list",
1122                Field::new("element", DataType::Int32, false),
1123                true,
1124            ));
1125        }
1126
1127        // // List<Tuple<String, Integer>> (nullable list, non-null elements)
1128        // optional group my_list (LIST) {
1129        //   repeated group element {
1130        //     required binary str (UTF8);
1131        //     required int32 num;
1132        //   };
1133        // }
1134        {
1135            let fields = vec![
1136                Field::new("str", DataType::Utf8, false),
1137                Field::new("num", DataType::Int32, false),
1138            ];
1139            arrow_fields.push(Field::new_list(
1140                "my_list",
1141                Field::new_struct("element", fields, false),
1142                true,
1143            ));
1144        }
1145
1146        // // List<OneTuple<String>> (nullable list, non-null elements)
1147        // optional group my_list (LIST) {
1148        //   repeated group array {
1149        //     required binary str (UTF8);
1150        //   };
1151        // }
1152        // Special case: group is named array
1153        {
1154            let fields = vec![Field::new("str", DataType::Utf8, false)];
1155            arrow_fields.push(Field::new_list(
1156                "my_list",
1157                Field::new_struct("array", fields, false),
1158                true,
1159            ));
1160        }
1161
1162        // // List<OneTuple<String>> (nullable list, non-null elements)
1163        // optional group my_list (LIST) {
1164        //   repeated group my_list_tuple {
1165        //     required binary str (UTF8);
1166        //   };
1167        // }
1168        // Special case: group named ends in _tuple
1169        {
1170            let fields = vec![Field::new("str", DataType::Utf8, false)];
1171            arrow_fields.push(Field::new_list(
1172                "my_list",
1173                Field::new_struct("my_list_tuple", fields, false),
1174                true,
1175            ));
1176        }
1177
1178        // One-level encoding: Only allows required lists with required cells
1179        //   repeated value_type name
1180        {
1181            arrow_fields.push(Field::new_list(
1182                "name",
1183                Field::new("name", DataType::Int32, false),
1184                false,
1185            ));
1186        }
1187
1188        let parquet_group_type = parse_message_type(message_type).unwrap();
1189
1190        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1191        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1192        let converted_fields = converted_arrow_schema.fields();
1193
1194        assert_eq!(arrow_fields.len(), converted_fields.len());
1195        for i in 0..arrow_fields.len() {
1196            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1197        }
1198    }
1199
1200    #[test]
1201    fn test_parquet_list_nullable() {
1202        let mut arrow_fields = Vec::new();
1203
1204        let message_type = "
1205        message test_schema {
1206          REQUIRED GROUP my_list1 (LIST) {
1207            REPEATED GROUP list {
1208              OPTIONAL BINARY element (UTF8);
1209            }
1210          }
1211          OPTIONAL GROUP my_list2 (LIST) {
1212            REPEATED GROUP list {
1213              REQUIRED BINARY element (UTF8);
1214            }
1215          }
1216          REQUIRED GROUP my_list3 (LIST) {
1217            REPEATED GROUP list {
1218              REQUIRED BINARY element (UTF8);
1219            }
1220          }
1221        }
1222        ";
1223
1224        // // List<String> (list non-null, elements nullable)
1225        // required group my_list1 (LIST) {
1226        //   repeated group list {
1227        //     optional binary element (UTF8);
1228        //   }
1229        // }
1230        {
1231            arrow_fields.push(Field::new_list(
1232                "my_list1",
1233                Field::new("element", DataType::Utf8, true),
1234                false,
1235            ));
1236        }
1237
1238        // // List<String> (list nullable, elements non-null)
1239        // optional group my_list2 (LIST) {
1240        //   repeated group list {
1241        //     required binary element (UTF8);
1242        //   }
1243        // }
1244        {
1245            arrow_fields.push(Field::new_list(
1246                "my_list2",
1247                Field::new("element", DataType::Utf8, false),
1248                true,
1249            ));
1250        }
1251
1252        // // List<String> (list non-null, elements non-null)
1253        // repeated group my_list3 (LIST) {
1254        //   repeated group list {
1255        //     required binary element (UTF8);
1256        //   }
1257        // }
1258        {
1259            arrow_fields.push(Field::new_list(
1260                "my_list3",
1261                Field::new("element", DataType::Utf8, false),
1262                false,
1263            ));
1264        }
1265
1266        let parquet_group_type = parse_message_type(message_type).unwrap();
1267
1268        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1269        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1270        let converted_fields = converted_arrow_schema.fields();
1271
1272        assert_eq!(arrow_fields.len(), converted_fields.len());
1273        for i in 0..arrow_fields.len() {
1274            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1275        }
1276    }
1277
1278    #[test]
1279    fn test_parquet_maps() {
1280        let mut arrow_fields = Vec::new();
1281
1282        // LIST encoding example taken from parquet-format/LogicalTypes.md
1283        let message_type = "
1284        message test_schema {
1285          REQUIRED group my_map1 (MAP) {
1286            REPEATED group key_value {
1287              REQUIRED binary key (UTF8);
1288              OPTIONAL int32 value;
1289            }
1290          }
1291          OPTIONAL group my_map2 (MAP) {
1292            REPEATED group map {
1293              REQUIRED binary str (UTF8);
1294              REQUIRED int32 num;
1295            }
1296          }
1297          OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1298            REPEATED group map {
1299              REQUIRED binary key (UTF8);
1300              OPTIONAL int32 value;
1301            }
1302          }
1303          REQUIRED group my_map4 (MAP) {
1304            REPEATED group map {
1305              OPTIONAL binary key (UTF8);
1306              REQUIRED int32 value;
1307            }
1308          }
1309        }
1310        ";
1311
1312        // // Map<String, Integer>
1313        // required group my_map (MAP) {
1314        //   repeated group key_value {
1315        //     required binary key (UTF8);
1316        //     optional int32 value;
1317        //   }
1318        // }
1319        {
1320            arrow_fields.push(Field::new_map(
1321                "my_map1",
1322                "key_value",
1323                Field::new("key", DataType::Utf8, false),
1324                Field::new("value", DataType::Int32, true),
1325                false,
1326                false,
1327            ));
1328        }
1329
1330        // // Map<String, Integer> (nullable map, non-null values)
1331        // optional group my_map (MAP) {
1332        //   repeated group map {
1333        //     required binary str (UTF8);
1334        //     required int32 num;
1335        //   }
1336        // }
1337        {
1338            arrow_fields.push(Field::new_map(
1339                "my_map2",
1340                "map",
1341                Field::new("str", DataType::Utf8, false),
1342                Field::new("num", DataType::Int32, false),
1343                false,
1344                true,
1345            ));
1346        }
1347
1348        // // Map<String, Integer> (nullable map, nullable values)
1349        // optional group my_map (MAP_KEY_VALUE) {
1350        //   repeated group map {
1351        //     required binary key (UTF8);
1352        //     optional int32 value;
1353        //   }
1354        // }
1355        {
1356            arrow_fields.push(Field::new_map(
1357                "my_map3",
1358                "map",
1359                Field::new("key", DataType::Utf8, false),
1360                Field::new("value", DataType::Int32, true),
1361                false,
1362                true,
1363            ));
1364        }
1365
1366        // // Map<String, Integer> (non-compliant nullable key)
1367        // group my_map (MAP_KEY_VALUE) {
1368        //   repeated group map {
1369        //     optional binary key (UTF8);
1370        //     required int32 value;
1371        //   }
1372        // }
1373        {
1374            arrow_fields.push(Field::new_map(
1375                "my_map4",
1376                "map",
1377                Field::new("key", DataType::Utf8, false), // The key is always non-nullable (#5630)
1378                Field::new("value", DataType::Int32, false),
1379                false,
1380                false,
1381            ));
1382        }
1383
1384        let parquet_group_type = parse_message_type(message_type).unwrap();
1385
1386        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1387        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1388        let converted_fields = converted_arrow_schema.fields();
1389
1390        assert_eq!(arrow_fields.len(), converted_fields.len());
1391        for i in 0..arrow_fields.len() {
1392            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1393        }
1394    }
1395
1396    #[test]
1397    fn test_nested_schema() {
1398        let mut arrow_fields = Vec::new();
1399        {
1400            let group1_fields = Fields::from(vec![
1401                Field::new("leaf1", DataType::Boolean, false),
1402                Field::new("leaf2", DataType::Int32, false),
1403            ]);
1404            let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1405            arrow_fields.push(group1_struct);
1406
1407            let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1408            arrow_fields.push(leaf3_field);
1409        }
1410
1411        let message_type = "
1412        message test_schema {
1413          REQUIRED GROUP group1 {
1414            REQUIRED BOOLEAN leaf1;
1415            REQUIRED INT32 leaf2;
1416          }
1417          REQUIRED INT64 leaf3;
1418        }
1419        ";
1420        let parquet_group_type = parse_message_type(message_type).unwrap();
1421
1422        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1423        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1424        let converted_fields = converted_arrow_schema.fields();
1425
1426        assert_eq!(arrow_fields.len(), converted_fields.len());
1427        for i in 0..arrow_fields.len() {
1428            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1429        }
1430    }
1431
1432    #[test]
1433    fn test_nested_schema_partial() {
1434        let mut arrow_fields = Vec::new();
1435        {
1436            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1437            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1438            arrow_fields.push(group1);
1439
1440            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1441            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1442            arrow_fields.push(group2);
1443
1444            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1445        }
1446
1447        let message_type = "
1448        message test_schema {
1449          REQUIRED GROUP group1 {
1450            REQUIRED INT64 leaf1;
1451            REQUIRED INT64 leaf2;
1452          }
1453          REQUIRED  GROUP group2 {
1454            REQUIRED INT64 leaf3;
1455            REQUIRED INT64 leaf4;
1456          }
1457          REQUIRED INT64 leaf5;
1458        }
1459        ";
1460        let parquet_group_type = parse_message_type(message_type).unwrap();
1461
1462        // Expected partial arrow schema (columns 0, 3, 4):
1463        // required group group1 {
1464        //   required int64 leaf1;
1465        // }
1466        // required group group2 {
1467        //   required int64 leaf4;
1468        // }
1469        // required int64 leaf5;
1470
1471        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1472        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1473        let converted_arrow_schema =
1474            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1475        let converted_fields = converted_arrow_schema.fields();
1476
1477        assert_eq!(arrow_fields.len(), converted_fields.len());
1478        for i in 0..arrow_fields.len() {
1479            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1480        }
1481    }
1482
1483    #[test]
1484    fn test_nested_schema_partial_ordering() {
1485        let mut arrow_fields = Vec::new();
1486        {
1487            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1488            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1489            arrow_fields.push(group1);
1490
1491            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1492            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1493            arrow_fields.push(group2);
1494
1495            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1496        }
1497
1498        let message_type = "
1499        message test_schema {
1500          REQUIRED GROUP group1 {
1501            REQUIRED INT64 leaf1;
1502            REQUIRED INT64 leaf2;
1503          }
1504          REQUIRED  GROUP group2 {
1505            REQUIRED INT64 leaf3;
1506            REQUIRED INT64 leaf4;
1507          }
1508          REQUIRED INT64 leaf5;
1509        }
1510        ";
1511        let parquet_group_type = parse_message_type(message_type).unwrap();
1512
1513        // Expected partial arrow schema (columns 3, 4, 0):
1514        // required group group1 {
1515        //   required int64 leaf1;
1516        // }
1517        // required group group2 {
1518        //   required int64 leaf4;
1519        // }
1520        // required int64 leaf5;
1521
1522        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1523        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1524        let converted_arrow_schema =
1525            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1526        let converted_fields = converted_arrow_schema.fields();
1527
1528        assert_eq!(arrow_fields.len(), converted_fields.len());
1529        for i in 0..arrow_fields.len() {
1530            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1531        }
1532
1533        let mask =
1534            ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1535        let converted_arrow_schema =
1536            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1537        let converted_fields = converted_arrow_schema.fields();
1538
1539        assert_eq!(arrow_fields.len(), converted_fields.len());
1540        for i in 0..arrow_fields.len() {
1541            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1542        }
1543    }
1544
1545    #[test]
1546    fn test_repeated_nested_schema() {
1547        let mut arrow_fields = Vec::new();
1548        {
1549            arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1550
1551            let inner_group_list = Field::new_list(
1552                "innerGroup",
1553                Field::new_struct(
1554                    "innerGroup",
1555                    vec![Field::new("leaf3", DataType::Int32, true)],
1556                    false,
1557                ),
1558                false,
1559            );
1560
1561            let outer_group_list = Field::new_list(
1562                "outerGroup",
1563                Field::new_struct(
1564                    "outerGroup",
1565                    vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1566                    false,
1567                ),
1568                false,
1569            );
1570            arrow_fields.push(outer_group_list);
1571        }
1572
1573        let message_type = "
1574        message test_schema {
1575          OPTIONAL INT32 leaf1;
1576          REPEATED GROUP outerGroup {
1577            OPTIONAL INT32 leaf2;
1578            REPEATED GROUP innerGroup {
1579              OPTIONAL INT32 leaf3;
1580            }
1581          }
1582        }
1583        ";
1584        let parquet_group_type = parse_message_type(message_type).unwrap();
1585
1586        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1587        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1588        let converted_fields = converted_arrow_schema.fields();
1589
1590        assert_eq!(arrow_fields.len(), converted_fields.len());
1591        for i in 0..arrow_fields.len() {
1592            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1593        }
1594    }
1595
1596    #[test]
1597    fn test_column_desc_to_field() {
1598        let message_type = "
1599        message test_schema {
1600            REQUIRED BOOLEAN boolean;
1601            REQUIRED INT32   int8  (INT_8);
1602            REQUIRED INT32   uint8 (INTEGER(8,false));
1603            REQUIRED INT32   int16 (INT_16);
1604            REQUIRED INT32   uint16 (INTEGER(16,false));
1605            REQUIRED INT32   int32;
1606            REQUIRED INT64   int64;
1607            OPTIONAL DOUBLE  double;
1608            OPTIONAL FLOAT   float;
1609            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1610            OPTIONAL BINARY  string (UTF8);
1611            REPEATED BOOLEAN bools;
1612            OPTIONAL INT32   date       (DATE);
1613            OPTIONAL INT32   time_milli (TIME_MILLIS);
1614            OPTIONAL INT64   time_micro (TIME_MICROS);
1615            OPTIONAL INT64   time_nano (TIME(NANOS,false));
1616            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1617            REQUIRED INT64   ts_micro (TIMESTAMP_MICROS);
1618            REQUIRED INT64   ts_nano (TIMESTAMP(NANOS,true));
1619            REPEATED INT32   int_list;
1620            REPEATED BINARY  byte_list;
1621            REPEATED BINARY  string_list (UTF8);
1622            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1623            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1624            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1625        }
1626        ";
1627        let parquet_group_type = parse_message_type(message_type).unwrap();
1628
1629        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1630        let converted_arrow_fields = parquet_schema
1631            .columns()
1632            .iter()
1633            .map(|c| parquet_to_arrow_field(c).unwrap())
1634            .collect::<Vec<Field>>();
1635
1636        let arrow_fields = vec![
1637            Field::new("boolean", DataType::Boolean, false),
1638            Field::new("int8", DataType::Int8, false),
1639            Field::new("uint8", DataType::UInt8, false),
1640            Field::new("int16", DataType::Int16, false),
1641            Field::new("uint16", DataType::UInt16, false),
1642            Field::new("int32", DataType::Int32, false),
1643            Field::new("int64", DataType::Int64, false),
1644            Field::new("double", DataType::Float64, true),
1645            Field::new("float", DataType::Float32, true),
1646            Field::new("float16", DataType::Float16, true),
1647            Field::new("string", DataType::Utf8, true),
1648            Field::new_list(
1649                "bools",
1650                Field::new("bools", DataType::Boolean, false),
1651                false,
1652            ),
1653            Field::new("date", DataType::Date32, true),
1654            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1655            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1656            Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1657            Field::new(
1658                "ts_milli",
1659                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1660                true,
1661            ),
1662            Field::new(
1663                "ts_micro",
1664                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1665                false,
1666            ),
1667            Field::new(
1668                "ts_nano",
1669                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1670                false,
1671            ),
1672            Field::new_list(
1673                "int_list",
1674                Field::new("int_list", DataType::Int32, false),
1675                false,
1676            ),
1677            Field::new_list(
1678                "byte_list",
1679                Field::new("byte_list", DataType::Binary, false),
1680                false,
1681            ),
1682            Field::new_list(
1683                "string_list",
1684                Field::new("string_list", DataType::Utf8, false),
1685                false,
1686            ),
1687            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1688            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1689            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1690        ];
1691
1692        assert_eq!(arrow_fields, converted_arrow_fields);
1693    }
1694
1695    #[test]
1696    fn test_coerced_map_list() {
1697        // Create Arrow schema with non-Parquet naming
1698        let arrow_fields = vec![
1699            Field::new_list(
1700                "my_list",
1701                Field::new("item", DataType::Boolean, true),
1702                false,
1703            ),
1704            Field::new_map(
1705                "my_map",
1706                "entries",
1707                Field::new("keys", DataType::Utf8, false),
1708                Field::new("values", DataType::Int32, true),
1709                false,
1710                true,
1711            ),
1712        ];
1713        let arrow_schema = Schema::new(arrow_fields);
1714
1715        // Create Parquet schema with coerced names
1716        let message_type = "
1717        message parquet_schema {
1718            REQUIRED GROUP my_list (LIST) {
1719                REPEATED GROUP list {
1720                    OPTIONAL BOOLEAN element;
1721                }
1722            }
1723            OPTIONAL GROUP my_map (MAP) {
1724                REPEATED GROUP key_value {
1725                    REQUIRED BINARY key (STRING);
1726                    OPTIONAL INT32 value;
1727                }
1728            }
1729        }
1730        ";
1731        let parquet_group_type = parse_message_type(message_type).unwrap();
1732        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1733        let converted_arrow_schema = ArrowSchemaConverter::new()
1734            .with_coerce_types(true)
1735            .convert(&arrow_schema)
1736            .unwrap();
1737        assert_eq!(
1738            parquet_schema.columns().len(),
1739            converted_arrow_schema.columns().len()
1740        );
1741
1742        // Create Parquet schema without coerced names
1743        let message_type = "
1744        message parquet_schema {
1745            REQUIRED GROUP my_list (LIST) {
1746                REPEATED GROUP list {
1747                    OPTIONAL BOOLEAN item;
1748                }
1749            }
1750            OPTIONAL GROUP my_map (MAP) {
1751                REPEATED GROUP entries {
1752                    REQUIRED BINARY keys (STRING);
1753                    OPTIONAL INT32 values;
1754                }
1755            }
1756        }
1757        ";
1758        let parquet_group_type = parse_message_type(message_type).unwrap();
1759        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1760        let converted_arrow_schema = ArrowSchemaConverter::new()
1761            .with_coerce_types(false)
1762            .convert(&arrow_schema)
1763            .unwrap();
1764        assert_eq!(
1765            parquet_schema.columns().len(),
1766            converted_arrow_schema.columns().len()
1767        );
1768    }
1769
1770    #[test]
1771    fn test_field_to_column_desc() {
1772        let message_type = "
1773        message arrow_schema {
1774            REQUIRED BOOLEAN boolean;
1775            REQUIRED INT32   int8  (INT_8);
1776            REQUIRED INT32   int16 (INTEGER(16,true));
1777            REQUIRED INT32   int32;
1778            REQUIRED INT64   int64;
1779            OPTIONAL DOUBLE  double;
1780            OPTIONAL FLOAT   float;
1781            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1782            OPTIONAL BINARY  string (STRING);
1783            OPTIONAL GROUP   bools (LIST) {
1784                REPEATED GROUP list {
1785                    OPTIONAL BOOLEAN element;
1786                }
1787            }
1788            REQUIRED GROUP   bools_non_null (LIST) {
1789                REPEATED GROUP list {
1790                    REQUIRED BOOLEAN element;
1791                }
1792            }
1793            OPTIONAL INT32   date       (DATE);
1794            OPTIONAL INT32   time_milli (TIME(MILLIS,false));
1795            OPTIONAL INT32   time_milli_utc (TIME(MILLIS,true));
1796            OPTIONAL INT64   time_micro (TIME_MICROS);
1797            OPTIONAL INT64   time_micro_utc (TIME(MICROS, true));
1798            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1799            REQUIRED INT64   ts_micro (TIMESTAMP(MICROS,false));
1800            REQUIRED INT64   ts_seconds;
1801            REQUIRED INT64   ts_micro_utc (TIMESTAMP(MICROS, true));
1802            REQUIRED INT64   ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1803            REQUIRED INT64   ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1804            REQUIRED INT64   ts_micro_non_utc (TIMESTAMP(MICROS, true));
1805            REQUIRED GROUP struct {
1806                REQUIRED BOOLEAN bools;
1807                REQUIRED INT32 uint32 (INTEGER(32,false));
1808                REQUIRED GROUP   int32 (LIST) {
1809                    REPEATED GROUP list {
1810                        OPTIONAL INT32 element;
1811                    }
1812                }
1813            }
1814            REQUIRED BINARY  dictionary_strings (STRING);
1815            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1816            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1817            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1818            REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1819            REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1820        }
1821        ";
1822        let parquet_group_type = parse_message_type(message_type).unwrap();
1823
1824        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1825
1826        let arrow_fields = vec![
1827            Field::new("boolean", DataType::Boolean, false),
1828            Field::new("int8", DataType::Int8, false),
1829            Field::new("int16", DataType::Int16, false),
1830            Field::new("int32", DataType::Int32, false),
1831            Field::new("int64", DataType::Int64, false),
1832            Field::new("double", DataType::Float64, true),
1833            Field::new("float", DataType::Float32, true),
1834            Field::new("float16", DataType::Float16, true),
1835            Field::new("string", DataType::Utf8, true),
1836            Field::new_list(
1837                "bools",
1838                Field::new("element", DataType::Boolean, true),
1839                true,
1840            ),
1841            Field::new_list(
1842                "bools_non_null",
1843                Field::new("element", DataType::Boolean, false),
1844                false,
1845            ),
1846            Field::new("date", DataType::Date32, true),
1847            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1848            Field::new(
1849                "time_milli_utc",
1850                DataType::Time32(TimeUnit::Millisecond),
1851                true,
1852            )
1853            .with_metadata(HashMap::from_iter(vec![(
1854                "adjusted_to_utc".to_string(),
1855                "".to_string(),
1856            )])),
1857            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1858            Field::new(
1859                "time_micro_utc",
1860                DataType::Time64(TimeUnit::Microsecond),
1861                true,
1862            )
1863            .with_metadata(HashMap::from_iter(vec![(
1864                "adjusted_to_utc".to_string(),
1865                "".to_string(),
1866            )])),
1867            Field::new(
1868                "ts_milli",
1869                DataType::Timestamp(TimeUnit::Millisecond, None),
1870                true,
1871            ),
1872            Field::new(
1873                "ts_micro",
1874                DataType::Timestamp(TimeUnit::Microsecond, None),
1875                false,
1876            ),
1877            Field::new(
1878                "ts_seconds",
1879                DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1880                false,
1881            ),
1882            Field::new(
1883                "ts_micro_utc",
1884                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1885                false,
1886            ),
1887            Field::new(
1888                "ts_millis_zero_offset",
1889                DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1890                false,
1891            ),
1892            Field::new(
1893                "ts_millis_zero_negative_offset",
1894                DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1895                false,
1896            ),
1897            Field::new(
1898                "ts_micro_non_utc",
1899                DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1900                false,
1901            ),
1902            Field::new_struct(
1903                "struct",
1904                vec![
1905                    Field::new("bools", DataType::Boolean, false),
1906                    Field::new("uint32", DataType::UInt32, false),
1907                    Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1908                ],
1909                false,
1910            ),
1911            Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1912            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1913            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1914            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1915            Field::new("decimal128", DataType::Decimal128(38, 2), false),
1916            Field::new("decimal256", DataType::Decimal256(39, 2), false),
1917        ];
1918        let arrow_schema = Schema::new(arrow_fields);
1919        let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1920
1921        assert_eq!(
1922            parquet_schema.columns().len(),
1923            converted_arrow_schema.columns().len()
1924        );
1925        parquet_schema
1926            .columns()
1927            .iter()
1928            .zip(converted_arrow_schema.columns())
1929            .for_each(|(a, b)| {
1930                // Only check logical type if it's set on the Parquet side.
1931                // This is because the Arrow conversion always sets logical type,
1932                // even if there wasn't originally one.
1933                // This is not an issue, but is an inconvenience for this test.
1934                match a.logical_type_ref() {
1935                    Some(_) => {
1936                        assert_eq!(a, b)
1937                    }
1938                    None => {
1939                        assert_eq!(a.name(), b.name());
1940                        assert_eq!(a.physical_type(), b.physical_type());
1941                        assert_eq!(a.converted_type(), b.converted_type());
1942                    }
1943                };
1944            });
1945    }
1946
1947    #[test]
1948    #[should_panic(expected = "Parquet does not support writing empty structs")]
1949    fn test_empty_struct_field() {
1950        let arrow_fields = vec![Field::new(
1951            "struct",
1952            DataType::Struct(Fields::empty()),
1953            false,
1954        )];
1955        let arrow_schema = Schema::new(arrow_fields);
1956        let converted_arrow_schema = ArrowSchemaConverter::new()
1957            .with_coerce_types(true)
1958            .convert(&arrow_schema);
1959
1960        converted_arrow_schema.unwrap();
1961    }
1962
1963    #[test]
1964    fn test_metadata() {
1965        let message_type = "
1966        message test_schema {
1967            OPTIONAL BINARY  string (STRING);
1968        }
1969        ";
1970        let parquet_group_type = parse_message_type(message_type).unwrap();
1971
1972        let key_value_metadata = vec![
1973            KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
1974            KeyValue::new("baz".to_owned(), None),
1975        ];
1976
1977        let mut expected_metadata: HashMap<String, String> = HashMap::new();
1978        expected_metadata.insert("foo".to_owned(), "bar".to_owned());
1979
1980        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1981        let converted_arrow_schema =
1982            parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
1983
1984        assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
1985    }
1986
1987    #[test]
1988    fn test_arrow_schema_roundtrip() -> Result<()> {
1989        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
1990            a.iter()
1991                .map(|(a, b)| (a.to_string(), b.to_string()))
1992                .collect()
1993        };
1994
1995        let schema = Schema::new_with_metadata(
1996            vec![
1997                Field::new("c1", DataType::Utf8, false)
1998                    .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
1999                Field::new("c2", DataType::Binary, false),
2000                Field::new("c3", DataType::FixedSizeBinary(3), false),
2001                Field::new("c4", DataType::Boolean, false),
2002                Field::new("c5", DataType::Date32, false),
2003                Field::new("c6", DataType::Date64, false),
2004                Field::new("c7", DataType::Time32(TimeUnit::Second), false),
2005                Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
2006                Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
2007                Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
2008                Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
2009                Field::new(
2010                    "c16",
2011                    DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
2012                    false,
2013                ),
2014                Field::new(
2015                    "c17",
2016                    DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
2017                    false,
2018                ),
2019                Field::new(
2020                    "c18",
2021                    DataType::Timestamp(TimeUnit::Nanosecond, None),
2022                    false,
2023                ),
2024                Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
2025                Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
2026                Field::new_list(
2027                    "c21",
2028                    Field::new_list_field(DataType::Boolean, true)
2029                        .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
2030                    false,
2031                )
2032                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
2033                Field::new(
2034                    "c22",
2035                    DataType::FixedSizeList(
2036                        Arc::new(Field::new_list_field(DataType::Boolean, true)),
2037                        5,
2038                    ),
2039                    false,
2040                ),
2041                Field::new_list(
2042                    "c23",
2043                    Field::new_large_list(
2044                        "inner",
2045                        Field::new_list_field(
2046                            DataType::Struct(
2047                                vec![
2048                                    Field::new("a", DataType::Int16, true),
2049                                    Field::new("b", DataType::Float64, false),
2050                                    Field::new("c", DataType::Float32, false),
2051                                    Field::new("d", DataType::Float16, false),
2052                                ]
2053                                .into(),
2054                            ),
2055                            false,
2056                        ),
2057                        true,
2058                    ),
2059                    false,
2060                ),
2061                Field::new(
2062                    "c24",
2063                    DataType::Struct(Fields::from(vec![
2064                        Field::new("a", DataType::Utf8, false),
2065                        Field::new("b", DataType::UInt16, false),
2066                    ])),
2067                    false,
2068                ),
2069                Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
2070                Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
2071                // Duration types not supported
2072                // Field::new("c27", DataType::Duration(TimeUnit::Second), false),
2073                // Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false),
2074                // Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false),
2075                // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false),
2076                #[allow(deprecated)]
2077                Field::new_dict(
2078                    "c31",
2079                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2080                    true,
2081                    123,
2082                    true,
2083                )
2084                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
2085                Field::new("c32", DataType::LargeBinary, true),
2086                Field::new("c33", DataType::LargeUtf8, true),
2087                Field::new_large_list(
2088                    "c34",
2089                    Field::new_list(
2090                        "inner",
2091                        Field::new_list_field(
2092                            DataType::Struct(
2093                                vec![
2094                                    Field::new("a", DataType::Int16, true),
2095                                    Field::new("b", DataType::Float64, true),
2096                                ]
2097                                .into(),
2098                            ),
2099                            true,
2100                        ),
2101                        true,
2102                    ),
2103                    true,
2104                ),
2105                Field::new("c35", DataType::Null, true),
2106                Field::new("c36", DataType::Decimal128(2, 1), false),
2107                Field::new("c37", DataType::Decimal256(50, 20), false),
2108                Field::new("c38", DataType::Decimal128(18, 12), true),
2109                Field::new_map(
2110                    "c39",
2111                    "key_value",
2112                    Field::new("key", DataType::Utf8, false),
2113                    Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2114                    false, // fails to roundtrip keys_sorted
2115                    true,
2116                ),
2117                Field::new_map(
2118                    "c40",
2119                    "my_entries",
2120                    Field::new("my_key", DataType::Utf8, false)
2121                        .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2122                    Field::new_list(
2123                        "my_value",
2124                        Field::new_list_field(DataType::Utf8, true)
2125                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2126                        true,
2127                    )
2128                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2129                    false, // fails to roundtrip keys_sorted
2130                    true,
2131                )
2132                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2133                Field::new_map(
2134                    "c41",
2135                    "my_entries",
2136                    Field::new("my_key", DataType::Utf8, false),
2137                    Field::new_list(
2138                        "my_value",
2139                        Field::new_list_field(DataType::Utf8, true)
2140                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2141                        true,
2142                    ),
2143                    false, // fails to roundtrip keys_sorted
2144                    false,
2145                ),
2146                Field::new("c42", DataType::Decimal32(5, 2), false),
2147                Field::new("c43", DataType::Decimal64(18, 12), true),
2148            ],
2149            meta(&[("Key", "Value")]),
2150        );
2151
2152        // write to an empty parquet file so that schema is serialized
2153        let file = tempfile::tempfile().unwrap();
2154        let writer =
2155            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2156        writer.close()?;
2157
2158        // read file back
2159        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2160
2161        // Check arrow schema
2162        let read_schema = arrow_reader.schema();
2163        assert_eq!(&schema, read_schema.as_ref());
2164
2165        // Walk schema finding field IDs
2166        let mut stack = Vec::with_capacity(10);
2167        let mut out = Vec::with_capacity(10);
2168
2169        let root = arrow_reader.parquet_schema().root_schema_ptr();
2170        stack.push((root.name().to_string(), root));
2171
2172        while let Some((p, t)) = stack.pop() {
2173            if t.is_group() {
2174                for f in t.get_fields() {
2175                    stack.push((format!("{p}.{}", f.name()), f.clone()))
2176                }
2177            }
2178
2179            let info = t.get_basic_info();
2180            if info.has_id() {
2181                out.push(format!("{p} -> {}", info.id()))
2182            }
2183        }
2184        out.sort_unstable();
2185        let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2186
2187        assert_eq!(
2188            &out,
2189            &[
2190                "arrow_schema.c1 -> 2",
2191                "arrow_schema.c21 -> 4",
2192                "arrow_schema.c21.list.item -> 5",
2193                "arrow_schema.c31 -> 6",
2194                "arrow_schema.c40 -> 7",
2195                "arrow_schema.c40.my_entries.my_key -> 8",
2196                "arrow_schema.c40.my_entries.my_value -> 9",
2197                "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2198                "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2199            ]
2200        );
2201
2202        Ok(())
2203    }
2204
2205    #[test]
2206    fn test_read_parquet_field_ids_raw() -> Result<()> {
2207        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2208            a.iter()
2209                .map(|(a, b)| (a.to_string(), b.to_string()))
2210                .collect()
2211        };
2212        let schema = Schema::new_with_metadata(
2213            vec![
2214                Field::new("c1", DataType::Utf8, true)
2215                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2216                Field::new("c2", DataType::Utf8, true)
2217                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2218            ],
2219            HashMap::new(),
2220        );
2221
2222        let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2223        let parquet_bytes = writer.into_inner()?;
2224
2225        let reader =
2226            crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2227        let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2228
2229        // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema
2230        let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2231
2232        let parq_schema_descr = ArrowSchemaConverter::new()
2233            .with_coerce_types(true)
2234            .convert(&arrow_schema)?;
2235        let parq_fields = parq_schema_descr.root_schema().get_fields();
2236        assert_eq!(parq_fields.len(), 2);
2237        assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2238        assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2239
2240        Ok(())
2241    }
2242
2243    #[test]
2244    fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2245        let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2246            .iter()
2247            .cloned()
2248            .collect();
2249
2250        let schema = Schema::new_with_metadata(
2251            vec![
2252                Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2253                Field::new(
2254                    "c22",
2255                    DataType::FixedSizeList(
2256                        Arc::new(Field::new("items", DataType::Boolean, false)),
2257                        5,
2258                    ),
2259                    false,
2260                ),
2261                Field::new_list(
2262                    "c23",
2263                    Field::new_large_list(
2264                        "items",
2265                        Field::new_struct(
2266                            "items",
2267                            vec![
2268                                Field::new("a", DataType::Int16, true),
2269                                Field::new("b", DataType::Float64, false),
2270                            ],
2271                            true,
2272                        ),
2273                        true,
2274                    ),
2275                    true,
2276                ),
2277            ],
2278            metadata,
2279        );
2280
2281        // write to an empty parquet file so that schema is serialized
2282        let file = tempfile::tempfile().unwrap();
2283        let writer =
2284            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2285        writer.close()?;
2286
2287        // read file back
2288        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2289        let read_schema = arrow_reader.schema();
2290        assert_eq!(&schema, read_schema.as_ref());
2291        Ok(())
2292    }
2293
2294    #[test]
2295    fn test_get_arrow_schema_from_metadata() {
2296        assert!(get_arrow_schema_from_metadata("").is_err());
2297    }
2298
2299    #[test]
2300    #[cfg(feature = "arrow_canonical_extension_types")]
2301    fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2302        use arrow_schema::extension::Uuid;
2303        let arrow_schema = Schema::new(vec![
2304            Field::new("uuid", DataType::FixedSizeBinary(16), false).with_extension_type(Uuid),
2305        ]);
2306
2307        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2308
2309        assert_eq!(
2310            parquet_schema.column(0).logical_type_ref(),
2311            Some(&LogicalType::Uuid)
2312        );
2313
2314        let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2315        assert_eq!(arrow_schema.field(0).try_extension_type::<Uuid>()?, Uuid);
2316
2317        Ok(())
2318    }
2319
2320    #[test]
2321    #[cfg(feature = "arrow_canonical_extension_types")]
2322    fn arrow_json_to_parquet_json() -> Result<()> {
2323        use arrow_schema::extension::Json;
2324        let arrow_schema = Schema::new(vec![
2325            Field::new("json", DataType::Utf8, false).with_extension_type(Json::default()),
2326        ]);
2327
2328        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2329
2330        assert_eq!(
2331            parquet_schema.column(0).logical_type_ref(),
2332            Some(&LogicalType::Json)
2333        );
2334
2335        let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2336        assert_eq!(
2337            arrow_schema.field(0).try_extension_type::<Json>()?,
2338            Json::default()
2339        );
2340
2341        Ok(())
2342    }
2343
2344    #[test]
2345    fn test_parquet_to_arrow_field_levels_with_virtual_rejects_non_virtual() {
2346        let message_type = "
2347        message test_schema {
2348            REQUIRED INT32 id;
2349        }
2350        ";
2351        let parquet_schema = Arc::new(parse_message_type(message_type).unwrap());
2352        let descriptor = SchemaDescriptor::new(parquet_schema);
2353
2354        // Try to pass a regular field (not a virtual column)
2355        let regular_field = Arc::new(Field::new("regular_column", DataType::Int64, false));
2356        let result = parquet_to_arrow_field_levels_with_virtual(
2357            &descriptor,
2358            ProjectionMask::all(),
2359            None,
2360            &[regular_field],
2361        );
2362
2363        assert!(result.is_err());
2364        assert!(
2365            result
2366                .unwrap_err()
2367                .to_string()
2368                .contains("is not a virtual column")
2369        );
2370    }
2371}