Skip to main content

parquet/arrow/schema/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Converting Parquet schema <--> Arrow schema: [`ArrowSchemaConverter`] and [parquet_to_arrow_schema]
19
20use base64::Engine;
21use base64::prelude::BASE64_STANDARD;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, TimeUnit};
27
28use crate::basic::{
29    ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
30};
31use crate::errors::{ParquetError, Result};
32use crate::file::{metadata::KeyValue, properties::WriterProperties};
33use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
34
35mod complex;
36mod extension;
37mod primitive;
38pub mod virtual_type;
39
40use super::PARQUET_FIELD_ID_META_KEY;
41use crate::arrow::ProjectionMask;
42use crate::arrow::schema::extension::{
43    has_extension_type, logical_type_for_binary, logical_type_for_binary_view,
44    logical_type_for_fixed_size_binary, logical_type_for_string, logical_type_for_struct,
45    try_add_extension_type,
46};
47pub(crate) use complex::{ParquetField, ParquetFieldType, VirtualColumnType};
48
49/// Convert Parquet schema to Arrow schema including optional metadata
50///
51/// Attempts to decode any existing Arrow schema metadata, falling back
52/// to converting the Parquet schema column-wise
53pub fn parquet_to_arrow_schema(
54    parquet_schema: &SchemaDescriptor,
55    key_value_metadata: Option<&Vec<KeyValue>>,
56) -> Result<Schema> {
57    parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
58}
59
60/// Convert parquet schema to arrow schema including optional metadata,
61/// only preserving some leaf columns.
62pub fn parquet_to_arrow_schema_by_columns(
63    parquet_schema: &SchemaDescriptor,
64    mask: ProjectionMask,
65    key_value_metadata: Option<&Vec<KeyValue>>,
66) -> Result<Schema> {
67    Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata, &[])?.0)
68}
69
70/// Determines the Arrow Schema from a Parquet schema
71///
72/// Looks for an Arrow schema metadata "hint" (see
73/// [`parquet_to_arrow_field_levels`]), and uses it if present to ensure
74/// lossless round trips.
75pub(crate) fn parquet_to_arrow_schema_and_fields(
76    parquet_schema: &SchemaDescriptor,
77    mask: ProjectionMask,
78    key_value_metadata: Option<&Vec<KeyValue>>,
79    virtual_columns: &[FieldRef],
80) -> Result<(Schema, Option<ParquetField>)> {
81    let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
82    let maybe_schema = metadata
83        .remove(super::ARROW_SCHEMA_META_KEY)
84        .map(|value| get_arrow_schema_from_metadata(&value))
85        .transpose()?;
86
87    // Add the Arrow metadata to the Parquet metadata skipping keys that collide
88    if let Some(arrow_schema) = &maybe_schema {
89        arrow_schema.metadata().iter().for_each(|(k, v)| {
90            metadata.entry(k.clone()).or_insert_with(|| v.clone());
91        });
92    }
93
94    let hint = maybe_schema.as_ref().map(|s| s.fields());
95    let field_levels =
96        parquet_to_arrow_field_levels_with_virtual(parquet_schema, mask, hint, virtual_columns)?;
97    let schema = Schema::new_with_metadata(field_levels.fields, metadata);
98    Ok((schema, field_levels.levels))
99}
100
101/// Schema information necessary to decode a parquet file as arrow [`Fields`]
102///
103/// In particular this stores the dremel-level information necessary to correctly
104/// interpret the encoded definition and repetition levels
105///
106/// Note: this is an opaque container intended to be used with lower-level APIs
107/// within this crate
108#[derive(Debug, Clone)]
109pub struct FieldLevels {
110    pub(crate) fields: Fields,
111    pub(crate) levels: Option<ParquetField>,
112}
113
114/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`]
115///
116/// Columns not included within [`ProjectionMask`] will be ignored.
117///
118/// The optional `hint` parameter is the desired Arrow schema. See the
119/// [`arrow`] module documentation for more information.
120///
121/// [`arrow`]: crate::arrow
122///
123/// # Notes:
124/// Where a field type in `hint` is compatible with the corresponding parquet type in `schema`, it
125/// will be used, otherwise the default arrow type for the given parquet column type will be used.
126///
127/// This is to accommodate arrow types that cannot be round-tripped through parquet natively.
128/// Depending on the parquet writer, this can lead to a mismatch between a file's parquet schema
129/// and its embedded arrow schema. The parquet `schema` must be treated as authoritative in such
130/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for more information
131///
132/// Note: this is a low-level API, most users will want to make use of the higher-level
133/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file.
134pub fn parquet_to_arrow_field_levels(
135    schema: &SchemaDescriptor,
136    mask: ProjectionMask,
137    hint: Option<&Fields>,
138) -> Result<FieldLevels> {
139    parquet_to_arrow_field_levels_with_virtual(schema, mask, hint, &[])
140}
141
142/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`] with support for virtual columns
143///
144/// Columns not included within [`ProjectionMask`] will be ignored.
145///
146/// The optional `hint` parameter is the desired Arrow schema. See the
147/// [`arrow`] module documentation for more information.
148///
149/// [`arrow`]: crate::arrow
150///
151/// # Arguments
152/// * `schema` - The Parquet schema descriptor
153/// * `mask` - Projection mask to select which columns to include
154/// * `hint` - Optional hint for Arrow field types to use instead of defaults
155/// * `virtual_columns` - Virtual columns to append to the schema (e.g., row numbers)
156///
157/// # Notes:
158/// Where a field type in `hint` is compatible with the corresponding parquet type in `schema`, it
159/// will be used, otherwise the default arrow type for the given parquet column type will be used.
160///
161/// Virtual columns are columns that don't exist in the Parquet file but are generated during reading.
162/// They must have extension type names starting with "arrow.virtual.".
163///
164/// This is to accommodate arrow types that cannot be round-tripped through parquet natively.
165/// Depending on the parquet writer, this can lead to a mismatch between a file's parquet schema
166/// and its embedded arrow schema. The parquet `schema` must be treated as authoritative in such
167/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for more information
168///
169/// Note: this is a low-level API, most users will want to make use of the higher-level
170/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file.
171pub fn parquet_to_arrow_field_levels_with_virtual(
172    schema: &SchemaDescriptor,
173    mask: ProjectionMask,
174    hint: Option<&Fields>,
175    virtual_columns: &[FieldRef],
176) -> Result<FieldLevels> {
177    // Validate that all fields are virtual columns
178    for field in virtual_columns {
179        if !virtual_type::is_virtual_column(field) {
180            return Err(ParquetError::General(format!(
181                "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
182                field.name()
183            )));
184        }
185    }
186
187    // Convert the regular schema first
188    let mut parquet_field = match complex::convert_schema(schema, mask, hint)? {
189        Some(field) => field,
190        None if virtual_columns.is_empty() => {
191            return Ok(FieldLevels {
192                fields: Fields::empty(),
193                levels: None,
194            });
195        }
196        None => {
197            // No regular fields, but we have virtual columns - create empty root struct
198            ParquetField {
199                rep_level: 0,
200                def_level: 0,
201                nullable: false,
202                arrow_type: DataType::Struct(Fields::empty()),
203                field_type: ParquetFieldType::Group {
204                    children: Vec::new(),
205                },
206            }
207        }
208    };
209
210    // Append virtual columns if any
211    if !virtual_columns.is_empty() {
212        match &mut parquet_field.field_type {
213            ParquetFieldType::Group { children } => {
214                // Get the mutable fields from the struct type
215                let DataType::Struct(ref mut fields) = parquet_field.arrow_type else {
216                    unreachable!("Root field must be a struct");
217                };
218
219                // Convert to mutable Vec to append
220                let mut fields_vec: Vec<FieldRef> = fields.iter().cloned().collect();
221
222                // Append each virtual column
223                for virtual_column in virtual_columns {
224                    // Virtual columns can only be added at the root level
225                    assert_eq!(
226                        parquet_field.rep_level, 0,
227                        "Virtual columns can only be added at rep level 0"
228                    );
229                    assert_eq!(
230                        parquet_field.def_level, 0,
231                        "Virtual columns can only be added at def level 0"
232                    );
233
234                    fields_vec.push(virtual_column.clone());
235                    let virtual_parquet_field = complex::convert_virtual_field(
236                        virtual_column,
237                        parquet_field.rep_level,
238                        parquet_field.def_level,
239                    )?;
240                    children.push(virtual_parquet_field);
241                }
242
243                // Update the fields
244                parquet_field.arrow_type = DataType::Struct(Fields::from(fields_vec));
245            }
246            _ => unreachable!("Root field must be a group"),
247        }
248    }
249
250    match &parquet_field.arrow_type {
251        DataType::Struct(fields) => Ok(FieldLevels {
252            fields: fields.clone(),
253            levels: Some(parquet_field),
254        }),
255        _ => unreachable!(),
256    }
257}
258
259/// Try to convert Arrow schema metadata into a schema
260fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
261    let decoded = BASE64_STANDARD.decode(encoded_meta);
262    match decoded {
263        Ok(bytes) => {
264            let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
265                &bytes[8..]
266            } else {
267                bytes.as_slice()
268            };
269            match arrow_ipc::root_as_message(slice) {
270                Ok(message) => message
271                    .header_as_schema()
272                    .map(arrow_ipc::convert::fb_to_schema)
273                    .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
274                Err(err) => {
275                    // The flatbuffers implementation returns an error on verification error.
276                    Err(arrow_err!(
277                        "Unable to get root as message stored in {}: {:?}",
278                        super::ARROW_SCHEMA_META_KEY,
279                        err
280                    ))
281                }
282            }
283        }
284        Err(err) => {
285            // The C++ implementation returns an error if the schema can't be parsed.
286            Err(arrow_err!(
287                "Unable to decode the encoded schema stored in {}, {:?}",
288                super::ARROW_SCHEMA_META_KEY,
289                err
290            ))
291        }
292    }
293}
294
295/// Encodes the Arrow schema into the IPC format, and base64 encodes it
296pub fn encode_arrow_schema(schema: &Schema) -> String {
297    let options = writer::IpcWriteOptions::default();
298    let mut dictionary_tracker = writer::DictionaryTracker::new(true);
299    let data_gen = writer::IpcDataGenerator::default();
300    let mut serialized_schema =
301        data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
302
303    // manually prepending the length to the schema as arrow uses the legacy IPC format
304    // TODO: change after addressing ARROW-9777
305    let schema_len = serialized_schema.ipc_message.len();
306    let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
307    len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
308    len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
309    len_prefix_schema.append(&mut serialized_schema.ipc_message);
310
311    BASE64_STANDARD.encode(&len_prefix_schema)
312}
313
314/// Mutates writer metadata by storing the encoded Arrow schema hint in
315/// [`ARROW_SCHEMA_META_KEY`].
316///
317/// If there is an existing Arrow schema metadata, it is replaced.
318///
319/// [`ARROW_SCHEMA_META_KEY`]: crate::arrow::ARROW_SCHEMA_META_KEY
320pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
321    let encoded = encode_arrow_schema(schema);
322
323    let schema_kv = KeyValue {
324        key: super::ARROW_SCHEMA_META_KEY.to_string(),
325        value: Some(encoded),
326    };
327
328    let meta = props
329        .key_value_metadata
330        .get_or_insert_with(Default::default);
331
332    // check if ARROW:schema exists, and overwrite it
333    let schema_meta = meta
334        .iter()
335        .enumerate()
336        .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
337    match schema_meta {
338        Some((i, _)) => {
339            meta.remove(i);
340            meta.push(schema_kv);
341        }
342        None => {
343            meta.push(schema_kv);
344        }
345    }
346}
347
348/// Converter for Arrow schema to Parquet schema
349///
350/// See the documentation on the [`arrow`] module for background
351/// information on how Arrow schema is represented in Parquet.
352///
353/// [`arrow`]: crate::arrow
354///
355/// # Example:
356/// ```
357/// # use std::sync::Arc;
358/// # use arrow_schema::{Field, Schema, DataType};
359/// # use parquet::arrow::ArrowSchemaConverter;
360/// use parquet::schema::types::{SchemaDescriptor, Type};
361/// use parquet::basic; // note there are two `Type`s in the following example
362/// // create an Arrow Schema
363/// let arrow_schema = Schema::new(vec![
364///   Field::new("a", DataType::Int64, true),
365///   Field::new("b", DataType::Date32, true),
366/// ]);
367/// // convert the Arrow schema to a Parquet schema
368/// let parquet_schema = ArrowSchemaConverter::new()
369///   .convert(&arrow_schema)
370///   .unwrap();
371///
372/// let expected_parquet_schema = SchemaDescriptor::new(
373///   Arc::new(
374///     Type::group_type_builder("arrow_schema")
375///       .with_fields(vec![
376///         Arc::new(
377///          Type::primitive_type_builder("a", basic::Type::INT64)
378///           .build().unwrap()
379///         ),
380///         Arc::new(
381///          Type::primitive_type_builder("b", basic::Type::INT32)
382///           .with_converted_type(basic::ConvertedType::DATE)
383///           .with_logical_type(Some(basic::LogicalType::Date))
384///           .build().unwrap()
385///         ),
386///      ])
387///      .build().unwrap()
388///   )
389/// );
390/// assert_eq!(parquet_schema, expected_parquet_schema);
391/// ```
392#[derive(Debug)]
393pub struct ArrowSchemaConverter<'a> {
394    /// Name of the root schema in Parquet
395    schema_root: &'a str,
396    /// Should we coerce Arrow types to compatible Parquet types?
397    ///
398    /// See docs on [Self::with_coerce_types]`
399    coerce_types: bool,
400}
401
402impl Default for ArrowSchemaConverter<'_> {
403    fn default() -> Self {
404        Self::new()
405    }
406}
407
408impl<'a> ArrowSchemaConverter<'a> {
409    /// Create a new converter
410    pub fn new() -> Self {
411        Self {
412            schema_root: "arrow_schema",
413            coerce_types: false,
414        }
415    }
416
417    /// Should Arrow types be coerced into Parquet native types (default `false`).
418    ///
419    /// Setting this option to `true` will result in Parquet files that can be
420    /// read by more readers, but may lose precision for Arrow types such as
421    /// [`DataType::Date64`] which have no direct [corresponding Parquet type].
422    ///
423    /// By default, this converter does not coerce to native Parquet types. Enabling type
424    /// coercion allows for meaningful representations that do not require
425    /// downstream readers to consider the embedded Arrow schema, and can allow
426    /// for greater compatibility with other Parquet implementations. However,
427    /// type coercion also prevents data from being losslessly round-tripped.
428    ///
429    /// # Discussion
430    ///
431    /// Some Arrow types such as `Date64`, `Timestamp` and `Interval` have no
432    /// corresponding Parquet logical type. Thus, they can not be losslessly
433    /// round-tripped when stored using the appropriate Parquet logical type.
434    /// For example, some Date64 values may be truncated when stored with
435    /// parquet's native 32 bit date type.
436    ///
437    /// For [`List`] and [`Map`] types, some Parquet readers expect certain
438    /// schema elements to have specific names (earlier versions of the spec
439    /// were somewhat ambiguous on this point). Type coercion will use the names
440    /// prescribed by the Parquet specification, potentially losing naming
441    /// metadata from the Arrow schema.
442    ///
443    /// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
444    /// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
445    /// [corresponding Parquet type]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date
446    ///
447    pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
448        self.coerce_types = coerce_types;
449        self
450    }
451
452    /// Set the root schema element name (defaults to `"arrow_schema"`).
453    pub fn schema_root(mut self, schema_root: &'a str) -> Self {
454        self.schema_root = schema_root;
455        self
456    }
457
458    /// Convert the specified Arrow [`Schema`] to the desired Parquet [`SchemaDescriptor`]
459    ///
460    /// See example in [`ArrowSchemaConverter`]
461    pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
462        let fields = schema
463            .fields()
464            .iter()
465            .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
466            .collect::<Result<_>>()?;
467        let group = Type::group_type_builder(self.schema_root)
468            .with_fields(fields)
469            .build()?;
470        Ok(SchemaDescriptor::new(Arc::new(group)))
471    }
472}
473
474fn parse_key_value_metadata(
475    key_value_metadata: Option<&Vec<KeyValue>>,
476) -> Option<HashMap<String, String>> {
477    match key_value_metadata {
478        Some(key_values) => {
479            let map: HashMap<String, String> = key_values
480                .iter()
481                .filter_map(|kv| {
482                    kv.value
483                        .as_ref()
484                        .map(|value| (kv.key.clone(), value.clone()))
485                })
486                .collect();
487
488            if map.is_empty() { None } else { Some(map) }
489        }
490        None => None,
491    }
492}
493
494/// Convert parquet column schema to arrow field.
495pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
496    let field = complex::convert_type(&parquet_column.self_type_ptr())?;
497    let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
498
499    let parquet_type = parquet_column.self_type();
500    let basic_info = parquet_type.get_basic_info();
501
502    let mut hash_map_size = 0;
503    if basic_info.has_id() {
504        hash_map_size += 1;
505    }
506    if has_extension_type(parquet_type) {
507        hash_map_size += 1;
508    }
509    if hash_map_size == 0 {
510        return Ok(ret);
511    }
512    ret.set_metadata(HashMap::with_capacity(hash_map_size));
513    if basic_info.has_id() {
514        ret.metadata_mut().insert(
515            PARQUET_FIELD_ID_META_KEY.to_string(),
516            basic_info.id().to_string(),
517        );
518    }
519    try_add_extension_type(ret, parquet_column.self_type())
520}
521
522pub fn decimal_length_from_precision(precision: u8) -> usize {
523    // digits = floor(log_10(2^(8*n - 1) - 1))  // definition in parquet's logical types
524    // ceil(digits) = log10(2^(8*n - 1) - 1)
525    // 10^ceil(digits) = 2^(8*n - 1) - 1
526    // 10^ceil(digits) + 1 = 2^(8*n - 1)
527    // log2(10^ceil(digits) + 1) = (8*n - 1)
528    // log2(10^ceil(digits) + 1) + 1 = 8*n
529    // (log2(10^ceil(a) + 1) + 1) / 8 = n
530    (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
531}
532
533/// Convert an arrow field to a parquet `Type`
534fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
535    const PARQUET_LIST_ELEMENT_NAME: &str = "element";
536    const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
537    const PARQUET_KEY_FIELD_NAME: &str = "key";
538    const PARQUET_VALUE_FIELD_NAME: &str = "value";
539
540    let name = field.name().as_str();
541    let repetition = if field.is_nullable() {
542        Repetition::OPTIONAL
543    } else {
544        Repetition::REQUIRED
545    };
546    let id = field_id(field);
547    // create type from field
548    match field.data_type() {
549        DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
550            .with_logical_type(Some(LogicalType::Unknown))
551            .with_repetition(repetition)
552            .with_id(id)
553            .build(),
554        DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
555            .with_repetition(repetition)
556            .with_id(id)
557            .build(),
558        DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
559            .with_logical_type(Some(LogicalType::Integer {
560                bit_width: 8,
561                is_signed: true,
562            }))
563            .with_repetition(repetition)
564            .with_id(id)
565            .build(),
566        DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
567            .with_logical_type(Some(LogicalType::Integer {
568                bit_width: 16,
569                is_signed: true,
570            }))
571            .with_repetition(repetition)
572            .with_id(id)
573            .build(),
574        DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
575            .with_repetition(repetition)
576            .with_id(id)
577            .build(),
578        DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
579            .with_repetition(repetition)
580            .with_id(id)
581            .build(),
582        DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
583            .with_logical_type(Some(LogicalType::Integer {
584                bit_width: 8,
585                is_signed: false,
586            }))
587            .with_repetition(repetition)
588            .with_id(id)
589            .build(),
590        DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
591            .with_logical_type(Some(LogicalType::Integer {
592                bit_width: 16,
593                is_signed: false,
594            }))
595            .with_repetition(repetition)
596            .with_id(id)
597            .build(),
598        DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
599            .with_logical_type(Some(LogicalType::Integer {
600                bit_width: 32,
601                is_signed: false,
602            }))
603            .with_repetition(repetition)
604            .with_id(id)
605            .build(),
606        DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
607            .with_logical_type(Some(LogicalType::Integer {
608                bit_width: 64,
609                is_signed: false,
610            }))
611            .with_repetition(repetition)
612            .with_id(id)
613            .build(),
614        DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
615            .with_repetition(repetition)
616            .with_id(id)
617            .with_logical_type(Some(LogicalType::Float16))
618            .with_length(2)
619            .build(),
620        DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
621            .with_repetition(repetition)
622            .with_id(id)
623            .build(),
624        DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
625            .with_repetition(repetition)
626            .with_id(id)
627            .build(),
628        DataType::Timestamp(TimeUnit::Second, _) => {
629            // Cannot represent seconds in LogicalType
630            Type::primitive_type_builder(name, PhysicalType::INT64)
631                .with_repetition(repetition)
632                .with_id(id)
633                .build()
634        }
635        DataType::Timestamp(time_unit, tz) => {
636            Type::primitive_type_builder(name, PhysicalType::INT64)
637                .with_logical_type(Some(LogicalType::Timestamp {
638                    // If timezone set, values are normalized to UTC timezone
639                    is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
640                    unit: match time_unit {
641                        TimeUnit::Second => unreachable!(),
642                        TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
643                        TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
644                        TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
645                    },
646                }))
647                .with_repetition(repetition)
648                .with_id(id)
649                .build()
650        }
651        DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
652            .with_logical_type(Some(LogicalType::Date))
653            .with_repetition(repetition)
654            .with_id(id)
655            .build(),
656        DataType::Date64 => {
657            if coerce_types {
658                Type::primitive_type_builder(name, PhysicalType::INT32)
659                    .with_logical_type(Some(LogicalType::Date))
660                    .with_repetition(repetition)
661                    .with_id(id)
662                    .build()
663            } else {
664                Type::primitive_type_builder(name, PhysicalType::INT64)
665                    .with_repetition(repetition)
666                    .with_id(id)
667                    .build()
668            }
669        }
670        DataType::Time32(TimeUnit::Second) => {
671            // Cannot represent seconds in LogicalType
672            Type::primitive_type_builder(name, PhysicalType::INT32)
673                .with_repetition(repetition)
674                .with_id(id)
675                .build()
676        }
677        DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
678            .with_logical_type(Some(LogicalType::Time {
679                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
680                unit: match unit {
681                    TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
682                    u => unreachable!("Invalid unit for Time32: {:?}", u),
683                },
684            }))
685            .with_repetition(repetition)
686            .with_id(id)
687            .build(),
688        DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
689            .with_logical_type(Some(LogicalType::Time {
690                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
691                unit: match unit {
692                    TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
693                    TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
694                    u => unreachable!("Invalid unit for Time64: {:?}", u),
695                },
696            }))
697            .with_repetition(repetition)
698            .with_id(id)
699            .build(),
700        DataType::Duration(_) => Type::primitive_type_builder(name, PhysicalType::INT64)
701            .with_repetition(repetition)
702            .with_id(id)
703            .build(),
704        DataType::Interval(_) => {
705            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
706                .with_converted_type(ConvertedType::INTERVAL)
707                .with_repetition(repetition)
708                .with_id(id)
709                .with_length(12)
710                .build()
711        }
712        DataType::Binary | DataType::LargeBinary => {
713            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
714                .with_repetition(repetition)
715                .with_id(id)
716                .with_logical_type(logical_type_for_binary(field))
717                .build()
718        }
719        DataType::FixedSizeBinary(length) => {
720            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
721                .with_repetition(repetition)
722                .with_id(id)
723                .with_length(*length)
724                .with_logical_type(logical_type_for_fixed_size_binary(field))
725                .build()
726        }
727        DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
728            .with_repetition(repetition)
729            .with_id(id)
730            .with_logical_type(logical_type_for_binary_view(field))
731            .build(),
732        DataType::Decimal32(precision, scale)
733        | DataType::Decimal64(precision, scale)
734        | DataType::Decimal128(precision, scale)
735        | DataType::Decimal256(precision, scale) => {
736            // Decimal precision determines the Parquet physical type to use.
737            // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
738            let (physical_type, length) = if *precision > 1 && *precision <= 9 {
739                (PhysicalType::INT32, -1)
740            } else if *precision <= 18 {
741                (PhysicalType::INT64, -1)
742            } else {
743                (
744                    PhysicalType::FIXED_LEN_BYTE_ARRAY,
745                    decimal_length_from_precision(*precision) as i32,
746                )
747            };
748            Type::primitive_type_builder(name, physical_type)
749                .with_repetition(repetition)
750                .with_id(id)
751                .with_length(length)
752                .with_logical_type(Some(LogicalType::Decimal {
753                    scale: *scale as i32,
754                    precision: *precision as i32,
755                }))
756                .with_precision(*precision as i32)
757                .with_scale(*scale as i32)
758                .build()
759        }
760        DataType::Utf8 | DataType::LargeUtf8 => {
761            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
762                .with_logical_type(logical_type_for_string(field))
763                .with_repetition(repetition)
764                .with_id(id)
765                .build()
766        }
767        DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
768            .with_logical_type(logical_type_for_string(field))
769            .with_repetition(repetition)
770            .with_id(id)
771            .build(),
772        DataType::List(f)
773        | DataType::FixedSizeList(f, _)
774        | DataType::LargeList(f)
775        | DataType::ListView(f)
776        | DataType::LargeListView(f) => {
777            let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
778                // Ensure proper naming per the Parquet specification
779                let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
780                Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
781            } else {
782                Arc::new(arrow_to_parquet_type(f, coerce_types)?)
783            };
784
785            Type::group_type_builder(name)
786                .with_fields(vec![Arc::new(
787                    Type::group_type_builder("list")
788                        .with_fields(vec![field_ref])
789                        .with_repetition(Repetition::REPEATED)
790                        .build()?,
791                )])
792                .with_logical_type(Some(LogicalType::List))
793                .with_repetition(repetition)
794                .with_id(id)
795                .build()
796        }
797        DataType::Struct(fields) => {
798            if fields.is_empty() {
799                return Err(arrow_err!("Parquet does not support writing empty structs",));
800            }
801            // recursively convert children to types/nodes
802            let fields = fields
803                .iter()
804                .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
805                .collect::<Result<_>>()?;
806            Type::group_type_builder(name)
807                .with_fields(fields)
808                .with_repetition(repetition)
809                .with_id(id)
810                .with_logical_type(logical_type_for_struct(field))
811                .build()
812        }
813        DataType::Map(field, _) => {
814            if let DataType::Struct(struct_fields) = field.data_type() {
815                // If coercing then set inner struct name to "key_value"
816                let map_struct_name = if coerce_types {
817                    PARQUET_MAP_STRUCT_NAME
818                } else {
819                    field.name()
820                };
821
822                // If coercing then ensure struct fields are named "key" and "value"
823                let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
824                    if coerce_types && fld.name() != name {
825                        let f = fld.as_ref().clone().with_name(name);
826                        Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
827                    } else {
828                        Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
829                    }
830                };
831                let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
832                let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
833
834                Type::group_type_builder(name)
835                    .with_fields(vec![Arc::new(
836                        Type::group_type_builder(map_struct_name)
837                            .with_fields(vec![key_field, val_field])
838                            .with_repetition(Repetition::REPEATED)
839                            .build()?,
840                    )])
841                    .with_logical_type(Some(LogicalType::Map))
842                    .with_repetition(repetition)
843                    .with_id(id)
844                    .build()
845            } else {
846                Err(arrow_err!(
847                    "DataType::Map should contain a struct field child",
848                ))
849            }
850        }
851        DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
852        DataType::Dictionary(_, value) => {
853            // Dictionary encoding not handled at the schema level
854            let dict_field = field.clone().with_data_type(value.as_ref().clone());
855            arrow_to_parquet_type(&dict_field, coerce_types)
856        }
857        DataType::RunEndEncoded(_, _) => Err(arrow_err!(
858            "Converting RunEndEncodedType to parquet not supported",
859        )),
860    }
861}
862
863fn field_id(field: &Field) -> Option<i32> {
864    let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
865    value.parse().ok() // Fail quietly if not a valid integer
866}
867
868#[cfg(test)]
869mod tests {
870    use super::*;
871
872    use std::{collections::HashMap, sync::Arc};
873
874    use crate::arrow::PARQUET_FIELD_ID_META_KEY;
875    use crate::file::metadata::KeyValue;
876    use crate::file::reader::FileReader;
877    use crate::{
878        arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
879        schema::{parser::parse_message_type, types::SchemaDescriptor},
880    };
881    use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
882
883    #[test]
884    fn test_flat_primitives() {
885        let message_type = "
886        message test_schema {
887            REQUIRED BOOLEAN boolean;
888            REQUIRED INT32   int8  (INT_8);
889            REQUIRED INT32   int16 (INT_16);
890            REQUIRED INT32   uint8 (INTEGER(8,false));
891            REQUIRED INT32   uint16 (INTEGER(16,false));
892            REQUIRED INT32   int32;
893            REQUIRED INT64   int64;
894            OPTIONAL DOUBLE  double;
895            OPTIONAL FLOAT   float;
896            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
897            OPTIONAL BINARY  string (UTF8);
898            OPTIONAL BINARY  string_2 (STRING);
899            OPTIONAL BINARY  json (JSON);
900        }
901        ";
902        let parquet_group_type = parse_message_type(message_type).unwrap();
903
904        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
905        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
906
907        let arrow_fields = Fields::from(vec![
908            Field::new("boolean", DataType::Boolean, false),
909            Field::new("int8", DataType::Int8, false),
910            Field::new("int16", DataType::Int16, false),
911            Field::new("uint8", DataType::UInt8, false),
912            Field::new("uint16", DataType::UInt16, false),
913            Field::new("int32", DataType::Int32, false),
914            Field::new("int64", DataType::Int64, false),
915            Field::new("double", DataType::Float64, true),
916            Field::new("float", DataType::Float32, true),
917            Field::new("float16", DataType::Float16, true),
918            Field::new("string", DataType::Utf8, true),
919            Field::new("string_2", DataType::Utf8, true),
920            json_field(),
921        ]);
922
923        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
924    }
925
926    /// Return the expected Field for a Parquet column annotated with
927    /// the JSON logical type.
928    fn json_field() -> Field {
929        #[cfg(feature = "arrow_canonical_extension_types")]
930        {
931            Field::new("json", DataType::Utf8, true)
932                .with_extension_type(arrow_schema::extension::Json::default())
933        }
934        #[cfg(not(feature = "arrow_canonical_extension_types"))]
935        {
936            Field::new("json", DataType::Utf8, true)
937        }
938    }
939
940    #[test]
941    fn test_decimal_fields() {
942        let message_type = "
943        message test_schema {
944                    REQUIRED INT32 decimal1 (DECIMAL(4,2));
945                    REQUIRED INT64 decimal2 (DECIMAL(12,2));
946                    REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
947                    REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
948                    REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
949                    REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
950                    REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
951        }
952        ";
953
954        let parquet_group_type = parse_message_type(message_type).unwrap();
955
956        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
957        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
958
959        let arrow_fields = Fields::from(vec![
960            Field::new("decimal1", DataType::Decimal128(4, 2), false),
961            Field::new("decimal2", DataType::Decimal128(12, 2), false),
962            Field::new("decimal3", DataType::Decimal128(30, 2), false),
963            Field::new("decimal4", DataType::Decimal128(33, 2), false),
964            Field::new("decimal5", DataType::Decimal128(38, 2), false),
965            Field::new("decimal6", DataType::Decimal256(39, 2), false),
966            Field::new("decimal7", DataType::Decimal256(39, 2), false),
967        ]);
968        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
969    }
970
971    #[test]
972    fn test_byte_array_fields() {
973        let message_type = "
974        message test_schema {
975            REQUIRED BYTE_ARRAY binary;
976            REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
977        }
978        ";
979
980        let parquet_group_type = parse_message_type(message_type).unwrap();
981
982        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
983        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
984
985        let arrow_fields = Fields::from(vec![
986            Field::new("binary", DataType::Binary, false),
987            Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
988        ]);
989        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
990    }
991
992    #[test]
993    fn test_duplicate_fields() {
994        let message_type = "
995        message test_schema {
996            REQUIRED BOOLEAN boolean;
997            REQUIRED INT32 int8 (INT_8);
998        }
999        ";
1000
1001        let parquet_group_type = parse_message_type(message_type).unwrap();
1002
1003        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1004        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1005
1006        let arrow_fields = Fields::from(vec![
1007            Field::new("boolean", DataType::Boolean, false),
1008            Field::new("int8", DataType::Int8, false),
1009        ]);
1010        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
1011
1012        let converted_arrow_schema =
1013            parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
1014                .unwrap();
1015        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
1016    }
1017
1018    #[test]
1019    fn test_parquet_lists() {
1020        let mut arrow_fields = Vec::new();
1021
1022        // LIST encoding example taken from parquet-format/LogicalTypes.md
1023        let message_type = "
1024        message test_schema {
1025          REQUIRED GROUP my_list (LIST) {
1026            REPEATED GROUP list {
1027              OPTIONAL BINARY element (UTF8);
1028            }
1029          }
1030          OPTIONAL GROUP my_list (LIST) {
1031            REPEATED GROUP list {
1032              REQUIRED BINARY element (UTF8);
1033            }
1034          }
1035          OPTIONAL GROUP array_of_arrays (LIST) {
1036            REPEATED GROUP list {
1037              REQUIRED GROUP element (LIST) {
1038                REPEATED GROUP list {
1039                  REQUIRED INT32 element;
1040                }
1041              }
1042            }
1043          }
1044          OPTIONAL GROUP my_list (LIST) {
1045            REPEATED GROUP element {
1046              REQUIRED BINARY str (UTF8);
1047            }
1048          }
1049          OPTIONAL GROUP my_list (LIST) {
1050            REPEATED INT32 element;
1051          }
1052          OPTIONAL GROUP my_list (LIST) {
1053            REPEATED GROUP element {
1054              REQUIRED BINARY str (UTF8);
1055              REQUIRED INT32 num;
1056            }
1057          }
1058          OPTIONAL GROUP my_list (LIST) {
1059            REPEATED GROUP array {
1060              REQUIRED BINARY str (UTF8);
1061            }
1062
1063          }
1064          OPTIONAL GROUP my_list (LIST) {
1065            REPEATED GROUP my_list_tuple {
1066              REQUIRED BINARY str (UTF8);
1067            }
1068          }
1069          REPEATED INT32 name;
1070        }
1071        ";
1072
1073        // // List<String> (list non-null, elements nullable)
1074        // required group my_list (LIST) {
1075        //   repeated group list {
1076        //     optional binary element (UTF8);
1077        //   }
1078        // }
1079        {
1080            arrow_fields.push(Field::new_list(
1081                "my_list",
1082                Field::new("element", DataType::Utf8, true),
1083                false,
1084            ));
1085        }
1086
1087        // // List<String> (list nullable, elements non-null)
1088        // optional group my_list (LIST) {
1089        //   repeated group list {
1090        //     required binary element (UTF8);
1091        //   }
1092        // }
1093        {
1094            arrow_fields.push(Field::new_list(
1095                "my_list",
1096                Field::new("element", DataType::Utf8, false),
1097                true,
1098            ));
1099        }
1100
1101        // Element types can be nested structures. For example, a list of lists:
1102        //
1103        // // List<List<Integer>>
1104        // optional group array_of_arrays (LIST) {
1105        //   repeated group list {
1106        //     required group element (LIST) {
1107        //       repeated group list {
1108        //         required int32 element;
1109        //       }
1110        //     }
1111        //   }
1112        // }
1113        {
1114            let arrow_inner_list = Field::new("element", DataType::Int32, false);
1115            arrow_fields.push(Field::new_list(
1116                "array_of_arrays",
1117                Field::new_list("element", arrow_inner_list, false),
1118                true,
1119            ));
1120        }
1121
1122        // // List<String> (list nullable, elements non-null)
1123        // optional group my_list (LIST) {
1124        //   repeated group element {
1125        //     required binary str (UTF8);
1126        //   };
1127        // }
1128        {
1129            arrow_fields.push(Field::new_list(
1130                "my_list",
1131                Field::new("str", DataType::Utf8, false),
1132                true,
1133            ));
1134        }
1135
1136        // // List<Integer> (nullable list, non-null elements)
1137        // optional group my_list (LIST) {
1138        //   repeated int32 element;
1139        // }
1140        {
1141            arrow_fields.push(Field::new_list(
1142                "my_list",
1143                Field::new("element", DataType::Int32, false),
1144                true,
1145            ));
1146        }
1147
1148        // // List<Tuple<String, Integer>> (nullable list, non-null elements)
1149        // optional group my_list (LIST) {
1150        //   repeated group element {
1151        //     required binary str (UTF8);
1152        //     required int32 num;
1153        //   };
1154        // }
1155        {
1156            let fields = vec![
1157                Field::new("str", DataType::Utf8, false),
1158                Field::new("num", DataType::Int32, false),
1159            ];
1160            arrow_fields.push(Field::new_list(
1161                "my_list",
1162                Field::new_struct("element", fields, false),
1163                true,
1164            ));
1165        }
1166
1167        // // List<OneTuple<String>> (nullable list, non-null elements)
1168        // optional group my_list (LIST) {
1169        //   repeated group array {
1170        //     required binary str (UTF8);
1171        //   };
1172        // }
1173        // Special case: group is named array
1174        {
1175            let fields = vec![Field::new("str", DataType::Utf8, false)];
1176            arrow_fields.push(Field::new_list(
1177                "my_list",
1178                Field::new_struct("array", fields, false),
1179                true,
1180            ));
1181        }
1182
1183        // // List<OneTuple<String>> (nullable list, non-null elements)
1184        // optional group my_list (LIST) {
1185        //   repeated group my_list_tuple {
1186        //     required binary str (UTF8);
1187        //   };
1188        // }
1189        // Special case: group named ends in _tuple
1190        {
1191            let fields = vec![Field::new("str", DataType::Utf8, false)];
1192            arrow_fields.push(Field::new_list(
1193                "my_list",
1194                Field::new_struct("my_list_tuple", fields, false),
1195                true,
1196            ));
1197        }
1198
1199        // One-level encoding: Only allows required lists with required cells
1200        //   repeated value_type name
1201        {
1202            arrow_fields.push(Field::new_list(
1203                "name",
1204                Field::new("name", DataType::Int32, false),
1205                false,
1206            ));
1207        }
1208
1209        let parquet_group_type = parse_message_type(message_type).unwrap();
1210
1211        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1212        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1213        let converted_fields = converted_arrow_schema.fields();
1214
1215        assert_eq!(arrow_fields.len(), converted_fields.len());
1216        for i in 0..arrow_fields.len() {
1217            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1218        }
1219    }
1220
1221    #[test]
1222    fn test_parquet_list_nullable() {
1223        let mut arrow_fields = Vec::new();
1224
1225        let message_type = "
1226        message test_schema {
1227          REQUIRED GROUP my_list1 (LIST) {
1228            REPEATED GROUP list {
1229              OPTIONAL BINARY element (UTF8);
1230            }
1231          }
1232          OPTIONAL GROUP my_list2 (LIST) {
1233            REPEATED GROUP list {
1234              REQUIRED BINARY element (UTF8);
1235            }
1236          }
1237          REQUIRED GROUP my_list3 (LIST) {
1238            REPEATED GROUP list {
1239              REQUIRED BINARY element (UTF8);
1240            }
1241          }
1242        }
1243        ";
1244
1245        // // List<String> (list non-null, elements nullable)
1246        // required group my_list1 (LIST) {
1247        //   repeated group list {
1248        //     optional binary element (UTF8);
1249        //   }
1250        // }
1251        {
1252            arrow_fields.push(Field::new_list(
1253                "my_list1",
1254                Field::new("element", DataType::Utf8, true),
1255                false,
1256            ));
1257        }
1258
1259        // // List<String> (list nullable, elements non-null)
1260        // optional group my_list2 (LIST) {
1261        //   repeated group list {
1262        //     required binary element (UTF8);
1263        //   }
1264        // }
1265        {
1266            arrow_fields.push(Field::new_list(
1267                "my_list2",
1268                Field::new("element", DataType::Utf8, false),
1269                true,
1270            ));
1271        }
1272
1273        // // List<String> (list non-null, elements non-null)
1274        // repeated group my_list3 (LIST) {
1275        //   repeated group list {
1276        //     required binary element (UTF8);
1277        //   }
1278        // }
1279        {
1280            arrow_fields.push(Field::new_list(
1281                "my_list3",
1282                Field::new("element", DataType::Utf8, false),
1283                false,
1284            ));
1285        }
1286
1287        let parquet_group_type = parse_message_type(message_type).unwrap();
1288
1289        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1290        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1291        let converted_fields = converted_arrow_schema.fields();
1292
1293        assert_eq!(arrow_fields.len(), converted_fields.len());
1294        for i in 0..arrow_fields.len() {
1295            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1296        }
1297    }
1298
1299    #[test]
1300    fn test_parquet_maps() {
1301        let mut arrow_fields = Vec::new();
1302
1303        // LIST encoding example taken from parquet-format/LogicalTypes.md
1304        let message_type = "
1305        message test_schema {
1306          REQUIRED group my_map1 (MAP) {
1307            REPEATED group key_value {
1308              REQUIRED binary key (UTF8);
1309              OPTIONAL int32 value;
1310            }
1311          }
1312          OPTIONAL group my_map2 (MAP) {
1313            REPEATED group map {
1314              REQUIRED binary str (UTF8);
1315              REQUIRED int32 num;
1316            }
1317          }
1318          OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1319            REPEATED group map {
1320              REQUIRED binary key (UTF8);
1321              OPTIONAL int32 value;
1322            }
1323          }
1324          REQUIRED group my_map4 (MAP) {
1325            REPEATED group map {
1326              OPTIONAL binary key (UTF8);
1327              REQUIRED int32 value;
1328            }
1329          }
1330        }
1331        ";
1332
1333        // // Map<String, Integer>
1334        // required group my_map (MAP) {
1335        //   repeated group key_value {
1336        //     required binary key (UTF8);
1337        //     optional int32 value;
1338        //   }
1339        // }
1340        {
1341            arrow_fields.push(Field::new_map(
1342                "my_map1",
1343                "key_value",
1344                Field::new("key", DataType::Utf8, false),
1345                Field::new("value", DataType::Int32, true),
1346                false,
1347                false,
1348            ));
1349        }
1350
1351        // // Map<String, Integer> (nullable map, non-null values)
1352        // optional group my_map (MAP) {
1353        //   repeated group map {
1354        //     required binary str (UTF8);
1355        //     required int32 num;
1356        //   }
1357        // }
1358        {
1359            arrow_fields.push(Field::new_map(
1360                "my_map2",
1361                "map",
1362                Field::new("str", DataType::Utf8, false),
1363                Field::new("num", DataType::Int32, false),
1364                false,
1365                true,
1366            ));
1367        }
1368
1369        // // Map<String, Integer> (nullable map, nullable values)
1370        // optional group my_map (MAP_KEY_VALUE) {
1371        //   repeated group map {
1372        //     required binary key (UTF8);
1373        //     optional int32 value;
1374        //   }
1375        // }
1376        {
1377            arrow_fields.push(Field::new_map(
1378                "my_map3",
1379                "map",
1380                Field::new("key", DataType::Utf8, false),
1381                Field::new("value", DataType::Int32, true),
1382                false,
1383                true,
1384            ));
1385        }
1386
1387        // // Map<String, Integer> (non-compliant nullable key)
1388        // group my_map (MAP_KEY_VALUE) {
1389        //   repeated group map {
1390        //     optional binary key (UTF8);
1391        //     required int32 value;
1392        //   }
1393        // }
1394        {
1395            arrow_fields.push(Field::new_map(
1396                "my_map4",
1397                "map",
1398                Field::new("key", DataType::Utf8, false), // The key is always non-nullable (#5630)
1399                Field::new("value", DataType::Int32, false),
1400                false,
1401                false,
1402            ));
1403        }
1404
1405        let parquet_group_type = parse_message_type(message_type).unwrap();
1406
1407        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1408        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1409        let converted_fields = converted_arrow_schema.fields();
1410
1411        assert_eq!(arrow_fields.len(), converted_fields.len());
1412        for i in 0..arrow_fields.len() {
1413            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1414        }
1415    }
1416
1417    #[test]
1418    fn test_nested_schema() {
1419        let mut arrow_fields = Vec::new();
1420        {
1421            let group1_fields = Fields::from(vec![
1422                Field::new("leaf1", DataType::Boolean, false),
1423                Field::new("leaf2", DataType::Int32, false),
1424            ]);
1425            let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1426            arrow_fields.push(group1_struct);
1427
1428            let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1429            arrow_fields.push(leaf3_field);
1430        }
1431
1432        let message_type = "
1433        message test_schema {
1434          REQUIRED GROUP group1 {
1435            REQUIRED BOOLEAN leaf1;
1436            REQUIRED INT32 leaf2;
1437          }
1438          REQUIRED INT64 leaf3;
1439        }
1440        ";
1441        let parquet_group_type = parse_message_type(message_type).unwrap();
1442
1443        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1444        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1445        let converted_fields = converted_arrow_schema.fields();
1446
1447        assert_eq!(arrow_fields.len(), converted_fields.len());
1448        for i in 0..arrow_fields.len() {
1449            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1450        }
1451    }
1452
1453    #[test]
1454    fn test_nested_schema_partial() {
1455        let mut arrow_fields = Vec::new();
1456        {
1457            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1458            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1459            arrow_fields.push(group1);
1460
1461            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1462            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1463            arrow_fields.push(group2);
1464
1465            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1466        }
1467
1468        let message_type = "
1469        message test_schema {
1470          REQUIRED GROUP group1 {
1471            REQUIRED INT64 leaf1;
1472            REQUIRED INT64 leaf2;
1473          }
1474          REQUIRED  GROUP group2 {
1475            REQUIRED INT64 leaf3;
1476            REQUIRED INT64 leaf4;
1477          }
1478          REQUIRED INT64 leaf5;
1479        }
1480        ";
1481        let parquet_group_type = parse_message_type(message_type).unwrap();
1482
1483        // Expected partial arrow schema (columns 0, 3, 4):
1484        // required group group1 {
1485        //   required int64 leaf1;
1486        // }
1487        // required group group2 {
1488        //   required int64 leaf4;
1489        // }
1490        // required int64 leaf5;
1491
1492        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1493        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1494        let converted_arrow_schema =
1495            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1496        let converted_fields = converted_arrow_schema.fields();
1497
1498        assert_eq!(arrow_fields.len(), converted_fields.len());
1499        for i in 0..arrow_fields.len() {
1500            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1501        }
1502    }
1503
1504    #[test]
1505    fn test_nested_schema_partial_ordering() {
1506        let mut arrow_fields = Vec::new();
1507        {
1508            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1509            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1510            arrow_fields.push(group1);
1511
1512            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1513            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1514            arrow_fields.push(group2);
1515
1516            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1517        }
1518
1519        let message_type = "
1520        message test_schema {
1521          REQUIRED GROUP group1 {
1522            REQUIRED INT64 leaf1;
1523            REQUIRED INT64 leaf2;
1524          }
1525          REQUIRED  GROUP group2 {
1526            REQUIRED INT64 leaf3;
1527            REQUIRED INT64 leaf4;
1528          }
1529          REQUIRED INT64 leaf5;
1530        }
1531        ";
1532        let parquet_group_type = parse_message_type(message_type).unwrap();
1533
1534        // Expected partial arrow schema (columns 3, 4, 0):
1535        // required group group1 {
1536        //   required int64 leaf1;
1537        // }
1538        // required group group2 {
1539        //   required int64 leaf4;
1540        // }
1541        // required int64 leaf5;
1542
1543        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1544        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1545        let converted_arrow_schema =
1546            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1547        let converted_fields = converted_arrow_schema.fields();
1548
1549        assert_eq!(arrow_fields.len(), converted_fields.len());
1550        for i in 0..arrow_fields.len() {
1551            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1552        }
1553
1554        let mask =
1555            ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1556        let converted_arrow_schema =
1557            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1558        let converted_fields = converted_arrow_schema.fields();
1559
1560        assert_eq!(arrow_fields.len(), converted_fields.len());
1561        for i in 0..arrow_fields.len() {
1562            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1563        }
1564    }
1565
1566    #[test]
1567    fn test_repeated_nested_schema() {
1568        let mut arrow_fields = Vec::new();
1569        {
1570            arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1571
1572            let inner_group_list = Field::new_list(
1573                "innerGroup",
1574                Field::new_struct(
1575                    "innerGroup",
1576                    vec![Field::new("leaf3", DataType::Int32, true)],
1577                    false,
1578                ),
1579                false,
1580            );
1581
1582            let outer_group_list = Field::new_list(
1583                "outerGroup",
1584                Field::new_struct(
1585                    "outerGroup",
1586                    vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1587                    false,
1588                ),
1589                false,
1590            );
1591            arrow_fields.push(outer_group_list);
1592        }
1593
1594        let message_type = "
1595        message test_schema {
1596          OPTIONAL INT32 leaf1;
1597          REPEATED GROUP outerGroup {
1598            OPTIONAL INT32 leaf2;
1599            REPEATED GROUP innerGroup {
1600              OPTIONAL INT32 leaf3;
1601            }
1602          }
1603        }
1604        ";
1605        let parquet_group_type = parse_message_type(message_type).unwrap();
1606
1607        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1608        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1609        let converted_fields = converted_arrow_schema.fields();
1610
1611        assert_eq!(arrow_fields.len(), converted_fields.len());
1612        for i in 0..arrow_fields.len() {
1613            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1614        }
1615    }
1616
1617    #[test]
1618    fn test_column_desc_to_field() {
1619        let message_type = "
1620        message test_schema {
1621            REQUIRED BOOLEAN boolean;
1622            REQUIRED INT32   int8  (INT_8);
1623            REQUIRED INT32   uint8 (INTEGER(8,false));
1624            REQUIRED INT32   int16 (INT_16);
1625            REQUIRED INT32   uint16 (INTEGER(16,false));
1626            REQUIRED INT32   int32;
1627            REQUIRED INT64   int64;
1628            OPTIONAL DOUBLE  double;
1629            OPTIONAL FLOAT   float;
1630            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1631            OPTIONAL BINARY  string (UTF8);
1632            REPEATED BOOLEAN bools;
1633            OPTIONAL INT32   date       (DATE);
1634            OPTIONAL INT32   time_milli (TIME_MILLIS);
1635            OPTIONAL INT64   time_micro (TIME_MICROS);
1636            OPTIONAL INT64   time_nano (TIME(NANOS,false));
1637            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1638            REQUIRED INT64   ts_micro (TIMESTAMP_MICROS);
1639            REQUIRED INT64   ts_nano (TIMESTAMP(NANOS,true));
1640            REPEATED INT32   int_list;
1641            REPEATED BINARY  byte_list;
1642            REPEATED BINARY  string_list (UTF8);
1643            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1644            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1645            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1646        }
1647        ";
1648        let parquet_group_type = parse_message_type(message_type).unwrap();
1649
1650        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1651        let converted_arrow_fields = parquet_schema
1652            .columns()
1653            .iter()
1654            .map(|c| parquet_to_arrow_field(c).unwrap())
1655            .collect::<Vec<Field>>();
1656
1657        let arrow_fields = vec![
1658            Field::new("boolean", DataType::Boolean, false),
1659            Field::new("int8", DataType::Int8, false),
1660            Field::new("uint8", DataType::UInt8, false),
1661            Field::new("int16", DataType::Int16, false),
1662            Field::new("uint16", DataType::UInt16, false),
1663            Field::new("int32", DataType::Int32, false),
1664            Field::new("int64", DataType::Int64, false),
1665            Field::new("double", DataType::Float64, true),
1666            Field::new("float", DataType::Float32, true),
1667            Field::new("float16", DataType::Float16, true),
1668            Field::new("string", DataType::Utf8, true),
1669            Field::new_list(
1670                "bools",
1671                Field::new("bools", DataType::Boolean, false),
1672                false,
1673            ),
1674            Field::new("date", DataType::Date32, true),
1675            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1676            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1677            Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1678            Field::new(
1679                "ts_milli",
1680                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1681                true,
1682            ),
1683            Field::new(
1684                "ts_micro",
1685                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1686                false,
1687            ),
1688            Field::new(
1689                "ts_nano",
1690                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1691                false,
1692            ),
1693            Field::new_list(
1694                "int_list",
1695                Field::new("int_list", DataType::Int32, false),
1696                false,
1697            ),
1698            Field::new_list(
1699                "byte_list",
1700                Field::new("byte_list", DataType::Binary, false),
1701                false,
1702            ),
1703            Field::new_list(
1704                "string_list",
1705                Field::new("string_list", DataType::Utf8, false),
1706                false,
1707            ),
1708            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1709            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1710            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1711        ];
1712
1713        assert_eq!(arrow_fields, converted_arrow_fields);
1714    }
1715
1716    #[test]
1717    fn test_coerced_map_list() {
1718        // Create Arrow schema with non-Parquet naming
1719        let arrow_fields = vec![
1720            Field::new_list(
1721                "my_list",
1722                Field::new("item", DataType::Boolean, true),
1723                false,
1724            ),
1725            Field::new_map(
1726                "my_map",
1727                "entries",
1728                Field::new("keys", DataType::Utf8, false),
1729                Field::new("values", DataType::Int32, true),
1730                false,
1731                true,
1732            ),
1733        ];
1734        let arrow_schema = Schema::new(arrow_fields);
1735
1736        // Create Parquet schema with coerced names
1737        let message_type = "
1738        message parquet_schema {
1739            REQUIRED GROUP my_list (LIST) {
1740                REPEATED GROUP list {
1741                    OPTIONAL BOOLEAN element;
1742                }
1743            }
1744            OPTIONAL GROUP my_map (MAP) {
1745                REPEATED GROUP key_value {
1746                    REQUIRED BINARY key (STRING);
1747                    OPTIONAL INT32 value;
1748                }
1749            }
1750        }
1751        ";
1752        let parquet_group_type = parse_message_type(message_type).unwrap();
1753        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1754        let converted_arrow_schema = ArrowSchemaConverter::new()
1755            .with_coerce_types(true)
1756            .convert(&arrow_schema)
1757            .unwrap();
1758        assert_eq!(
1759            parquet_schema.columns().len(),
1760            converted_arrow_schema.columns().len()
1761        );
1762
1763        // Create Parquet schema without coerced names
1764        let message_type = "
1765        message parquet_schema {
1766            REQUIRED GROUP my_list (LIST) {
1767                REPEATED GROUP list {
1768                    OPTIONAL BOOLEAN item;
1769                }
1770            }
1771            OPTIONAL GROUP my_map (MAP) {
1772                REPEATED GROUP entries {
1773                    REQUIRED BINARY keys (STRING);
1774                    OPTIONAL INT32 values;
1775                }
1776            }
1777        }
1778        ";
1779        let parquet_group_type = parse_message_type(message_type).unwrap();
1780        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1781        let converted_arrow_schema = ArrowSchemaConverter::new()
1782            .with_coerce_types(false)
1783            .convert(&arrow_schema)
1784            .unwrap();
1785        assert_eq!(
1786            parquet_schema.columns().len(),
1787            converted_arrow_schema.columns().len()
1788        );
1789    }
1790
1791    #[test]
1792    fn test_field_to_column_desc() {
1793        let message_type = "
1794        message arrow_schema {
1795            REQUIRED BOOLEAN boolean;
1796            REQUIRED INT32   int8  (INT_8);
1797            REQUIRED INT32   int16 (INTEGER(16,true));
1798            REQUIRED INT32   int32;
1799            REQUIRED INT64   int64;
1800            OPTIONAL DOUBLE  double;
1801            OPTIONAL FLOAT   float;
1802            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1803            OPTIONAL BINARY  string (STRING);
1804            OPTIONAL GROUP   bools (LIST) {
1805                REPEATED GROUP list {
1806                    OPTIONAL BOOLEAN element;
1807                }
1808            }
1809            REQUIRED GROUP   bools_non_null (LIST) {
1810                REPEATED GROUP list {
1811                    REQUIRED BOOLEAN element;
1812                }
1813            }
1814            OPTIONAL INT32   date       (DATE);
1815            OPTIONAL INT32   time_milli (TIME(MILLIS,false));
1816            OPTIONAL INT32   time_milli_utc (TIME(MILLIS,true));
1817            OPTIONAL INT64   time_micro (TIME_MICROS);
1818            OPTIONAL INT64   time_micro_utc (TIME(MICROS, true));
1819            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1820            REQUIRED INT64   ts_micro (TIMESTAMP(MICROS,false));
1821            REQUIRED INT64   ts_seconds;
1822            REQUIRED INT64   ts_micro_utc (TIMESTAMP(MICROS, true));
1823            REQUIRED INT64   ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1824            REQUIRED INT64   ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1825            REQUIRED INT64   ts_micro_non_utc (TIMESTAMP(MICROS, true));
1826            REQUIRED GROUP struct {
1827                REQUIRED BOOLEAN bools;
1828                REQUIRED INT32 uint32 (INTEGER(32,false));
1829                REQUIRED GROUP   int32 (LIST) {
1830                    REPEATED GROUP list {
1831                        OPTIONAL INT32 element;
1832                    }
1833                }
1834            }
1835            REQUIRED BINARY  dictionary_strings (STRING);
1836            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1837            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1838            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1839            REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1840            REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1841        }
1842        ";
1843        let parquet_group_type = parse_message_type(message_type).unwrap();
1844
1845        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1846
1847        let arrow_fields = vec![
1848            Field::new("boolean", DataType::Boolean, false),
1849            Field::new("int8", DataType::Int8, false),
1850            Field::new("int16", DataType::Int16, false),
1851            Field::new("int32", DataType::Int32, false),
1852            Field::new("int64", DataType::Int64, false),
1853            Field::new("double", DataType::Float64, true),
1854            Field::new("float", DataType::Float32, true),
1855            Field::new("float16", DataType::Float16, true),
1856            Field::new("string", DataType::Utf8, true),
1857            Field::new_list(
1858                "bools",
1859                Field::new("element", DataType::Boolean, true),
1860                true,
1861            ),
1862            Field::new_list(
1863                "bools_non_null",
1864                Field::new("element", DataType::Boolean, false),
1865                false,
1866            ),
1867            Field::new("date", DataType::Date32, true),
1868            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1869            Field::new(
1870                "time_milli_utc",
1871                DataType::Time32(TimeUnit::Millisecond),
1872                true,
1873            )
1874            .with_metadata(HashMap::from_iter(vec![(
1875                "adjusted_to_utc".to_string(),
1876                "".to_string(),
1877            )])),
1878            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1879            Field::new(
1880                "time_micro_utc",
1881                DataType::Time64(TimeUnit::Microsecond),
1882                true,
1883            )
1884            .with_metadata(HashMap::from_iter(vec![(
1885                "adjusted_to_utc".to_string(),
1886                "".to_string(),
1887            )])),
1888            Field::new(
1889                "ts_milli",
1890                DataType::Timestamp(TimeUnit::Millisecond, None),
1891                true,
1892            ),
1893            Field::new(
1894                "ts_micro",
1895                DataType::Timestamp(TimeUnit::Microsecond, None),
1896                false,
1897            ),
1898            Field::new(
1899                "ts_seconds",
1900                DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1901                false,
1902            ),
1903            Field::new(
1904                "ts_micro_utc",
1905                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1906                false,
1907            ),
1908            Field::new(
1909                "ts_millis_zero_offset",
1910                DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1911                false,
1912            ),
1913            Field::new(
1914                "ts_millis_zero_negative_offset",
1915                DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1916                false,
1917            ),
1918            Field::new(
1919                "ts_micro_non_utc",
1920                DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1921                false,
1922            ),
1923            Field::new_struct(
1924                "struct",
1925                vec![
1926                    Field::new("bools", DataType::Boolean, false),
1927                    Field::new("uint32", DataType::UInt32, false),
1928                    Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1929                ],
1930                false,
1931            ),
1932            Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1933            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1934            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1935            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1936            Field::new("decimal128", DataType::Decimal128(38, 2), false),
1937            Field::new("decimal256", DataType::Decimal256(39, 2), false),
1938        ];
1939        let arrow_schema = Schema::new(arrow_fields);
1940        let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1941
1942        assert_eq!(
1943            parquet_schema.columns().len(),
1944            converted_arrow_schema.columns().len()
1945        );
1946        parquet_schema
1947            .columns()
1948            .iter()
1949            .zip(converted_arrow_schema.columns())
1950            .for_each(|(a, b)| {
1951                // Only check logical type if it's set on the Parquet side.
1952                // This is because the Arrow conversion always sets logical type,
1953                // even if there wasn't originally one.
1954                // This is not an issue, but is an inconvenience for this test.
1955                match a.logical_type_ref() {
1956                    Some(_) => {
1957                        assert_eq!(a, b)
1958                    }
1959                    None => {
1960                        assert_eq!(a.name(), b.name());
1961                        assert_eq!(a.physical_type(), b.physical_type());
1962                        assert_eq!(a.converted_type(), b.converted_type());
1963                    }
1964                };
1965            });
1966    }
1967
1968    #[test]
1969    #[should_panic(expected = "Parquet does not support writing empty structs")]
1970    fn test_empty_struct_field() {
1971        let arrow_fields = vec![Field::new(
1972            "struct",
1973            DataType::Struct(Fields::empty()),
1974            false,
1975        )];
1976        let arrow_schema = Schema::new(arrow_fields);
1977        let converted_arrow_schema = ArrowSchemaConverter::new()
1978            .with_coerce_types(true)
1979            .convert(&arrow_schema);
1980
1981        converted_arrow_schema.unwrap();
1982    }
1983
1984    #[test]
1985    fn test_metadata() {
1986        let message_type = "
1987        message test_schema {
1988            OPTIONAL BINARY  string (STRING);
1989        }
1990        ";
1991        let parquet_group_type = parse_message_type(message_type).unwrap();
1992
1993        let key_value_metadata = vec![
1994            KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
1995            KeyValue::new("baz".to_owned(), None),
1996        ];
1997
1998        let mut expected_metadata: HashMap<String, String> = HashMap::new();
1999        expected_metadata.insert("foo".to_owned(), "bar".to_owned());
2000
2001        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
2002        let converted_arrow_schema =
2003            parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
2004
2005        assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
2006    }
2007
2008    #[test]
2009    fn test_arrow_schema_roundtrip() -> Result<()> {
2010        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2011            a.iter()
2012                .map(|(a, b)| (a.to_string(), b.to_string()))
2013                .collect()
2014        };
2015
2016        let schema = Schema::new_with_metadata(
2017            vec![
2018                Field::new("c1", DataType::Utf8, false)
2019                    .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
2020                Field::new("c2", DataType::Binary, false),
2021                Field::new("c3", DataType::FixedSizeBinary(3), false),
2022                Field::new("c4", DataType::Boolean, false),
2023                Field::new("c5", DataType::Date32, false),
2024                Field::new("c6", DataType::Date64, false),
2025                Field::new("c7", DataType::Time32(TimeUnit::Second), false),
2026                Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
2027                Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
2028                Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
2029                Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
2030                Field::new(
2031                    "c16",
2032                    DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
2033                    false,
2034                ),
2035                Field::new(
2036                    "c17",
2037                    DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
2038                    false,
2039                ),
2040                Field::new(
2041                    "c18",
2042                    DataType::Timestamp(TimeUnit::Nanosecond, None),
2043                    false,
2044                ),
2045                Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
2046                Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
2047                Field::new_list(
2048                    "c21",
2049                    Field::new_list_field(DataType::Boolean, true)
2050                        .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
2051                    false,
2052                )
2053                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
2054                Field::new(
2055                    "c22",
2056                    DataType::FixedSizeList(
2057                        Arc::new(Field::new_list_field(DataType::Boolean, true)),
2058                        5,
2059                    ),
2060                    false,
2061                ),
2062                Field::new_list(
2063                    "c23",
2064                    Field::new_large_list(
2065                        "inner",
2066                        Field::new_list_field(
2067                            DataType::Struct(
2068                                vec![
2069                                    Field::new("a", DataType::Int16, true),
2070                                    Field::new("b", DataType::Float64, false),
2071                                    Field::new("c", DataType::Float32, false),
2072                                    Field::new("d", DataType::Float16, false),
2073                                ]
2074                                .into(),
2075                            ),
2076                            false,
2077                        ),
2078                        true,
2079                    ),
2080                    false,
2081                ),
2082                Field::new(
2083                    "c24",
2084                    DataType::Struct(Fields::from(vec![
2085                        Field::new("a", DataType::Utf8, false),
2086                        Field::new("b", DataType::UInt16, false),
2087                    ])),
2088                    false,
2089                ),
2090                Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
2091                Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
2092                // Duration types not supported
2093                // Field::new("c27", DataType::Duration(TimeUnit::Second), false),
2094                // Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false),
2095                // Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false),
2096                // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false),
2097                #[allow(deprecated)]
2098                Field::new_dict(
2099                    "c31",
2100                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2101                    true,
2102                    123,
2103                    true,
2104                )
2105                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
2106                Field::new("c32", DataType::LargeBinary, true),
2107                Field::new("c33", DataType::LargeUtf8, true),
2108                Field::new_large_list(
2109                    "c34",
2110                    Field::new_list(
2111                        "inner",
2112                        Field::new_list_field(
2113                            DataType::Struct(
2114                                vec![
2115                                    Field::new("a", DataType::Int16, true),
2116                                    Field::new("b", DataType::Float64, true),
2117                                ]
2118                                .into(),
2119                            ),
2120                            true,
2121                        ),
2122                        true,
2123                    ),
2124                    true,
2125                ),
2126                Field::new("c35", DataType::Null, true),
2127                Field::new("c36", DataType::Decimal128(2, 1), false),
2128                Field::new("c37", DataType::Decimal256(50, 20), false),
2129                Field::new("c38", DataType::Decimal128(18, 12), true),
2130                Field::new_map(
2131                    "c39",
2132                    "key_value",
2133                    Field::new("key", DataType::Utf8, false),
2134                    Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2135                    false, // fails to roundtrip keys_sorted
2136                    true,
2137                ),
2138                Field::new_map(
2139                    "c40",
2140                    "my_entries",
2141                    Field::new("my_key", DataType::Utf8, false)
2142                        .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2143                    Field::new_list(
2144                        "my_value",
2145                        Field::new_list_field(DataType::Utf8, true)
2146                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2147                        true,
2148                    )
2149                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2150                    false, // fails to roundtrip keys_sorted
2151                    true,
2152                )
2153                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2154                Field::new_map(
2155                    "c41",
2156                    "my_entries",
2157                    Field::new("my_key", DataType::Utf8, false),
2158                    Field::new_list(
2159                        "my_value",
2160                        Field::new_list_field(DataType::Utf8, true)
2161                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2162                        true,
2163                    ),
2164                    false, // fails to roundtrip keys_sorted
2165                    false,
2166                ),
2167                Field::new("c42", DataType::Decimal32(5, 2), false),
2168                Field::new("c43", DataType::Decimal64(18, 12), true),
2169            ],
2170            meta(&[("Key", "Value")]),
2171        );
2172
2173        // write to an empty parquet file so that schema is serialized
2174        let file = tempfile::tempfile().unwrap();
2175        let writer =
2176            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2177        writer.close()?;
2178
2179        // read file back
2180        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2181
2182        // Check arrow schema
2183        let read_schema = arrow_reader.schema();
2184        assert_eq!(&schema, read_schema.as_ref());
2185
2186        // Walk schema finding field IDs
2187        let mut stack = Vec::with_capacity(10);
2188        let mut out = Vec::with_capacity(10);
2189
2190        let root = arrow_reader.parquet_schema().root_schema_ptr();
2191        stack.push((root.name().to_string(), root));
2192
2193        while let Some((p, t)) = stack.pop() {
2194            if t.is_group() {
2195                for f in t.get_fields() {
2196                    stack.push((format!("{p}.{}", f.name()), f.clone()))
2197                }
2198            }
2199
2200            let info = t.get_basic_info();
2201            if info.has_id() {
2202                out.push(format!("{p} -> {}", info.id()))
2203            }
2204        }
2205        out.sort_unstable();
2206        let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2207
2208        assert_eq!(
2209            &out,
2210            &[
2211                "arrow_schema.c1 -> 2",
2212                "arrow_schema.c21 -> 4",
2213                "arrow_schema.c21.list.item -> 5",
2214                "arrow_schema.c31 -> 6",
2215                "arrow_schema.c40 -> 7",
2216                "arrow_schema.c40.my_entries.my_key -> 8",
2217                "arrow_schema.c40.my_entries.my_value -> 9",
2218                "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2219                "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2220            ]
2221        );
2222
2223        Ok(())
2224    }
2225
2226    #[test]
2227    fn test_read_parquet_field_ids_raw() -> Result<()> {
2228        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2229            a.iter()
2230                .map(|(a, b)| (a.to_string(), b.to_string()))
2231                .collect()
2232        };
2233        let schema = Schema::new_with_metadata(
2234            vec![
2235                Field::new("c1", DataType::Utf8, true)
2236                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2237                Field::new("c2", DataType::Utf8, true)
2238                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2239            ],
2240            HashMap::new(),
2241        );
2242
2243        let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2244        let parquet_bytes = writer.into_inner()?;
2245
2246        let reader =
2247            crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2248        let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2249
2250        // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema
2251        let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2252
2253        let parq_schema_descr = ArrowSchemaConverter::new()
2254            .with_coerce_types(true)
2255            .convert(&arrow_schema)?;
2256        let parq_fields = parq_schema_descr.root_schema().get_fields();
2257        assert_eq!(parq_fields.len(), 2);
2258        assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2259        assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2260
2261        Ok(())
2262    }
2263
2264    #[test]
2265    fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2266        let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2267            .iter()
2268            .cloned()
2269            .collect();
2270
2271        let schema = Schema::new_with_metadata(
2272            vec![
2273                Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2274                Field::new(
2275                    "c22",
2276                    DataType::FixedSizeList(
2277                        Arc::new(Field::new("items", DataType::Boolean, false)),
2278                        5,
2279                    ),
2280                    false,
2281                ),
2282                Field::new_list(
2283                    "c23",
2284                    Field::new_large_list(
2285                        "items",
2286                        Field::new_struct(
2287                            "items",
2288                            vec![
2289                                Field::new("a", DataType::Int16, true),
2290                                Field::new("b", DataType::Float64, false),
2291                            ],
2292                            true,
2293                        ),
2294                        true,
2295                    ),
2296                    true,
2297                ),
2298            ],
2299            metadata,
2300        );
2301
2302        // write to an empty parquet file so that schema is serialized
2303        let file = tempfile::tempfile().unwrap();
2304        let writer =
2305            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2306        writer.close()?;
2307
2308        // read file back
2309        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2310        let read_schema = arrow_reader.schema();
2311        assert_eq!(&schema, read_schema.as_ref());
2312        Ok(())
2313    }
2314
2315    #[test]
2316    fn test_get_arrow_schema_from_metadata() {
2317        assert!(get_arrow_schema_from_metadata("").is_err());
2318    }
2319
2320    #[test]
2321    #[cfg(feature = "arrow_canonical_extension_types")]
2322    fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2323        use arrow_schema::extension::Uuid;
2324        let arrow_schema = Schema::new(vec![
2325            Field::new("uuid", DataType::FixedSizeBinary(16), false).with_extension_type(Uuid),
2326        ]);
2327
2328        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2329
2330        assert_eq!(
2331            parquet_schema.column(0).logical_type_ref(),
2332            Some(&LogicalType::Uuid)
2333        );
2334
2335        let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2336        assert_eq!(arrow_schema.field(0).try_extension_type::<Uuid>()?, Uuid);
2337
2338        Ok(())
2339    }
2340
2341    #[test]
2342    #[cfg(feature = "arrow_canonical_extension_types")]
2343    fn arrow_json_to_parquet_json() -> Result<()> {
2344        use arrow_schema::extension::Json;
2345        let arrow_schema = Schema::new(vec![
2346            Field::new("json", DataType::Utf8, false).with_extension_type(Json::default()),
2347        ]);
2348
2349        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2350
2351        assert_eq!(
2352            parquet_schema.column(0).logical_type_ref(),
2353            Some(&LogicalType::Json)
2354        );
2355
2356        let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2357        assert_eq!(
2358            arrow_schema.field(0).try_extension_type::<Json>()?,
2359            Json::default()
2360        );
2361
2362        Ok(())
2363    }
2364
2365    #[test]
2366    fn test_parquet_to_arrow_field_levels_with_virtual_rejects_non_virtual() {
2367        let message_type = "
2368        message test_schema {
2369            REQUIRED INT32 id;
2370        }
2371        ";
2372        let parquet_schema = Arc::new(parse_message_type(message_type).unwrap());
2373        let descriptor = SchemaDescriptor::new(parquet_schema);
2374
2375        // Try to pass a regular field (not a virtual column)
2376        let regular_field = Arc::new(Field::new("regular_column", DataType::Int64, false));
2377        let result = parquet_to_arrow_field_levels_with_virtual(
2378            &descriptor,
2379            ProjectionMask::all(),
2380            None,
2381            &[regular_field],
2382        );
2383
2384        assert!(result.is_err());
2385        assert!(
2386            result
2387                .unwrap_err()
2388                .to_string()
2389                .contains("is not a virtual column")
2390        );
2391    }
2392}