parquet/arrow/schema/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Converting Parquet schema <--> Arrow schema: [`ArrowSchemaConverter`] and [parquet_to_arrow_schema]
19
20use base64::Engine;
21use base64::prelude::BASE64_STANDARD;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, TimeUnit};
27
28use crate::basic::{
29    ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
30};
31use crate::errors::{ParquetError, Result};
32use crate::file::{metadata::KeyValue, properties::WriterProperties};
33use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
34
35mod complex;
36mod extension;
37mod primitive;
38pub mod virtual_type;
39
40use super::PARQUET_FIELD_ID_META_KEY;
41use crate::arrow::ProjectionMask;
42use crate::arrow::schema::extension::{
43    has_extension_type, logical_type_for_binary, logical_type_for_binary_view,
44    logical_type_for_fixed_size_binary, logical_type_for_string, logical_type_for_struct,
45    try_add_extension_type,
46};
47pub(crate) use complex::{ParquetField, ParquetFieldType, VirtualColumnType};
48
49/// Convert Parquet schema to Arrow schema including optional metadata
50///
51/// Attempts to decode any existing Arrow schema metadata, falling back
52/// to converting the Parquet schema column-wise
53pub fn parquet_to_arrow_schema(
54    parquet_schema: &SchemaDescriptor,
55    key_value_metadata: Option<&Vec<KeyValue>>,
56) -> Result<Schema> {
57    parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
58}
59
60/// Convert parquet schema to arrow schema including optional metadata,
61/// only preserving some leaf columns.
62pub fn parquet_to_arrow_schema_by_columns(
63    parquet_schema: &SchemaDescriptor,
64    mask: ProjectionMask,
65    key_value_metadata: Option<&Vec<KeyValue>>,
66) -> Result<Schema> {
67    Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata, &[])?.0)
68}
69
70/// Determines the Arrow Schema from a Parquet schema
71///
72/// Looks for an Arrow schema metadata "hint" (see
73/// [`parquet_to_arrow_field_levels`]), and uses it if present to ensure
74/// lossless round trips.
75pub(crate) fn parquet_to_arrow_schema_and_fields(
76    parquet_schema: &SchemaDescriptor,
77    mask: ProjectionMask,
78    key_value_metadata: Option<&Vec<KeyValue>>,
79    virtual_columns: &[FieldRef],
80) -> Result<(Schema, Option<ParquetField>)> {
81    let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
82    let maybe_schema = metadata
83        .remove(super::ARROW_SCHEMA_META_KEY)
84        .map(|value| get_arrow_schema_from_metadata(&value))
85        .transpose()?;
86
87    // Add the Arrow metadata to the Parquet metadata skipping keys that collide
88    if let Some(arrow_schema) = &maybe_schema {
89        arrow_schema.metadata().iter().for_each(|(k, v)| {
90            metadata.entry(k.clone()).or_insert_with(|| v.clone());
91        });
92    }
93
94    let hint = maybe_schema.as_ref().map(|s| s.fields());
95    let field_levels =
96        parquet_to_arrow_field_levels_with_virtual(parquet_schema, mask, hint, virtual_columns)?;
97    let schema = Schema::new_with_metadata(field_levels.fields, metadata);
98    Ok((schema, field_levels.levels))
99}
100
101/// Schema information necessary to decode a parquet file as arrow [`Fields`]
102///
103/// In particular this stores the dremel-level information necessary to correctly
104/// interpret the encoded definition and repetition levels
105///
106/// Note: this is an opaque container intended to be used with lower-level APIs
107/// within this crate
108#[derive(Debug, Clone)]
109pub struct FieldLevels {
110    pub(crate) fields: Fields,
111    pub(crate) levels: Option<ParquetField>,
112}
113
114/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`]
115///
116/// Columns not included within [`ProjectionMask`] will be ignored.
117///
118/// The optional `hint` parameter is the desired Arrow schema. See the
119/// [`arrow`] module documentation for more information.
120///
121/// [`arrow`]: crate::arrow
122///
123/// # Notes:
124/// Where a field type in `hint` is compatible with the corresponding parquet type in `schema`, it
125/// will be used, otherwise the default arrow type for the given parquet column type will be used.
126///
127/// This is to accommodate arrow types that cannot be round-tripped through parquet natively.
128/// Depending on the parquet writer, this can lead to a mismatch between a file's parquet schema
129/// and its embedded arrow schema. The parquet `schema` must be treated as authoritative in such
130/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for more information
131///
132/// Note: this is a low-level API, most users will want to make use of the higher-level
133/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file.
134pub fn parquet_to_arrow_field_levels(
135    schema: &SchemaDescriptor,
136    mask: ProjectionMask,
137    hint: Option<&Fields>,
138) -> Result<FieldLevels> {
139    parquet_to_arrow_field_levels_with_virtual(schema, mask, hint, &[])
140}
141
142/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`] with support for virtual columns
143///
144/// Columns not included within [`ProjectionMask`] will be ignored.
145///
146/// The optional `hint` parameter is the desired Arrow schema. See the
147/// [`arrow`] module documentation for more information.
148///
149/// [`arrow`]: crate::arrow
150///
151/// # Arguments
152/// * `schema` - The Parquet schema descriptor
153/// * `mask` - Projection mask to select which columns to include
154/// * `hint` - Optional hint for Arrow field types to use instead of defaults
155/// * `virtual_columns` - Virtual columns to append to the schema (e.g., row numbers)
156///
157/// # Notes:
158/// Where a field type in `hint` is compatible with the corresponding parquet type in `schema`, it
159/// will be used, otherwise the default arrow type for the given parquet column type will be used.
160///
161/// Virtual columns are columns that don't exist in the Parquet file but are generated during reading.
162/// They must have extension type names starting with "arrow.virtual.".
163///
164/// This is to accommodate arrow types that cannot be round-tripped through parquet natively.
165/// Depending on the parquet writer, this can lead to a mismatch between a file's parquet schema
166/// and its embedded arrow schema. The parquet `schema` must be treated as authoritative in such
167/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for more information
168///
169/// Note: this is a low-level API, most users will want to make use of the higher-level
170/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file.
171pub fn parquet_to_arrow_field_levels_with_virtual(
172    schema: &SchemaDescriptor,
173    mask: ProjectionMask,
174    hint: Option<&Fields>,
175    virtual_columns: &[FieldRef],
176) -> Result<FieldLevels> {
177    // Validate that all fields are virtual columns
178    for field in virtual_columns {
179        if !virtual_type::is_virtual_column(field) {
180            return Err(ParquetError::General(format!(
181                "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
182                field.name()
183            )));
184        }
185    }
186
187    // Convert the regular schema first
188    let mut parquet_field = match complex::convert_schema(schema, mask, hint)? {
189        Some(field) => field,
190        None if virtual_columns.is_empty() => {
191            return Ok(FieldLevels {
192                fields: Fields::empty(),
193                levels: None,
194            });
195        }
196        None => {
197            // No regular fields, but we have virtual columns - create empty root struct
198            ParquetField {
199                rep_level: 0,
200                def_level: 0,
201                nullable: false,
202                arrow_type: DataType::Struct(Fields::empty()),
203                field_type: ParquetFieldType::Group {
204                    children: Vec::new(),
205                },
206            }
207        }
208    };
209
210    // Append virtual columns if any
211    if !virtual_columns.is_empty() {
212        match &mut parquet_field.field_type {
213            ParquetFieldType::Group { children } => {
214                // Get the mutable fields from the struct type
215                let DataType::Struct(ref mut fields) = parquet_field.arrow_type else {
216                    unreachable!("Root field must be a struct");
217                };
218
219                // Convert to mutable Vec to append
220                let mut fields_vec: Vec<FieldRef> = fields.iter().cloned().collect();
221
222                // Append each virtual column
223                for virtual_column in virtual_columns {
224                    // Virtual columns can only be added at the root level
225                    assert_eq!(
226                        parquet_field.rep_level, 0,
227                        "Virtual columns can only be added at rep level 0"
228                    );
229                    assert_eq!(
230                        parquet_field.def_level, 0,
231                        "Virtual columns can only be added at def level 0"
232                    );
233
234                    fields_vec.push(virtual_column.clone());
235                    let virtual_parquet_field = complex::convert_virtual_field(
236                        virtual_column,
237                        parquet_field.rep_level,
238                        parquet_field.def_level,
239                    )?;
240                    children.push(virtual_parquet_field);
241                }
242
243                // Update the fields
244                parquet_field.arrow_type = DataType::Struct(Fields::from(fields_vec));
245            }
246            _ => unreachable!("Root field must be a group"),
247        }
248    }
249
250    match &parquet_field.arrow_type {
251        DataType::Struct(fields) => Ok(FieldLevels {
252            fields: fields.clone(),
253            levels: Some(parquet_field),
254        }),
255        _ => unreachable!(),
256    }
257}
258
259/// Try to convert Arrow schema metadata into a schema
260fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
261    let decoded = BASE64_STANDARD.decode(encoded_meta);
262    match decoded {
263        Ok(bytes) => {
264            let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
265                &bytes[8..]
266            } else {
267                bytes.as_slice()
268            };
269            match arrow_ipc::root_as_message(slice) {
270                Ok(message) => message
271                    .header_as_schema()
272                    .map(arrow_ipc::convert::fb_to_schema)
273                    .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
274                Err(err) => {
275                    // The flatbuffers implementation returns an error on verification error.
276                    Err(arrow_err!(
277                        "Unable to get root as message stored in {}: {:?}",
278                        super::ARROW_SCHEMA_META_KEY,
279                        err
280                    ))
281                }
282            }
283        }
284        Err(err) => {
285            // The C++ implementation returns an error if the schema can't be parsed.
286            Err(arrow_err!(
287                "Unable to decode the encoded schema stored in {}, {:?}",
288                super::ARROW_SCHEMA_META_KEY,
289                err
290            ))
291        }
292    }
293}
294
295/// Encodes the Arrow schema into the IPC format, and base64 encodes it
296pub fn encode_arrow_schema(schema: &Schema) -> String {
297    let options = writer::IpcWriteOptions::default();
298    let mut dictionary_tracker = writer::DictionaryTracker::new(true);
299    let data_gen = writer::IpcDataGenerator::default();
300    let mut serialized_schema =
301        data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
302
303    // manually prepending the length to the schema as arrow uses the legacy IPC format
304    // TODO: change after addressing ARROW-9777
305    let schema_len = serialized_schema.ipc_message.len();
306    let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
307    len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
308    len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
309    len_prefix_schema.append(&mut serialized_schema.ipc_message);
310
311    BASE64_STANDARD.encode(&len_prefix_schema)
312}
313
314/// Mutates writer metadata by storing the encoded Arrow schema hint in
315/// [`ARROW_SCHEMA_META_KEY`].
316///
317/// If there is an existing Arrow schema metadata, it is replaced.
318///
319/// [`ARROW_SCHEMA_META_KEY`]: crate::arrow::ARROW_SCHEMA_META_KEY
320pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
321    let encoded = encode_arrow_schema(schema);
322
323    let schema_kv = KeyValue {
324        key: super::ARROW_SCHEMA_META_KEY.to_string(),
325        value: Some(encoded),
326    };
327
328    let meta = props
329        .key_value_metadata
330        .get_or_insert_with(Default::default);
331
332    // check if ARROW:schema exists, and overwrite it
333    let schema_meta = meta
334        .iter()
335        .enumerate()
336        .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
337    match schema_meta {
338        Some((i, _)) => {
339            meta.remove(i);
340            meta.push(schema_kv);
341        }
342        None => {
343            meta.push(schema_kv);
344        }
345    }
346}
347
348/// Converter for Arrow schema to Parquet schema
349///
350/// See the documentation on the [`arrow`] module for background
351/// information on how Arrow schema is represented in Parquet.
352///
353/// [`arrow`]: crate::arrow
354///
355/// # Example:
356/// ```
357/// # use std::sync::Arc;
358/// # use arrow_schema::{Field, Schema, DataType};
359/// # use parquet::arrow::ArrowSchemaConverter;
360/// use parquet::schema::types::{SchemaDescriptor, Type};
361/// use parquet::basic; // note there are two `Type`s in the following example
362/// // create an Arrow Schema
363/// let arrow_schema = Schema::new(vec![
364///   Field::new("a", DataType::Int64, true),
365///   Field::new("b", DataType::Date32, true),
366/// ]);
367/// // convert the Arrow schema to a Parquet schema
368/// let parquet_schema = ArrowSchemaConverter::new()
369///   .convert(&arrow_schema)
370///   .unwrap();
371///
372/// let expected_parquet_schema = SchemaDescriptor::new(
373///   Arc::new(
374///     Type::group_type_builder("arrow_schema")
375///       .with_fields(vec![
376///         Arc::new(
377///          Type::primitive_type_builder("a", basic::Type::INT64)
378///           .build().unwrap()
379///         ),
380///         Arc::new(
381///          Type::primitive_type_builder("b", basic::Type::INT32)
382///           .with_converted_type(basic::ConvertedType::DATE)
383///           .with_logical_type(Some(basic::LogicalType::Date))
384///           .build().unwrap()
385///         ),
386///      ])
387///      .build().unwrap()
388///   )
389/// );
390/// assert_eq!(parquet_schema, expected_parquet_schema);
391/// ```
392#[derive(Debug)]
393pub struct ArrowSchemaConverter<'a> {
394    /// Name of the root schema in Parquet
395    schema_root: &'a str,
396    /// Should we coerce Arrow types to compatible Parquet types?
397    ///
398    /// See docs on [Self::with_coerce_types]`
399    coerce_types: bool,
400}
401
402impl Default for ArrowSchemaConverter<'_> {
403    fn default() -> Self {
404        Self::new()
405    }
406}
407
408impl<'a> ArrowSchemaConverter<'a> {
409    /// Create a new converter
410    pub fn new() -> Self {
411        Self {
412            schema_root: "arrow_schema",
413            coerce_types: false,
414        }
415    }
416
417    /// Should Arrow types be coerced into Parquet native types (default `false`).
418    ///
419    /// Setting this option to `true` will result in Parquet files that can be
420    /// read by more readers, but may lose precision for Arrow types such as
421    /// [`DataType::Date64`] which have no direct [corresponding Parquet type].
422    ///
423    /// By default, this converter does not coerce to native Parquet types. Enabling type
424    /// coercion allows for meaningful representations that do not require
425    /// downstream readers to consider the embedded Arrow schema, and can allow
426    /// for greater compatibility with other Parquet implementations. However,
427    /// type coercion also prevents data from being losslessly round-tripped.
428    ///
429    /// # Discussion
430    ///
431    /// Some Arrow types such as `Date64`, `Timestamp` and `Interval` have no
432    /// corresponding Parquet logical type. Thus, they can not be losslessly
433    /// round-tripped when stored using the appropriate Parquet logical type.
434    /// For example, some Date64 values may be truncated when stored with
435    /// parquet's native 32 bit date type.
436    ///
437    /// For [`List`] and [`Map`] types, some Parquet readers expect certain
438    /// schema elements to have specific names (earlier versions of the spec
439    /// were somewhat ambiguous on this point). Type coercion will use the names
440    /// prescribed by the Parquet specification, potentially losing naming
441    /// metadata from the Arrow schema.
442    ///
443    /// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
444    /// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
445    /// [corresponding Parquet type]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date
446    ///
447    pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
448        self.coerce_types = coerce_types;
449        self
450    }
451
452    /// Set the root schema element name (defaults to `"arrow_schema"`).
453    pub fn schema_root(mut self, schema_root: &'a str) -> Self {
454        self.schema_root = schema_root;
455        self
456    }
457
458    /// Convert the specified Arrow [`Schema`] to the desired Parquet [`SchemaDescriptor`]
459    ///
460    /// See example in [`ArrowSchemaConverter`]
461    pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
462        let fields = schema
463            .fields()
464            .iter()
465            .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
466            .collect::<Result<_>>()?;
467        let group = Type::group_type_builder(self.schema_root)
468            .with_fields(fields)
469            .build()?;
470        Ok(SchemaDescriptor::new(Arc::new(group)))
471    }
472}
473
474fn parse_key_value_metadata(
475    key_value_metadata: Option<&Vec<KeyValue>>,
476) -> Option<HashMap<String, String>> {
477    match key_value_metadata {
478        Some(key_values) => {
479            let map: HashMap<String, String> = key_values
480                .iter()
481                .filter_map(|kv| {
482                    kv.value
483                        .as_ref()
484                        .map(|value| (kv.key.clone(), value.clone()))
485                })
486                .collect();
487
488            if map.is_empty() { None } else { Some(map) }
489        }
490        None => None,
491    }
492}
493
494/// Convert parquet column schema to arrow field.
495pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
496    let field = complex::convert_type(&parquet_column.self_type_ptr())?;
497    let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
498
499    let parquet_type = parquet_column.self_type();
500    let basic_info = parquet_type.get_basic_info();
501
502    let mut hash_map_size = 0;
503    if basic_info.has_id() {
504        hash_map_size += 1;
505    }
506    if has_extension_type(parquet_type) {
507        hash_map_size += 1;
508    }
509    if hash_map_size == 0 {
510        return Ok(ret);
511    }
512    ret.set_metadata(HashMap::with_capacity(hash_map_size));
513    if basic_info.has_id() {
514        ret.metadata_mut().insert(
515            PARQUET_FIELD_ID_META_KEY.to_string(),
516            basic_info.id().to_string(),
517        );
518    }
519    try_add_extension_type(ret, parquet_column.self_type())
520}
521
522pub fn decimal_length_from_precision(precision: u8) -> usize {
523    // digits = floor(log_10(2^(8*n - 1) - 1))  // definition in parquet's logical types
524    // ceil(digits) = log10(2^(8*n - 1) - 1)
525    // 10^ceil(digits) = 2^(8*n - 1) - 1
526    // 10^ceil(digits) + 1 = 2^(8*n - 1)
527    // log2(10^ceil(digits) + 1) = (8*n - 1)
528    // log2(10^ceil(digits) + 1) + 1 = 8*n
529    // (log2(10^ceil(a) + 1) + 1) / 8 = n
530    (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
531}
532
533/// Convert an arrow field to a parquet `Type`
534fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
535    const PARQUET_LIST_ELEMENT_NAME: &str = "element";
536    const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
537    const PARQUET_KEY_FIELD_NAME: &str = "key";
538    const PARQUET_VALUE_FIELD_NAME: &str = "value";
539
540    let name = field.name().as_str();
541    let repetition = if field.is_nullable() {
542        Repetition::OPTIONAL
543    } else {
544        Repetition::REQUIRED
545    };
546    let id = field_id(field);
547    // create type from field
548    match field.data_type() {
549        DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
550            .with_logical_type(Some(LogicalType::Unknown))
551            .with_repetition(repetition)
552            .with_id(id)
553            .build(),
554        DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
555            .with_repetition(repetition)
556            .with_id(id)
557            .build(),
558        DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
559            .with_logical_type(Some(LogicalType::Integer {
560                bit_width: 8,
561                is_signed: true,
562            }))
563            .with_repetition(repetition)
564            .with_id(id)
565            .build(),
566        DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
567            .with_logical_type(Some(LogicalType::Integer {
568                bit_width: 16,
569                is_signed: true,
570            }))
571            .with_repetition(repetition)
572            .with_id(id)
573            .build(),
574        DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
575            .with_repetition(repetition)
576            .with_id(id)
577            .build(),
578        DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
579            .with_repetition(repetition)
580            .with_id(id)
581            .build(),
582        DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
583            .with_logical_type(Some(LogicalType::Integer {
584                bit_width: 8,
585                is_signed: false,
586            }))
587            .with_repetition(repetition)
588            .with_id(id)
589            .build(),
590        DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
591            .with_logical_type(Some(LogicalType::Integer {
592                bit_width: 16,
593                is_signed: false,
594            }))
595            .with_repetition(repetition)
596            .with_id(id)
597            .build(),
598        DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
599            .with_logical_type(Some(LogicalType::Integer {
600                bit_width: 32,
601                is_signed: false,
602            }))
603            .with_repetition(repetition)
604            .with_id(id)
605            .build(),
606        DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
607            .with_logical_type(Some(LogicalType::Integer {
608                bit_width: 64,
609                is_signed: false,
610            }))
611            .with_repetition(repetition)
612            .with_id(id)
613            .build(),
614        DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
615            .with_repetition(repetition)
616            .with_id(id)
617            .with_logical_type(Some(LogicalType::Float16))
618            .with_length(2)
619            .build(),
620        DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
621            .with_repetition(repetition)
622            .with_id(id)
623            .build(),
624        DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
625            .with_repetition(repetition)
626            .with_id(id)
627            .build(),
628        DataType::Timestamp(TimeUnit::Second, _) => {
629            // Cannot represent seconds in LogicalType
630            Type::primitive_type_builder(name, PhysicalType::INT64)
631                .with_repetition(repetition)
632                .with_id(id)
633                .build()
634        }
635        DataType::Timestamp(time_unit, tz) => {
636            Type::primitive_type_builder(name, PhysicalType::INT64)
637                .with_logical_type(Some(LogicalType::Timestamp {
638                    // If timezone set, values are normalized to UTC timezone
639                    is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
640                    unit: match time_unit {
641                        TimeUnit::Second => unreachable!(),
642                        TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
643                        TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
644                        TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
645                    },
646                }))
647                .with_repetition(repetition)
648                .with_id(id)
649                .build()
650        }
651        DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
652            .with_logical_type(Some(LogicalType::Date))
653            .with_repetition(repetition)
654            .with_id(id)
655            .build(),
656        DataType::Date64 => {
657            if coerce_types {
658                Type::primitive_type_builder(name, PhysicalType::INT32)
659                    .with_logical_type(Some(LogicalType::Date))
660                    .with_repetition(repetition)
661                    .with_id(id)
662                    .build()
663            } else {
664                Type::primitive_type_builder(name, PhysicalType::INT64)
665                    .with_repetition(repetition)
666                    .with_id(id)
667                    .build()
668            }
669        }
670        DataType::Time32(TimeUnit::Second) => {
671            // Cannot represent seconds in LogicalType
672            Type::primitive_type_builder(name, PhysicalType::INT32)
673                .with_repetition(repetition)
674                .with_id(id)
675                .build()
676        }
677        DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
678            .with_logical_type(Some(LogicalType::Time {
679                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
680                unit: match unit {
681                    TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
682                    u => unreachable!("Invalid unit for Time32: {:?}", u),
683                },
684            }))
685            .with_repetition(repetition)
686            .with_id(id)
687            .build(),
688        DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
689            .with_logical_type(Some(LogicalType::Time {
690                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
691                unit: match unit {
692                    TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
693                    TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
694                    u => unreachable!("Invalid unit for Time64: {:?}", u),
695                },
696            }))
697            .with_repetition(repetition)
698            .with_id(id)
699            .build(),
700        DataType::Duration(_) => Type::primitive_type_builder(name, PhysicalType::INT64)
701            .with_repetition(repetition)
702            .with_id(id)
703            .build(),
704        DataType::Interval(_) => {
705            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
706                .with_converted_type(ConvertedType::INTERVAL)
707                .with_repetition(repetition)
708                .with_id(id)
709                .with_length(12)
710                .build()
711        }
712        DataType::Binary | DataType::LargeBinary => {
713            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
714                .with_repetition(repetition)
715                .with_id(id)
716                .with_logical_type(logical_type_for_binary(field))
717                .build()
718        }
719        DataType::FixedSizeBinary(length) => {
720            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
721                .with_repetition(repetition)
722                .with_id(id)
723                .with_length(*length)
724                .with_logical_type(logical_type_for_fixed_size_binary(field))
725                .build()
726        }
727        DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
728            .with_repetition(repetition)
729            .with_id(id)
730            .with_logical_type(logical_type_for_binary_view(field))
731            .build(),
732        DataType::Decimal32(precision, scale)
733        | DataType::Decimal64(precision, scale)
734        | DataType::Decimal128(precision, scale)
735        | DataType::Decimal256(precision, scale) => {
736            // Decimal precision determines the Parquet physical type to use.
737            // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
738            let (physical_type, length) = if *precision > 1 && *precision <= 9 {
739                (PhysicalType::INT32, -1)
740            } else if *precision <= 18 {
741                (PhysicalType::INT64, -1)
742            } else {
743                (
744                    PhysicalType::FIXED_LEN_BYTE_ARRAY,
745                    decimal_length_from_precision(*precision) as i32,
746                )
747            };
748            Type::primitive_type_builder(name, physical_type)
749                .with_repetition(repetition)
750                .with_id(id)
751                .with_length(length)
752                .with_logical_type(Some(LogicalType::Decimal {
753                    scale: *scale as i32,
754                    precision: *precision as i32,
755                }))
756                .with_precision(*precision as i32)
757                .with_scale(*scale as i32)
758                .build()
759        }
760        DataType::Utf8 | DataType::LargeUtf8 => {
761            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
762                .with_logical_type(logical_type_for_string(field))
763                .with_repetition(repetition)
764                .with_id(id)
765                .build()
766        }
767        DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
768            .with_logical_type(logical_type_for_string(field))
769            .with_repetition(repetition)
770            .with_id(id)
771            .build(),
772        DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
773            let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
774                // Ensure proper naming per the Parquet specification
775                let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
776                Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
777            } else {
778                Arc::new(arrow_to_parquet_type(f, coerce_types)?)
779            };
780
781            Type::group_type_builder(name)
782                .with_fields(vec![Arc::new(
783                    Type::group_type_builder("list")
784                        .with_fields(vec![field_ref])
785                        .with_repetition(Repetition::REPEATED)
786                        .build()?,
787                )])
788                .with_logical_type(Some(LogicalType::List))
789                .with_repetition(repetition)
790                .with_id(id)
791                .build()
792        }
793        DataType::ListView(_) | DataType::LargeListView(_) => {
794            unimplemented!("ListView/LargeListView not implemented")
795        }
796        DataType::Struct(fields) => {
797            if fields.is_empty() {
798                return Err(arrow_err!("Parquet does not support writing empty structs",));
799            }
800            // recursively convert children to types/nodes
801            let fields = fields
802                .iter()
803                .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
804                .collect::<Result<_>>()?;
805            Type::group_type_builder(name)
806                .with_fields(fields)
807                .with_repetition(repetition)
808                .with_id(id)
809                .with_logical_type(logical_type_for_struct(field))
810                .build()
811        }
812        DataType::Map(field, _) => {
813            if let DataType::Struct(struct_fields) = field.data_type() {
814                // If coercing then set inner struct name to "key_value"
815                let map_struct_name = if coerce_types {
816                    PARQUET_MAP_STRUCT_NAME
817                } else {
818                    field.name()
819                };
820
821                // If coercing then ensure struct fields are named "key" and "value"
822                let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
823                    if coerce_types && fld.name() != name {
824                        let f = fld.as_ref().clone().with_name(name);
825                        Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
826                    } else {
827                        Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
828                    }
829                };
830                let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
831                let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
832
833                Type::group_type_builder(name)
834                    .with_fields(vec![Arc::new(
835                        Type::group_type_builder(map_struct_name)
836                            .with_fields(vec![key_field, val_field])
837                            .with_repetition(Repetition::REPEATED)
838                            .build()?,
839                    )])
840                    .with_logical_type(Some(LogicalType::Map))
841                    .with_repetition(repetition)
842                    .with_id(id)
843                    .build()
844            } else {
845                Err(arrow_err!(
846                    "DataType::Map should contain a struct field child",
847                ))
848            }
849        }
850        DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
851        DataType::Dictionary(_, value) => {
852            // Dictionary encoding not handled at the schema level
853            let dict_field = field.clone().with_data_type(value.as_ref().clone());
854            arrow_to_parquet_type(&dict_field, coerce_types)
855        }
856        DataType::RunEndEncoded(_, _) => Err(arrow_err!(
857            "Converting RunEndEncodedType to parquet not supported",
858        )),
859    }
860}
861
862fn field_id(field: &Field) -> Option<i32> {
863    let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
864    value.parse().ok() // Fail quietly if not a valid integer
865}
866
867#[cfg(test)]
868mod tests {
869    use super::*;
870
871    use std::{collections::HashMap, sync::Arc};
872
873    use crate::arrow::PARQUET_FIELD_ID_META_KEY;
874    use crate::file::metadata::KeyValue;
875    use crate::file::reader::FileReader;
876    use crate::{
877        arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
878        schema::{parser::parse_message_type, types::SchemaDescriptor},
879    };
880    use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
881
882    #[test]
883    fn test_flat_primitives() {
884        let message_type = "
885        message test_schema {
886            REQUIRED BOOLEAN boolean;
887            REQUIRED INT32   int8  (INT_8);
888            REQUIRED INT32   int16 (INT_16);
889            REQUIRED INT32   uint8 (INTEGER(8,false));
890            REQUIRED INT32   uint16 (INTEGER(16,false));
891            REQUIRED INT32   int32;
892            REQUIRED INT64   int64;
893            OPTIONAL DOUBLE  double;
894            OPTIONAL FLOAT   float;
895            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
896            OPTIONAL BINARY  string (UTF8);
897            OPTIONAL BINARY  string_2 (STRING);
898            OPTIONAL BINARY  json (JSON);
899        }
900        ";
901        let parquet_group_type = parse_message_type(message_type).unwrap();
902
903        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
904        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
905
906        let arrow_fields = Fields::from(vec![
907            Field::new("boolean", DataType::Boolean, false),
908            Field::new("int8", DataType::Int8, false),
909            Field::new("int16", DataType::Int16, false),
910            Field::new("uint8", DataType::UInt8, false),
911            Field::new("uint16", DataType::UInt16, false),
912            Field::new("int32", DataType::Int32, false),
913            Field::new("int64", DataType::Int64, false),
914            Field::new("double", DataType::Float64, true),
915            Field::new("float", DataType::Float32, true),
916            Field::new("float16", DataType::Float16, true),
917            Field::new("string", DataType::Utf8, true),
918            Field::new("string_2", DataType::Utf8, true),
919            json_field(),
920        ]);
921
922        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
923    }
924
925    /// Return the expected Field for a Parquet column annotated with
926    /// the JSON logical type.
927    fn json_field() -> Field {
928        #[cfg(feature = "arrow_canonical_extension_types")]
929        {
930            Field::new("json", DataType::Utf8, true)
931                .with_extension_type(arrow_schema::extension::Json::default())
932        }
933        #[cfg(not(feature = "arrow_canonical_extension_types"))]
934        {
935            Field::new("json", DataType::Utf8, true)
936        }
937    }
938
939    #[test]
940    fn test_decimal_fields() {
941        let message_type = "
942        message test_schema {
943                    REQUIRED INT32 decimal1 (DECIMAL(4,2));
944                    REQUIRED INT64 decimal2 (DECIMAL(12,2));
945                    REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
946                    REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
947                    REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
948                    REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
949                    REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
950        }
951        ";
952
953        let parquet_group_type = parse_message_type(message_type).unwrap();
954
955        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
956        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
957
958        let arrow_fields = Fields::from(vec![
959            Field::new("decimal1", DataType::Decimal128(4, 2), false),
960            Field::new("decimal2", DataType::Decimal128(12, 2), false),
961            Field::new("decimal3", DataType::Decimal128(30, 2), false),
962            Field::new("decimal4", DataType::Decimal128(33, 2), false),
963            Field::new("decimal5", DataType::Decimal128(38, 2), false),
964            Field::new("decimal6", DataType::Decimal256(39, 2), false),
965            Field::new("decimal7", DataType::Decimal256(39, 2), false),
966        ]);
967        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
968    }
969
970    #[test]
971    fn test_byte_array_fields() {
972        let message_type = "
973        message test_schema {
974            REQUIRED BYTE_ARRAY binary;
975            REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
976        }
977        ";
978
979        let parquet_group_type = parse_message_type(message_type).unwrap();
980
981        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
982        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
983
984        let arrow_fields = Fields::from(vec![
985            Field::new("binary", DataType::Binary, false),
986            Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
987        ]);
988        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
989    }
990
991    #[test]
992    fn test_duplicate_fields() {
993        let message_type = "
994        message test_schema {
995            REQUIRED BOOLEAN boolean;
996            REQUIRED INT32 int8 (INT_8);
997        }
998        ";
999
1000        let parquet_group_type = parse_message_type(message_type).unwrap();
1001
1002        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1003        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1004
1005        let arrow_fields = Fields::from(vec![
1006            Field::new("boolean", DataType::Boolean, false),
1007            Field::new("int8", DataType::Int8, false),
1008        ]);
1009        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
1010
1011        let converted_arrow_schema =
1012            parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
1013                .unwrap();
1014        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
1015    }
1016
1017    #[test]
1018    fn test_parquet_lists() {
1019        let mut arrow_fields = Vec::new();
1020
1021        // LIST encoding example taken from parquet-format/LogicalTypes.md
1022        let message_type = "
1023        message test_schema {
1024          REQUIRED GROUP my_list (LIST) {
1025            REPEATED GROUP list {
1026              OPTIONAL BINARY element (UTF8);
1027            }
1028          }
1029          OPTIONAL GROUP my_list (LIST) {
1030            REPEATED GROUP list {
1031              REQUIRED BINARY element (UTF8);
1032            }
1033          }
1034          OPTIONAL GROUP array_of_arrays (LIST) {
1035            REPEATED GROUP list {
1036              REQUIRED GROUP element (LIST) {
1037                REPEATED GROUP list {
1038                  REQUIRED INT32 element;
1039                }
1040              }
1041            }
1042          }
1043          OPTIONAL GROUP my_list (LIST) {
1044            REPEATED GROUP element {
1045              REQUIRED BINARY str (UTF8);
1046            }
1047          }
1048          OPTIONAL GROUP my_list (LIST) {
1049            REPEATED INT32 element;
1050          }
1051          OPTIONAL GROUP my_list (LIST) {
1052            REPEATED GROUP element {
1053              REQUIRED BINARY str (UTF8);
1054              REQUIRED INT32 num;
1055            }
1056          }
1057          OPTIONAL GROUP my_list (LIST) {
1058            REPEATED GROUP array {
1059              REQUIRED BINARY str (UTF8);
1060            }
1061
1062          }
1063          OPTIONAL GROUP my_list (LIST) {
1064            REPEATED GROUP my_list_tuple {
1065              REQUIRED BINARY str (UTF8);
1066            }
1067          }
1068          REPEATED INT32 name;
1069        }
1070        ";
1071
1072        // // List<String> (list non-null, elements nullable)
1073        // required group my_list (LIST) {
1074        //   repeated group list {
1075        //     optional binary element (UTF8);
1076        //   }
1077        // }
1078        {
1079            arrow_fields.push(Field::new_list(
1080                "my_list",
1081                Field::new("element", DataType::Utf8, true),
1082                false,
1083            ));
1084        }
1085
1086        // // List<String> (list nullable, elements non-null)
1087        // optional group my_list (LIST) {
1088        //   repeated group list {
1089        //     required binary element (UTF8);
1090        //   }
1091        // }
1092        {
1093            arrow_fields.push(Field::new_list(
1094                "my_list",
1095                Field::new("element", DataType::Utf8, false),
1096                true,
1097            ));
1098        }
1099
1100        // Element types can be nested structures. For example, a list of lists:
1101        //
1102        // // List<List<Integer>>
1103        // optional group array_of_arrays (LIST) {
1104        //   repeated group list {
1105        //     required group element (LIST) {
1106        //       repeated group list {
1107        //         required int32 element;
1108        //       }
1109        //     }
1110        //   }
1111        // }
1112        {
1113            let arrow_inner_list = Field::new("element", DataType::Int32, false);
1114            arrow_fields.push(Field::new_list(
1115                "array_of_arrays",
1116                Field::new_list("element", arrow_inner_list, false),
1117                true,
1118            ));
1119        }
1120
1121        // // List<String> (list nullable, elements non-null)
1122        // optional group my_list (LIST) {
1123        //   repeated group element {
1124        //     required binary str (UTF8);
1125        //   };
1126        // }
1127        {
1128            arrow_fields.push(Field::new_list(
1129                "my_list",
1130                Field::new("str", DataType::Utf8, false),
1131                true,
1132            ));
1133        }
1134
1135        // // List<Integer> (nullable list, non-null elements)
1136        // optional group my_list (LIST) {
1137        //   repeated int32 element;
1138        // }
1139        {
1140            arrow_fields.push(Field::new_list(
1141                "my_list",
1142                Field::new("element", DataType::Int32, false),
1143                true,
1144            ));
1145        }
1146
1147        // // List<Tuple<String, Integer>> (nullable list, non-null elements)
1148        // optional group my_list (LIST) {
1149        //   repeated group element {
1150        //     required binary str (UTF8);
1151        //     required int32 num;
1152        //   };
1153        // }
1154        {
1155            let fields = vec![
1156                Field::new("str", DataType::Utf8, false),
1157                Field::new("num", DataType::Int32, false),
1158            ];
1159            arrow_fields.push(Field::new_list(
1160                "my_list",
1161                Field::new_struct("element", fields, false),
1162                true,
1163            ));
1164        }
1165
1166        // // List<OneTuple<String>> (nullable list, non-null elements)
1167        // optional group my_list (LIST) {
1168        //   repeated group array {
1169        //     required binary str (UTF8);
1170        //   };
1171        // }
1172        // Special case: group is named array
1173        {
1174            let fields = vec![Field::new("str", DataType::Utf8, false)];
1175            arrow_fields.push(Field::new_list(
1176                "my_list",
1177                Field::new_struct("array", fields, false),
1178                true,
1179            ));
1180        }
1181
1182        // // List<OneTuple<String>> (nullable list, non-null elements)
1183        // optional group my_list (LIST) {
1184        //   repeated group my_list_tuple {
1185        //     required binary str (UTF8);
1186        //   };
1187        // }
1188        // Special case: group named ends in _tuple
1189        {
1190            let fields = vec![Field::new("str", DataType::Utf8, false)];
1191            arrow_fields.push(Field::new_list(
1192                "my_list",
1193                Field::new_struct("my_list_tuple", fields, false),
1194                true,
1195            ));
1196        }
1197
1198        // One-level encoding: Only allows required lists with required cells
1199        //   repeated value_type name
1200        {
1201            arrow_fields.push(Field::new_list(
1202                "name",
1203                Field::new("name", DataType::Int32, false),
1204                false,
1205            ));
1206        }
1207
1208        let parquet_group_type = parse_message_type(message_type).unwrap();
1209
1210        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1211        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1212        let converted_fields = converted_arrow_schema.fields();
1213
1214        assert_eq!(arrow_fields.len(), converted_fields.len());
1215        for i in 0..arrow_fields.len() {
1216            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1217        }
1218    }
1219
1220    #[test]
1221    fn test_parquet_list_nullable() {
1222        let mut arrow_fields = Vec::new();
1223
1224        let message_type = "
1225        message test_schema {
1226          REQUIRED GROUP my_list1 (LIST) {
1227            REPEATED GROUP list {
1228              OPTIONAL BINARY element (UTF8);
1229            }
1230          }
1231          OPTIONAL GROUP my_list2 (LIST) {
1232            REPEATED GROUP list {
1233              REQUIRED BINARY element (UTF8);
1234            }
1235          }
1236          REQUIRED GROUP my_list3 (LIST) {
1237            REPEATED GROUP list {
1238              REQUIRED BINARY element (UTF8);
1239            }
1240          }
1241        }
1242        ";
1243
1244        // // List<String> (list non-null, elements nullable)
1245        // required group my_list1 (LIST) {
1246        //   repeated group list {
1247        //     optional binary element (UTF8);
1248        //   }
1249        // }
1250        {
1251            arrow_fields.push(Field::new_list(
1252                "my_list1",
1253                Field::new("element", DataType::Utf8, true),
1254                false,
1255            ));
1256        }
1257
1258        // // List<String> (list nullable, elements non-null)
1259        // optional group my_list2 (LIST) {
1260        //   repeated group list {
1261        //     required binary element (UTF8);
1262        //   }
1263        // }
1264        {
1265            arrow_fields.push(Field::new_list(
1266                "my_list2",
1267                Field::new("element", DataType::Utf8, false),
1268                true,
1269            ));
1270        }
1271
1272        // // List<String> (list non-null, elements non-null)
1273        // repeated group my_list3 (LIST) {
1274        //   repeated group list {
1275        //     required binary element (UTF8);
1276        //   }
1277        // }
1278        {
1279            arrow_fields.push(Field::new_list(
1280                "my_list3",
1281                Field::new("element", DataType::Utf8, false),
1282                false,
1283            ));
1284        }
1285
1286        let parquet_group_type = parse_message_type(message_type).unwrap();
1287
1288        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1289        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1290        let converted_fields = converted_arrow_schema.fields();
1291
1292        assert_eq!(arrow_fields.len(), converted_fields.len());
1293        for i in 0..arrow_fields.len() {
1294            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1295        }
1296    }
1297
1298    #[test]
1299    fn test_parquet_maps() {
1300        let mut arrow_fields = Vec::new();
1301
1302        // LIST encoding example taken from parquet-format/LogicalTypes.md
1303        let message_type = "
1304        message test_schema {
1305          REQUIRED group my_map1 (MAP) {
1306            REPEATED group key_value {
1307              REQUIRED binary key (UTF8);
1308              OPTIONAL int32 value;
1309            }
1310          }
1311          OPTIONAL group my_map2 (MAP) {
1312            REPEATED group map {
1313              REQUIRED binary str (UTF8);
1314              REQUIRED int32 num;
1315            }
1316          }
1317          OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1318            REPEATED group map {
1319              REQUIRED binary key (UTF8);
1320              OPTIONAL int32 value;
1321            }
1322          }
1323          REQUIRED group my_map4 (MAP) {
1324            REPEATED group map {
1325              OPTIONAL binary key (UTF8);
1326              REQUIRED int32 value;
1327            }
1328          }
1329        }
1330        ";
1331
1332        // // Map<String, Integer>
1333        // required group my_map (MAP) {
1334        //   repeated group key_value {
1335        //     required binary key (UTF8);
1336        //     optional int32 value;
1337        //   }
1338        // }
1339        {
1340            arrow_fields.push(Field::new_map(
1341                "my_map1",
1342                "key_value",
1343                Field::new("key", DataType::Utf8, false),
1344                Field::new("value", DataType::Int32, true),
1345                false,
1346                false,
1347            ));
1348        }
1349
1350        // // Map<String, Integer> (nullable map, non-null values)
1351        // optional group my_map (MAP) {
1352        //   repeated group map {
1353        //     required binary str (UTF8);
1354        //     required int32 num;
1355        //   }
1356        // }
1357        {
1358            arrow_fields.push(Field::new_map(
1359                "my_map2",
1360                "map",
1361                Field::new("str", DataType::Utf8, false),
1362                Field::new("num", DataType::Int32, false),
1363                false,
1364                true,
1365            ));
1366        }
1367
1368        // // Map<String, Integer> (nullable map, nullable values)
1369        // optional group my_map (MAP_KEY_VALUE) {
1370        //   repeated group map {
1371        //     required binary key (UTF8);
1372        //     optional int32 value;
1373        //   }
1374        // }
1375        {
1376            arrow_fields.push(Field::new_map(
1377                "my_map3",
1378                "map",
1379                Field::new("key", DataType::Utf8, false),
1380                Field::new("value", DataType::Int32, true),
1381                false,
1382                true,
1383            ));
1384        }
1385
1386        // // Map<String, Integer> (non-compliant nullable key)
1387        // group my_map (MAP_KEY_VALUE) {
1388        //   repeated group map {
1389        //     optional binary key (UTF8);
1390        //     required int32 value;
1391        //   }
1392        // }
1393        {
1394            arrow_fields.push(Field::new_map(
1395                "my_map4",
1396                "map",
1397                Field::new("key", DataType::Utf8, false), // The key is always non-nullable (#5630)
1398                Field::new("value", DataType::Int32, false),
1399                false,
1400                false,
1401            ));
1402        }
1403
1404        let parquet_group_type = parse_message_type(message_type).unwrap();
1405
1406        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1407        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1408        let converted_fields = converted_arrow_schema.fields();
1409
1410        assert_eq!(arrow_fields.len(), converted_fields.len());
1411        for i in 0..arrow_fields.len() {
1412            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1413        }
1414    }
1415
1416    #[test]
1417    fn test_nested_schema() {
1418        let mut arrow_fields = Vec::new();
1419        {
1420            let group1_fields = Fields::from(vec![
1421                Field::new("leaf1", DataType::Boolean, false),
1422                Field::new("leaf2", DataType::Int32, false),
1423            ]);
1424            let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1425            arrow_fields.push(group1_struct);
1426
1427            let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1428            arrow_fields.push(leaf3_field);
1429        }
1430
1431        let message_type = "
1432        message test_schema {
1433          REQUIRED GROUP group1 {
1434            REQUIRED BOOLEAN leaf1;
1435            REQUIRED INT32 leaf2;
1436          }
1437          REQUIRED INT64 leaf3;
1438        }
1439        ";
1440        let parquet_group_type = parse_message_type(message_type).unwrap();
1441
1442        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1443        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1444        let converted_fields = converted_arrow_schema.fields();
1445
1446        assert_eq!(arrow_fields.len(), converted_fields.len());
1447        for i in 0..arrow_fields.len() {
1448            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1449        }
1450    }
1451
1452    #[test]
1453    fn test_nested_schema_partial() {
1454        let mut arrow_fields = Vec::new();
1455        {
1456            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1457            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1458            arrow_fields.push(group1);
1459
1460            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1461            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1462            arrow_fields.push(group2);
1463
1464            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1465        }
1466
1467        let message_type = "
1468        message test_schema {
1469          REQUIRED GROUP group1 {
1470            REQUIRED INT64 leaf1;
1471            REQUIRED INT64 leaf2;
1472          }
1473          REQUIRED  GROUP group2 {
1474            REQUIRED INT64 leaf3;
1475            REQUIRED INT64 leaf4;
1476          }
1477          REQUIRED INT64 leaf5;
1478        }
1479        ";
1480        let parquet_group_type = parse_message_type(message_type).unwrap();
1481
1482        // Expected partial arrow schema (columns 0, 3, 4):
1483        // required group group1 {
1484        //   required int64 leaf1;
1485        // }
1486        // required group group2 {
1487        //   required int64 leaf4;
1488        // }
1489        // required int64 leaf5;
1490
1491        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1492        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1493        let converted_arrow_schema =
1494            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1495        let converted_fields = converted_arrow_schema.fields();
1496
1497        assert_eq!(arrow_fields.len(), converted_fields.len());
1498        for i in 0..arrow_fields.len() {
1499            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1500        }
1501    }
1502
1503    #[test]
1504    fn test_nested_schema_partial_ordering() {
1505        let mut arrow_fields = Vec::new();
1506        {
1507            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1508            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1509            arrow_fields.push(group1);
1510
1511            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1512            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1513            arrow_fields.push(group2);
1514
1515            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1516        }
1517
1518        let message_type = "
1519        message test_schema {
1520          REQUIRED GROUP group1 {
1521            REQUIRED INT64 leaf1;
1522            REQUIRED INT64 leaf2;
1523          }
1524          REQUIRED  GROUP group2 {
1525            REQUIRED INT64 leaf3;
1526            REQUIRED INT64 leaf4;
1527          }
1528          REQUIRED INT64 leaf5;
1529        }
1530        ";
1531        let parquet_group_type = parse_message_type(message_type).unwrap();
1532
1533        // Expected partial arrow schema (columns 3, 4, 0):
1534        // required group group1 {
1535        //   required int64 leaf1;
1536        // }
1537        // required group group2 {
1538        //   required int64 leaf4;
1539        // }
1540        // required int64 leaf5;
1541
1542        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1543        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1544        let converted_arrow_schema =
1545            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1546        let converted_fields = converted_arrow_schema.fields();
1547
1548        assert_eq!(arrow_fields.len(), converted_fields.len());
1549        for i in 0..arrow_fields.len() {
1550            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1551        }
1552
1553        let mask =
1554            ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1555        let converted_arrow_schema =
1556            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1557        let converted_fields = converted_arrow_schema.fields();
1558
1559        assert_eq!(arrow_fields.len(), converted_fields.len());
1560        for i in 0..arrow_fields.len() {
1561            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1562        }
1563    }
1564
1565    #[test]
1566    fn test_repeated_nested_schema() {
1567        let mut arrow_fields = Vec::new();
1568        {
1569            arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1570
1571            let inner_group_list = Field::new_list(
1572                "innerGroup",
1573                Field::new_struct(
1574                    "innerGroup",
1575                    vec![Field::new("leaf3", DataType::Int32, true)],
1576                    false,
1577                ),
1578                false,
1579            );
1580
1581            let outer_group_list = Field::new_list(
1582                "outerGroup",
1583                Field::new_struct(
1584                    "outerGroup",
1585                    vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1586                    false,
1587                ),
1588                false,
1589            );
1590            arrow_fields.push(outer_group_list);
1591        }
1592
1593        let message_type = "
1594        message test_schema {
1595          OPTIONAL INT32 leaf1;
1596          REPEATED GROUP outerGroup {
1597            OPTIONAL INT32 leaf2;
1598            REPEATED GROUP innerGroup {
1599              OPTIONAL INT32 leaf3;
1600            }
1601          }
1602        }
1603        ";
1604        let parquet_group_type = parse_message_type(message_type).unwrap();
1605
1606        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1607        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1608        let converted_fields = converted_arrow_schema.fields();
1609
1610        assert_eq!(arrow_fields.len(), converted_fields.len());
1611        for i in 0..arrow_fields.len() {
1612            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1613        }
1614    }
1615
1616    #[test]
1617    fn test_column_desc_to_field() {
1618        let message_type = "
1619        message test_schema {
1620            REQUIRED BOOLEAN boolean;
1621            REQUIRED INT32   int8  (INT_8);
1622            REQUIRED INT32   uint8 (INTEGER(8,false));
1623            REQUIRED INT32   int16 (INT_16);
1624            REQUIRED INT32   uint16 (INTEGER(16,false));
1625            REQUIRED INT32   int32;
1626            REQUIRED INT64   int64;
1627            OPTIONAL DOUBLE  double;
1628            OPTIONAL FLOAT   float;
1629            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1630            OPTIONAL BINARY  string (UTF8);
1631            REPEATED BOOLEAN bools;
1632            OPTIONAL INT32   date       (DATE);
1633            OPTIONAL INT32   time_milli (TIME_MILLIS);
1634            OPTIONAL INT64   time_micro (TIME_MICROS);
1635            OPTIONAL INT64   time_nano (TIME(NANOS,false));
1636            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1637            REQUIRED INT64   ts_micro (TIMESTAMP_MICROS);
1638            REQUIRED INT64   ts_nano (TIMESTAMP(NANOS,true));
1639            REPEATED INT32   int_list;
1640            REPEATED BINARY  byte_list;
1641            REPEATED BINARY  string_list (UTF8);
1642            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1643            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1644            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1645        }
1646        ";
1647        let parquet_group_type = parse_message_type(message_type).unwrap();
1648
1649        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1650        let converted_arrow_fields = parquet_schema
1651            .columns()
1652            .iter()
1653            .map(|c| parquet_to_arrow_field(c).unwrap())
1654            .collect::<Vec<Field>>();
1655
1656        let arrow_fields = vec![
1657            Field::new("boolean", DataType::Boolean, false),
1658            Field::new("int8", DataType::Int8, false),
1659            Field::new("uint8", DataType::UInt8, false),
1660            Field::new("int16", DataType::Int16, false),
1661            Field::new("uint16", DataType::UInt16, false),
1662            Field::new("int32", DataType::Int32, false),
1663            Field::new("int64", DataType::Int64, false),
1664            Field::new("double", DataType::Float64, true),
1665            Field::new("float", DataType::Float32, true),
1666            Field::new("float16", DataType::Float16, true),
1667            Field::new("string", DataType::Utf8, true),
1668            Field::new_list(
1669                "bools",
1670                Field::new("bools", DataType::Boolean, false),
1671                false,
1672            ),
1673            Field::new("date", DataType::Date32, true),
1674            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1675            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1676            Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1677            Field::new(
1678                "ts_milli",
1679                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1680                true,
1681            ),
1682            Field::new(
1683                "ts_micro",
1684                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1685                false,
1686            ),
1687            Field::new(
1688                "ts_nano",
1689                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1690                false,
1691            ),
1692            Field::new_list(
1693                "int_list",
1694                Field::new("int_list", DataType::Int32, false),
1695                false,
1696            ),
1697            Field::new_list(
1698                "byte_list",
1699                Field::new("byte_list", DataType::Binary, false),
1700                false,
1701            ),
1702            Field::new_list(
1703                "string_list",
1704                Field::new("string_list", DataType::Utf8, false),
1705                false,
1706            ),
1707            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1708            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1709            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1710        ];
1711
1712        assert_eq!(arrow_fields, converted_arrow_fields);
1713    }
1714
1715    #[test]
1716    fn test_coerced_map_list() {
1717        // Create Arrow schema with non-Parquet naming
1718        let arrow_fields = vec![
1719            Field::new_list(
1720                "my_list",
1721                Field::new("item", DataType::Boolean, true),
1722                false,
1723            ),
1724            Field::new_map(
1725                "my_map",
1726                "entries",
1727                Field::new("keys", DataType::Utf8, false),
1728                Field::new("values", DataType::Int32, true),
1729                false,
1730                true,
1731            ),
1732        ];
1733        let arrow_schema = Schema::new(arrow_fields);
1734
1735        // Create Parquet schema with coerced names
1736        let message_type = "
1737        message parquet_schema {
1738            REQUIRED GROUP my_list (LIST) {
1739                REPEATED GROUP list {
1740                    OPTIONAL BOOLEAN element;
1741                }
1742            }
1743            OPTIONAL GROUP my_map (MAP) {
1744                REPEATED GROUP key_value {
1745                    REQUIRED BINARY key (STRING);
1746                    OPTIONAL INT32 value;
1747                }
1748            }
1749        }
1750        ";
1751        let parquet_group_type = parse_message_type(message_type).unwrap();
1752        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1753        let converted_arrow_schema = ArrowSchemaConverter::new()
1754            .with_coerce_types(true)
1755            .convert(&arrow_schema)
1756            .unwrap();
1757        assert_eq!(
1758            parquet_schema.columns().len(),
1759            converted_arrow_schema.columns().len()
1760        );
1761
1762        // Create Parquet schema without coerced names
1763        let message_type = "
1764        message parquet_schema {
1765            REQUIRED GROUP my_list (LIST) {
1766                REPEATED GROUP list {
1767                    OPTIONAL BOOLEAN item;
1768                }
1769            }
1770            OPTIONAL GROUP my_map (MAP) {
1771                REPEATED GROUP entries {
1772                    REQUIRED BINARY keys (STRING);
1773                    OPTIONAL INT32 values;
1774                }
1775            }
1776        }
1777        ";
1778        let parquet_group_type = parse_message_type(message_type).unwrap();
1779        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1780        let converted_arrow_schema = ArrowSchemaConverter::new()
1781            .with_coerce_types(false)
1782            .convert(&arrow_schema)
1783            .unwrap();
1784        assert_eq!(
1785            parquet_schema.columns().len(),
1786            converted_arrow_schema.columns().len()
1787        );
1788    }
1789
1790    #[test]
1791    fn test_field_to_column_desc() {
1792        let message_type = "
1793        message arrow_schema {
1794            REQUIRED BOOLEAN boolean;
1795            REQUIRED INT32   int8  (INT_8);
1796            REQUIRED INT32   int16 (INTEGER(16,true));
1797            REQUIRED INT32   int32;
1798            REQUIRED INT64   int64;
1799            OPTIONAL DOUBLE  double;
1800            OPTIONAL FLOAT   float;
1801            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1802            OPTIONAL BINARY  string (STRING);
1803            OPTIONAL GROUP   bools (LIST) {
1804                REPEATED GROUP list {
1805                    OPTIONAL BOOLEAN element;
1806                }
1807            }
1808            REQUIRED GROUP   bools_non_null (LIST) {
1809                REPEATED GROUP list {
1810                    REQUIRED BOOLEAN element;
1811                }
1812            }
1813            OPTIONAL INT32   date       (DATE);
1814            OPTIONAL INT32   time_milli (TIME(MILLIS,false));
1815            OPTIONAL INT32   time_milli_utc (TIME(MILLIS,true));
1816            OPTIONAL INT64   time_micro (TIME_MICROS);
1817            OPTIONAL INT64   time_micro_utc (TIME(MICROS, true));
1818            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1819            REQUIRED INT64   ts_micro (TIMESTAMP(MICROS,false));
1820            REQUIRED INT64   ts_seconds;
1821            REQUIRED INT64   ts_micro_utc (TIMESTAMP(MICROS, true));
1822            REQUIRED INT64   ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1823            REQUIRED INT64   ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1824            REQUIRED INT64   ts_micro_non_utc (TIMESTAMP(MICROS, true));
1825            REQUIRED GROUP struct {
1826                REQUIRED BOOLEAN bools;
1827                REQUIRED INT32 uint32 (INTEGER(32,false));
1828                REQUIRED GROUP   int32 (LIST) {
1829                    REPEATED GROUP list {
1830                        OPTIONAL INT32 element;
1831                    }
1832                }
1833            }
1834            REQUIRED BINARY  dictionary_strings (STRING);
1835            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1836            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1837            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1838            REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1839            REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1840        }
1841        ";
1842        let parquet_group_type = parse_message_type(message_type).unwrap();
1843
1844        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1845
1846        let arrow_fields = vec![
1847            Field::new("boolean", DataType::Boolean, false),
1848            Field::new("int8", DataType::Int8, false),
1849            Field::new("int16", DataType::Int16, false),
1850            Field::new("int32", DataType::Int32, false),
1851            Field::new("int64", DataType::Int64, false),
1852            Field::new("double", DataType::Float64, true),
1853            Field::new("float", DataType::Float32, true),
1854            Field::new("float16", DataType::Float16, true),
1855            Field::new("string", DataType::Utf8, true),
1856            Field::new_list(
1857                "bools",
1858                Field::new("element", DataType::Boolean, true),
1859                true,
1860            ),
1861            Field::new_list(
1862                "bools_non_null",
1863                Field::new("element", DataType::Boolean, false),
1864                false,
1865            ),
1866            Field::new("date", DataType::Date32, true),
1867            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1868            Field::new(
1869                "time_milli_utc",
1870                DataType::Time32(TimeUnit::Millisecond),
1871                true,
1872            )
1873            .with_metadata(HashMap::from_iter(vec![(
1874                "adjusted_to_utc".to_string(),
1875                "".to_string(),
1876            )])),
1877            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1878            Field::new(
1879                "time_micro_utc",
1880                DataType::Time64(TimeUnit::Microsecond),
1881                true,
1882            )
1883            .with_metadata(HashMap::from_iter(vec![(
1884                "adjusted_to_utc".to_string(),
1885                "".to_string(),
1886            )])),
1887            Field::new(
1888                "ts_milli",
1889                DataType::Timestamp(TimeUnit::Millisecond, None),
1890                true,
1891            ),
1892            Field::new(
1893                "ts_micro",
1894                DataType::Timestamp(TimeUnit::Microsecond, None),
1895                false,
1896            ),
1897            Field::new(
1898                "ts_seconds",
1899                DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1900                false,
1901            ),
1902            Field::new(
1903                "ts_micro_utc",
1904                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1905                false,
1906            ),
1907            Field::new(
1908                "ts_millis_zero_offset",
1909                DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1910                false,
1911            ),
1912            Field::new(
1913                "ts_millis_zero_negative_offset",
1914                DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1915                false,
1916            ),
1917            Field::new(
1918                "ts_micro_non_utc",
1919                DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1920                false,
1921            ),
1922            Field::new_struct(
1923                "struct",
1924                vec![
1925                    Field::new("bools", DataType::Boolean, false),
1926                    Field::new("uint32", DataType::UInt32, false),
1927                    Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1928                ],
1929                false,
1930            ),
1931            Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1932            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1933            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1934            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1935            Field::new("decimal128", DataType::Decimal128(38, 2), false),
1936            Field::new("decimal256", DataType::Decimal256(39, 2), false),
1937        ];
1938        let arrow_schema = Schema::new(arrow_fields);
1939        let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1940
1941        assert_eq!(
1942            parquet_schema.columns().len(),
1943            converted_arrow_schema.columns().len()
1944        );
1945        parquet_schema
1946            .columns()
1947            .iter()
1948            .zip(converted_arrow_schema.columns())
1949            .for_each(|(a, b)| {
1950                // Only check logical type if it's set on the Parquet side.
1951                // This is because the Arrow conversion always sets logical type,
1952                // even if there wasn't originally one.
1953                // This is not an issue, but is an inconvenience for this test.
1954                match a.logical_type_ref() {
1955                    Some(_) => {
1956                        assert_eq!(a, b)
1957                    }
1958                    None => {
1959                        assert_eq!(a.name(), b.name());
1960                        assert_eq!(a.physical_type(), b.physical_type());
1961                        assert_eq!(a.converted_type(), b.converted_type());
1962                    }
1963                };
1964            });
1965    }
1966
1967    #[test]
1968    #[should_panic(expected = "Parquet does not support writing empty structs")]
1969    fn test_empty_struct_field() {
1970        let arrow_fields = vec![Field::new(
1971            "struct",
1972            DataType::Struct(Fields::empty()),
1973            false,
1974        )];
1975        let arrow_schema = Schema::new(arrow_fields);
1976        let converted_arrow_schema = ArrowSchemaConverter::new()
1977            .with_coerce_types(true)
1978            .convert(&arrow_schema);
1979
1980        converted_arrow_schema.unwrap();
1981    }
1982
1983    #[test]
1984    fn test_metadata() {
1985        let message_type = "
1986        message test_schema {
1987            OPTIONAL BINARY  string (STRING);
1988        }
1989        ";
1990        let parquet_group_type = parse_message_type(message_type).unwrap();
1991
1992        let key_value_metadata = vec![
1993            KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
1994            KeyValue::new("baz".to_owned(), None),
1995        ];
1996
1997        let mut expected_metadata: HashMap<String, String> = HashMap::new();
1998        expected_metadata.insert("foo".to_owned(), "bar".to_owned());
1999
2000        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
2001        let converted_arrow_schema =
2002            parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
2003
2004        assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
2005    }
2006
2007    #[test]
2008    fn test_arrow_schema_roundtrip() -> Result<()> {
2009        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2010            a.iter()
2011                .map(|(a, b)| (a.to_string(), b.to_string()))
2012                .collect()
2013        };
2014
2015        let schema = Schema::new_with_metadata(
2016            vec![
2017                Field::new("c1", DataType::Utf8, false)
2018                    .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
2019                Field::new("c2", DataType::Binary, false),
2020                Field::new("c3", DataType::FixedSizeBinary(3), false),
2021                Field::new("c4", DataType::Boolean, false),
2022                Field::new("c5", DataType::Date32, false),
2023                Field::new("c6", DataType::Date64, false),
2024                Field::new("c7", DataType::Time32(TimeUnit::Second), false),
2025                Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
2026                Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
2027                Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
2028                Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
2029                Field::new(
2030                    "c16",
2031                    DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
2032                    false,
2033                ),
2034                Field::new(
2035                    "c17",
2036                    DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
2037                    false,
2038                ),
2039                Field::new(
2040                    "c18",
2041                    DataType::Timestamp(TimeUnit::Nanosecond, None),
2042                    false,
2043                ),
2044                Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
2045                Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
2046                Field::new_list(
2047                    "c21",
2048                    Field::new_list_field(DataType::Boolean, true)
2049                        .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
2050                    false,
2051                )
2052                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
2053                Field::new(
2054                    "c22",
2055                    DataType::FixedSizeList(
2056                        Arc::new(Field::new_list_field(DataType::Boolean, true)),
2057                        5,
2058                    ),
2059                    false,
2060                ),
2061                Field::new_list(
2062                    "c23",
2063                    Field::new_large_list(
2064                        "inner",
2065                        Field::new_list_field(
2066                            DataType::Struct(
2067                                vec![
2068                                    Field::new("a", DataType::Int16, true),
2069                                    Field::new("b", DataType::Float64, false),
2070                                    Field::new("c", DataType::Float32, false),
2071                                    Field::new("d", DataType::Float16, false),
2072                                ]
2073                                .into(),
2074                            ),
2075                            false,
2076                        ),
2077                        true,
2078                    ),
2079                    false,
2080                ),
2081                Field::new(
2082                    "c24",
2083                    DataType::Struct(Fields::from(vec![
2084                        Field::new("a", DataType::Utf8, false),
2085                        Field::new("b", DataType::UInt16, false),
2086                    ])),
2087                    false,
2088                ),
2089                Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
2090                Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
2091                // Duration types not supported
2092                // Field::new("c27", DataType::Duration(TimeUnit::Second), false),
2093                // Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false),
2094                // Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false),
2095                // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false),
2096                #[allow(deprecated)]
2097                Field::new_dict(
2098                    "c31",
2099                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2100                    true,
2101                    123,
2102                    true,
2103                )
2104                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
2105                Field::new("c32", DataType::LargeBinary, true),
2106                Field::new("c33", DataType::LargeUtf8, true),
2107                Field::new_large_list(
2108                    "c34",
2109                    Field::new_list(
2110                        "inner",
2111                        Field::new_list_field(
2112                            DataType::Struct(
2113                                vec![
2114                                    Field::new("a", DataType::Int16, true),
2115                                    Field::new("b", DataType::Float64, true),
2116                                ]
2117                                .into(),
2118                            ),
2119                            true,
2120                        ),
2121                        true,
2122                    ),
2123                    true,
2124                ),
2125                Field::new("c35", DataType::Null, true),
2126                Field::new("c36", DataType::Decimal128(2, 1), false),
2127                Field::new("c37", DataType::Decimal256(50, 20), false),
2128                Field::new("c38", DataType::Decimal128(18, 12), true),
2129                Field::new_map(
2130                    "c39",
2131                    "key_value",
2132                    Field::new("key", DataType::Utf8, false),
2133                    Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2134                    false, // fails to roundtrip keys_sorted
2135                    true,
2136                ),
2137                Field::new_map(
2138                    "c40",
2139                    "my_entries",
2140                    Field::new("my_key", DataType::Utf8, false)
2141                        .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2142                    Field::new_list(
2143                        "my_value",
2144                        Field::new_list_field(DataType::Utf8, true)
2145                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2146                        true,
2147                    )
2148                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2149                    false, // fails to roundtrip keys_sorted
2150                    true,
2151                )
2152                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2153                Field::new_map(
2154                    "c41",
2155                    "my_entries",
2156                    Field::new("my_key", DataType::Utf8, false),
2157                    Field::new_list(
2158                        "my_value",
2159                        Field::new_list_field(DataType::Utf8, true)
2160                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2161                        true,
2162                    ),
2163                    false, // fails to roundtrip keys_sorted
2164                    false,
2165                ),
2166                Field::new("c42", DataType::Decimal32(5, 2), false),
2167                Field::new("c43", DataType::Decimal64(18, 12), true),
2168            ],
2169            meta(&[("Key", "Value")]),
2170        );
2171
2172        // write to an empty parquet file so that schema is serialized
2173        let file = tempfile::tempfile().unwrap();
2174        let writer =
2175            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2176        writer.close()?;
2177
2178        // read file back
2179        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2180
2181        // Check arrow schema
2182        let read_schema = arrow_reader.schema();
2183        assert_eq!(&schema, read_schema.as_ref());
2184
2185        // Walk schema finding field IDs
2186        let mut stack = Vec::with_capacity(10);
2187        let mut out = Vec::with_capacity(10);
2188
2189        let root = arrow_reader.parquet_schema().root_schema_ptr();
2190        stack.push((root.name().to_string(), root));
2191
2192        while let Some((p, t)) = stack.pop() {
2193            if t.is_group() {
2194                for f in t.get_fields() {
2195                    stack.push((format!("{p}.{}", f.name()), f.clone()))
2196                }
2197            }
2198
2199            let info = t.get_basic_info();
2200            if info.has_id() {
2201                out.push(format!("{p} -> {}", info.id()))
2202            }
2203        }
2204        out.sort_unstable();
2205        let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2206
2207        assert_eq!(
2208            &out,
2209            &[
2210                "arrow_schema.c1 -> 2",
2211                "arrow_schema.c21 -> 4",
2212                "arrow_schema.c21.list.item -> 5",
2213                "arrow_schema.c31 -> 6",
2214                "arrow_schema.c40 -> 7",
2215                "arrow_schema.c40.my_entries.my_key -> 8",
2216                "arrow_schema.c40.my_entries.my_value -> 9",
2217                "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2218                "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2219            ]
2220        );
2221
2222        Ok(())
2223    }
2224
2225    #[test]
2226    fn test_read_parquet_field_ids_raw() -> Result<()> {
2227        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2228            a.iter()
2229                .map(|(a, b)| (a.to_string(), b.to_string()))
2230                .collect()
2231        };
2232        let schema = Schema::new_with_metadata(
2233            vec![
2234                Field::new("c1", DataType::Utf8, true)
2235                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2236                Field::new("c2", DataType::Utf8, true)
2237                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2238            ],
2239            HashMap::new(),
2240        );
2241
2242        let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2243        let parquet_bytes = writer.into_inner()?;
2244
2245        let reader =
2246            crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2247        let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2248
2249        // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema
2250        let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2251
2252        let parq_schema_descr = ArrowSchemaConverter::new()
2253            .with_coerce_types(true)
2254            .convert(&arrow_schema)?;
2255        let parq_fields = parq_schema_descr.root_schema().get_fields();
2256        assert_eq!(parq_fields.len(), 2);
2257        assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2258        assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2259
2260        Ok(())
2261    }
2262
2263    #[test]
2264    fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2265        let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2266            .iter()
2267            .cloned()
2268            .collect();
2269
2270        let schema = Schema::new_with_metadata(
2271            vec![
2272                Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2273                Field::new(
2274                    "c22",
2275                    DataType::FixedSizeList(
2276                        Arc::new(Field::new("items", DataType::Boolean, false)),
2277                        5,
2278                    ),
2279                    false,
2280                ),
2281                Field::new_list(
2282                    "c23",
2283                    Field::new_large_list(
2284                        "items",
2285                        Field::new_struct(
2286                            "items",
2287                            vec![
2288                                Field::new("a", DataType::Int16, true),
2289                                Field::new("b", DataType::Float64, false),
2290                            ],
2291                            true,
2292                        ),
2293                        true,
2294                    ),
2295                    true,
2296                ),
2297            ],
2298            metadata,
2299        );
2300
2301        // write to an empty parquet file so that schema is serialized
2302        let file = tempfile::tempfile().unwrap();
2303        let writer =
2304            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2305        writer.close()?;
2306
2307        // read file back
2308        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2309        let read_schema = arrow_reader.schema();
2310        assert_eq!(&schema, read_schema.as_ref());
2311        Ok(())
2312    }
2313
2314    #[test]
2315    fn test_get_arrow_schema_from_metadata() {
2316        assert!(get_arrow_schema_from_metadata("").is_err());
2317    }
2318
2319    #[test]
2320    #[cfg(feature = "arrow_canonical_extension_types")]
2321    fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2322        use arrow_schema::extension::Uuid;
2323        let arrow_schema = Schema::new(vec![
2324            Field::new("uuid", DataType::FixedSizeBinary(16), false).with_extension_type(Uuid),
2325        ]);
2326
2327        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2328
2329        assert_eq!(
2330            parquet_schema.column(0).logical_type_ref(),
2331            Some(&LogicalType::Uuid)
2332        );
2333
2334        let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2335        assert_eq!(arrow_schema.field(0).try_extension_type::<Uuid>()?, Uuid);
2336
2337        Ok(())
2338    }
2339
2340    #[test]
2341    #[cfg(feature = "arrow_canonical_extension_types")]
2342    fn arrow_json_to_parquet_json() -> Result<()> {
2343        use arrow_schema::extension::Json;
2344        let arrow_schema = Schema::new(vec![
2345            Field::new("json", DataType::Utf8, false).with_extension_type(Json::default()),
2346        ]);
2347
2348        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2349
2350        assert_eq!(
2351            parquet_schema.column(0).logical_type_ref(),
2352            Some(&LogicalType::Json)
2353        );
2354
2355        let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2356        assert_eq!(
2357            arrow_schema.field(0).try_extension_type::<Json>()?,
2358            Json::default()
2359        );
2360
2361        Ok(())
2362    }
2363
2364    #[test]
2365    fn test_parquet_to_arrow_field_levels_with_virtual_rejects_non_virtual() {
2366        let message_type = "
2367        message test_schema {
2368            REQUIRED INT32 id;
2369        }
2370        ";
2371        let parquet_schema = Arc::new(parse_message_type(message_type).unwrap());
2372        let descriptor = SchemaDescriptor::new(parquet_schema);
2373
2374        // Try to pass a regular field (not a virtual column)
2375        let regular_field = Arc::new(Field::new("regular_column", DataType::Int64, false));
2376        let result = parquet_to_arrow_field_levels_with_virtual(
2377            &descriptor,
2378            ProjectionMask::all(),
2379            None,
2380            &[regular_field],
2381        );
2382
2383        assert!(result.is_err());
2384        assert!(
2385            result
2386                .unwrap_err()
2387                .to_string()
2388                .contains("is not a virtual column")
2389        );
2390    }
2391}