Skip to main content

parquet/arrow/schema/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Converting Parquet schema <--> Arrow schema: [`ArrowSchemaConverter`] and [parquet_to_arrow_schema]
19
20use base64::Engine;
21use base64::prelude::BASE64_STANDARD;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, TimeUnit};
27
28use crate::basic::{
29    ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
30};
31use crate::errors::{ParquetError, Result};
32use crate::file::{metadata::KeyValue, properties::WriterProperties};
33use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
34
35mod complex;
36mod extension;
37mod primitive;
38pub mod virtual_type;
39
40use super::PARQUET_FIELD_ID_META_KEY;
41use crate::arrow::ProjectionMask;
42use crate::arrow::schema::extension::{
43    has_extension_type, logical_type_for_binary, logical_type_for_binary_view,
44    logical_type_for_fixed_size_binary, logical_type_for_string, logical_type_for_struct,
45    try_add_extension_type,
46};
47pub(crate) use complex::{ParquetField, ParquetFieldType, VirtualColumnType};
48
49/// Convert Parquet schema to Arrow schema including optional metadata
50///
51/// Attempts to decode any existing Arrow schema metadata, falling back
52/// to converting the Parquet schema column-wise
53pub fn parquet_to_arrow_schema(
54    parquet_schema: &SchemaDescriptor,
55    key_value_metadata: Option<&Vec<KeyValue>>,
56) -> Result<Schema> {
57    parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
58}
59
60/// Convert parquet schema to arrow schema including optional metadata,
61/// only preserving some leaf columns.
62pub fn parquet_to_arrow_schema_by_columns(
63    parquet_schema: &SchemaDescriptor,
64    mask: ProjectionMask,
65    key_value_metadata: Option<&Vec<KeyValue>>,
66) -> Result<Schema> {
67    Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata, &[])?.0)
68}
69
70/// Determines the Arrow Schema from a Parquet schema
71///
72/// Looks for an Arrow schema metadata "hint" (see
73/// [`parquet_to_arrow_field_levels`]), and uses it if present to ensure
74/// lossless round trips.
75pub(crate) fn parquet_to_arrow_schema_and_fields(
76    parquet_schema: &SchemaDescriptor,
77    mask: ProjectionMask,
78    key_value_metadata: Option<&Vec<KeyValue>>,
79    virtual_columns: &[FieldRef],
80) -> Result<(Schema, Option<ParquetField>)> {
81    let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
82    let maybe_schema = metadata
83        .remove(super::ARROW_SCHEMA_META_KEY)
84        .map(|value| get_arrow_schema_from_metadata(&value))
85        .transpose()?;
86
87    // Add the Arrow metadata to the Parquet metadata skipping keys that collide
88    if let Some(arrow_schema) = &maybe_schema {
89        arrow_schema.metadata().iter().for_each(|(k, v)| {
90            metadata.entry(k.clone()).or_insert_with(|| v.clone());
91        });
92    }
93
94    let hint = maybe_schema.as_ref().map(|s| s.fields());
95    let field_levels =
96        parquet_to_arrow_field_levels_with_virtual(parquet_schema, mask, hint, virtual_columns)?;
97    let schema = Schema::new_with_metadata(field_levels.fields, metadata);
98    Ok((schema, field_levels.levels))
99}
100
101/// Schema information necessary to decode a parquet file as arrow [`Fields`]
102///
103/// In particular this stores the dremel-level information necessary to correctly
104/// interpret the encoded definition and repetition levels
105///
106/// Note: this is an opaque container intended to be used with lower-level APIs
107/// within this crate
108#[derive(Debug, Clone)]
109pub struct FieldLevels {
110    pub(crate) fields: Fields,
111    pub(crate) levels: Option<ParquetField>,
112}
113
114/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`]
115///
116/// Columns not included within [`ProjectionMask`] will be ignored.
117///
118/// The optional `hint` parameter is the desired Arrow schema. See the
119/// [`arrow`] module documentation for more information.
120///
121/// [`arrow`]: crate::arrow
122///
123/// # Notes:
124/// Where a field type in `hint` is compatible with the corresponding parquet type in `schema`, it
125/// will be used, otherwise the default arrow type for the given parquet column type will be used.
126///
127/// This is to accommodate arrow types that cannot be round-tripped through parquet natively.
128/// Depending on the parquet writer, this can lead to a mismatch between a file's parquet schema
129/// and its embedded arrow schema. The parquet `schema` must be treated as authoritative in such
130/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for more information
131///
132/// Note: this is a low-level API, most users will want to make use of the higher-level
133/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file.
134pub fn parquet_to_arrow_field_levels(
135    schema: &SchemaDescriptor,
136    mask: ProjectionMask,
137    hint: Option<&Fields>,
138) -> Result<FieldLevels> {
139    parquet_to_arrow_field_levels_with_virtual(schema, mask, hint, &[])
140}
141
142/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`] with support for virtual columns
143///
144/// Columns not included within [`ProjectionMask`] will be ignored.
145///
146/// The optional `hint` parameter is the desired Arrow schema. See the
147/// [`arrow`] module documentation for more information.
148///
149/// [`arrow`]: crate::arrow
150///
151/// # Arguments
152/// * `schema` - The Parquet schema descriptor
153/// * `mask` - Projection mask to select which columns to include
154/// * `hint` - Optional hint for Arrow field types to use instead of defaults
155/// * `virtual_columns` - Virtual columns to append to the schema (e.g., row numbers)
156///
157/// # Notes:
158/// Where a field type in `hint` is compatible with the corresponding parquet type in `schema`, it
159/// will be used, otherwise the default arrow type for the given parquet column type will be used.
160///
161/// Virtual columns are columns that don't exist in the Parquet file but are generated during reading.
162/// They must have extension type names starting with "arrow.virtual.".
163///
164/// This is to accommodate arrow types that cannot be round-tripped through parquet natively.
165/// Depending on the parquet writer, this can lead to a mismatch between a file's parquet schema
166/// and its embedded arrow schema. The parquet `schema` must be treated as authoritative in such
167/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for more information
168///
169/// Note: this is a low-level API, most users will want to make use of the higher-level
170/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file.
171pub fn parquet_to_arrow_field_levels_with_virtual(
172    schema: &SchemaDescriptor,
173    mask: ProjectionMask,
174    hint: Option<&Fields>,
175    virtual_columns: &[FieldRef],
176) -> Result<FieldLevels> {
177    // Validate that all fields are virtual columns
178    for field in virtual_columns {
179        if !virtual_type::is_virtual_column(field) {
180            return Err(ParquetError::General(format!(
181                "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
182                field.name()
183            )));
184        }
185    }
186
187    // Convert the regular schema first
188    let mut parquet_field = match complex::convert_schema(schema, mask, hint)? {
189        Some(field) => field,
190        None if virtual_columns.is_empty() => {
191            return Ok(FieldLevels {
192                fields: Fields::empty(),
193                levels: None,
194            });
195        }
196        None => {
197            // No regular fields, but we have virtual columns - create empty root struct
198            ParquetField {
199                rep_level: 0,
200                def_level: 0,
201                nullable: false,
202                arrow_type: DataType::Struct(Fields::empty()),
203                field_type: ParquetFieldType::Group {
204                    children: Vec::new(),
205                },
206            }
207        }
208    };
209
210    // Append virtual columns if any
211    if !virtual_columns.is_empty() {
212        match &mut parquet_field.field_type {
213            ParquetFieldType::Group { children } => {
214                // Get the mutable fields from the struct type
215                let DataType::Struct(ref mut fields) = parquet_field.arrow_type else {
216                    unreachable!("Root field must be a struct");
217                };
218
219                // Convert to mutable Vec to append
220                let mut fields_vec: Vec<FieldRef> = fields.iter().cloned().collect();
221
222                // Append each virtual column
223                for virtual_column in virtual_columns {
224                    // Virtual columns can only be added at the root level
225                    assert_eq!(
226                        parquet_field.rep_level, 0,
227                        "Virtual columns can only be added at rep level 0"
228                    );
229                    assert_eq!(
230                        parquet_field.def_level, 0,
231                        "Virtual columns can only be added at def level 0"
232                    );
233
234                    fields_vec.push(virtual_column.clone());
235                    let virtual_parquet_field = complex::convert_virtual_field(
236                        virtual_column,
237                        parquet_field.rep_level,
238                        parquet_field.def_level,
239                    )?;
240                    children.push(virtual_parquet_field);
241                }
242
243                // Update the fields
244                parquet_field.arrow_type = DataType::Struct(Fields::from(fields_vec));
245            }
246            _ => unreachable!("Root field must be a group"),
247        }
248    }
249
250    match &parquet_field.arrow_type {
251        DataType::Struct(fields) => Ok(FieldLevels {
252            fields: fields.clone(),
253            levels: Some(parquet_field),
254        }),
255        _ => unreachable!(),
256    }
257}
258
259/// Try to convert Arrow schema metadata into a schema
260fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
261    let decoded = BASE64_STANDARD.decode(encoded_meta);
262    match decoded {
263        Ok(bytes) => {
264            let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
265                &bytes[8..]
266            } else {
267                bytes.as_slice()
268            };
269            match arrow_ipc::root_as_message(slice) {
270                Ok(message) => message
271                    .header_as_schema()
272                    .map(arrow_ipc::convert::fb_to_schema)
273                    .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
274                Err(err) => {
275                    // The flatbuffers implementation returns an error on verification error.
276                    Err(arrow_err!(
277                        "Unable to get root as message stored in {}: {:?}",
278                        super::ARROW_SCHEMA_META_KEY,
279                        err
280                    ))
281                }
282            }
283        }
284        Err(err) => {
285            // The C++ implementation returns an error if the schema can't be parsed.
286            Err(arrow_err!(
287                "Unable to decode the encoded schema stored in {}, {:?}",
288                super::ARROW_SCHEMA_META_KEY,
289                err
290            ))
291        }
292    }
293}
294
295/// Encodes the Arrow schema into the IPC format, and base64 encodes it
296pub fn encode_arrow_schema(schema: &Schema) -> String {
297    let options = writer::IpcWriteOptions::default();
298    let mut dictionary_tracker = writer::DictionaryTracker::new(true);
299    let data_gen = writer::IpcDataGenerator::default();
300    let mut serialized_schema =
301        data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
302
303    // manually prepending the length to the schema as arrow uses the legacy IPC format
304    // TODO: change after addressing ARROW-9777
305    let schema_len = serialized_schema.ipc_message.len();
306    let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
307    len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
308    len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
309    len_prefix_schema.append(&mut serialized_schema.ipc_message);
310
311    BASE64_STANDARD.encode(&len_prefix_schema)
312}
313
314fn flatten_ree_field(field: &Field) -> Field {
315    match field.data_type() {
316        DataType::RunEndEncoded(_, value_field) => field
317            .clone()
318            .with_data_type(value_field.data_type().clone()),
319        _ => field.clone(),
320    }
321}
322
323/// Mutates writer metadata by storing the encoded Arrow schema hint in
324/// [`ARROW_SCHEMA_META_KEY`].
325///
326/// If there is an existing Arrow schema metadata, it is replaced.
327///
328/// [`ARROW_SCHEMA_META_KEY`]: crate::arrow::ARROW_SCHEMA_META_KEY
329pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
330    let has_ree = schema
331        .fields()
332        .iter()
333        .any(|f| matches!(f.data_type(), DataType::RunEndEncoded(_, _)));
334    let flat_schema;
335    let schema = if has_ree {
336        let flat_fields: Vec<Field> = schema
337            .fields()
338            .iter()
339            .map(|f| flatten_ree_field(f))
340            .collect();
341        flat_schema = Schema::new_with_metadata(flat_fields, schema.metadata().clone());
342        &flat_schema
343    } else {
344        schema
345    };
346    let encoded = encode_arrow_schema(schema);
347
348    let schema_kv = KeyValue {
349        key: super::ARROW_SCHEMA_META_KEY.to_string(),
350        value: Some(encoded),
351    };
352
353    let meta = props
354        .key_value_metadata
355        .get_or_insert_with(Default::default);
356
357    // check if ARROW:schema exists, and overwrite it
358    let schema_meta = meta
359        .iter()
360        .enumerate()
361        .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
362    match schema_meta {
363        Some((i, _)) => {
364            meta.remove(i);
365            meta.push(schema_kv);
366        }
367        None => {
368            meta.push(schema_kv);
369        }
370    }
371}
372
373/// Converter for Arrow schema to Parquet schema
374///
375/// See the documentation on the [`arrow`] module for background
376/// information on how Arrow schema is represented in Parquet.
377///
378/// [`arrow`]: crate::arrow
379///
380/// # Example:
381/// ```
382/// # use std::sync::Arc;
383/// # use arrow_schema::{Field, Schema, DataType};
384/// # use parquet::arrow::ArrowSchemaConverter;
385/// use parquet::schema::types::{SchemaDescriptor, Type};
386/// use parquet::basic; // note there are two `Type`s in the following example
387/// // create an Arrow Schema
388/// let arrow_schema = Schema::new(vec![
389///   Field::new("a", DataType::Int64, true),
390///   Field::new("b", DataType::Date32, true),
391/// ]);
392/// // convert the Arrow schema to a Parquet schema
393/// let parquet_schema = ArrowSchemaConverter::new()
394///   .convert(&arrow_schema)
395///   .unwrap();
396///
397/// let expected_parquet_schema = SchemaDescriptor::new(
398///   Arc::new(
399///     Type::group_type_builder("arrow_schema")
400///       .with_fields(vec![
401///         Arc::new(
402///          Type::primitive_type_builder("a", basic::Type::INT64)
403///           .build().unwrap()
404///         ),
405///         Arc::new(
406///          Type::primitive_type_builder("b", basic::Type::INT32)
407///           .with_converted_type(basic::ConvertedType::DATE)
408///           .with_logical_type(Some(basic::LogicalType::Date))
409///           .build().unwrap()
410///         ),
411///      ])
412///      .build().unwrap()
413///   )
414/// );
415/// assert_eq!(parquet_schema, expected_parquet_schema);
416/// ```
417#[derive(Debug)]
418pub struct ArrowSchemaConverter<'a> {
419    /// Name of the root schema in Parquet
420    schema_root: &'a str,
421    /// Should we coerce Arrow types to compatible Parquet types?
422    ///
423    /// See docs on [Self::with_coerce_types]`
424    coerce_types: bool,
425}
426
427impl Default for ArrowSchemaConverter<'_> {
428    fn default() -> Self {
429        Self::new()
430    }
431}
432
433impl<'a> ArrowSchemaConverter<'a> {
434    /// Create a new converter
435    pub fn new() -> Self {
436        Self {
437            schema_root: "arrow_schema",
438            coerce_types: false,
439        }
440    }
441
442    /// Should Arrow types be coerced into Parquet native types (default `false`).
443    ///
444    /// Setting this option to `true` will result in Parquet files that can be
445    /// read by more readers, but may lose precision for Arrow types such as
446    /// [`DataType::Date64`] which have no direct [corresponding Parquet type].
447    ///
448    /// By default, this converter does not coerce to native Parquet types. Enabling type
449    /// coercion allows for meaningful representations that do not require
450    /// downstream readers to consider the embedded Arrow schema, and can allow
451    /// for greater compatibility with other Parquet implementations. However,
452    /// type coercion also prevents data from being losslessly round-tripped.
453    ///
454    /// # Discussion
455    ///
456    /// Some Arrow types such as `Date64`, `Timestamp` and `Interval` have no
457    /// corresponding Parquet logical type. Thus, they can not be losslessly
458    /// round-tripped when stored using the appropriate Parquet logical type.
459    /// For example, some Date64 values may be truncated when stored with
460    /// parquet's native 32 bit date type.
461    ///
462    /// For [`List`] and [`Map`] types, some Parquet readers expect certain
463    /// schema elements to have specific names (earlier versions of the spec
464    /// were somewhat ambiguous on this point). Type coercion will use the names
465    /// prescribed by the Parquet specification, potentially losing naming
466    /// metadata from the Arrow schema.
467    ///
468    /// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
469    /// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
470    /// [corresponding Parquet type]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date
471    ///
472    pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
473        self.coerce_types = coerce_types;
474        self
475    }
476
477    /// Set the root schema element name (defaults to `"arrow_schema"`).
478    pub fn schema_root(mut self, schema_root: &'a str) -> Self {
479        self.schema_root = schema_root;
480        self
481    }
482
483    /// Convert the specified Arrow [`Schema`] to the desired Parquet [`SchemaDescriptor`]
484    ///
485    /// See example in [`ArrowSchemaConverter`]
486    pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
487        let fields = schema
488            .fields()
489            .iter()
490            .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
491            .collect::<Result<_>>()?;
492        let group = Type::group_type_builder(self.schema_root)
493            .with_fields(fields)
494            .build()?;
495        Ok(SchemaDescriptor::new(Arc::new(group)))
496    }
497}
498
499fn parse_key_value_metadata(
500    key_value_metadata: Option<&Vec<KeyValue>>,
501) -> Option<HashMap<String, String>> {
502    match key_value_metadata {
503        Some(key_values) => {
504            let map: HashMap<String, String> = key_values
505                .iter()
506                .filter_map(|kv| {
507                    kv.value
508                        .as_ref()
509                        .map(|value| (kv.key.clone(), value.clone()))
510                })
511                .collect();
512
513            if map.is_empty() { None } else { Some(map) }
514        }
515        None => None,
516    }
517}
518
519/// Convert parquet column schema to arrow field.
520pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
521    let field = complex::convert_type(&parquet_column.self_type_ptr())?;
522    let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
523
524    let parquet_type = parquet_column.self_type();
525    let basic_info = parquet_type.get_basic_info();
526
527    let mut hash_map_size = 0;
528    if basic_info.has_id() {
529        hash_map_size += 1;
530    }
531    if has_extension_type(parquet_type) {
532        hash_map_size += 1;
533    }
534    if hash_map_size == 0 {
535        return Ok(ret);
536    }
537    ret.set_metadata(HashMap::with_capacity(hash_map_size));
538    if basic_info.has_id() {
539        ret.metadata_mut().insert(
540            PARQUET_FIELD_ID_META_KEY.to_string(),
541            basic_info.id().to_string(),
542        );
543    }
544    try_add_extension_type(ret, parquet_column.self_type())
545}
546
547pub fn decimal_length_from_precision(precision: u8) -> usize {
548    // digits = floor(log_10(2^(8*n - 1) - 1))  // definition in parquet's logical types
549    // ceil(digits) = log10(2^(8*n - 1) - 1)
550    // 10^ceil(digits) = 2^(8*n - 1) - 1
551    // 10^ceil(digits) + 1 = 2^(8*n - 1)
552    // log2(10^ceil(digits) + 1) = (8*n - 1)
553    // log2(10^ceil(digits) + 1) + 1 = 8*n
554    // (log2(10^ceil(a) + 1) + 1) / 8 = n
555    (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
556}
557
558/// Convert an arrow field to a parquet `Type`
559fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
560    const PARQUET_LIST_ELEMENT_NAME: &str = "element";
561    const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
562    const PARQUET_KEY_FIELD_NAME: &str = "key";
563    const PARQUET_VALUE_FIELD_NAME: &str = "value";
564
565    let name = field.name().as_str();
566    let repetition = if field.is_nullable() {
567        Repetition::OPTIONAL
568    } else {
569        Repetition::REQUIRED
570    };
571    let id = field_id(field);
572    // create type from field
573    match field.data_type() {
574        DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
575            .with_logical_type(Some(LogicalType::Unknown))
576            .with_repetition(repetition)
577            .with_id(id)
578            .build(),
579        DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
580            .with_repetition(repetition)
581            .with_id(id)
582            .build(),
583        DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
584            .with_logical_type(Some(LogicalType::integer(8, true)))
585            .with_repetition(repetition)
586            .with_id(id)
587            .build(),
588        DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
589            .with_logical_type(Some(LogicalType::integer(16, true)))
590            .with_repetition(repetition)
591            .with_id(id)
592            .build(),
593        DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
594            .with_repetition(repetition)
595            .with_id(id)
596            .build(),
597        DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
598            .with_repetition(repetition)
599            .with_id(id)
600            .build(),
601        DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
602            .with_logical_type(Some(LogicalType::integer(8, false)))
603            .with_repetition(repetition)
604            .with_id(id)
605            .build(),
606        DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
607            .with_logical_type(Some(LogicalType::integer(16, false)))
608            .with_repetition(repetition)
609            .with_id(id)
610            .build(),
611        DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
612            .with_logical_type(Some(LogicalType::integer(32, false)))
613            .with_repetition(repetition)
614            .with_id(id)
615            .build(),
616        DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
617            .with_logical_type(Some(LogicalType::integer(64, false)))
618            .with_repetition(repetition)
619            .with_id(id)
620            .build(),
621        DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
622            .with_repetition(repetition)
623            .with_id(id)
624            .with_logical_type(Some(LogicalType::Float16))
625            .with_length(2)
626            .build(),
627        DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
628            .with_repetition(repetition)
629            .with_id(id)
630            .build(),
631        DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
632            .with_repetition(repetition)
633            .with_id(id)
634            .build(),
635        DataType::Timestamp(TimeUnit::Second, _) => {
636            // Cannot represent seconds in LogicalType
637            Type::primitive_type_builder(name, PhysicalType::INT64)
638                .with_repetition(repetition)
639                .with_id(id)
640                .build()
641        }
642        DataType::Timestamp(time_unit, tz) => {
643            Type::primitive_type_builder(name, PhysicalType::INT64)
644                .with_logical_type(Some(LogicalType::timestamp(
645                    // If timezone set, values are normalized to UTC timezone
646                    matches!(tz, Some(z) if !z.as_ref().is_empty()),
647                    match time_unit {
648                        TimeUnit::Second => unreachable!(),
649                        TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
650                        TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
651                        TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
652                    },
653                )))
654                .with_repetition(repetition)
655                .with_id(id)
656                .build()
657        }
658        DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
659            .with_logical_type(Some(LogicalType::Date))
660            .with_repetition(repetition)
661            .with_id(id)
662            .build(),
663        DataType::Date64 => {
664            if coerce_types {
665                Type::primitive_type_builder(name, PhysicalType::INT32)
666                    .with_logical_type(Some(LogicalType::Date))
667                    .with_repetition(repetition)
668                    .with_id(id)
669                    .build()
670            } else {
671                Type::primitive_type_builder(name, PhysicalType::INT64)
672                    .with_repetition(repetition)
673                    .with_id(id)
674                    .build()
675            }
676        }
677        DataType::Time32(TimeUnit::Second) => {
678            // Cannot represent seconds in LogicalType
679            Type::primitive_type_builder(name, PhysicalType::INT32)
680                .with_repetition(repetition)
681                .with_id(id)
682                .build()
683        }
684        DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
685            .with_logical_type(Some(LogicalType::time(
686                field.metadata().contains_key("adjusted_to_utc"),
687                match unit {
688                    TimeUnit::Millisecond => ParquetTimeUnit::MILLIS,
689                    u => unreachable!("Invalid unit for Time32: {:?}", u),
690                },
691            )))
692            .with_repetition(repetition)
693            .with_id(id)
694            .build(),
695        DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
696            .with_logical_type(Some(LogicalType::time(
697                field.metadata().contains_key("adjusted_to_utc"),
698                match unit {
699                    TimeUnit::Microsecond => ParquetTimeUnit::MICROS,
700                    TimeUnit::Nanosecond => ParquetTimeUnit::NANOS,
701                    u => unreachable!("Invalid unit for Time64: {:?}", u),
702                },
703            )))
704            .with_repetition(repetition)
705            .with_id(id)
706            .build(),
707        DataType::Duration(_) => Type::primitive_type_builder(name, PhysicalType::INT64)
708            .with_repetition(repetition)
709            .with_id(id)
710            .build(),
711        DataType::Interval(_) => {
712            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
713                .with_converted_type(ConvertedType::INTERVAL)
714                .with_repetition(repetition)
715                .with_id(id)
716                .with_length(12)
717                .build()
718        }
719        DataType::Binary | DataType::LargeBinary => {
720            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
721                .with_repetition(repetition)
722                .with_id(id)
723                .with_logical_type(logical_type_for_binary(field))
724                .build()
725        }
726        DataType::FixedSizeBinary(length) => {
727            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
728                .with_repetition(repetition)
729                .with_id(id)
730                .with_length(*length)
731                .with_logical_type(logical_type_for_fixed_size_binary(field))
732                .build()
733        }
734        DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
735            .with_repetition(repetition)
736            .with_id(id)
737            .with_logical_type(logical_type_for_binary_view(field))
738            .build(),
739        DataType::Decimal32(precision, scale)
740        | DataType::Decimal64(precision, scale)
741        | DataType::Decimal128(precision, scale)
742        | DataType::Decimal256(precision, scale) => {
743            // Decimal precision determines the Parquet physical type to use.
744            // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
745            let (physical_type, length) = if *precision > 1 && *precision <= 9 {
746                (PhysicalType::INT32, -1)
747            } else if *precision <= 18 {
748                (PhysicalType::INT64, -1)
749            } else {
750                (
751                    PhysicalType::FIXED_LEN_BYTE_ARRAY,
752                    decimal_length_from_precision(*precision) as i32,
753                )
754            };
755            Type::primitive_type_builder(name, physical_type)
756                .with_repetition(repetition)
757                .with_id(id)
758                .with_length(length)
759                .with_logical_type(Some(LogicalType::decimal(*scale as i32, *precision as i32)))
760                .with_precision(*precision as i32)
761                .with_scale(*scale as i32)
762                .build()
763        }
764        DataType::Utf8 | DataType::LargeUtf8 => {
765            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
766                .with_logical_type(logical_type_for_string(field))
767                .with_repetition(repetition)
768                .with_id(id)
769                .build()
770        }
771        DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
772            .with_logical_type(logical_type_for_string(field))
773            .with_repetition(repetition)
774            .with_id(id)
775            .build(),
776        DataType::List(f)
777        | DataType::FixedSizeList(f, _)
778        | DataType::LargeList(f)
779        | DataType::ListView(f)
780        | DataType::LargeListView(f) => {
781            let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
782                // Ensure proper naming per the Parquet specification
783                let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
784                Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
785            } else {
786                Arc::new(arrow_to_parquet_type(f, coerce_types)?)
787            };
788
789            Type::group_type_builder(name)
790                .with_fields(vec![Arc::new(
791                    Type::group_type_builder("list")
792                        .with_fields(vec![field_ref])
793                        .with_repetition(Repetition::REPEATED)
794                        .build()?,
795                )])
796                .with_logical_type(Some(LogicalType::List))
797                .with_repetition(repetition)
798                .with_id(id)
799                .build()
800        }
801        DataType::Struct(fields) => {
802            if fields.is_empty() {
803                return Err(arrow_err!("Parquet does not support writing empty structs",));
804            }
805            // recursively convert children to types/nodes
806            let fields = fields
807                .iter()
808                .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
809                .collect::<Result<_>>()?;
810            Type::group_type_builder(name)
811                .with_fields(fields)
812                .with_repetition(repetition)
813                .with_id(id)
814                .with_logical_type(logical_type_for_struct(field))
815                .build()
816        }
817        DataType::Map(field, _) => {
818            if let DataType::Struct(struct_fields) = field.data_type() {
819                // If coercing then set inner struct name to "key_value"
820                let map_struct_name = if coerce_types {
821                    PARQUET_MAP_STRUCT_NAME
822                } else {
823                    field.name()
824                };
825
826                // If coercing then ensure struct fields are named "key" and "value"
827                let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
828                    if coerce_types && fld.name() != name {
829                        let f = fld.as_ref().clone().with_name(name);
830                        Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
831                    } else {
832                        Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
833                    }
834                };
835                let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
836                let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
837
838                Type::group_type_builder(name)
839                    .with_fields(vec![Arc::new(
840                        Type::group_type_builder(map_struct_name)
841                            .with_fields(vec![key_field, val_field])
842                            .with_repetition(Repetition::REPEATED)
843                            .build()?,
844                    )])
845                    .with_logical_type(Some(LogicalType::Map))
846                    .with_repetition(repetition)
847                    .with_id(id)
848                    .build()
849            } else {
850                Err(arrow_err!(
851                    "DataType::Map should contain a struct field child",
852                ))
853            }
854        }
855        DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
856        DataType::Dictionary(_, value) => {
857            // Dictionary encoding not handled at the schema level
858            let dict_field = field.clone().with_data_type(value.as_ref().clone());
859            arrow_to_parquet_type(&dict_field, coerce_types)
860        }
861        DataType::RunEndEncoded(_, value_field) => {
862            let ree_value_field = field
863                .clone()
864                .with_data_type(value_field.data_type().clone());
865            arrow_to_parquet_type(&ree_value_field, coerce_types)
866        }
867    }
868}
869
870fn field_id(field: &Field) -> Option<i32> {
871    let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
872    value.parse().ok() // Fail quietly if not a valid integer
873}
874
875#[cfg(test)]
876mod tests {
877    use super::*;
878
879    use std::{collections::HashMap, sync::Arc};
880
881    use crate::arrow::PARQUET_FIELD_ID_META_KEY;
882    use crate::file::metadata::KeyValue;
883    use crate::file::reader::FileReader;
884    use crate::{
885        arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
886        schema::{parser::parse_message_type, types::SchemaDescriptor},
887    };
888    use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
889
890    #[test]
891    fn test_flat_primitives() {
892        let message_type = "
893        message test_schema {
894            REQUIRED BOOLEAN boolean;
895            REQUIRED INT32   int8  (INT_8);
896            REQUIRED INT32   int16 (INT_16);
897            REQUIRED INT32   uint8 (INTEGER(8,false));
898            REQUIRED INT32   uint16 (INTEGER(16,false));
899            REQUIRED INT32   int32;
900            REQUIRED INT64   int64;
901            OPTIONAL DOUBLE  double;
902            OPTIONAL FLOAT   float;
903            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
904            OPTIONAL BINARY  string (UTF8);
905            OPTIONAL BINARY  string_2 (STRING);
906            OPTIONAL BINARY  json (JSON);
907        }
908        ";
909        let parquet_group_type = parse_message_type(message_type).unwrap();
910
911        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
912        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
913
914        let arrow_fields = Fields::from(vec![
915            Field::new("boolean", DataType::Boolean, false),
916            Field::new("int8", DataType::Int8, false),
917            Field::new("int16", DataType::Int16, false),
918            Field::new("uint8", DataType::UInt8, false),
919            Field::new("uint16", DataType::UInt16, false),
920            Field::new("int32", DataType::Int32, false),
921            Field::new("int64", DataType::Int64, false),
922            Field::new("double", DataType::Float64, true),
923            Field::new("float", DataType::Float32, true),
924            Field::new("float16", DataType::Float16, true),
925            Field::new("string", DataType::Utf8, true),
926            Field::new("string_2", DataType::Utf8, true),
927            json_field(),
928        ]);
929
930        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
931    }
932
933    /// Return the expected Field for a Parquet column annotated with
934    /// the JSON logical type.
935    fn json_field() -> Field {
936        #[cfg(feature = "arrow_canonical_extension_types")]
937        {
938            Field::new("json", DataType::Utf8, true)
939                .with_extension_type(arrow_schema::extension::Json::default())
940        }
941        #[cfg(not(feature = "arrow_canonical_extension_types"))]
942        {
943            Field::new("json", DataType::Utf8, true)
944        }
945    }
946
947    #[test]
948    fn test_decimal_fields() {
949        let message_type = "
950        message test_schema {
951                    REQUIRED INT32 decimal1 (DECIMAL(4,2));
952                    REQUIRED INT64 decimal2 (DECIMAL(12,2));
953                    REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
954                    REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
955                    REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
956                    REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
957                    REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
958        }
959        ";
960
961        let parquet_group_type = parse_message_type(message_type).unwrap();
962
963        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
964        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
965
966        let arrow_fields = Fields::from(vec![
967            Field::new("decimal1", DataType::Decimal128(4, 2), false),
968            Field::new("decimal2", DataType::Decimal128(12, 2), false),
969            Field::new("decimal3", DataType::Decimal128(30, 2), false),
970            Field::new("decimal4", DataType::Decimal128(33, 2), false),
971            Field::new("decimal5", DataType::Decimal128(38, 2), false),
972            Field::new("decimal6", DataType::Decimal256(39, 2), false),
973            Field::new("decimal7", DataType::Decimal256(39, 2), false),
974        ]);
975        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
976    }
977
978    #[test]
979    fn test_byte_array_fields() {
980        let message_type = "
981        message test_schema {
982            REQUIRED BYTE_ARRAY binary;
983            REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
984        }
985        ";
986
987        let parquet_group_type = parse_message_type(message_type).unwrap();
988
989        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
990        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
991
992        let arrow_fields = Fields::from(vec![
993            Field::new("binary", DataType::Binary, false),
994            Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
995        ]);
996        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
997    }
998
999    #[test]
1000    fn test_duplicate_fields() {
1001        let message_type = "
1002        message test_schema {
1003            REQUIRED BOOLEAN boolean;
1004            REQUIRED INT32 int8 (INT_8);
1005        }
1006        ";
1007
1008        let parquet_group_type = parse_message_type(message_type).unwrap();
1009
1010        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1011        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1012
1013        let arrow_fields = Fields::from(vec![
1014            Field::new("boolean", DataType::Boolean, false),
1015            Field::new("int8", DataType::Int8, false),
1016        ]);
1017        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
1018
1019        let converted_arrow_schema =
1020            parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
1021                .unwrap();
1022        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
1023    }
1024
1025    #[test]
1026    fn test_parquet_lists() {
1027        let mut arrow_fields = Vec::new();
1028
1029        // LIST encoding example taken from parquet-format/LogicalTypes.md
1030        let message_type = "
1031        message test_schema {
1032          REQUIRED GROUP my_list (LIST) {
1033            REPEATED GROUP list {
1034              OPTIONAL BINARY element (UTF8);
1035            }
1036          }
1037          OPTIONAL GROUP my_list (LIST) {
1038            REPEATED GROUP list {
1039              REQUIRED BINARY element (UTF8);
1040            }
1041          }
1042          OPTIONAL GROUP array_of_arrays (LIST) {
1043            REPEATED GROUP list {
1044              REQUIRED GROUP element (LIST) {
1045                REPEATED GROUP list {
1046                  REQUIRED INT32 element;
1047                }
1048              }
1049            }
1050          }
1051          OPTIONAL GROUP my_list (LIST) {
1052            REPEATED GROUP element {
1053              REQUIRED BINARY str (UTF8);
1054            }
1055          }
1056          OPTIONAL GROUP my_list (LIST) {
1057            REPEATED INT32 element;
1058          }
1059          OPTIONAL GROUP my_list (LIST) {
1060            REPEATED GROUP element {
1061              REQUIRED BINARY str (UTF8);
1062              REQUIRED INT32 num;
1063            }
1064          }
1065          OPTIONAL GROUP my_list (LIST) {
1066            REPEATED GROUP array {
1067              REQUIRED BINARY str (UTF8);
1068            }
1069
1070          }
1071          OPTIONAL GROUP my_list (LIST) {
1072            REPEATED GROUP my_list_tuple {
1073              REQUIRED BINARY str (UTF8);
1074            }
1075          }
1076          REPEATED INT32 name;
1077        }
1078        ";
1079
1080        // // List<String> (list non-null, elements nullable)
1081        // required group my_list (LIST) {
1082        //   repeated group list {
1083        //     optional binary element (UTF8);
1084        //   }
1085        // }
1086        {
1087            arrow_fields.push(Field::new_list(
1088                "my_list",
1089                Field::new("element", DataType::Utf8, true),
1090                false,
1091            ));
1092        }
1093
1094        // // List<String> (list nullable, elements non-null)
1095        // optional group my_list (LIST) {
1096        //   repeated group list {
1097        //     required binary element (UTF8);
1098        //   }
1099        // }
1100        {
1101            arrow_fields.push(Field::new_list(
1102                "my_list",
1103                Field::new("element", DataType::Utf8, false),
1104                true,
1105            ));
1106        }
1107
1108        // Element types can be nested structures. For example, a list of lists:
1109        //
1110        // // List<List<Integer>>
1111        // optional group array_of_arrays (LIST) {
1112        //   repeated group list {
1113        //     required group element (LIST) {
1114        //       repeated group list {
1115        //         required int32 element;
1116        //       }
1117        //     }
1118        //   }
1119        // }
1120        {
1121            let arrow_inner_list = Field::new("element", DataType::Int32, false);
1122            arrow_fields.push(Field::new_list(
1123                "array_of_arrays",
1124                Field::new_list("element", arrow_inner_list, false),
1125                true,
1126            ));
1127        }
1128
1129        // // List<String> (list nullable, elements non-null)
1130        // optional group my_list (LIST) {
1131        //   repeated group element {
1132        //     required binary str (UTF8);
1133        //   };
1134        // }
1135        {
1136            arrow_fields.push(Field::new_list(
1137                "my_list",
1138                Field::new("str", DataType::Utf8, false),
1139                true,
1140            ));
1141        }
1142
1143        // // List<Integer> (nullable list, non-null elements)
1144        // optional group my_list (LIST) {
1145        //   repeated int32 element;
1146        // }
1147        {
1148            arrow_fields.push(Field::new_list(
1149                "my_list",
1150                Field::new("element", DataType::Int32, false),
1151                true,
1152            ));
1153        }
1154
1155        // // List<Tuple<String, Integer>> (nullable list, non-null elements)
1156        // optional group my_list (LIST) {
1157        //   repeated group element {
1158        //     required binary str (UTF8);
1159        //     required int32 num;
1160        //   };
1161        // }
1162        {
1163            let fields = vec![
1164                Field::new("str", DataType::Utf8, false),
1165                Field::new("num", DataType::Int32, false),
1166            ];
1167            arrow_fields.push(Field::new_list(
1168                "my_list",
1169                Field::new_struct("element", fields, false),
1170                true,
1171            ));
1172        }
1173
1174        // // List<OneTuple<String>> (nullable list, non-null elements)
1175        // optional group my_list (LIST) {
1176        //   repeated group array {
1177        //     required binary str (UTF8);
1178        //   };
1179        // }
1180        // Special case: group is named array
1181        {
1182            let fields = vec![Field::new("str", DataType::Utf8, false)];
1183            arrow_fields.push(Field::new_list(
1184                "my_list",
1185                Field::new_struct("array", fields, false),
1186                true,
1187            ));
1188        }
1189
1190        // // List<OneTuple<String>> (nullable list, non-null elements)
1191        // optional group my_list (LIST) {
1192        //   repeated group my_list_tuple {
1193        //     required binary str (UTF8);
1194        //   };
1195        // }
1196        // Special case: group named ends in _tuple
1197        {
1198            let fields = vec![Field::new("str", DataType::Utf8, false)];
1199            arrow_fields.push(Field::new_list(
1200                "my_list",
1201                Field::new_struct("my_list_tuple", fields, false),
1202                true,
1203            ));
1204        }
1205
1206        // One-level encoding: Only allows required lists with required cells
1207        //   repeated value_type name
1208        {
1209            arrow_fields.push(Field::new_list(
1210                "name",
1211                Field::new("name", DataType::Int32, false),
1212                false,
1213            ));
1214        }
1215
1216        let parquet_group_type = parse_message_type(message_type).unwrap();
1217
1218        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1219        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1220        let converted_fields = converted_arrow_schema.fields();
1221
1222        assert_eq!(arrow_fields.len(), converted_fields.len());
1223        for i in 0..arrow_fields.len() {
1224            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1225        }
1226    }
1227
1228    #[test]
1229    fn test_parquet_list_nullable() {
1230        let mut arrow_fields = Vec::new();
1231
1232        let message_type = "
1233        message test_schema {
1234          REQUIRED GROUP my_list1 (LIST) {
1235            REPEATED GROUP list {
1236              OPTIONAL BINARY element (UTF8);
1237            }
1238          }
1239          OPTIONAL GROUP my_list2 (LIST) {
1240            REPEATED GROUP list {
1241              REQUIRED BINARY element (UTF8);
1242            }
1243          }
1244          REQUIRED GROUP my_list3 (LIST) {
1245            REPEATED GROUP list {
1246              REQUIRED BINARY element (UTF8);
1247            }
1248          }
1249        }
1250        ";
1251
1252        // // List<String> (list non-null, elements nullable)
1253        // required group my_list1 (LIST) {
1254        //   repeated group list {
1255        //     optional binary element (UTF8);
1256        //   }
1257        // }
1258        {
1259            arrow_fields.push(Field::new_list(
1260                "my_list1",
1261                Field::new("element", DataType::Utf8, true),
1262                false,
1263            ));
1264        }
1265
1266        // // List<String> (list nullable, elements non-null)
1267        // optional group my_list2 (LIST) {
1268        //   repeated group list {
1269        //     required binary element (UTF8);
1270        //   }
1271        // }
1272        {
1273            arrow_fields.push(Field::new_list(
1274                "my_list2",
1275                Field::new("element", DataType::Utf8, false),
1276                true,
1277            ));
1278        }
1279
1280        // // List<String> (list non-null, elements non-null)
1281        // repeated group my_list3 (LIST) {
1282        //   repeated group list {
1283        //     required binary element (UTF8);
1284        //   }
1285        // }
1286        {
1287            arrow_fields.push(Field::new_list(
1288                "my_list3",
1289                Field::new("element", DataType::Utf8, false),
1290                false,
1291            ));
1292        }
1293
1294        let parquet_group_type = parse_message_type(message_type).unwrap();
1295
1296        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1297        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1298        let converted_fields = converted_arrow_schema.fields();
1299
1300        assert_eq!(arrow_fields.len(), converted_fields.len());
1301        for i in 0..arrow_fields.len() {
1302            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1303        }
1304    }
1305
1306    #[test]
1307    fn test_parquet_maps() {
1308        let mut arrow_fields = Vec::new();
1309
1310        // LIST encoding example taken from parquet-format/LogicalTypes.md
1311        let message_type = "
1312        message test_schema {
1313          REQUIRED group my_map1 (MAP) {
1314            REPEATED group key_value {
1315              REQUIRED binary key (UTF8);
1316              OPTIONAL int32 value;
1317            }
1318          }
1319          OPTIONAL group my_map2 (MAP) {
1320            REPEATED group map {
1321              REQUIRED binary str (UTF8);
1322              REQUIRED int32 num;
1323            }
1324          }
1325          OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1326            REPEATED group map {
1327              REQUIRED binary key (UTF8);
1328              OPTIONAL int32 value;
1329            }
1330          }
1331          REQUIRED group my_map4 (MAP) {
1332            REPEATED group map {
1333              OPTIONAL binary key (UTF8);
1334              REQUIRED int32 value;
1335            }
1336          }
1337        }
1338        ";
1339
1340        // // Map<String, Integer>
1341        // required group my_map (MAP) {
1342        //   repeated group key_value {
1343        //     required binary key (UTF8);
1344        //     optional int32 value;
1345        //   }
1346        // }
1347        {
1348            arrow_fields.push(Field::new_map(
1349                "my_map1",
1350                "key_value",
1351                Field::new("key", DataType::Utf8, false),
1352                Field::new("value", DataType::Int32, true),
1353                false,
1354                false,
1355            ));
1356        }
1357
1358        // // Map<String, Integer> (nullable map, non-null values)
1359        // optional group my_map (MAP) {
1360        //   repeated group map {
1361        //     required binary str (UTF8);
1362        //     required int32 num;
1363        //   }
1364        // }
1365        {
1366            arrow_fields.push(Field::new_map(
1367                "my_map2",
1368                "map",
1369                Field::new("str", DataType::Utf8, false),
1370                Field::new("num", DataType::Int32, false),
1371                false,
1372                true,
1373            ));
1374        }
1375
1376        // // Map<String, Integer> (nullable map, nullable values)
1377        // optional group my_map (MAP_KEY_VALUE) {
1378        //   repeated group map {
1379        //     required binary key (UTF8);
1380        //     optional int32 value;
1381        //   }
1382        // }
1383        {
1384            arrow_fields.push(Field::new_map(
1385                "my_map3",
1386                "map",
1387                Field::new("key", DataType::Utf8, false),
1388                Field::new("value", DataType::Int32, true),
1389                false,
1390                true,
1391            ));
1392        }
1393
1394        // // Map<String, Integer> (non-compliant nullable key)
1395        // group my_map (MAP_KEY_VALUE) {
1396        //   repeated group map {
1397        //     optional binary key (UTF8);
1398        //     required int32 value;
1399        //   }
1400        // }
1401        {
1402            arrow_fields.push(Field::new_map(
1403                "my_map4",
1404                "map",
1405                Field::new("key", DataType::Utf8, false), // The key is always non-nullable (#5630)
1406                Field::new("value", DataType::Int32, false),
1407                false,
1408                false,
1409            ));
1410        }
1411
1412        let parquet_group_type = parse_message_type(message_type).unwrap();
1413
1414        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1415        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1416        let converted_fields = converted_arrow_schema.fields();
1417
1418        assert_eq!(arrow_fields.len(), converted_fields.len());
1419        for i in 0..arrow_fields.len() {
1420            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1421        }
1422    }
1423
1424    #[test]
1425    fn test_nested_schema() {
1426        let mut arrow_fields = Vec::new();
1427        {
1428            let group1_fields = Fields::from(vec![
1429                Field::new("leaf1", DataType::Boolean, false),
1430                Field::new("leaf2", DataType::Int32, false),
1431            ]);
1432            let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1433            arrow_fields.push(group1_struct);
1434
1435            let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1436            arrow_fields.push(leaf3_field);
1437        }
1438
1439        let message_type = "
1440        message test_schema {
1441          REQUIRED GROUP group1 {
1442            REQUIRED BOOLEAN leaf1;
1443            REQUIRED INT32 leaf2;
1444          }
1445          REQUIRED INT64 leaf3;
1446        }
1447        ";
1448        let parquet_group_type = parse_message_type(message_type).unwrap();
1449
1450        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1451        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1452        let converted_fields = converted_arrow_schema.fields();
1453
1454        assert_eq!(arrow_fields.len(), converted_fields.len());
1455        for i in 0..arrow_fields.len() {
1456            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1457        }
1458    }
1459
1460    #[test]
1461    fn test_nested_schema_partial() {
1462        let mut arrow_fields = Vec::new();
1463        {
1464            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1465            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1466            arrow_fields.push(group1);
1467
1468            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1469            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1470            arrow_fields.push(group2);
1471
1472            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1473        }
1474
1475        let message_type = "
1476        message test_schema {
1477          REQUIRED GROUP group1 {
1478            REQUIRED INT64 leaf1;
1479            REQUIRED INT64 leaf2;
1480          }
1481          REQUIRED  GROUP group2 {
1482            REQUIRED INT64 leaf3;
1483            REQUIRED INT64 leaf4;
1484          }
1485          REQUIRED INT64 leaf5;
1486        }
1487        ";
1488        let parquet_group_type = parse_message_type(message_type).unwrap();
1489
1490        // Expected partial arrow schema (columns 0, 3, 4):
1491        // required group group1 {
1492        //   required int64 leaf1;
1493        // }
1494        // required group group2 {
1495        //   required int64 leaf4;
1496        // }
1497        // required int64 leaf5;
1498
1499        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1500        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1501        let converted_arrow_schema =
1502            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1503        let converted_fields = converted_arrow_schema.fields();
1504
1505        assert_eq!(arrow_fields.len(), converted_fields.len());
1506        for i in 0..arrow_fields.len() {
1507            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1508        }
1509    }
1510
1511    #[test]
1512    fn test_nested_schema_partial_ordering() {
1513        let mut arrow_fields = Vec::new();
1514        {
1515            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1516            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1517            arrow_fields.push(group1);
1518
1519            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1520            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1521            arrow_fields.push(group2);
1522
1523            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1524        }
1525
1526        let message_type = "
1527        message test_schema {
1528          REQUIRED GROUP group1 {
1529            REQUIRED INT64 leaf1;
1530            REQUIRED INT64 leaf2;
1531          }
1532          REQUIRED  GROUP group2 {
1533            REQUIRED INT64 leaf3;
1534            REQUIRED INT64 leaf4;
1535          }
1536          REQUIRED INT64 leaf5;
1537        }
1538        ";
1539        let parquet_group_type = parse_message_type(message_type).unwrap();
1540
1541        // Expected partial arrow schema (columns 3, 4, 0):
1542        // required group group1 {
1543        //   required int64 leaf1;
1544        // }
1545        // required group group2 {
1546        //   required int64 leaf4;
1547        // }
1548        // required int64 leaf5;
1549
1550        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1551        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1552        let converted_arrow_schema =
1553            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1554        let converted_fields = converted_arrow_schema.fields();
1555
1556        assert_eq!(arrow_fields.len(), converted_fields.len());
1557        for i in 0..arrow_fields.len() {
1558            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1559        }
1560
1561        let mask =
1562            ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1563        let converted_arrow_schema =
1564            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1565        let converted_fields = converted_arrow_schema.fields();
1566
1567        assert_eq!(arrow_fields.len(), converted_fields.len());
1568        for i in 0..arrow_fields.len() {
1569            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1570        }
1571    }
1572
1573    #[test]
1574    fn test_repeated_nested_schema() {
1575        let mut arrow_fields = Vec::new();
1576        {
1577            arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1578
1579            let inner_group_list = Field::new_list(
1580                "innerGroup",
1581                Field::new_struct(
1582                    "innerGroup",
1583                    vec![Field::new("leaf3", DataType::Int32, true)],
1584                    false,
1585                ),
1586                false,
1587            );
1588
1589            let outer_group_list = Field::new_list(
1590                "outerGroup",
1591                Field::new_struct(
1592                    "outerGroup",
1593                    vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1594                    false,
1595                ),
1596                false,
1597            );
1598            arrow_fields.push(outer_group_list);
1599        }
1600
1601        let message_type = "
1602        message test_schema {
1603          OPTIONAL INT32 leaf1;
1604          REPEATED GROUP outerGroup {
1605            OPTIONAL INT32 leaf2;
1606            REPEATED GROUP innerGroup {
1607              OPTIONAL INT32 leaf3;
1608            }
1609          }
1610        }
1611        ";
1612        let parquet_group_type = parse_message_type(message_type).unwrap();
1613
1614        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1615        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1616        let converted_fields = converted_arrow_schema.fields();
1617
1618        assert_eq!(arrow_fields.len(), converted_fields.len());
1619        for i in 0..arrow_fields.len() {
1620            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1621        }
1622    }
1623
1624    #[test]
1625    fn test_column_desc_to_field() {
1626        let message_type = "
1627        message test_schema {
1628            REQUIRED BOOLEAN boolean;
1629            REQUIRED INT32   int8  (INT_8);
1630            REQUIRED INT32   uint8 (INTEGER(8,false));
1631            REQUIRED INT32   int16 (INT_16);
1632            REQUIRED INT32   uint16 (INTEGER(16,false));
1633            REQUIRED INT32   int32;
1634            REQUIRED INT64   int64;
1635            OPTIONAL DOUBLE  double;
1636            OPTIONAL FLOAT   float;
1637            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1638            OPTIONAL BINARY  string (UTF8);
1639            REPEATED BOOLEAN bools;
1640            OPTIONAL INT32   date       (DATE);
1641            OPTIONAL INT32   time_milli (TIME_MILLIS);
1642            OPTIONAL INT64   time_micro (TIME_MICROS);
1643            OPTIONAL INT64   time_nano (TIME(NANOS,false));
1644            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1645            REQUIRED INT64   ts_micro (TIMESTAMP_MICROS);
1646            REQUIRED INT64   ts_nano (TIMESTAMP(NANOS,true));
1647            REPEATED INT32   int_list;
1648            REPEATED BINARY  byte_list;
1649            REPEATED BINARY  string_list (UTF8);
1650            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1651            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1652            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1653        }
1654        ";
1655        let parquet_group_type = parse_message_type(message_type).unwrap();
1656
1657        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1658        let converted_arrow_fields = parquet_schema
1659            .columns()
1660            .iter()
1661            .map(|c| parquet_to_arrow_field(c).unwrap())
1662            .collect::<Vec<Field>>();
1663
1664        let arrow_fields = vec![
1665            Field::new("boolean", DataType::Boolean, false),
1666            Field::new("int8", DataType::Int8, false),
1667            Field::new("uint8", DataType::UInt8, false),
1668            Field::new("int16", DataType::Int16, false),
1669            Field::new("uint16", DataType::UInt16, false),
1670            Field::new("int32", DataType::Int32, false),
1671            Field::new("int64", DataType::Int64, false),
1672            Field::new("double", DataType::Float64, true),
1673            Field::new("float", DataType::Float32, true),
1674            Field::new("float16", DataType::Float16, true),
1675            Field::new("string", DataType::Utf8, true),
1676            Field::new_list(
1677                "bools",
1678                Field::new("bools", DataType::Boolean, false),
1679                false,
1680            ),
1681            Field::new("date", DataType::Date32, true),
1682            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1683            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1684            Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1685            Field::new(
1686                "ts_milli",
1687                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1688                true,
1689            ),
1690            Field::new(
1691                "ts_micro",
1692                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1693                false,
1694            ),
1695            Field::new(
1696                "ts_nano",
1697                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1698                false,
1699            ),
1700            Field::new_list(
1701                "int_list",
1702                Field::new("int_list", DataType::Int32, false),
1703                false,
1704            ),
1705            Field::new_list(
1706                "byte_list",
1707                Field::new("byte_list", DataType::Binary, false),
1708                false,
1709            ),
1710            Field::new_list(
1711                "string_list",
1712                Field::new("string_list", DataType::Utf8, false),
1713                false,
1714            ),
1715            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1716            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1717            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1718        ];
1719
1720        assert_eq!(arrow_fields, converted_arrow_fields);
1721    }
1722
1723    #[test]
1724    fn test_coerced_map_list() {
1725        // Create Arrow schema with non-Parquet naming
1726        let arrow_fields = vec![
1727            Field::new_list(
1728                "my_list",
1729                Field::new("item", DataType::Boolean, true),
1730                false,
1731            ),
1732            Field::new_map(
1733                "my_map",
1734                "entries",
1735                Field::new("keys", DataType::Utf8, false),
1736                Field::new("values", DataType::Int32, true),
1737                false,
1738                true,
1739            ),
1740        ];
1741        let arrow_schema = Schema::new(arrow_fields);
1742
1743        // Create Parquet schema with coerced names
1744        let message_type = "
1745        message parquet_schema {
1746            REQUIRED GROUP my_list (LIST) {
1747                REPEATED GROUP list {
1748                    OPTIONAL BOOLEAN element;
1749                }
1750            }
1751            OPTIONAL GROUP my_map (MAP) {
1752                REPEATED GROUP key_value {
1753                    REQUIRED BINARY key (STRING);
1754                    OPTIONAL INT32 value;
1755                }
1756            }
1757        }
1758        ";
1759        let parquet_group_type = parse_message_type(message_type).unwrap();
1760        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1761        let converted_arrow_schema = ArrowSchemaConverter::new()
1762            .with_coerce_types(true)
1763            .convert(&arrow_schema)
1764            .unwrap();
1765        assert_eq!(
1766            parquet_schema.columns().len(),
1767            converted_arrow_schema.columns().len()
1768        );
1769
1770        // Create Parquet schema without coerced names
1771        let message_type = "
1772        message parquet_schema {
1773            REQUIRED GROUP my_list (LIST) {
1774                REPEATED GROUP list {
1775                    OPTIONAL BOOLEAN item;
1776                }
1777            }
1778            OPTIONAL GROUP my_map (MAP) {
1779                REPEATED GROUP entries {
1780                    REQUIRED BINARY keys (STRING);
1781                    OPTIONAL INT32 values;
1782                }
1783            }
1784        }
1785        ";
1786        let parquet_group_type = parse_message_type(message_type).unwrap();
1787        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1788        let converted_arrow_schema = ArrowSchemaConverter::new()
1789            .with_coerce_types(false)
1790            .convert(&arrow_schema)
1791            .unwrap();
1792        assert_eq!(
1793            parquet_schema.columns().len(),
1794            converted_arrow_schema.columns().len()
1795        );
1796    }
1797
1798    #[test]
1799    fn test_field_to_column_desc() {
1800        let message_type = "
1801        message arrow_schema {
1802            REQUIRED BOOLEAN boolean;
1803            REQUIRED INT32   int8  (INT_8);
1804            REQUIRED INT32   int16 (INTEGER(16,true));
1805            REQUIRED INT32   int32;
1806            REQUIRED INT64   int64;
1807            OPTIONAL DOUBLE  double;
1808            OPTIONAL FLOAT   float;
1809            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1810            OPTIONAL BINARY  string (STRING);
1811            OPTIONAL GROUP   bools (LIST) {
1812                REPEATED GROUP list {
1813                    OPTIONAL BOOLEAN element;
1814                }
1815            }
1816            REQUIRED GROUP   bools_non_null (LIST) {
1817                REPEATED GROUP list {
1818                    REQUIRED BOOLEAN element;
1819                }
1820            }
1821            OPTIONAL INT32   date       (DATE);
1822            OPTIONAL INT32   time_milli (TIME(MILLIS,false));
1823            OPTIONAL INT32   time_milli_utc (TIME(MILLIS,true));
1824            OPTIONAL INT64   time_micro (TIME_MICROS);
1825            OPTIONAL INT64   time_micro_utc (TIME(MICROS, true));
1826            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1827            REQUIRED INT64   ts_micro (TIMESTAMP(MICROS,false));
1828            REQUIRED INT64   ts_seconds;
1829            REQUIRED INT64   ts_micro_utc (TIMESTAMP(MICROS, true));
1830            REQUIRED INT64   ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1831            REQUIRED INT64   ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1832            REQUIRED INT64   ts_micro_non_utc (TIMESTAMP(MICROS, true));
1833            REQUIRED GROUP struct {
1834                REQUIRED BOOLEAN bools;
1835                REQUIRED INT32 uint32 (INTEGER(32,false));
1836                REQUIRED GROUP   int32 (LIST) {
1837                    REPEATED GROUP list {
1838                        OPTIONAL INT32 element;
1839                    }
1840                }
1841            }
1842            REQUIRED BINARY  dictionary_strings (STRING);
1843            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1844            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1845            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1846            REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1847            REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1848        }
1849        ";
1850        let parquet_group_type = parse_message_type(message_type).unwrap();
1851
1852        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1853
1854        let arrow_fields = vec![
1855            Field::new("boolean", DataType::Boolean, false),
1856            Field::new("int8", DataType::Int8, false),
1857            Field::new("int16", DataType::Int16, false),
1858            Field::new("int32", DataType::Int32, false),
1859            Field::new("int64", DataType::Int64, false),
1860            Field::new("double", DataType::Float64, true),
1861            Field::new("float", DataType::Float32, true),
1862            Field::new("float16", DataType::Float16, true),
1863            Field::new("string", DataType::Utf8, true),
1864            Field::new_list(
1865                "bools",
1866                Field::new("element", DataType::Boolean, true),
1867                true,
1868            ),
1869            Field::new_list(
1870                "bools_non_null",
1871                Field::new("element", DataType::Boolean, false),
1872                false,
1873            ),
1874            Field::new("date", DataType::Date32, true),
1875            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1876            Field::new(
1877                "time_milli_utc",
1878                DataType::Time32(TimeUnit::Millisecond),
1879                true,
1880            )
1881            .with_metadata(HashMap::from_iter(vec![(
1882                "adjusted_to_utc".to_string(),
1883                "".to_string(),
1884            )])),
1885            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1886            Field::new(
1887                "time_micro_utc",
1888                DataType::Time64(TimeUnit::Microsecond),
1889                true,
1890            )
1891            .with_metadata(HashMap::from_iter(vec![(
1892                "adjusted_to_utc".to_string(),
1893                "".to_string(),
1894            )])),
1895            Field::new(
1896                "ts_milli",
1897                DataType::Timestamp(TimeUnit::Millisecond, None),
1898                true,
1899            ),
1900            Field::new(
1901                "ts_micro",
1902                DataType::Timestamp(TimeUnit::Microsecond, None),
1903                false,
1904            ),
1905            Field::new(
1906                "ts_seconds",
1907                DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1908                false,
1909            ),
1910            Field::new(
1911                "ts_micro_utc",
1912                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1913                false,
1914            ),
1915            Field::new(
1916                "ts_millis_zero_offset",
1917                DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1918                false,
1919            ),
1920            Field::new(
1921                "ts_millis_zero_negative_offset",
1922                DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1923                false,
1924            ),
1925            Field::new(
1926                "ts_micro_non_utc",
1927                DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1928                false,
1929            ),
1930            Field::new_struct(
1931                "struct",
1932                vec![
1933                    Field::new("bools", DataType::Boolean, false),
1934                    Field::new("uint32", DataType::UInt32, false),
1935                    Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1936                ],
1937                false,
1938            ),
1939            Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1940            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1941            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1942            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1943            Field::new("decimal128", DataType::Decimal128(38, 2), false),
1944            Field::new("decimal256", DataType::Decimal256(39, 2), false),
1945        ];
1946        let arrow_schema = Schema::new(arrow_fields);
1947        let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1948
1949        assert_eq!(
1950            parquet_schema.columns().len(),
1951            converted_arrow_schema.columns().len()
1952        );
1953        parquet_schema
1954            .columns()
1955            .iter()
1956            .zip(converted_arrow_schema.columns())
1957            .for_each(|(a, b)| {
1958                // Only check logical type if it's set on the Parquet side.
1959                // This is because the Arrow conversion always sets logical type,
1960                // even if there wasn't originally one.
1961                // This is not an issue, but is an inconvenience for this test.
1962                match a.logical_type_ref() {
1963                    Some(_) => {
1964                        assert_eq!(a, b)
1965                    }
1966                    None => {
1967                        assert_eq!(a.name(), b.name());
1968                        assert_eq!(a.physical_type(), b.physical_type());
1969                        assert_eq!(a.converted_type(), b.converted_type());
1970                    }
1971                };
1972            });
1973    }
1974
1975    #[test]
1976    #[should_panic(expected = "Parquet does not support writing empty structs")]
1977    fn test_empty_struct_field() {
1978        let arrow_fields = vec![Field::new(
1979            "struct",
1980            DataType::Struct(Fields::empty()),
1981            false,
1982        )];
1983        let arrow_schema = Schema::new(arrow_fields);
1984        let converted_arrow_schema = ArrowSchemaConverter::new()
1985            .with_coerce_types(true)
1986            .convert(&arrow_schema);
1987
1988        converted_arrow_schema.unwrap();
1989    }
1990
1991    #[test]
1992    fn test_metadata() {
1993        let message_type = "
1994        message test_schema {
1995            OPTIONAL BINARY  string (STRING);
1996        }
1997        ";
1998        let parquet_group_type = parse_message_type(message_type).unwrap();
1999
2000        let key_value_metadata = vec![
2001            KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
2002            KeyValue::new("baz".to_owned(), None),
2003        ];
2004
2005        let mut expected_metadata: HashMap<String, String> = HashMap::new();
2006        expected_metadata.insert("foo".to_owned(), "bar".to_owned());
2007
2008        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
2009        let converted_arrow_schema =
2010            parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
2011
2012        assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
2013    }
2014
2015    #[test]
2016    fn test_arrow_schema_roundtrip() -> Result<()> {
2017        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2018            a.iter()
2019                .map(|(a, b)| (a.to_string(), b.to_string()))
2020                .collect()
2021        };
2022
2023        let schema = Schema::new_with_metadata(
2024            vec![
2025                Field::new("c1", DataType::Utf8, false)
2026                    .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
2027                Field::new("c2", DataType::Binary, false),
2028                Field::new("c3", DataType::FixedSizeBinary(3), false),
2029                Field::new("c4", DataType::Boolean, false),
2030                Field::new("c5", DataType::Date32, false),
2031                Field::new("c6", DataType::Date64, false),
2032                Field::new("c7", DataType::Time32(TimeUnit::Second), false),
2033                Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
2034                Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
2035                Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
2036                Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
2037                Field::new(
2038                    "c16",
2039                    DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
2040                    false,
2041                ),
2042                Field::new(
2043                    "c17",
2044                    DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
2045                    false,
2046                ),
2047                Field::new(
2048                    "c18",
2049                    DataType::Timestamp(TimeUnit::Nanosecond, None),
2050                    false,
2051                ),
2052                Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
2053                Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
2054                Field::new_list(
2055                    "c21",
2056                    Field::new_list_field(DataType::Boolean, true)
2057                        .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
2058                    false,
2059                )
2060                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
2061                Field::new(
2062                    "c22",
2063                    DataType::FixedSizeList(
2064                        Arc::new(Field::new_list_field(DataType::Boolean, true)),
2065                        5,
2066                    ),
2067                    false,
2068                ),
2069                Field::new_list(
2070                    "c23",
2071                    Field::new_large_list(
2072                        "inner",
2073                        Field::new_list_field(
2074                            DataType::Struct(
2075                                vec![
2076                                    Field::new("a", DataType::Int16, true),
2077                                    Field::new("b", DataType::Float64, false),
2078                                    Field::new("c", DataType::Float32, false),
2079                                    Field::new("d", DataType::Float16, false),
2080                                ]
2081                                .into(),
2082                            ),
2083                            false,
2084                        ),
2085                        true,
2086                    ),
2087                    false,
2088                ),
2089                Field::new(
2090                    "c24",
2091                    DataType::Struct(Fields::from(vec![
2092                        Field::new("a", DataType::Utf8, false),
2093                        Field::new("b", DataType::UInt16, false),
2094                    ])),
2095                    false,
2096                ),
2097                Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
2098                Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
2099                // Duration types not supported
2100                // Field::new("c27", DataType::Duration(TimeUnit::Second), false),
2101                // Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false),
2102                // Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false),
2103                // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false),
2104                #[allow(deprecated)]
2105                Field::new_dict(
2106                    "c31",
2107                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2108                    true,
2109                    123,
2110                    true,
2111                )
2112                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
2113                Field::new("c32", DataType::LargeBinary, true),
2114                Field::new("c33", DataType::LargeUtf8, true),
2115                Field::new_large_list(
2116                    "c34",
2117                    Field::new_list(
2118                        "inner",
2119                        Field::new_list_field(
2120                            DataType::Struct(
2121                                vec![
2122                                    Field::new("a", DataType::Int16, true),
2123                                    Field::new("b", DataType::Float64, true),
2124                                ]
2125                                .into(),
2126                            ),
2127                            true,
2128                        ),
2129                        true,
2130                    ),
2131                    true,
2132                ),
2133                Field::new("c35", DataType::Null, true),
2134                Field::new("c36", DataType::Decimal128(2, 1), false),
2135                Field::new("c37", DataType::Decimal256(50, 20), false),
2136                Field::new("c38", DataType::Decimal128(18, 12), true),
2137                Field::new_map(
2138                    "c39",
2139                    "key_value",
2140                    Field::new("key", DataType::Utf8, false),
2141                    Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2142                    false, // fails to roundtrip keys_sorted
2143                    true,
2144                ),
2145                Field::new_map(
2146                    "c40",
2147                    "my_entries",
2148                    Field::new("my_key", DataType::Utf8, false)
2149                        .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2150                    Field::new_list(
2151                        "my_value",
2152                        Field::new_list_field(DataType::Utf8, true)
2153                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2154                        true,
2155                    )
2156                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2157                    false, // fails to roundtrip keys_sorted
2158                    true,
2159                )
2160                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2161                Field::new_map(
2162                    "c41",
2163                    "my_entries",
2164                    Field::new("my_key", DataType::Utf8, false),
2165                    Field::new_list(
2166                        "my_value",
2167                        Field::new_list_field(DataType::Utf8, true)
2168                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2169                        true,
2170                    ),
2171                    false, // fails to roundtrip keys_sorted
2172                    false,
2173                ),
2174                Field::new("c42", DataType::Decimal32(5, 2), false),
2175                Field::new("c43", DataType::Decimal64(18, 12), true),
2176            ],
2177            meta(&[("Key", "Value")]),
2178        );
2179
2180        // write to an empty parquet file so that schema is serialized
2181        let file = tempfile::tempfile().unwrap();
2182        let writer =
2183            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2184        writer.close()?;
2185
2186        // read file back
2187        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2188
2189        // Check arrow schema
2190        let read_schema = arrow_reader.schema();
2191        assert_eq!(&schema, read_schema.as_ref());
2192
2193        // Walk schema finding field IDs
2194        let mut stack = Vec::with_capacity(10);
2195        let mut out = Vec::with_capacity(10);
2196
2197        let root = arrow_reader.parquet_schema().root_schema_ptr();
2198        stack.push((root.name().to_string(), root));
2199
2200        while let Some((p, t)) = stack.pop() {
2201            if t.is_group() {
2202                for f in t.get_fields() {
2203                    stack.push((format!("{p}.{}", f.name()), f.clone()))
2204                }
2205            }
2206
2207            let info = t.get_basic_info();
2208            if info.has_id() {
2209                out.push(format!("{p} -> {}", info.id()))
2210            }
2211        }
2212        out.sort_unstable();
2213        let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2214
2215        assert_eq!(
2216            &out,
2217            &[
2218                "arrow_schema.c1 -> 2",
2219                "arrow_schema.c21 -> 4",
2220                "arrow_schema.c21.list.item -> 5",
2221                "arrow_schema.c31 -> 6",
2222                "arrow_schema.c40 -> 7",
2223                "arrow_schema.c40.my_entries.my_key -> 8",
2224                "arrow_schema.c40.my_entries.my_value -> 9",
2225                "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2226                "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2227            ]
2228        );
2229
2230        Ok(())
2231    }
2232
2233    #[test]
2234    fn test_read_parquet_field_ids_raw() -> Result<()> {
2235        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2236            a.iter()
2237                .map(|(a, b)| (a.to_string(), b.to_string()))
2238                .collect()
2239        };
2240        let schema = Schema::new_with_metadata(
2241            vec![
2242                Field::new("c1", DataType::Utf8, true)
2243                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2244                Field::new("c2", DataType::Utf8, true)
2245                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2246            ],
2247            HashMap::new(),
2248        );
2249
2250        let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2251        let parquet_bytes = writer.into_inner()?;
2252
2253        let reader =
2254            crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2255        let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2256
2257        // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema
2258        let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2259
2260        let parq_schema_descr = ArrowSchemaConverter::new()
2261            .with_coerce_types(true)
2262            .convert(&arrow_schema)?;
2263        let parq_fields = parq_schema_descr.root_schema().get_fields();
2264        assert_eq!(parq_fields.len(), 2);
2265        assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2266        assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2267
2268        Ok(())
2269    }
2270
2271    #[test]
2272    fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2273        let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2274            .iter()
2275            .cloned()
2276            .collect();
2277
2278        let schema = Schema::new_with_metadata(
2279            vec![
2280                Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2281                Field::new(
2282                    "c22",
2283                    DataType::FixedSizeList(
2284                        Arc::new(Field::new("items", DataType::Boolean, false)),
2285                        5,
2286                    ),
2287                    false,
2288                ),
2289                Field::new_list(
2290                    "c23",
2291                    Field::new_large_list(
2292                        "items",
2293                        Field::new_struct(
2294                            "items",
2295                            vec![
2296                                Field::new("a", DataType::Int16, true),
2297                                Field::new("b", DataType::Float64, false),
2298                            ],
2299                            true,
2300                        ),
2301                        true,
2302                    ),
2303                    true,
2304                ),
2305            ],
2306            metadata,
2307        );
2308
2309        // write to an empty parquet file so that schema is serialized
2310        let file = tempfile::tempfile().unwrap();
2311        let writer =
2312            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2313        writer.close()?;
2314
2315        // read file back
2316        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2317        let read_schema = arrow_reader.schema();
2318        assert_eq!(&schema, read_schema.as_ref());
2319        Ok(())
2320    }
2321
2322    #[test]
2323    fn test_get_arrow_schema_from_metadata() {
2324        assert!(get_arrow_schema_from_metadata("").is_err());
2325    }
2326
2327    #[test]
2328    #[cfg(feature = "arrow_canonical_extension_types")]
2329    fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2330        use arrow_schema::extension::Uuid;
2331        let arrow_schema = Schema::new(vec![
2332            Field::new("uuid", DataType::FixedSizeBinary(16), false).with_extension_type(Uuid),
2333        ]);
2334
2335        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2336
2337        assert_eq!(
2338            parquet_schema.column(0).logical_type_ref(),
2339            Some(&LogicalType::Uuid)
2340        );
2341
2342        let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2343        assert_eq!(arrow_schema.field(0).try_extension_type::<Uuid>()?, Uuid);
2344
2345        Ok(())
2346    }
2347
2348    #[test]
2349    #[cfg(feature = "arrow_canonical_extension_types")]
2350    fn arrow_json_to_parquet_json() -> Result<()> {
2351        use arrow_schema::extension::Json;
2352        let arrow_schema = Schema::new(vec![
2353            Field::new("json", DataType::Utf8, false).with_extension_type(Json::default()),
2354        ]);
2355
2356        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2357
2358        assert_eq!(
2359            parquet_schema.column(0).logical_type_ref(),
2360            Some(&LogicalType::Json)
2361        );
2362
2363        let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2364        assert_eq!(
2365            arrow_schema.field(0).try_extension_type::<Json>()?,
2366            Json::default()
2367        );
2368
2369        Ok(())
2370    }
2371
2372    #[test]
2373    fn test_parquet_to_arrow_field_levels_with_virtual_rejects_non_virtual() {
2374        let message_type = "
2375        message test_schema {
2376            REQUIRED INT32 id;
2377        }
2378        ";
2379        let parquet_schema = Arc::new(parse_message_type(message_type).unwrap());
2380        let descriptor = SchemaDescriptor::new(parquet_schema);
2381
2382        // Try to pass a regular field (not a virtual column)
2383        let regular_field = Arc::new(Field::new("regular_column", DataType::Int64, false));
2384        let result = parquet_to_arrow_field_levels_with_virtual(
2385            &descriptor,
2386            ProjectionMask::all(),
2387            None,
2388            &[regular_field],
2389        );
2390
2391        assert!(result.is_err());
2392        assert!(
2393            result
2394                .unwrap_err()
2395                .to_string()
2396                .contains("is not a virtual column")
2397        );
2398    }
2399}