parquet/arrow/schema/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Converting Parquet schema <--> Arrow schema: [`ArrowSchemaConverter`] and [parquet_to_arrow_schema]
19
20use base64::Engine;
21use base64::prelude::BASE64_STANDARD;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, TimeUnit};
27
28use crate::basic::{
29    ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
30};
31use crate::errors::{ParquetError, Result};
32use crate::file::{metadata::KeyValue, properties::WriterProperties};
33use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
34
35mod complex;
36mod extension;
37mod primitive;
38pub mod virtual_type;
39
40use super::PARQUET_FIELD_ID_META_KEY;
41use crate::arrow::ProjectionMask;
42use crate::arrow::schema::extension::{
43    has_extension_type, logical_type_for_fixed_size_binary, logical_type_for_string,
44    logical_type_for_struct, try_add_extension_type,
45};
46pub(crate) use complex::{ParquetField, ParquetFieldType, VirtualColumnType};
47
48/// Convert Parquet schema to Arrow schema including optional metadata
49///
50/// Attempts to decode any existing Arrow schema metadata, falling back
51/// to converting the Parquet schema column-wise
52pub fn parquet_to_arrow_schema(
53    parquet_schema: &SchemaDescriptor,
54    key_value_metadata: Option<&Vec<KeyValue>>,
55) -> Result<Schema> {
56    parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
57}
58
59/// Convert parquet schema to arrow schema including optional metadata,
60/// only preserving some leaf columns.
61pub fn parquet_to_arrow_schema_by_columns(
62    parquet_schema: &SchemaDescriptor,
63    mask: ProjectionMask,
64    key_value_metadata: Option<&Vec<KeyValue>>,
65) -> Result<Schema> {
66    Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata, &[])?.0)
67}
68
69/// Determines the Arrow Schema from a Parquet schema
70///
71/// Looks for an Arrow schema metadata "hint" (see
72/// [`parquet_to_arrow_field_levels`]), and uses it if present to ensure
73/// lossless round trips.
74pub(crate) fn parquet_to_arrow_schema_and_fields(
75    parquet_schema: &SchemaDescriptor,
76    mask: ProjectionMask,
77    key_value_metadata: Option<&Vec<KeyValue>>,
78    virtual_columns: &[FieldRef],
79) -> Result<(Schema, Option<ParquetField>)> {
80    let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
81    let maybe_schema = metadata
82        .remove(super::ARROW_SCHEMA_META_KEY)
83        .map(|value| get_arrow_schema_from_metadata(&value))
84        .transpose()?;
85
86    // Add the Arrow metadata to the Parquet metadata skipping keys that collide
87    if let Some(arrow_schema) = &maybe_schema {
88        arrow_schema.metadata().iter().for_each(|(k, v)| {
89            metadata.entry(k.clone()).or_insert_with(|| v.clone());
90        });
91    }
92
93    let hint = maybe_schema.as_ref().map(|s| s.fields());
94    let field_levels =
95        parquet_to_arrow_field_levels_with_virtual(parquet_schema, mask, hint, virtual_columns)?;
96    let schema = Schema::new_with_metadata(field_levels.fields, metadata);
97    Ok((schema, field_levels.levels))
98}
99
100/// Schema information necessary to decode a parquet file as arrow [`Fields`]
101///
102/// In particular this stores the dremel-level information necessary to correctly
103/// interpret the encoded definition and repetition levels
104///
105/// Note: this is an opaque container intended to be used with lower-level APIs
106/// within this crate
107#[derive(Debug, Clone)]
108pub struct FieldLevels {
109    pub(crate) fields: Fields,
110    pub(crate) levels: Option<ParquetField>,
111}
112
113/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`]
114///
115/// Columns not included within [`ProjectionMask`] will be ignored.
116///
117/// The optional `hint` parameter is the desired Arrow schema. See the
118/// [`arrow`] module documentation for more information.
119///
120/// [`arrow`]: crate::arrow
121///
122/// # Notes:
123/// Where a field type in `hint` is compatible with the corresponding parquet type in `schema`, it
124/// will be used, otherwise the default arrow type for the given parquet column type will be used.
125///
126/// This is to accommodate arrow types that cannot be round-tripped through parquet natively.
127/// Depending on the parquet writer, this can lead to a mismatch between a file's parquet schema
128/// and its embedded arrow schema. The parquet `schema` must be treated as authoritative in such
129/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for more information
130///
131/// Note: this is a low-level API, most users will want to make use of the higher-level
132/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file.
133pub fn parquet_to_arrow_field_levels(
134    schema: &SchemaDescriptor,
135    mask: ProjectionMask,
136    hint: Option<&Fields>,
137) -> Result<FieldLevels> {
138    parquet_to_arrow_field_levels_with_virtual(schema, mask, hint, &[])
139}
140
141/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`] with support for virtual columns
142///
143/// Columns not included within [`ProjectionMask`] will be ignored.
144///
145/// The optional `hint` parameter is the desired Arrow schema. See the
146/// [`arrow`] module documentation for more information.
147///
148/// [`arrow`]: crate::arrow
149///
150/// # Arguments
151/// * `schema` - The Parquet schema descriptor
152/// * `mask` - Projection mask to select which columns to include
153/// * `hint` - Optional hint for Arrow field types to use instead of defaults
154/// * `virtual_columns` - Virtual columns to append to the schema (e.g., row numbers)
155///
156/// # Notes:
157/// Where a field type in `hint` is compatible with the corresponding parquet type in `schema`, it
158/// will be used, otherwise the default arrow type for the given parquet column type will be used.
159///
160/// Virtual columns are columns that don't exist in the Parquet file but are generated during reading.
161/// They must have extension type names starting with "arrow.virtual.".
162///
163/// This is to accommodate arrow types that cannot be round-tripped through parquet natively.
164/// Depending on the parquet writer, this can lead to a mismatch between a file's parquet schema
165/// and its embedded arrow schema. The parquet `schema` must be treated as authoritative in such
166/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for more information
167///
168/// Note: this is a low-level API, most users will want to make use of the higher-level
169/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file.
170pub fn parquet_to_arrow_field_levels_with_virtual(
171    schema: &SchemaDescriptor,
172    mask: ProjectionMask,
173    hint: Option<&Fields>,
174    virtual_columns: &[FieldRef],
175) -> Result<FieldLevels> {
176    // Validate that all fields are virtual columns
177    for field in virtual_columns {
178        if !virtual_type::is_virtual_column(field) {
179            return Err(ParquetError::General(format!(
180                "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
181                field.name()
182            )));
183        }
184    }
185
186    // Convert the regular schema first
187    let mut parquet_field = match complex::convert_schema(schema, mask, hint)? {
188        Some(field) => field,
189        None if virtual_columns.is_empty() => {
190            return Ok(FieldLevels {
191                fields: Fields::empty(),
192                levels: None,
193            });
194        }
195        None => {
196            // No regular fields, but we have virtual columns - create empty root struct
197            ParquetField {
198                rep_level: 0,
199                def_level: 0,
200                nullable: false,
201                arrow_type: DataType::Struct(Fields::empty()),
202                field_type: ParquetFieldType::Group {
203                    children: Vec::new(),
204                },
205            }
206        }
207    };
208
209    // Append virtual columns if any
210    if !virtual_columns.is_empty() {
211        match &mut parquet_field.field_type {
212            ParquetFieldType::Group { children } => {
213                // Get the mutable fields from the struct type
214                let DataType::Struct(ref mut fields) = parquet_field.arrow_type else {
215                    unreachable!("Root field must be a struct");
216                };
217
218                // Convert to mutable Vec to append
219                let mut fields_vec: Vec<FieldRef> = fields.iter().cloned().collect();
220
221                // Append each virtual column
222                for virtual_column in virtual_columns {
223                    // Virtual columns can only be added at the root level
224                    assert_eq!(
225                        parquet_field.rep_level, 0,
226                        "Virtual columns can only be added at rep level 0"
227                    );
228                    assert_eq!(
229                        parquet_field.def_level, 0,
230                        "Virtual columns can only be added at def level 0"
231                    );
232
233                    fields_vec.push(virtual_column.clone());
234                    let virtual_parquet_field = complex::convert_virtual_field(
235                        virtual_column,
236                        parquet_field.rep_level,
237                        parquet_field.def_level,
238                    )?;
239                    children.push(virtual_parquet_field);
240                }
241
242                // Update the fields
243                parquet_field.arrow_type = DataType::Struct(Fields::from(fields_vec));
244            }
245            _ => unreachable!("Root field must be a group"),
246        }
247    }
248
249    match &parquet_field.arrow_type {
250        DataType::Struct(fields) => Ok(FieldLevels {
251            fields: fields.clone(),
252            levels: Some(parquet_field),
253        }),
254        _ => unreachable!(),
255    }
256}
257
258/// Try to convert Arrow schema metadata into a schema
259fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
260    let decoded = BASE64_STANDARD.decode(encoded_meta);
261    match decoded {
262        Ok(bytes) => {
263            let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
264                &bytes[8..]
265            } else {
266                bytes.as_slice()
267            };
268            match arrow_ipc::root_as_message(slice) {
269                Ok(message) => message
270                    .header_as_schema()
271                    .map(arrow_ipc::convert::fb_to_schema)
272                    .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
273                Err(err) => {
274                    // The flatbuffers implementation returns an error on verification error.
275                    Err(arrow_err!(
276                        "Unable to get root as message stored in {}: {:?}",
277                        super::ARROW_SCHEMA_META_KEY,
278                        err
279                    ))
280                }
281            }
282        }
283        Err(err) => {
284            // The C++ implementation returns an error if the schema can't be parsed.
285            Err(arrow_err!(
286                "Unable to decode the encoded schema stored in {}, {:?}",
287                super::ARROW_SCHEMA_META_KEY,
288                err
289            ))
290        }
291    }
292}
293
294/// Encodes the Arrow schema into the IPC format, and base64 encodes it
295pub fn encode_arrow_schema(schema: &Schema) -> String {
296    let options = writer::IpcWriteOptions::default();
297    let mut dictionary_tracker = writer::DictionaryTracker::new(true);
298    let data_gen = writer::IpcDataGenerator::default();
299    let mut serialized_schema =
300        data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
301
302    // manually prepending the length to the schema as arrow uses the legacy IPC format
303    // TODO: change after addressing ARROW-9777
304    let schema_len = serialized_schema.ipc_message.len();
305    let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
306    len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
307    len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
308    len_prefix_schema.append(&mut serialized_schema.ipc_message);
309
310    BASE64_STANDARD.encode(&len_prefix_schema)
311}
312
313/// Mutates writer metadata by storing the encoded Arrow schema hint in
314/// [`ARROW_SCHEMA_META_KEY`].
315///
316/// If there is an existing Arrow schema metadata, it is replaced.
317///
318/// [`ARROW_SCHEMA_META_KEY`]: crate::arrow::ARROW_SCHEMA_META_KEY
319pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
320    let encoded = encode_arrow_schema(schema);
321
322    let schema_kv = KeyValue {
323        key: super::ARROW_SCHEMA_META_KEY.to_string(),
324        value: Some(encoded),
325    };
326
327    let meta = props
328        .key_value_metadata
329        .get_or_insert_with(Default::default);
330
331    // check if ARROW:schema exists, and overwrite it
332    let schema_meta = meta
333        .iter()
334        .enumerate()
335        .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
336    match schema_meta {
337        Some((i, _)) => {
338            meta.remove(i);
339            meta.push(schema_kv);
340        }
341        None => {
342            meta.push(schema_kv);
343        }
344    }
345}
346
347/// Converter for Arrow schema to Parquet schema
348///
349/// See the documentation on the [`arrow`] module for background
350/// information on how Arrow schema is represented in Parquet.
351///
352/// [`arrow`]: crate::arrow
353///
354/// # Example:
355/// ```
356/// # use std::sync::Arc;
357/// # use arrow_schema::{Field, Schema, DataType};
358/// # use parquet::arrow::ArrowSchemaConverter;
359/// use parquet::schema::types::{SchemaDescriptor, Type};
360/// use parquet::basic; // note there are two `Type`s in the following example
361/// // create an Arrow Schema
362/// let arrow_schema = Schema::new(vec![
363///   Field::new("a", DataType::Int64, true),
364///   Field::new("b", DataType::Date32, true),
365/// ]);
366/// // convert the Arrow schema to a Parquet schema
367/// let parquet_schema = ArrowSchemaConverter::new()
368///   .convert(&arrow_schema)
369///   .unwrap();
370///
371/// let expected_parquet_schema = SchemaDescriptor::new(
372///   Arc::new(
373///     Type::group_type_builder("arrow_schema")
374///       .with_fields(vec![
375///         Arc::new(
376///          Type::primitive_type_builder("a", basic::Type::INT64)
377///           .build().unwrap()
378///         ),
379///         Arc::new(
380///          Type::primitive_type_builder("b", basic::Type::INT32)
381///           .with_converted_type(basic::ConvertedType::DATE)
382///           .with_logical_type(Some(basic::LogicalType::Date))
383///           .build().unwrap()
384///         ),
385///      ])
386///      .build().unwrap()
387///   )
388/// );
389/// assert_eq!(parquet_schema, expected_parquet_schema);
390/// ```
391#[derive(Debug)]
392pub struct ArrowSchemaConverter<'a> {
393    /// Name of the root schema in Parquet
394    schema_root: &'a str,
395    /// Should we coerce Arrow types to compatible Parquet types?
396    ///
397    /// See docs on [Self::with_coerce_types]`
398    coerce_types: bool,
399}
400
401impl Default for ArrowSchemaConverter<'_> {
402    fn default() -> Self {
403        Self::new()
404    }
405}
406
407impl<'a> ArrowSchemaConverter<'a> {
408    /// Create a new converter
409    pub fn new() -> Self {
410        Self {
411            schema_root: "arrow_schema",
412            coerce_types: false,
413        }
414    }
415
416    /// Should Arrow types be coerced into Parquet native types (default `false`).
417    ///
418    /// Setting this option to `true` will result in Parquet files that can be
419    /// read by more readers, but may lose precision for Arrow types such as
420    /// [`DataType::Date64`] which have no direct [corresponding Parquet type].
421    ///
422    /// By default, this converter does not coerce to native Parquet types. Enabling type
423    /// coercion allows for meaningful representations that do not require
424    /// downstream readers to consider the embedded Arrow schema, and can allow
425    /// for greater compatibility with other Parquet implementations. However,
426    /// type coercion also prevents data from being losslessly round-tripped.
427    ///
428    /// # Discussion
429    ///
430    /// Some Arrow types such as `Date64`, `Timestamp` and `Interval` have no
431    /// corresponding Parquet logical type. Thus, they can not be losslessly
432    /// round-tripped when stored using the appropriate Parquet logical type.
433    /// For example, some Date64 values may be truncated when stored with
434    /// parquet's native 32 bit date type.
435    ///
436    /// For [`List`] and [`Map`] types, some Parquet readers expect certain
437    /// schema elements to have specific names (earlier versions of the spec
438    /// were somewhat ambiguous on this point). Type coercion will use the names
439    /// prescribed by the Parquet specification, potentially losing naming
440    /// metadata from the Arrow schema.
441    ///
442    /// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
443    /// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
444    /// [corresponding Parquet type]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date
445    ///
446    pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
447        self.coerce_types = coerce_types;
448        self
449    }
450
451    /// Set the root schema element name (defaults to `"arrow_schema"`).
452    pub fn schema_root(mut self, schema_root: &'a str) -> Self {
453        self.schema_root = schema_root;
454        self
455    }
456
457    /// Convert the specified Arrow [`Schema`] to the desired Parquet [`SchemaDescriptor`]
458    ///
459    /// See example in [`ArrowSchemaConverter`]
460    pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
461        let fields = schema
462            .fields()
463            .iter()
464            .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
465            .collect::<Result<_>>()?;
466        let group = Type::group_type_builder(self.schema_root)
467            .with_fields(fields)
468            .build()?;
469        Ok(SchemaDescriptor::new(Arc::new(group)))
470    }
471}
472
473fn parse_key_value_metadata(
474    key_value_metadata: Option<&Vec<KeyValue>>,
475) -> Option<HashMap<String, String>> {
476    match key_value_metadata {
477        Some(key_values) => {
478            let map: HashMap<String, String> = key_values
479                .iter()
480                .filter_map(|kv| {
481                    kv.value
482                        .as_ref()
483                        .map(|value| (kv.key.clone(), value.clone()))
484                })
485                .collect();
486
487            if map.is_empty() { None } else { Some(map) }
488        }
489        None => None,
490    }
491}
492
493/// Convert parquet column schema to arrow field.
494pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
495    let field = complex::convert_type(&parquet_column.self_type_ptr())?;
496    let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
497
498    let parquet_type = parquet_column.self_type();
499    let basic_info = parquet_type.get_basic_info();
500
501    let mut hash_map_size = 0;
502    if basic_info.has_id() {
503        hash_map_size += 1;
504    }
505    if has_extension_type(parquet_type) {
506        hash_map_size += 1;
507    }
508    if hash_map_size == 0 {
509        return Ok(ret);
510    }
511    ret.set_metadata(HashMap::with_capacity(hash_map_size));
512    if basic_info.has_id() {
513        ret.metadata_mut().insert(
514            PARQUET_FIELD_ID_META_KEY.to_string(),
515            basic_info.id().to_string(),
516        );
517    }
518    try_add_extension_type(ret, parquet_column.self_type())
519}
520
521pub fn decimal_length_from_precision(precision: u8) -> usize {
522    // digits = floor(log_10(2^(8*n - 1) - 1))  // definition in parquet's logical types
523    // ceil(digits) = log10(2^(8*n - 1) - 1)
524    // 10^ceil(digits) = 2^(8*n - 1) - 1
525    // 10^ceil(digits) + 1 = 2^(8*n - 1)
526    // log2(10^ceil(digits) + 1) = (8*n - 1)
527    // log2(10^ceil(digits) + 1) + 1 = 8*n
528    // (log2(10^ceil(a) + 1) + 1) / 8 = n
529    (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
530}
531
532/// Convert an arrow field to a parquet `Type`
533fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
534    const PARQUET_LIST_ELEMENT_NAME: &str = "element";
535    const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
536    const PARQUET_KEY_FIELD_NAME: &str = "key";
537    const PARQUET_VALUE_FIELD_NAME: &str = "value";
538
539    let name = field.name().as_str();
540    let repetition = if field.is_nullable() {
541        Repetition::OPTIONAL
542    } else {
543        Repetition::REQUIRED
544    };
545    let id = field_id(field);
546    // create type from field
547    match field.data_type() {
548        DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
549            .with_logical_type(Some(LogicalType::Unknown))
550            .with_repetition(repetition)
551            .with_id(id)
552            .build(),
553        DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
554            .with_repetition(repetition)
555            .with_id(id)
556            .build(),
557        DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
558            .with_logical_type(Some(LogicalType::Integer {
559                bit_width: 8,
560                is_signed: true,
561            }))
562            .with_repetition(repetition)
563            .with_id(id)
564            .build(),
565        DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
566            .with_logical_type(Some(LogicalType::Integer {
567                bit_width: 16,
568                is_signed: true,
569            }))
570            .with_repetition(repetition)
571            .with_id(id)
572            .build(),
573        DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
574            .with_repetition(repetition)
575            .with_id(id)
576            .build(),
577        DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
578            .with_repetition(repetition)
579            .with_id(id)
580            .build(),
581        DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
582            .with_logical_type(Some(LogicalType::Integer {
583                bit_width: 8,
584                is_signed: false,
585            }))
586            .with_repetition(repetition)
587            .with_id(id)
588            .build(),
589        DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
590            .with_logical_type(Some(LogicalType::Integer {
591                bit_width: 16,
592                is_signed: false,
593            }))
594            .with_repetition(repetition)
595            .with_id(id)
596            .build(),
597        DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
598            .with_logical_type(Some(LogicalType::Integer {
599                bit_width: 32,
600                is_signed: false,
601            }))
602            .with_repetition(repetition)
603            .with_id(id)
604            .build(),
605        DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
606            .with_logical_type(Some(LogicalType::Integer {
607                bit_width: 64,
608                is_signed: false,
609            }))
610            .with_repetition(repetition)
611            .with_id(id)
612            .build(),
613        DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
614            .with_repetition(repetition)
615            .with_id(id)
616            .with_logical_type(Some(LogicalType::Float16))
617            .with_length(2)
618            .build(),
619        DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
620            .with_repetition(repetition)
621            .with_id(id)
622            .build(),
623        DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
624            .with_repetition(repetition)
625            .with_id(id)
626            .build(),
627        DataType::Timestamp(TimeUnit::Second, _) => {
628            // Cannot represent seconds in LogicalType
629            Type::primitive_type_builder(name, PhysicalType::INT64)
630                .with_repetition(repetition)
631                .with_id(id)
632                .build()
633        }
634        DataType::Timestamp(time_unit, tz) => {
635            Type::primitive_type_builder(name, PhysicalType::INT64)
636                .with_logical_type(Some(LogicalType::Timestamp {
637                    // If timezone set, values are normalized to UTC timezone
638                    is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
639                    unit: match time_unit {
640                        TimeUnit::Second => unreachable!(),
641                        TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
642                        TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
643                        TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
644                    },
645                }))
646                .with_repetition(repetition)
647                .with_id(id)
648                .build()
649        }
650        DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
651            .with_logical_type(Some(LogicalType::Date))
652            .with_repetition(repetition)
653            .with_id(id)
654            .build(),
655        DataType::Date64 => {
656            if coerce_types {
657                Type::primitive_type_builder(name, PhysicalType::INT32)
658                    .with_logical_type(Some(LogicalType::Date))
659                    .with_repetition(repetition)
660                    .with_id(id)
661                    .build()
662            } else {
663                Type::primitive_type_builder(name, PhysicalType::INT64)
664                    .with_repetition(repetition)
665                    .with_id(id)
666                    .build()
667            }
668        }
669        DataType::Time32(TimeUnit::Second) => {
670            // Cannot represent seconds in LogicalType
671            Type::primitive_type_builder(name, PhysicalType::INT32)
672                .with_repetition(repetition)
673                .with_id(id)
674                .build()
675        }
676        DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
677            .with_logical_type(Some(LogicalType::Time {
678                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
679                unit: match unit {
680                    TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
681                    u => unreachable!("Invalid unit for Time32: {:?}", u),
682                },
683            }))
684            .with_repetition(repetition)
685            .with_id(id)
686            .build(),
687        DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
688            .with_logical_type(Some(LogicalType::Time {
689                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
690                unit: match unit {
691                    TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
692                    TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
693                    u => unreachable!("Invalid unit for Time64: {:?}", u),
694                },
695            }))
696            .with_repetition(repetition)
697            .with_id(id)
698            .build(),
699        DataType::Duration(_) => Type::primitive_type_builder(name, PhysicalType::INT64)
700            .with_repetition(repetition)
701            .with_id(id)
702            .build(),
703        DataType::Interval(_) => {
704            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
705                .with_converted_type(ConvertedType::INTERVAL)
706                .with_repetition(repetition)
707                .with_id(id)
708                .with_length(12)
709                .build()
710        }
711        DataType::Binary | DataType::LargeBinary => {
712            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
713                .with_repetition(repetition)
714                .with_id(id)
715                .build()
716        }
717        DataType::FixedSizeBinary(length) => {
718            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
719                .with_repetition(repetition)
720                .with_id(id)
721                .with_length(*length)
722                .with_logical_type(logical_type_for_fixed_size_binary(field))
723                .build()
724        }
725        DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
726            .with_repetition(repetition)
727            .with_id(id)
728            .build(),
729        DataType::Decimal32(precision, scale)
730        | DataType::Decimal64(precision, scale)
731        | DataType::Decimal128(precision, scale)
732        | DataType::Decimal256(precision, scale) => {
733            // Decimal precision determines the Parquet physical type to use.
734            // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
735            let (physical_type, length) = if *precision > 1 && *precision <= 9 {
736                (PhysicalType::INT32, -1)
737            } else if *precision <= 18 {
738                (PhysicalType::INT64, -1)
739            } else {
740                (
741                    PhysicalType::FIXED_LEN_BYTE_ARRAY,
742                    decimal_length_from_precision(*precision) as i32,
743                )
744            };
745            Type::primitive_type_builder(name, physical_type)
746                .with_repetition(repetition)
747                .with_id(id)
748                .with_length(length)
749                .with_logical_type(Some(LogicalType::Decimal {
750                    scale: *scale as i32,
751                    precision: *precision as i32,
752                }))
753                .with_precision(*precision as i32)
754                .with_scale(*scale as i32)
755                .build()
756        }
757        DataType::Utf8 | DataType::LargeUtf8 => {
758            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
759                .with_logical_type(logical_type_for_string(field))
760                .with_repetition(repetition)
761                .with_id(id)
762                .build()
763        }
764        DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
765            .with_logical_type(logical_type_for_string(field))
766            .with_repetition(repetition)
767            .with_id(id)
768            .build(),
769        DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
770            let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
771                // Ensure proper naming per the Parquet specification
772                let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
773                Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
774            } else {
775                Arc::new(arrow_to_parquet_type(f, coerce_types)?)
776            };
777
778            Type::group_type_builder(name)
779                .with_fields(vec![Arc::new(
780                    Type::group_type_builder("list")
781                        .with_fields(vec![field_ref])
782                        .with_repetition(Repetition::REPEATED)
783                        .build()?,
784                )])
785                .with_logical_type(Some(LogicalType::List))
786                .with_repetition(repetition)
787                .with_id(id)
788                .build()
789        }
790        DataType::ListView(_) | DataType::LargeListView(_) => {
791            unimplemented!("ListView/LargeListView not implemented")
792        }
793        DataType::Struct(fields) => {
794            if fields.is_empty() {
795                return Err(arrow_err!("Parquet does not support writing empty structs",));
796            }
797            // recursively convert children to types/nodes
798            let fields = fields
799                .iter()
800                .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
801                .collect::<Result<_>>()?;
802            Type::group_type_builder(name)
803                .with_fields(fields)
804                .with_repetition(repetition)
805                .with_id(id)
806                .with_logical_type(logical_type_for_struct(field))
807                .build()
808        }
809        DataType::Map(field, _) => {
810            if let DataType::Struct(struct_fields) = field.data_type() {
811                // If coercing then set inner struct name to "key_value"
812                let map_struct_name = if coerce_types {
813                    PARQUET_MAP_STRUCT_NAME
814                } else {
815                    field.name()
816                };
817
818                // If coercing then ensure struct fields are named "key" and "value"
819                let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
820                    if coerce_types && fld.name() != name {
821                        let f = fld.as_ref().clone().with_name(name);
822                        Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
823                    } else {
824                        Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
825                    }
826                };
827                let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
828                let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
829
830                Type::group_type_builder(name)
831                    .with_fields(vec![Arc::new(
832                        Type::group_type_builder(map_struct_name)
833                            .with_fields(vec![key_field, val_field])
834                            .with_repetition(Repetition::REPEATED)
835                            .build()?,
836                    )])
837                    .with_logical_type(Some(LogicalType::Map))
838                    .with_repetition(repetition)
839                    .with_id(id)
840                    .build()
841            } else {
842                Err(arrow_err!(
843                    "DataType::Map should contain a struct field child",
844                ))
845            }
846        }
847        DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
848        DataType::Dictionary(_, value) => {
849            // Dictionary encoding not handled at the schema level
850            let dict_field = field.clone().with_data_type(value.as_ref().clone());
851            arrow_to_parquet_type(&dict_field, coerce_types)
852        }
853        DataType::RunEndEncoded(_, _) => Err(arrow_err!(
854            "Converting RunEndEncodedType to parquet not supported",
855        )),
856    }
857}
858
859fn field_id(field: &Field) -> Option<i32> {
860    let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
861    value.parse().ok() // Fail quietly if not a valid integer
862}
863
864#[cfg(test)]
865mod tests {
866    use super::*;
867
868    use std::{collections::HashMap, sync::Arc};
869
870    use crate::arrow::PARQUET_FIELD_ID_META_KEY;
871    use crate::file::metadata::KeyValue;
872    use crate::file::reader::FileReader;
873    use crate::{
874        arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
875        schema::{parser::parse_message_type, types::SchemaDescriptor},
876    };
877    use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
878
879    #[test]
880    fn test_flat_primitives() {
881        let message_type = "
882        message test_schema {
883            REQUIRED BOOLEAN boolean;
884            REQUIRED INT32   int8  (INT_8);
885            REQUIRED INT32   int16 (INT_16);
886            REQUIRED INT32   uint8 (INTEGER(8,false));
887            REQUIRED INT32   uint16 (INTEGER(16,false));
888            REQUIRED INT32   int32;
889            REQUIRED INT64   int64;
890            OPTIONAL DOUBLE  double;
891            OPTIONAL FLOAT   float;
892            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
893            OPTIONAL BINARY  string (UTF8);
894            OPTIONAL BINARY  string_2 (STRING);
895            OPTIONAL BINARY  json (JSON);
896        }
897        ";
898        let parquet_group_type = parse_message_type(message_type).unwrap();
899
900        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
901        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
902
903        let arrow_fields = Fields::from(vec![
904            Field::new("boolean", DataType::Boolean, false),
905            Field::new("int8", DataType::Int8, false),
906            Field::new("int16", DataType::Int16, false),
907            Field::new("uint8", DataType::UInt8, false),
908            Field::new("uint16", DataType::UInt16, false),
909            Field::new("int32", DataType::Int32, false),
910            Field::new("int64", DataType::Int64, false),
911            Field::new("double", DataType::Float64, true),
912            Field::new("float", DataType::Float32, true),
913            Field::new("float16", DataType::Float16, true),
914            Field::new("string", DataType::Utf8, true),
915            Field::new("string_2", DataType::Utf8, true),
916            json_field(),
917        ]);
918
919        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
920    }
921
922    /// Return the expected Field for a Parquet column annotated with
923    /// the JSON logical type.
924    fn json_field() -> Field {
925        #[cfg(feature = "arrow_canonical_extension_types")]
926        {
927            Field::new("json", DataType::Utf8, true)
928                .with_extension_type(arrow_schema::extension::Json::default())
929        }
930        #[cfg(not(feature = "arrow_canonical_extension_types"))]
931        {
932            Field::new("json", DataType::Utf8, true)
933        }
934    }
935
936    #[test]
937    fn test_decimal_fields() {
938        let message_type = "
939        message test_schema {
940                    REQUIRED INT32 decimal1 (DECIMAL(4,2));
941                    REQUIRED INT64 decimal2 (DECIMAL(12,2));
942                    REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
943                    REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
944                    REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
945                    REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
946                    REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
947        }
948        ";
949
950        let parquet_group_type = parse_message_type(message_type).unwrap();
951
952        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
953        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
954
955        let arrow_fields = Fields::from(vec![
956            Field::new("decimal1", DataType::Decimal128(4, 2), false),
957            Field::new("decimal2", DataType::Decimal128(12, 2), false),
958            Field::new("decimal3", DataType::Decimal128(30, 2), false),
959            Field::new("decimal4", DataType::Decimal128(33, 2), false),
960            Field::new("decimal5", DataType::Decimal128(38, 2), false),
961            Field::new("decimal6", DataType::Decimal256(39, 2), false),
962            Field::new("decimal7", DataType::Decimal256(39, 2), false),
963        ]);
964        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
965    }
966
967    #[test]
968    fn test_byte_array_fields() {
969        let message_type = "
970        message test_schema {
971            REQUIRED BYTE_ARRAY binary;
972            REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
973        }
974        ";
975
976        let parquet_group_type = parse_message_type(message_type).unwrap();
977
978        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
979        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
980
981        let arrow_fields = Fields::from(vec![
982            Field::new("binary", DataType::Binary, false),
983            Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
984        ]);
985        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
986    }
987
988    #[test]
989    fn test_duplicate_fields() {
990        let message_type = "
991        message test_schema {
992            REQUIRED BOOLEAN boolean;
993            REQUIRED INT32 int8 (INT_8);
994        }
995        ";
996
997        let parquet_group_type = parse_message_type(message_type).unwrap();
998
999        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1000        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1001
1002        let arrow_fields = Fields::from(vec![
1003            Field::new("boolean", DataType::Boolean, false),
1004            Field::new("int8", DataType::Int8, false),
1005        ]);
1006        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
1007
1008        let converted_arrow_schema =
1009            parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
1010                .unwrap();
1011        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
1012    }
1013
1014    #[test]
1015    fn test_parquet_lists() {
1016        let mut arrow_fields = Vec::new();
1017
1018        // LIST encoding example taken from parquet-format/LogicalTypes.md
1019        let message_type = "
1020        message test_schema {
1021          REQUIRED GROUP my_list (LIST) {
1022            REPEATED GROUP list {
1023              OPTIONAL BINARY element (UTF8);
1024            }
1025          }
1026          OPTIONAL GROUP my_list (LIST) {
1027            REPEATED GROUP list {
1028              REQUIRED BINARY element (UTF8);
1029            }
1030          }
1031          OPTIONAL GROUP array_of_arrays (LIST) {
1032            REPEATED GROUP list {
1033              REQUIRED GROUP element (LIST) {
1034                REPEATED GROUP list {
1035                  REQUIRED INT32 element;
1036                }
1037              }
1038            }
1039          }
1040          OPTIONAL GROUP my_list (LIST) {
1041            REPEATED GROUP element {
1042              REQUIRED BINARY str (UTF8);
1043            }
1044          }
1045          OPTIONAL GROUP my_list (LIST) {
1046            REPEATED INT32 element;
1047          }
1048          OPTIONAL GROUP my_list (LIST) {
1049            REPEATED GROUP element {
1050              REQUIRED BINARY str (UTF8);
1051              REQUIRED INT32 num;
1052            }
1053          }
1054          OPTIONAL GROUP my_list (LIST) {
1055            REPEATED GROUP array {
1056              REQUIRED BINARY str (UTF8);
1057            }
1058
1059          }
1060          OPTIONAL GROUP my_list (LIST) {
1061            REPEATED GROUP my_list_tuple {
1062              REQUIRED BINARY str (UTF8);
1063            }
1064          }
1065          REPEATED INT32 name;
1066        }
1067        ";
1068
1069        // // List<String> (list non-null, elements nullable)
1070        // required group my_list (LIST) {
1071        //   repeated group list {
1072        //     optional binary element (UTF8);
1073        //   }
1074        // }
1075        {
1076            arrow_fields.push(Field::new_list(
1077                "my_list",
1078                Field::new("element", DataType::Utf8, true),
1079                false,
1080            ));
1081        }
1082
1083        // // List<String> (list nullable, elements non-null)
1084        // optional group my_list (LIST) {
1085        //   repeated group list {
1086        //     required binary element (UTF8);
1087        //   }
1088        // }
1089        {
1090            arrow_fields.push(Field::new_list(
1091                "my_list",
1092                Field::new("element", DataType::Utf8, false),
1093                true,
1094            ));
1095        }
1096
1097        // Element types can be nested structures. For example, a list of lists:
1098        //
1099        // // List<List<Integer>>
1100        // optional group array_of_arrays (LIST) {
1101        //   repeated group list {
1102        //     required group element (LIST) {
1103        //       repeated group list {
1104        //         required int32 element;
1105        //       }
1106        //     }
1107        //   }
1108        // }
1109        {
1110            let arrow_inner_list = Field::new("element", DataType::Int32, false);
1111            arrow_fields.push(Field::new_list(
1112                "array_of_arrays",
1113                Field::new_list("element", arrow_inner_list, false),
1114                true,
1115            ));
1116        }
1117
1118        // // List<String> (list nullable, elements non-null)
1119        // optional group my_list (LIST) {
1120        //   repeated group element {
1121        //     required binary str (UTF8);
1122        //   };
1123        // }
1124        {
1125            arrow_fields.push(Field::new_list(
1126                "my_list",
1127                Field::new("str", DataType::Utf8, false),
1128                true,
1129            ));
1130        }
1131
1132        // // List<Integer> (nullable list, non-null elements)
1133        // optional group my_list (LIST) {
1134        //   repeated int32 element;
1135        // }
1136        {
1137            arrow_fields.push(Field::new_list(
1138                "my_list",
1139                Field::new("element", DataType::Int32, false),
1140                true,
1141            ));
1142        }
1143
1144        // // List<Tuple<String, Integer>> (nullable list, non-null elements)
1145        // optional group my_list (LIST) {
1146        //   repeated group element {
1147        //     required binary str (UTF8);
1148        //     required int32 num;
1149        //   };
1150        // }
1151        {
1152            let fields = vec![
1153                Field::new("str", DataType::Utf8, false),
1154                Field::new("num", DataType::Int32, false),
1155            ];
1156            arrow_fields.push(Field::new_list(
1157                "my_list",
1158                Field::new_struct("element", fields, false),
1159                true,
1160            ));
1161        }
1162
1163        // // List<OneTuple<String>> (nullable list, non-null elements)
1164        // optional group my_list (LIST) {
1165        //   repeated group array {
1166        //     required binary str (UTF8);
1167        //   };
1168        // }
1169        // Special case: group is named array
1170        {
1171            let fields = vec![Field::new("str", DataType::Utf8, false)];
1172            arrow_fields.push(Field::new_list(
1173                "my_list",
1174                Field::new_struct("array", fields, false),
1175                true,
1176            ));
1177        }
1178
1179        // // List<OneTuple<String>> (nullable list, non-null elements)
1180        // optional group my_list (LIST) {
1181        //   repeated group my_list_tuple {
1182        //     required binary str (UTF8);
1183        //   };
1184        // }
1185        // Special case: group named ends in _tuple
1186        {
1187            let fields = vec![Field::new("str", DataType::Utf8, false)];
1188            arrow_fields.push(Field::new_list(
1189                "my_list",
1190                Field::new_struct("my_list_tuple", fields, false),
1191                true,
1192            ));
1193        }
1194
1195        // One-level encoding: Only allows required lists with required cells
1196        //   repeated value_type name
1197        {
1198            arrow_fields.push(Field::new_list(
1199                "name",
1200                Field::new("name", DataType::Int32, false),
1201                false,
1202            ));
1203        }
1204
1205        let parquet_group_type = parse_message_type(message_type).unwrap();
1206
1207        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1208        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1209        let converted_fields = converted_arrow_schema.fields();
1210
1211        assert_eq!(arrow_fields.len(), converted_fields.len());
1212        for i in 0..arrow_fields.len() {
1213            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1214        }
1215    }
1216
1217    #[test]
1218    fn test_parquet_list_nullable() {
1219        let mut arrow_fields = Vec::new();
1220
1221        let message_type = "
1222        message test_schema {
1223          REQUIRED GROUP my_list1 (LIST) {
1224            REPEATED GROUP list {
1225              OPTIONAL BINARY element (UTF8);
1226            }
1227          }
1228          OPTIONAL GROUP my_list2 (LIST) {
1229            REPEATED GROUP list {
1230              REQUIRED BINARY element (UTF8);
1231            }
1232          }
1233          REQUIRED GROUP my_list3 (LIST) {
1234            REPEATED GROUP list {
1235              REQUIRED BINARY element (UTF8);
1236            }
1237          }
1238        }
1239        ";
1240
1241        // // List<String> (list non-null, elements nullable)
1242        // required group my_list1 (LIST) {
1243        //   repeated group list {
1244        //     optional binary element (UTF8);
1245        //   }
1246        // }
1247        {
1248            arrow_fields.push(Field::new_list(
1249                "my_list1",
1250                Field::new("element", DataType::Utf8, true),
1251                false,
1252            ));
1253        }
1254
1255        // // List<String> (list nullable, elements non-null)
1256        // optional group my_list2 (LIST) {
1257        //   repeated group list {
1258        //     required binary element (UTF8);
1259        //   }
1260        // }
1261        {
1262            arrow_fields.push(Field::new_list(
1263                "my_list2",
1264                Field::new("element", DataType::Utf8, false),
1265                true,
1266            ));
1267        }
1268
1269        // // List<String> (list non-null, elements non-null)
1270        // repeated group my_list3 (LIST) {
1271        //   repeated group list {
1272        //     required binary element (UTF8);
1273        //   }
1274        // }
1275        {
1276            arrow_fields.push(Field::new_list(
1277                "my_list3",
1278                Field::new("element", DataType::Utf8, false),
1279                false,
1280            ));
1281        }
1282
1283        let parquet_group_type = parse_message_type(message_type).unwrap();
1284
1285        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1286        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1287        let converted_fields = converted_arrow_schema.fields();
1288
1289        assert_eq!(arrow_fields.len(), converted_fields.len());
1290        for i in 0..arrow_fields.len() {
1291            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1292        }
1293    }
1294
1295    #[test]
1296    fn test_parquet_maps() {
1297        let mut arrow_fields = Vec::new();
1298
1299        // LIST encoding example taken from parquet-format/LogicalTypes.md
1300        let message_type = "
1301        message test_schema {
1302          REQUIRED group my_map1 (MAP) {
1303            REPEATED group key_value {
1304              REQUIRED binary key (UTF8);
1305              OPTIONAL int32 value;
1306            }
1307          }
1308          OPTIONAL group my_map2 (MAP) {
1309            REPEATED group map {
1310              REQUIRED binary str (UTF8);
1311              REQUIRED int32 num;
1312            }
1313          }
1314          OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1315            REPEATED group map {
1316              REQUIRED binary key (UTF8);
1317              OPTIONAL int32 value;
1318            }
1319          }
1320          REQUIRED group my_map4 (MAP) {
1321            REPEATED group map {
1322              OPTIONAL binary key (UTF8);
1323              REQUIRED int32 value;
1324            }
1325          }
1326        }
1327        ";
1328
1329        // // Map<String, Integer>
1330        // required group my_map (MAP) {
1331        //   repeated group key_value {
1332        //     required binary key (UTF8);
1333        //     optional int32 value;
1334        //   }
1335        // }
1336        {
1337            arrow_fields.push(Field::new_map(
1338                "my_map1",
1339                "key_value",
1340                Field::new("key", DataType::Utf8, false),
1341                Field::new("value", DataType::Int32, true),
1342                false,
1343                false,
1344            ));
1345        }
1346
1347        // // Map<String, Integer> (nullable map, non-null values)
1348        // optional group my_map (MAP) {
1349        //   repeated group map {
1350        //     required binary str (UTF8);
1351        //     required int32 num;
1352        //   }
1353        // }
1354        {
1355            arrow_fields.push(Field::new_map(
1356                "my_map2",
1357                "map",
1358                Field::new("str", DataType::Utf8, false),
1359                Field::new("num", DataType::Int32, false),
1360                false,
1361                true,
1362            ));
1363        }
1364
1365        // // Map<String, Integer> (nullable map, nullable values)
1366        // optional group my_map (MAP_KEY_VALUE) {
1367        //   repeated group map {
1368        //     required binary key (UTF8);
1369        //     optional int32 value;
1370        //   }
1371        // }
1372        {
1373            arrow_fields.push(Field::new_map(
1374                "my_map3",
1375                "map",
1376                Field::new("key", DataType::Utf8, false),
1377                Field::new("value", DataType::Int32, true),
1378                false,
1379                true,
1380            ));
1381        }
1382
1383        // // Map<String, Integer> (non-compliant nullable key)
1384        // group my_map (MAP_KEY_VALUE) {
1385        //   repeated group map {
1386        //     optional binary key (UTF8);
1387        //     required int32 value;
1388        //   }
1389        // }
1390        {
1391            arrow_fields.push(Field::new_map(
1392                "my_map4",
1393                "map",
1394                Field::new("key", DataType::Utf8, false), // The key is always non-nullable (#5630)
1395                Field::new("value", DataType::Int32, false),
1396                false,
1397                false,
1398            ));
1399        }
1400
1401        let parquet_group_type = parse_message_type(message_type).unwrap();
1402
1403        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1404        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1405        let converted_fields = converted_arrow_schema.fields();
1406
1407        assert_eq!(arrow_fields.len(), converted_fields.len());
1408        for i in 0..arrow_fields.len() {
1409            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1410        }
1411    }
1412
1413    #[test]
1414    fn test_nested_schema() {
1415        let mut arrow_fields = Vec::new();
1416        {
1417            let group1_fields = Fields::from(vec![
1418                Field::new("leaf1", DataType::Boolean, false),
1419                Field::new("leaf2", DataType::Int32, false),
1420            ]);
1421            let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1422            arrow_fields.push(group1_struct);
1423
1424            let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1425            arrow_fields.push(leaf3_field);
1426        }
1427
1428        let message_type = "
1429        message test_schema {
1430          REQUIRED GROUP group1 {
1431            REQUIRED BOOLEAN leaf1;
1432            REQUIRED INT32 leaf2;
1433          }
1434          REQUIRED INT64 leaf3;
1435        }
1436        ";
1437        let parquet_group_type = parse_message_type(message_type).unwrap();
1438
1439        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1440        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1441        let converted_fields = converted_arrow_schema.fields();
1442
1443        assert_eq!(arrow_fields.len(), converted_fields.len());
1444        for i in 0..arrow_fields.len() {
1445            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1446        }
1447    }
1448
1449    #[test]
1450    fn test_nested_schema_partial() {
1451        let mut arrow_fields = Vec::new();
1452        {
1453            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1454            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1455            arrow_fields.push(group1);
1456
1457            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1458            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1459            arrow_fields.push(group2);
1460
1461            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1462        }
1463
1464        let message_type = "
1465        message test_schema {
1466          REQUIRED GROUP group1 {
1467            REQUIRED INT64 leaf1;
1468            REQUIRED INT64 leaf2;
1469          }
1470          REQUIRED  GROUP group2 {
1471            REQUIRED INT64 leaf3;
1472            REQUIRED INT64 leaf4;
1473          }
1474          REQUIRED INT64 leaf5;
1475        }
1476        ";
1477        let parquet_group_type = parse_message_type(message_type).unwrap();
1478
1479        // Expected partial arrow schema (columns 0, 3, 4):
1480        // required group group1 {
1481        //   required int64 leaf1;
1482        // }
1483        // required group group2 {
1484        //   required int64 leaf4;
1485        // }
1486        // required int64 leaf5;
1487
1488        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1489        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1490        let converted_arrow_schema =
1491            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1492        let converted_fields = converted_arrow_schema.fields();
1493
1494        assert_eq!(arrow_fields.len(), converted_fields.len());
1495        for i in 0..arrow_fields.len() {
1496            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1497        }
1498    }
1499
1500    #[test]
1501    fn test_nested_schema_partial_ordering() {
1502        let mut arrow_fields = Vec::new();
1503        {
1504            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1505            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1506            arrow_fields.push(group1);
1507
1508            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1509            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1510            arrow_fields.push(group2);
1511
1512            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1513        }
1514
1515        let message_type = "
1516        message test_schema {
1517          REQUIRED GROUP group1 {
1518            REQUIRED INT64 leaf1;
1519            REQUIRED INT64 leaf2;
1520          }
1521          REQUIRED  GROUP group2 {
1522            REQUIRED INT64 leaf3;
1523            REQUIRED INT64 leaf4;
1524          }
1525          REQUIRED INT64 leaf5;
1526        }
1527        ";
1528        let parquet_group_type = parse_message_type(message_type).unwrap();
1529
1530        // Expected partial arrow schema (columns 3, 4, 0):
1531        // required group group1 {
1532        //   required int64 leaf1;
1533        // }
1534        // required group group2 {
1535        //   required int64 leaf4;
1536        // }
1537        // required int64 leaf5;
1538
1539        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1540        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1541        let converted_arrow_schema =
1542            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1543        let converted_fields = converted_arrow_schema.fields();
1544
1545        assert_eq!(arrow_fields.len(), converted_fields.len());
1546        for i in 0..arrow_fields.len() {
1547            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1548        }
1549
1550        let mask =
1551            ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1552        let converted_arrow_schema =
1553            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1554        let converted_fields = converted_arrow_schema.fields();
1555
1556        assert_eq!(arrow_fields.len(), converted_fields.len());
1557        for i in 0..arrow_fields.len() {
1558            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1559        }
1560    }
1561
1562    #[test]
1563    fn test_repeated_nested_schema() {
1564        let mut arrow_fields = Vec::new();
1565        {
1566            arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1567
1568            let inner_group_list = Field::new_list(
1569                "innerGroup",
1570                Field::new_struct(
1571                    "innerGroup",
1572                    vec![Field::new("leaf3", DataType::Int32, true)],
1573                    false,
1574                ),
1575                false,
1576            );
1577
1578            let outer_group_list = Field::new_list(
1579                "outerGroup",
1580                Field::new_struct(
1581                    "outerGroup",
1582                    vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1583                    false,
1584                ),
1585                false,
1586            );
1587            arrow_fields.push(outer_group_list);
1588        }
1589
1590        let message_type = "
1591        message test_schema {
1592          OPTIONAL INT32 leaf1;
1593          REPEATED GROUP outerGroup {
1594            OPTIONAL INT32 leaf2;
1595            REPEATED GROUP innerGroup {
1596              OPTIONAL INT32 leaf3;
1597            }
1598          }
1599        }
1600        ";
1601        let parquet_group_type = parse_message_type(message_type).unwrap();
1602
1603        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1604        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1605        let converted_fields = converted_arrow_schema.fields();
1606
1607        assert_eq!(arrow_fields.len(), converted_fields.len());
1608        for i in 0..arrow_fields.len() {
1609            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1610        }
1611    }
1612
1613    #[test]
1614    fn test_column_desc_to_field() {
1615        let message_type = "
1616        message test_schema {
1617            REQUIRED BOOLEAN boolean;
1618            REQUIRED INT32   int8  (INT_8);
1619            REQUIRED INT32   uint8 (INTEGER(8,false));
1620            REQUIRED INT32   int16 (INT_16);
1621            REQUIRED INT32   uint16 (INTEGER(16,false));
1622            REQUIRED INT32   int32;
1623            REQUIRED INT64   int64;
1624            OPTIONAL DOUBLE  double;
1625            OPTIONAL FLOAT   float;
1626            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1627            OPTIONAL BINARY  string (UTF8);
1628            REPEATED BOOLEAN bools;
1629            OPTIONAL INT32   date       (DATE);
1630            OPTIONAL INT32   time_milli (TIME_MILLIS);
1631            OPTIONAL INT64   time_micro (TIME_MICROS);
1632            OPTIONAL INT64   time_nano (TIME(NANOS,false));
1633            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1634            REQUIRED INT64   ts_micro (TIMESTAMP_MICROS);
1635            REQUIRED INT64   ts_nano (TIMESTAMP(NANOS,true));
1636            REPEATED INT32   int_list;
1637            REPEATED BINARY  byte_list;
1638            REPEATED BINARY  string_list (UTF8);
1639            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1640            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1641            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1642        }
1643        ";
1644        let parquet_group_type = parse_message_type(message_type).unwrap();
1645
1646        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1647        let converted_arrow_fields = parquet_schema
1648            .columns()
1649            .iter()
1650            .map(|c| parquet_to_arrow_field(c).unwrap())
1651            .collect::<Vec<Field>>();
1652
1653        let arrow_fields = vec![
1654            Field::new("boolean", DataType::Boolean, false),
1655            Field::new("int8", DataType::Int8, false),
1656            Field::new("uint8", DataType::UInt8, false),
1657            Field::new("int16", DataType::Int16, false),
1658            Field::new("uint16", DataType::UInt16, false),
1659            Field::new("int32", DataType::Int32, false),
1660            Field::new("int64", DataType::Int64, false),
1661            Field::new("double", DataType::Float64, true),
1662            Field::new("float", DataType::Float32, true),
1663            Field::new("float16", DataType::Float16, true),
1664            Field::new("string", DataType::Utf8, true),
1665            Field::new_list(
1666                "bools",
1667                Field::new("bools", DataType::Boolean, false),
1668                false,
1669            ),
1670            Field::new("date", DataType::Date32, true),
1671            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1672            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1673            Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1674            Field::new(
1675                "ts_milli",
1676                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1677                true,
1678            ),
1679            Field::new(
1680                "ts_micro",
1681                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1682                false,
1683            ),
1684            Field::new(
1685                "ts_nano",
1686                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1687                false,
1688            ),
1689            Field::new_list(
1690                "int_list",
1691                Field::new("int_list", DataType::Int32, false),
1692                false,
1693            ),
1694            Field::new_list(
1695                "byte_list",
1696                Field::new("byte_list", DataType::Binary, false),
1697                false,
1698            ),
1699            Field::new_list(
1700                "string_list",
1701                Field::new("string_list", DataType::Utf8, false),
1702                false,
1703            ),
1704            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1705            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1706            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1707        ];
1708
1709        assert_eq!(arrow_fields, converted_arrow_fields);
1710    }
1711
1712    #[test]
1713    fn test_coerced_map_list() {
1714        // Create Arrow schema with non-Parquet naming
1715        let arrow_fields = vec![
1716            Field::new_list(
1717                "my_list",
1718                Field::new("item", DataType::Boolean, true),
1719                false,
1720            ),
1721            Field::new_map(
1722                "my_map",
1723                "entries",
1724                Field::new("keys", DataType::Utf8, false),
1725                Field::new("values", DataType::Int32, true),
1726                false,
1727                true,
1728            ),
1729        ];
1730        let arrow_schema = Schema::new(arrow_fields);
1731
1732        // Create Parquet schema with coerced names
1733        let message_type = "
1734        message parquet_schema {
1735            REQUIRED GROUP my_list (LIST) {
1736                REPEATED GROUP list {
1737                    OPTIONAL BOOLEAN element;
1738                }
1739            }
1740            OPTIONAL GROUP my_map (MAP) {
1741                REPEATED GROUP key_value {
1742                    REQUIRED BINARY key (STRING);
1743                    OPTIONAL INT32 value;
1744                }
1745            }
1746        }
1747        ";
1748        let parquet_group_type = parse_message_type(message_type).unwrap();
1749        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1750        let converted_arrow_schema = ArrowSchemaConverter::new()
1751            .with_coerce_types(true)
1752            .convert(&arrow_schema)
1753            .unwrap();
1754        assert_eq!(
1755            parquet_schema.columns().len(),
1756            converted_arrow_schema.columns().len()
1757        );
1758
1759        // Create Parquet schema without coerced names
1760        let message_type = "
1761        message parquet_schema {
1762            REQUIRED GROUP my_list (LIST) {
1763                REPEATED GROUP list {
1764                    OPTIONAL BOOLEAN item;
1765                }
1766            }
1767            OPTIONAL GROUP my_map (MAP) {
1768                REPEATED GROUP entries {
1769                    REQUIRED BINARY keys (STRING);
1770                    OPTIONAL INT32 values;
1771                }
1772            }
1773        }
1774        ";
1775        let parquet_group_type = parse_message_type(message_type).unwrap();
1776        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1777        let converted_arrow_schema = ArrowSchemaConverter::new()
1778            .with_coerce_types(false)
1779            .convert(&arrow_schema)
1780            .unwrap();
1781        assert_eq!(
1782            parquet_schema.columns().len(),
1783            converted_arrow_schema.columns().len()
1784        );
1785    }
1786
1787    #[test]
1788    fn test_field_to_column_desc() {
1789        let message_type = "
1790        message arrow_schema {
1791            REQUIRED BOOLEAN boolean;
1792            REQUIRED INT32   int8  (INT_8);
1793            REQUIRED INT32   int16 (INTEGER(16,true));
1794            REQUIRED INT32   int32;
1795            REQUIRED INT64   int64;
1796            OPTIONAL DOUBLE  double;
1797            OPTIONAL FLOAT   float;
1798            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1799            OPTIONAL BINARY  string (STRING);
1800            OPTIONAL GROUP   bools (LIST) {
1801                REPEATED GROUP list {
1802                    OPTIONAL BOOLEAN element;
1803                }
1804            }
1805            REQUIRED GROUP   bools_non_null (LIST) {
1806                REPEATED GROUP list {
1807                    REQUIRED BOOLEAN element;
1808                }
1809            }
1810            OPTIONAL INT32   date       (DATE);
1811            OPTIONAL INT32   time_milli (TIME(MILLIS,false));
1812            OPTIONAL INT32   time_milli_utc (TIME(MILLIS,true));
1813            OPTIONAL INT64   time_micro (TIME_MICROS);
1814            OPTIONAL INT64   time_micro_utc (TIME(MICROS, true));
1815            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1816            REQUIRED INT64   ts_micro (TIMESTAMP(MICROS,false));
1817            REQUIRED INT64   ts_seconds;
1818            REQUIRED INT64   ts_micro_utc (TIMESTAMP(MICROS, true));
1819            REQUIRED INT64   ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1820            REQUIRED INT64   ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1821            REQUIRED INT64   ts_micro_non_utc (TIMESTAMP(MICROS, true));
1822            REQUIRED GROUP struct {
1823                REQUIRED BOOLEAN bools;
1824                REQUIRED INT32 uint32 (INTEGER(32,false));
1825                REQUIRED GROUP   int32 (LIST) {
1826                    REPEATED GROUP list {
1827                        OPTIONAL INT32 element;
1828                    }
1829                }
1830            }
1831            REQUIRED BINARY  dictionary_strings (STRING);
1832            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1833            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1834            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1835            REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1836            REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1837        }
1838        ";
1839        let parquet_group_type = parse_message_type(message_type).unwrap();
1840
1841        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1842
1843        let arrow_fields = vec![
1844            Field::new("boolean", DataType::Boolean, false),
1845            Field::new("int8", DataType::Int8, false),
1846            Field::new("int16", DataType::Int16, false),
1847            Field::new("int32", DataType::Int32, false),
1848            Field::new("int64", DataType::Int64, false),
1849            Field::new("double", DataType::Float64, true),
1850            Field::new("float", DataType::Float32, true),
1851            Field::new("float16", DataType::Float16, true),
1852            Field::new("string", DataType::Utf8, true),
1853            Field::new_list(
1854                "bools",
1855                Field::new("element", DataType::Boolean, true),
1856                true,
1857            ),
1858            Field::new_list(
1859                "bools_non_null",
1860                Field::new("element", DataType::Boolean, false),
1861                false,
1862            ),
1863            Field::new("date", DataType::Date32, true),
1864            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1865            Field::new(
1866                "time_milli_utc",
1867                DataType::Time32(TimeUnit::Millisecond),
1868                true,
1869            )
1870            .with_metadata(HashMap::from_iter(vec![(
1871                "adjusted_to_utc".to_string(),
1872                "".to_string(),
1873            )])),
1874            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1875            Field::new(
1876                "time_micro_utc",
1877                DataType::Time64(TimeUnit::Microsecond),
1878                true,
1879            )
1880            .with_metadata(HashMap::from_iter(vec![(
1881                "adjusted_to_utc".to_string(),
1882                "".to_string(),
1883            )])),
1884            Field::new(
1885                "ts_milli",
1886                DataType::Timestamp(TimeUnit::Millisecond, None),
1887                true,
1888            ),
1889            Field::new(
1890                "ts_micro",
1891                DataType::Timestamp(TimeUnit::Microsecond, None),
1892                false,
1893            ),
1894            Field::new(
1895                "ts_seconds",
1896                DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1897                false,
1898            ),
1899            Field::new(
1900                "ts_micro_utc",
1901                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1902                false,
1903            ),
1904            Field::new(
1905                "ts_millis_zero_offset",
1906                DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1907                false,
1908            ),
1909            Field::new(
1910                "ts_millis_zero_negative_offset",
1911                DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1912                false,
1913            ),
1914            Field::new(
1915                "ts_micro_non_utc",
1916                DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1917                false,
1918            ),
1919            Field::new_struct(
1920                "struct",
1921                vec![
1922                    Field::new("bools", DataType::Boolean, false),
1923                    Field::new("uint32", DataType::UInt32, false),
1924                    Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1925                ],
1926                false,
1927            ),
1928            Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1929            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1930            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1931            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1932            Field::new("decimal128", DataType::Decimal128(38, 2), false),
1933            Field::new("decimal256", DataType::Decimal256(39, 2), false),
1934        ];
1935        let arrow_schema = Schema::new(arrow_fields);
1936        let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1937
1938        assert_eq!(
1939            parquet_schema.columns().len(),
1940            converted_arrow_schema.columns().len()
1941        );
1942        parquet_schema
1943            .columns()
1944            .iter()
1945            .zip(converted_arrow_schema.columns())
1946            .for_each(|(a, b)| {
1947                // Only check logical type if it's set on the Parquet side.
1948                // This is because the Arrow conversion always sets logical type,
1949                // even if there wasn't originally one.
1950                // This is not an issue, but is an inconvenience for this test.
1951                match a.logical_type_ref() {
1952                    Some(_) => {
1953                        assert_eq!(a, b)
1954                    }
1955                    None => {
1956                        assert_eq!(a.name(), b.name());
1957                        assert_eq!(a.physical_type(), b.physical_type());
1958                        assert_eq!(a.converted_type(), b.converted_type());
1959                    }
1960                };
1961            });
1962    }
1963
1964    #[test]
1965    #[should_panic(expected = "Parquet does not support writing empty structs")]
1966    fn test_empty_struct_field() {
1967        let arrow_fields = vec![Field::new(
1968            "struct",
1969            DataType::Struct(Fields::empty()),
1970            false,
1971        )];
1972        let arrow_schema = Schema::new(arrow_fields);
1973        let converted_arrow_schema = ArrowSchemaConverter::new()
1974            .with_coerce_types(true)
1975            .convert(&arrow_schema);
1976
1977        converted_arrow_schema.unwrap();
1978    }
1979
1980    #[test]
1981    fn test_metadata() {
1982        let message_type = "
1983        message test_schema {
1984            OPTIONAL BINARY  string (STRING);
1985        }
1986        ";
1987        let parquet_group_type = parse_message_type(message_type).unwrap();
1988
1989        let key_value_metadata = vec![
1990            KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
1991            KeyValue::new("baz".to_owned(), None),
1992        ];
1993
1994        let mut expected_metadata: HashMap<String, String> = HashMap::new();
1995        expected_metadata.insert("foo".to_owned(), "bar".to_owned());
1996
1997        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1998        let converted_arrow_schema =
1999            parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
2000
2001        assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
2002    }
2003
2004    #[test]
2005    fn test_arrow_schema_roundtrip() -> Result<()> {
2006        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2007            a.iter()
2008                .map(|(a, b)| (a.to_string(), b.to_string()))
2009                .collect()
2010        };
2011
2012        let schema = Schema::new_with_metadata(
2013            vec![
2014                Field::new("c1", DataType::Utf8, false)
2015                    .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
2016                Field::new("c2", DataType::Binary, false),
2017                Field::new("c3", DataType::FixedSizeBinary(3), false),
2018                Field::new("c4", DataType::Boolean, false),
2019                Field::new("c5", DataType::Date32, false),
2020                Field::new("c6", DataType::Date64, false),
2021                Field::new("c7", DataType::Time32(TimeUnit::Second), false),
2022                Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
2023                Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
2024                Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
2025                Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
2026                Field::new(
2027                    "c16",
2028                    DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
2029                    false,
2030                ),
2031                Field::new(
2032                    "c17",
2033                    DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
2034                    false,
2035                ),
2036                Field::new(
2037                    "c18",
2038                    DataType::Timestamp(TimeUnit::Nanosecond, None),
2039                    false,
2040                ),
2041                Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
2042                Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
2043                Field::new_list(
2044                    "c21",
2045                    Field::new_list_field(DataType::Boolean, true)
2046                        .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
2047                    false,
2048                )
2049                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
2050                Field::new(
2051                    "c22",
2052                    DataType::FixedSizeList(
2053                        Arc::new(Field::new_list_field(DataType::Boolean, true)),
2054                        5,
2055                    ),
2056                    false,
2057                ),
2058                Field::new_list(
2059                    "c23",
2060                    Field::new_large_list(
2061                        "inner",
2062                        Field::new_list_field(
2063                            DataType::Struct(
2064                                vec![
2065                                    Field::new("a", DataType::Int16, true),
2066                                    Field::new("b", DataType::Float64, false),
2067                                    Field::new("c", DataType::Float32, false),
2068                                    Field::new("d", DataType::Float16, false),
2069                                ]
2070                                .into(),
2071                            ),
2072                            false,
2073                        ),
2074                        true,
2075                    ),
2076                    false,
2077                ),
2078                Field::new(
2079                    "c24",
2080                    DataType::Struct(Fields::from(vec![
2081                        Field::new("a", DataType::Utf8, false),
2082                        Field::new("b", DataType::UInt16, false),
2083                    ])),
2084                    false,
2085                ),
2086                Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
2087                Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
2088                // Duration types not supported
2089                // Field::new("c27", DataType::Duration(TimeUnit::Second), false),
2090                // Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false),
2091                // Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false),
2092                // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false),
2093                #[allow(deprecated)]
2094                Field::new_dict(
2095                    "c31",
2096                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2097                    true,
2098                    123,
2099                    true,
2100                )
2101                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
2102                Field::new("c32", DataType::LargeBinary, true),
2103                Field::new("c33", DataType::LargeUtf8, true),
2104                Field::new_large_list(
2105                    "c34",
2106                    Field::new_list(
2107                        "inner",
2108                        Field::new_list_field(
2109                            DataType::Struct(
2110                                vec![
2111                                    Field::new("a", DataType::Int16, true),
2112                                    Field::new("b", DataType::Float64, true),
2113                                ]
2114                                .into(),
2115                            ),
2116                            true,
2117                        ),
2118                        true,
2119                    ),
2120                    true,
2121                ),
2122                Field::new("c35", DataType::Null, true),
2123                Field::new("c36", DataType::Decimal128(2, 1), false),
2124                Field::new("c37", DataType::Decimal256(50, 20), false),
2125                Field::new("c38", DataType::Decimal128(18, 12), true),
2126                Field::new_map(
2127                    "c39",
2128                    "key_value",
2129                    Field::new("key", DataType::Utf8, false),
2130                    Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2131                    false, // fails to roundtrip keys_sorted
2132                    true,
2133                ),
2134                Field::new_map(
2135                    "c40",
2136                    "my_entries",
2137                    Field::new("my_key", DataType::Utf8, false)
2138                        .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2139                    Field::new_list(
2140                        "my_value",
2141                        Field::new_list_field(DataType::Utf8, true)
2142                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2143                        true,
2144                    )
2145                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2146                    false, // fails to roundtrip keys_sorted
2147                    true,
2148                )
2149                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2150                Field::new_map(
2151                    "c41",
2152                    "my_entries",
2153                    Field::new("my_key", DataType::Utf8, false),
2154                    Field::new_list(
2155                        "my_value",
2156                        Field::new_list_field(DataType::Utf8, true)
2157                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2158                        true,
2159                    ),
2160                    false, // fails to roundtrip keys_sorted
2161                    false,
2162                ),
2163                Field::new("c42", DataType::Decimal32(5, 2), false),
2164                Field::new("c43", DataType::Decimal64(18, 12), true),
2165            ],
2166            meta(&[("Key", "Value")]),
2167        );
2168
2169        // write to an empty parquet file so that schema is serialized
2170        let file = tempfile::tempfile().unwrap();
2171        let writer =
2172            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2173        writer.close()?;
2174
2175        // read file back
2176        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2177
2178        // Check arrow schema
2179        let read_schema = arrow_reader.schema();
2180        assert_eq!(&schema, read_schema.as_ref());
2181
2182        // Walk schema finding field IDs
2183        let mut stack = Vec::with_capacity(10);
2184        let mut out = Vec::with_capacity(10);
2185
2186        let root = arrow_reader.parquet_schema().root_schema_ptr();
2187        stack.push((root.name().to_string(), root));
2188
2189        while let Some((p, t)) = stack.pop() {
2190            if t.is_group() {
2191                for f in t.get_fields() {
2192                    stack.push((format!("{p}.{}", f.name()), f.clone()))
2193                }
2194            }
2195
2196            let info = t.get_basic_info();
2197            if info.has_id() {
2198                out.push(format!("{p} -> {}", info.id()))
2199            }
2200        }
2201        out.sort_unstable();
2202        let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2203
2204        assert_eq!(
2205            &out,
2206            &[
2207                "arrow_schema.c1 -> 2",
2208                "arrow_schema.c21 -> 4",
2209                "arrow_schema.c21.list.item -> 5",
2210                "arrow_schema.c31 -> 6",
2211                "arrow_schema.c40 -> 7",
2212                "arrow_schema.c40.my_entries.my_key -> 8",
2213                "arrow_schema.c40.my_entries.my_value -> 9",
2214                "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2215                "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2216            ]
2217        );
2218
2219        Ok(())
2220    }
2221
2222    #[test]
2223    fn test_read_parquet_field_ids_raw() -> Result<()> {
2224        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2225            a.iter()
2226                .map(|(a, b)| (a.to_string(), b.to_string()))
2227                .collect()
2228        };
2229        let schema = Schema::new_with_metadata(
2230            vec![
2231                Field::new("c1", DataType::Utf8, true)
2232                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2233                Field::new("c2", DataType::Utf8, true)
2234                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2235            ],
2236            HashMap::new(),
2237        );
2238
2239        let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2240        let parquet_bytes = writer.into_inner()?;
2241
2242        let reader =
2243            crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2244        let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2245
2246        // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema
2247        let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2248
2249        let parq_schema_descr = ArrowSchemaConverter::new()
2250            .with_coerce_types(true)
2251            .convert(&arrow_schema)?;
2252        let parq_fields = parq_schema_descr.root_schema().get_fields();
2253        assert_eq!(parq_fields.len(), 2);
2254        assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2255        assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2256
2257        Ok(())
2258    }
2259
2260    #[test]
2261    fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2262        let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2263            .iter()
2264            .cloned()
2265            .collect();
2266
2267        let schema = Schema::new_with_metadata(
2268            vec![
2269                Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2270                Field::new(
2271                    "c22",
2272                    DataType::FixedSizeList(
2273                        Arc::new(Field::new("items", DataType::Boolean, false)),
2274                        5,
2275                    ),
2276                    false,
2277                ),
2278                Field::new_list(
2279                    "c23",
2280                    Field::new_large_list(
2281                        "items",
2282                        Field::new_struct(
2283                            "items",
2284                            vec![
2285                                Field::new("a", DataType::Int16, true),
2286                                Field::new("b", DataType::Float64, false),
2287                            ],
2288                            true,
2289                        ),
2290                        true,
2291                    ),
2292                    true,
2293                ),
2294            ],
2295            metadata,
2296        );
2297
2298        // write to an empty parquet file so that schema is serialized
2299        let file = tempfile::tempfile().unwrap();
2300        let writer =
2301            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2302        writer.close()?;
2303
2304        // read file back
2305        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2306        let read_schema = arrow_reader.schema();
2307        assert_eq!(&schema, read_schema.as_ref());
2308        Ok(())
2309    }
2310
2311    #[test]
2312    fn test_get_arrow_schema_from_metadata() {
2313        assert!(get_arrow_schema_from_metadata("").is_err());
2314    }
2315
2316    #[test]
2317    #[cfg(feature = "arrow_canonical_extension_types")]
2318    fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2319        use arrow_schema::extension::Uuid;
2320        let arrow_schema = Schema::new(vec![
2321            Field::new("uuid", DataType::FixedSizeBinary(16), false).with_extension_type(Uuid),
2322        ]);
2323
2324        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2325
2326        assert_eq!(
2327            parquet_schema.column(0).logical_type_ref(),
2328            Some(&LogicalType::Uuid)
2329        );
2330
2331        let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2332        assert_eq!(arrow_schema.field(0).try_extension_type::<Uuid>()?, Uuid);
2333
2334        Ok(())
2335    }
2336
2337    #[test]
2338    #[cfg(feature = "arrow_canonical_extension_types")]
2339    fn arrow_json_to_parquet_json() -> Result<()> {
2340        use arrow_schema::extension::Json;
2341        let arrow_schema = Schema::new(vec![
2342            Field::new("json", DataType::Utf8, false).with_extension_type(Json::default()),
2343        ]);
2344
2345        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2346
2347        assert_eq!(
2348            parquet_schema.column(0).logical_type_ref(),
2349            Some(&LogicalType::Json)
2350        );
2351
2352        let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2353        assert_eq!(
2354            arrow_schema.field(0).try_extension_type::<Json>()?,
2355            Json::default()
2356        );
2357
2358        Ok(())
2359    }
2360
2361    #[test]
2362    fn test_parquet_to_arrow_field_levels_with_virtual_rejects_non_virtual() {
2363        let message_type = "
2364        message test_schema {
2365            REQUIRED INT32 id;
2366        }
2367        ";
2368        let parquet_schema = Arc::new(parse_message_type(message_type).unwrap());
2369        let descriptor = SchemaDescriptor::new(parquet_schema);
2370
2371        // Try to pass a regular field (not a virtual column)
2372        let regular_field = Arc::new(Field::new("regular_column", DataType::Int64, false));
2373        let result = parquet_to_arrow_field_levels_with_virtual(
2374            &descriptor,
2375            ProjectionMask::all(),
2376            None,
2377            &[regular_field],
2378        );
2379
2380        assert!(result.is_err());
2381        assert!(
2382            result
2383                .unwrap_err()
2384                .to_string()
2385                .contains("is not a virtual column")
2386        );
2387    }
2388}