parquet/arrow/schema/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Converting Parquet schema <--> Arrow schema: [`ArrowSchemaConverter`] and [parquet_to_arrow_schema]
19
20use base64::prelude::BASE64_STANDARD;
21use base64::Engine;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26#[cfg(feature = "arrow_canonical_extension_types")]
27use arrow_schema::extension::{Json, Uuid};
28use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
29
30use crate::basic::{
31    ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
32};
33use crate::errors::{ParquetError, Result};
34use crate::file::{metadata::KeyValue, properties::WriterProperties};
35use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
36
37mod complex;
38mod primitive;
39
40use crate::arrow::ProjectionMask;
41pub(crate) use complex::{ParquetField, ParquetFieldType};
42
43use super::PARQUET_FIELD_ID_META_KEY;
44
45/// Convert Parquet schema to Arrow schema including optional metadata
46///
47/// Attempts to decode any existing Arrow schema metadata, falling back
48/// to converting the Parquet schema column-wise
49pub fn parquet_to_arrow_schema(
50    parquet_schema: &SchemaDescriptor,
51    key_value_metadata: Option<&Vec<KeyValue>>,
52) -> Result<Schema> {
53    parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
54}
55
56/// Convert parquet schema to arrow schema including optional metadata,
57/// only preserving some leaf columns.
58pub fn parquet_to_arrow_schema_by_columns(
59    parquet_schema: &SchemaDescriptor,
60    mask: ProjectionMask,
61    key_value_metadata: Option<&Vec<KeyValue>>,
62) -> Result<Schema> {
63    Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0)
64}
65
66/// Extracts the arrow metadata
67pub(crate) fn parquet_to_arrow_schema_and_fields(
68    parquet_schema: &SchemaDescriptor,
69    mask: ProjectionMask,
70    key_value_metadata: Option<&Vec<KeyValue>>,
71) -> Result<(Schema, Option<ParquetField>)> {
72    let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
73    let maybe_schema = metadata
74        .remove(super::ARROW_SCHEMA_META_KEY)
75        .map(|value| get_arrow_schema_from_metadata(&value))
76        .transpose()?;
77
78    // Add the Arrow metadata to the Parquet metadata skipping keys that collide
79    if let Some(arrow_schema) = &maybe_schema {
80        arrow_schema.metadata().iter().for_each(|(k, v)| {
81            metadata.entry(k.clone()).or_insert_with(|| v.clone());
82        });
83    }
84
85    let hint = maybe_schema.as_ref().map(|s| s.fields());
86    let field_levels = parquet_to_arrow_field_levels(parquet_schema, mask, hint)?;
87    let schema = Schema::new_with_metadata(field_levels.fields, metadata);
88    Ok((schema, field_levels.levels))
89}
90
91/// Schema information necessary to decode a parquet file as arrow [`Fields`]
92///
93/// In particular this stores the dremel-level information necessary to correctly
94/// interpret the encoded definition and repetition levels
95///
96/// Note: this is an opaque container intended to be used with lower-level APIs
97/// within this crate
98#[derive(Debug, Clone)]
99pub struct FieldLevels {
100    pub(crate) fields: Fields,
101    pub(crate) levels: Option<ParquetField>,
102}
103
104/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`]
105///
106/// Columns not included within [`ProjectionMask`] will be ignored.
107///
108/// Where a field type in `hint` is compatible with the corresponding parquet type in `schema`, it
109/// will be used, otherwise the default arrow type for the given parquet column type will be used.
110///
111/// This is to accommodate arrow types that cannot be round-tripped through parquet natively.
112/// Depending on the parquet writer, this can lead to a mismatch between a file's parquet schema
113/// and its embedded arrow schema. The parquet `schema` must be treated as authoritative in such
114/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for more information
115///
116/// Note: this is a low-level API, most users will want to make use of the higher-level
117/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file.
118pub fn parquet_to_arrow_field_levels(
119    schema: &SchemaDescriptor,
120    mask: ProjectionMask,
121    hint: Option<&Fields>,
122) -> Result<FieldLevels> {
123    match complex::convert_schema(schema, mask, hint)? {
124        Some(field) => match &field.arrow_type {
125            DataType::Struct(fields) => Ok(FieldLevels {
126                fields: fields.clone(),
127                levels: Some(field),
128            }),
129            _ => unreachable!(),
130        },
131        None => Ok(FieldLevels {
132            fields: Fields::empty(),
133            levels: None,
134        }),
135    }
136}
137
138/// Try to convert Arrow schema metadata into a schema
139fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
140    let decoded = BASE64_STANDARD.decode(encoded_meta);
141    match decoded {
142        Ok(bytes) => {
143            let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
144                &bytes[8..]
145            } else {
146                bytes.as_slice()
147            };
148            match arrow_ipc::root_as_message(slice) {
149                Ok(message) => message
150                    .header_as_schema()
151                    .map(arrow_ipc::convert::fb_to_schema)
152                    .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
153                Err(err) => {
154                    // The flatbuffers implementation returns an error on verification error.
155                    Err(arrow_err!(
156                        "Unable to get root as message stored in {}: {:?}",
157                        super::ARROW_SCHEMA_META_KEY,
158                        err
159                    ))
160                }
161            }
162        }
163        Err(err) => {
164            // The C++ implementation returns an error if the schema can't be parsed.
165            Err(arrow_err!(
166                "Unable to decode the encoded schema stored in {}, {:?}",
167                super::ARROW_SCHEMA_META_KEY,
168                err
169            ))
170        }
171    }
172}
173
174/// Encodes the Arrow schema into the IPC format, and base64 encodes it
175pub fn encode_arrow_schema(schema: &Schema) -> String {
176    let options = writer::IpcWriteOptions::default();
177    #[allow(deprecated)]
178    let mut dictionary_tracker =
179        writer::DictionaryTracker::new_with_preserve_dict_id(true, options.preserve_dict_id());
180    let data_gen = writer::IpcDataGenerator::default();
181    let mut serialized_schema =
182        data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
183
184    // manually prepending the length to the schema as arrow uses the legacy IPC format
185    // TODO: change after addressing ARROW-9777
186    let schema_len = serialized_schema.ipc_message.len();
187    let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
188    len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
189    len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
190    len_prefix_schema.append(&mut serialized_schema.ipc_message);
191
192    BASE64_STANDARD.encode(&len_prefix_schema)
193}
194
195/// Mutates writer metadata by storing the encoded Arrow schema.
196/// If there is an existing Arrow schema metadata, it is replaced.
197pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
198    let encoded = encode_arrow_schema(schema);
199
200    let schema_kv = KeyValue {
201        key: super::ARROW_SCHEMA_META_KEY.to_string(),
202        value: Some(encoded),
203    };
204
205    let meta = props
206        .key_value_metadata
207        .get_or_insert_with(Default::default);
208
209    // check if ARROW:schema exists, and overwrite it
210    let schema_meta = meta
211        .iter()
212        .enumerate()
213        .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
214    match schema_meta {
215        Some((i, _)) => {
216            meta.remove(i);
217            meta.push(schema_kv);
218        }
219        None => {
220            meta.push(schema_kv);
221        }
222    }
223}
224
225/// Converter for Arrow schema to Parquet schema
226///
227/// Example:
228/// ```
229/// # use std::sync::Arc;
230/// # use arrow_schema::{Field, Schema, DataType};
231/// # use parquet::arrow::ArrowSchemaConverter;
232/// use parquet::schema::types::{SchemaDescriptor, Type};
233/// use parquet::basic; // note there are two `Type`s in the following example
234/// // create an Arrow Schema
235/// let arrow_schema = Schema::new(vec![
236///   Field::new("a", DataType::Int64, true),
237///   Field::new("b", DataType::Date32, true),
238/// ]);
239/// // convert the Arrow schema to a Parquet schema
240/// let parquet_schema = ArrowSchemaConverter::new()
241///   .convert(&arrow_schema)
242///   .unwrap();
243///
244/// let expected_parquet_schema = SchemaDescriptor::new(
245///   Arc::new(
246///     Type::group_type_builder("arrow_schema")
247///       .with_fields(vec![
248///         Arc::new(
249///          Type::primitive_type_builder("a", basic::Type::INT64)
250///           .build().unwrap()
251///         ),
252///         Arc::new(
253///          Type::primitive_type_builder("b", basic::Type::INT32)
254///           .with_converted_type(basic::ConvertedType::DATE)
255///           .with_logical_type(Some(basic::LogicalType::Date))
256///           .build().unwrap()
257///         ),
258///      ])
259///      .build().unwrap()
260///   )
261/// );
262/// assert_eq!(parquet_schema, expected_parquet_schema);
263/// ```
264#[derive(Debug)]
265pub struct ArrowSchemaConverter<'a> {
266    /// Name of the root schema in Parquet
267    schema_root: &'a str,
268    /// Should we coerce Arrow types to compatible Parquet types?
269    ///
270    /// See docs on [Self::with_coerce_types]`
271    coerce_types: bool,
272}
273
274impl Default for ArrowSchemaConverter<'_> {
275    fn default() -> Self {
276        Self::new()
277    }
278}
279
280impl<'a> ArrowSchemaConverter<'a> {
281    /// Create a new converter
282    pub fn new() -> Self {
283        Self {
284            schema_root: "arrow_schema",
285            coerce_types: false,
286        }
287    }
288
289    /// Should Arrow types be coerced into Parquet native types (default `false`).
290    ///
291    /// Setting this option to `true` will result in Parquet files that can be
292    /// read by more readers, but may lose precision for Arrow types such as
293    /// [`DataType::Date64`] which have no direct [corresponding Parquet type].
294    ///
295    /// By default, this converter does not coerce to native Parquet types. Enabling type
296    /// coercion allows for meaningful representations that do not require
297    /// downstream readers to consider the embedded Arrow schema, and can allow
298    /// for greater compatibility with other Parquet implementations. However,
299    /// type coercion also prevents data from being losslessly round-tripped.
300    ///
301    /// # Discussion
302    ///
303    /// Some Arrow types such as `Date64`, `Timestamp` and `Interval` have no
304    /// corresponding Parquet logical type. Thus, they can not be losslessly
305    /// round-tripped when stored using the appropriate Parquet logical type.
306    /// For example, some Date64 values may be truncated when stored with
307    /// parquet's native 32 bit date type.
308    ///
309    /// For [`List`] and [`Map`] types, some Parquet readers expect certain
310    /// schema elements to have specific names (earlier versions of the spec
311    /// were somewhat ambiguous on this point). Type coercion will use the names
312    /// prescribed by the Parquet specification, potentially losing naming
313    /// metadata from the Arrow schema.
314    ///
315    /// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
316    /// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
317    /// [corresponding Parquet type]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date
318    ///
319    pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
320        self.coerce_types = coerce_types;
321        self
322    }
323
324    /// Set the root schema element name (defaults to `"arrow_schema"`).
325    pub fn schema_root(mut self, schema_root: &'a str) -> Self {
326        self.schema_root = schema_root;
327        self
328    }
329
330    /// Convert the specified Arrow [`Schema`] to the desired Parquet [`SchemaDescriptor`]
331    ///
332    /// See example in [`ArrowSchemaConverter`]
333    pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
334        let fields = schema
335            .fields()
336            .iter()
337            .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
338            .collect::<Result<_>>()?;
339        let group = Type::group_type_builder(self.schema_root)
340            .with_fields(fields)
341            .build()?;
342        Ok(SchemaDescriptor::new(Arc::new(group)))
343    }
344}
345
346/// Convert arrow schema to parquet schema
347///
348/// The name of the root schema element defaults to `"arrow_schema"`, this can be
349/// overridden with [`ArrowSchemaConverter`]
350#[deprecated(since = "54.0.0", note = "Use `ArrowSchemaConverter` instead")]
351pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
352    ArrowSchemaConverter::new().convert(schema)
353}
354
355fn parse_key_value_metadata(
356    key_value_metadata: Option<&Vec<KeyValue>>,
357) -> Option<HashMap<String, String>> {
358    match key_value_metadata {
359        Some(key_values) => {
360            let map: HashMap<String, String> = key_values
361                .iter()
362                .filter_map(|kv| {
363                    kv.value
364                        .as_ref()
365                        .map(|value| (kv.key.clone(), value.clone()))
366                })
367                .collect();
368
369            if map.is_empty() {
370                None
371            } else {
372                Some(map)
373            }
374        }
375        None => None,
376    }
377}
378
379/// Convert parquet column schema to arrow field.
380pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
381    let field = complex::convert_type(&parquet_column.self_type_ptr())?;
382    let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
383
384    let basic_info = parquet_column.self_type().get_basic_info();
385    let mut meta = HashMap::with_capacity(if cfg!(feature = "arrow_canonical_extension_types") {
386        2
387    } else {
388        1
389    });
390    if basic_info.has_id() {
391        meta.insert(
392            PARQUET_FIELD_ID_META_KEY.to_string(),
393            basic_info.id().to_string(),
394        );
395    }
396    #[cfg(feature = "arrow_canonical_extension_types")]
397    if let Some(logical_type) = basic_info.logical_type() {
398        match logical_type {
399            LogicalType::Uuid => ret.try_with_extension_type(Uuid)?,
400            LogicalType::Json => ret.try_with_extension_type(Json::default())?,
401            _ => {}
402        }
403    }
404    if !meta.is_empty() {
405        ret.set_metadata(meta);
406    }
407
408    Ok(ret)
409}
410
411pub fn decimal_length_from_precision(precision: u8) -> usize {
412    // digits = floor(log_10(2^(8*n - 1) - 1))  // definition in parquet's logical types
413    // ceil(digits) = log10(2^(8*n - 1) - 1)
414    // 10^ceil(digits) = 2^(8*n - 1) - 1
415    // 10^ceil(digits) + 1 = 2^(8*n - 1)
416    // log2(10^ceil(digits) + 1) = (8*n - 1)
417    // log2(10^ceil(digits) + 1) + 1 = 8*n
418    // (log2(10^ceil(a) + 1) + 1) / 8 = n
419    (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
420}
421
422/// Convert an arrow field to a parquet `Type`
423fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
424    const PARQUET_LIST_ELEMENT_NAME: &str = "element";
425    const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
426    const PARQUET_KEY_FIELD_NAME: &str = "key";
427    const PARQUET_VALUE_FIELD_NAME: &str = "value";
428
429    let name = field.name().as_str();
430    let repetition = if field.is_nullable() {
431        Repetition::OPTIONAL
432    } else {
433        Repetition::REQUIRED
434    };
435    let id = field_id(field);
436    // create type from field
437    match field.data_type() {
438        DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
439            .with_logical_type(Some(LogicalType::Unknown))
440            .with_repetition(repetition)
441            .with_id(id)
442            .build(),
443        DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
444            .with_repetition(repetition)
445            .with_id(id)
446            .build(),
447        DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
448            .with_logical_type(Some(LogicalType::Integer {
449                bit_width: 8,
450                is_signed: true,
451            }))
452            .with_repetition(repetition)
453            .with_id(id)
454            .build(),
455        DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
456            .with_logical_type(Some(LogicalType::Integer {
457                bit_width: 16,
458                is_signed: true,
459            }))
460            .with_repetition(repetition)
461            .with_id(id)
462            .build(),
463        DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
464            .with_repetition(repetition)
465            .with_id(id)
466            .build(),
467        DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
468            .with_repetition(repetition)
469            .with_id(id)
470            .build(),
471        DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
472            .with_logical_type(Some(LogicalType::Integer {
473                bit_width: 8,
474                is_signed: false,
475            }))
476            .with_repetition(repetition)
477            .with_id(id)
478            .build(),
479        DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
480            .with_logical_type(Some(LogicalType::Integer {
481                bit_width: 16,
482                is_signed: false,
483            }))
484            .with_repetition(repetition)
485            .with_id(id)
486            .build(),
487        DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
488            .with_logical_type(Some(LogicalType::Integer {
489                bit_width: 32,
490                is_signed: false,
491            }))
492            .with_repetition(repetition)
493            .with_id(id)
494            .build(),
495        DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
496            .with_logical_type(Some(LogicalType::Integer {
497                bit_width: 64,
498                is_signed: false,
499            }))
500            .with_repetition(repetition)
501            .with_id(id)
502            .build(),
503        DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
504            .with_repetition(repetition)
505            .with_id(id)
506            .with_logical_type(Some(LogicalType::Float16))
507            .with_length(2)
508            .build(),
509        DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
510            .with_repetition(repetition)
511            .with_id(id)
512            .build(),
513        DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
514            .with_repetition(repetition)
515            .with_id(id)
516            .build(),
517        DataType::Timestamp(TimeUnit::Second, _) => {
518            // Cannot represent seconds in LogicalType
519            Type::primitive_type_builder(name, PhysicalType::INT64)
520                .with_repetition(repetition)
521                .with_id(id)
522                .build()
523        }
524        DataType::Timestamp(time_unit, tz) => {
525            Type::primitive_type_builder(name, PhysicalType::INT64)
526                .with_logical_type(Some(LogicalType::Timestamp {
527                    // If timezone set, values are normalized to UTC timezone
528                    is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
529                    unit: match time_unit {
530                        TimeUnit::Second => unreachable!(),
531                        TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
532                        TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
533                        TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
534                    },
535                }))
536                .with_repetition(repetition)
537                .with_id(id)
538                .build()
539        }
540        DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
541            .with_logical_type(Some(LogicalType::Date))
542            .with_repetition(repetition)
543            .with_id(id)
544            .build(),
545        DataType::Date64 => {
546            if coerce_types {
547                Type::primitive_type_builder(name, PhysicalType::INT32)
548                    .with_logical_type(Some(LogicalType::Date))
549                    .with_repetition(repetition)
550                    .with_id(id)
551                    .build()
552            } else {
553                Type::primitive_type_builder(name, PhysicalType::INT64)
554                    .with_repetition(repetition)
555                    .with_id(id)
556                    .build()
557            }
558        }
559        DataType::Time32(TimeUnit::Second) => {
560            // Cannot represent seconds in LogicalType
561            Type::primitive_type_builder(name, PhysicalType::INT32)
562                .with_repetition(repetition)
563                .with_id(id)
564                .build()
565        }
566        DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
567            .with_logical_type(Some(LogicalType::Time {
568                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
569                unit: match unit {
570                    TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
571                    u => unreachable!("Invalid unit for Time32: {:?}", u),
572                },
573            }))
574            .with_repetition(repetition)
575            .with_id(id)
576            .build(),
577        DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
578            .with_logical_type(Some(LogicalType::Time {
579                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
580                unit: match unit {
581                    TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
582                    TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
583                    u => unreachable!("Invalid unit for Time64: {:?}", u),
584                },
585            }))
586            .with_repetition(repetition)
587            .with_id(id)
588            .build(),
589        DataType::Duration(_) => Err(arrow_err!("Converting Duration to parquet not supported",)),
590        DataType::Interval(_) => {
591            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
592                .with_converted_type(ConvertedType::INTERVAL)
593                .with_repetition(repetition)
594                .with_id(id)
595                .with_length(12)
596                .build()
597        }
598        DataType::Binary | DataType::LargeBinary => {
599            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
600                .with_repetition(repetition)
601                .with_id(id)
602                .build()
603        }
604        DataType::FixedSizeBinary(length) => {
605            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
606                .with_repetition(repetition)
607                .with_id(id)
608                .with_length(*length)
609                .with_logical_type(
610                    #[cfg(feature = "arrow_canonical_extension_types")]
611                    // If set, map arrow uuid extension type to parquet uuid logical type.
612                    field
613                        .try_extension_type::<Uuid>()
614                        .ok()
615                        .map(|_| LogicalType::Uuid),
616                    #[cfg(not(feature = "arrow_canonical_extension_types"))]
617                    None,
618                )
619                .build()
620        }
621        DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
622            .with_repetition(repetition)
623            .with_id(id)
624            .build(),
625        DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => {
626            // Decimal precision determines the Parquet physical type to use.
627            // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
628            let (physical_type, length) = if *precision > 1 && *precision <= 9 {
629                (PhysicalType::INT32, -1)
630            } else if *precision <= 18 {
631                (PhysicalType::INT64, -1)
632            } else {
633                (
634                    PhysicalType::FIXED_LEN_BYTE_ARRAY,
635                    decimal_length_from_precision(*precision) as i32,
636                )
637            };
638            Type::primitive_type_builder(name, physical_type)
639                .with_repetition(repetition)
640                .with_id(id)
641                .with_length(length)
642                .with_logical_type(Some(LogicalType::Decimal {
643                    scale: *scale as i32,
644                    precision: *precision as i32,
645                }))
646                .with_precision(*precision as i32)
647                .with_scale(*scale as i32)
648                .build()
649        }
650        DataType::Utf8 | DataType::LargeUtf8 => {
651            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
652                .with_logical_type({
653                    #[cfg(feature = "arrow_canonical_extension_types")]
654                    {
655                        // Use the Json logical type if the canonical Json
656                        // extension type is set on this field.
657                        field
658                            .try_extension_type::<Json>()
659                            .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
660                    }
661                    #[cfg(not(feature = "arrow_canonical_extension_types"))]
662                    Some(LogicalType::String)
663                })
664                .with_repetition(repetition)
665                .with_id(id)
666                .build()
667        }
668        DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
669            .with_logical_type({
670                #[cfg(feature = "arrow_canonical_extension_types")]
671                {
672                    // Use the Json logical type if the canonical Json
673                    // extension type is set on this field.
674                    field
675                        .try_extension_type::<Json>()
676                        .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
677                }
678                #[cfg(not(feature = "arrow_canonical_extension_types"))]
679                Some(LogicalType::String)
680            })
681            .with_repetition(repetition)
682            .with_id(id)
683            .build(),
684        DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
685            let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
686                // Ensure proper naming per the Parquet specification
687                let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
688                Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
689            } else {
690                Arc::new(arrow_to_parquet_type(f, coerce_types)?)
691            };
692
693            Type::group_type_builder(name)
694                .with_fields(vec![Arc::new(
695                    Type::group_type_builder("list")
696                        .with_fields(vec![field_ref])
697                        .with_repetition(Repetition::REPEATED)
698                        .build()?,
699                )])
700                .with_logical_type(Some(LogicalType::List))
701                .with_repetition(repetition)
702                .with_id(id)
703                .build()
704        }
705        DataType::ListView(_) | DataType::LargeListView(_) => {
706            unimplemented!("ListView/LargeListView not implemented")
707        }
708        DataType::Struct(fields) => {
709            if fields.is_empty() {
710                return Err(arrow_err!("Parquet does not support writing empty structs",));
711            }
712            // recursively convert children to types/nodes
713            let fields = fields
714                .iter()
715                .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
716                .collect::<Result<_>>()?;
717            Type::group_type_builder(name)
718                .with_fields(fields)
719                .with_repetition(repetition)
720                .with_id(id)
721                .build()
722        }
723        DataType::Map(field, _) => {
724            if let DataType::Struct(struct_fields) = field.data_type() {
725                // If coercing then set inner struct name to "key_value"
726                let map_struct_name = if coerce_types {
727                    PARQUET_MAP_STRUCT_NAME
728                } else {
729                    field.name()
730                };
731
732                // If coercing then ensure struct fields are named "key" and "value"
733                let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
734                    if coerce_types && fld.name() != name {
735                        let f = fld.as_ref().clone().with_name(name);
736                        Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
737                    } else {
738                        Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
739                    }
740                };
741                let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
742                let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
743
744                Type::group_type_builder(name)
745                    .with_fields(vec![Arc::new(
746                        Type::group_type_builder(map_struct_name)
747                            .with_fields(vec![key_field, val_field])
748                            .with_repetition(Repetition::REPEATED)
749                            .build()?,
750                    )])
751                    .with_logical_type(Some(LogicalType::Map))
752                    .with_repetition(repetition)
753                    .with_id(id)
754                    .build()
755            } else {
756                Err(arrow_err!(
757                    "DataType::Map should contain a struct field child",
758                ))
759            }
760        }
761        DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
762        DataType::Dictionary(_, ref value) => {
763            // Dictionary encoding not handled at the schema level
764            let dict_field = field.clone().with_data_type(value.as_ref().clone());
765            arrow_to_parquet_type(&dict_field, coerce_types)
766        }
767        DataType::RunEndEncoded(_, _) => Err(arrow_err!(
768            "Converting RunEndEncodedType to parquet not supported",
769        )),
770    }
771}
772
773fn field_id(field: &Field) -> Option<i32> {
774    let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
775    value.parse().ok() // Fail quietly if not a valid integer
776}
777
778#[cfg(test)]
779mod tests {
780    use super::*;
781
782    use std::{collections::HashMap, sync::Arc};
783
784    use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
785
786    use crate::arrow::PARQUET_FIELD_ID_META_KEY;
787    use crate::file::metadata::KeyValue;
788    use crate::file::reader::FileReader;
789    use crate::{
790        arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
791        schema::{parser::parse_message_type, types::SchemaDescriptor},
792    };
793
794    #[test]
795    fn test_flat_primitives() {
796        let message_type = "
797        message test_schema {
798            REQUIRED BOOLEAN boolean;
799            REQUIRED INT32   int8  (INT_8);
800            REQUIRED INT32   int16 (INT_16);
801            REQUIRED INT32   uint8 (INTEGER(8,false));
802            REQUIRED INT32   uint16 (INTEGER(16,false));
803            REQUIRED INT32   int32;
804            REQUIRED INT64   int64;
805            OPTIONAL DOUBLE  double;
806            OPTIONAL FLOAT   float;
807            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
808            OPTIONAL BINARY  string (UTF8);
809            OPTIONAL BINARY  string_2 (STRING);
810            OPTIONAL BINARY  json (JSON);
811        }
812        ";
813        let parquet_group_type = parse_message_type(message_type).unwrap();
814
815        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
816        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
817
818        let arrow_fields = Fields::from(vec![
819            Field::new("boolean", DataType::Boolean, false),
820            Field::new("int8", DataType::Int8, false),
821            Field::new("int16", DataType::Int16, false),
822            Field::new("uint8", DataType::UInt8, false),
823            Field::new("uint16", DataType::UInt16, false),
824            Field::new("int32", DataType::Int32, false),
825            Field::new("int64", DataType::Int64, false),
826            Field::new("double", DataType::Float64, true),
827            Field::new("float", DataType::Float32, true),
828            Field::new("float16", DataType::Float16, true),
829            Field::new("string", DataType::Utf8, true),
830            Field::new("string_2", DataType::Utf8, true),
831            Field::new("json", DataType::Utf8, true),
832        ]);
833
834        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
835    }
836
837    #[test]
838    fn test_decimal_fields() {
839        let message_type = "
840        message test_schema {
841                    REQUIRED INT32 decimal1 (DECIMAL(4,2));
842                    REQUIRED INT64 decimal2 (DECIMAL(12,2));
843                    REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
844                    REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
845                    REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
846                    REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
847                    REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
848        }
849        ";
850
851        let parquet_group_type = parse_message_type(message_type).unwrap();
852
853        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
854        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
855
856        let arrow_fields = Fields::from(vec![
857            Field::new("decimal1", DataType::Decimal128(4, 2), false),
858            Field::new("decimal2", DataType::Decimal128(12, 2), false),
859            Field::new("decimal3", DataType::Decimal128(30, 2), false),
860            Field::new("decimal4", DataType::Decimal128(33, 2), false),
861            Field::new("decimal5", DataType::Decimal128(38, 2), false),
862            Field::new("decimal6", DataType::Decimal256(39, 2), false),
863            Field::new("decimal7", DataType::Decimal256(39, 2), false),
864        ]);
865        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
866    }
867
868    #[test]
869    fn test_byte_array_fields() {
870        let message_type = "
871        message test_schema {
872            REQUIRED BYTE_ARRAY binary;
873            REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
874        }
875        ";
876
877        let parquet_group_type = parse_message_type(message_type).unwrap();
878
879        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
880        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
881
882        let arrow_fields = Fields::from(vec![
883            Field::new("binary", DataType::Binary, false),
884            Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
885        ]);
886        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
887    }
888
889    #[test]
890    fn test_duplicate_fields() {
891        let message_type = "
892        message test_schema {
893            REQUIRED BOOLEAN boolean;
894            REQUIRED INT32 int8 (INT_8);
895        }
896        ";
897
898        let parquet_group_type = parse_message_type(message_type).unwrap();
899
900        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
901        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
902
903        let arrow_fields = Fields::from(vec![
904            Field::new("boolean", DataType::Boolean, false),
905            Field::new("int8", DataType::Int8, false),
906        ]);
907        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
908
909        let converted_arrow_schema =
910            parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
911                .unwrap();
912        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
913    }
914
915    #[test]
916    fn test_parquet_lists() {
917        let mut arrow_fields = Vec::new();
918
919        // LIST encoding example taken from parquet-format/LogicalTypes.md
920        let message_type = "
921        message test_schema {
922          REQUIRED GROUP my_list (LIST) {
923            REPEATED GROUP list {
924              OPTIONAL BINARY element (UTF8);
925            }
926          }
927          OPTIONAL GROUP my_list (LIST) {
928            REPEATED GROUP list {
929              REQUIRED BINARY element (UTF8);
930            }
931          }
932          OPTIONAL GROUP array_of_arrays (LIST) {
933            REPEATED GROUP list {
934              REQUIRED GROUP element (LIST) {
935                REPEATED GROUP list {
936                  REQUIRED INT32 element;
937                }
938              }
939            }
940          }
941          OPTIONAL GROUP my_list (LIST) {
942            REPEATED GROUP element {
943              REQUIRED BINARY str (UTF8);
944            }
945          }
946          OPTIONAL GROUP my_list (LIST) {
947            REPEATED INT32 element;
948          }
949          OPTIONAL GROUP my_list (LIST) {
950            REPEATED GROUP element {
951              REQUIRED BINARY str (UTF8);
952              REQUIRED INT32 num;
953            }
954          }
955          OPTIONAL GROUP my_list (LIST) {
956            REPEATED GROUP array {
957              REQUIRED BINARY str (UTF8);
958            }
959
960          }
961          OPTIONAL GROUP my_list (LIST) {
962            REPEATED GROUP my_list_tuple {
963              REQUIRED BINARY str (UTF8);
964            }
965          }
966          REPEATED INT32 name;
967        }
968        ";
969
970        // // List<String> (list non-null, elements nullable)
971        // required group my_list (LIST) {
972        //   repeated group list {
973        //     optional binary element (UTF8);
974        //   }
975        // }
976        {
977            arrow_fields.push(Field::new_list(
978                "my_list",
979                Field::new("element", DataType::Utf8, true),
980                false,
981            ));
982        }
983
984        // // List<String> (list nullable, elements non-null)
985        // optional group my_list (LIST) {
986        //   repeated group list {
987        //     required binary element (UTF8);
988        //   }
989        // }
990        {
991            arrow_fields.push(Field::new_list(
992                "my_list",
993                Field::new("element", DataType::Utf8, false),
994                true,
995            ));
996        }
997
998        // Element types can be nested structures. For example, a list of lists:
999        //
1000        // // List<List<Integer>>
1001        // optional group array_of_arrays (LIST) {
1002        //   repeated group list {
1003        //     required group element (LIST) {
1004        //       repeated group list {
1005        //         required int32 element;
1006        //       }
1007        //     }
1008        //   }
1009        // }
1010        {
1011            let arrow_inner_list = Field::new("element", DataType::Int32, false);
1012            arrow_fields.push(Field::new_list(
1013                "array_of_arrays",
1014                Field::new_list("element", arrow_inner_list, false),
1015                true,
1016            ));
1017        }
1018
1019        // // List<String> (list nullable, elements non-null)
1020        // optional group my_list (LIST) {
1021        //   repeated group element {
1022        //     required binary str (UTF8);
1023        //   };
1024        // }
1025        {
1026            arrow_fields.push(Field::new_list(
1027                "my_list",
1028                Field::new("str", DataType::Utf8, false),
1029                true,
1030            ));
1031        }
1032
1033        // // List<Integer> (nullable list, non-null elements)
1034        // optional group my_list (LIST) {
1035        //   repeated int32 element;
1036        // }
1037        {
1038            arrow_fields.push(Field::new_list(
1039                "my_list",
1040                Field::new("element", DataType::Int32, false),
1041                true,
1042            ));
1043        }
1044
1045        // // List<Tuple<String, Integer>> (nullable list, non-null elements)
1046        // optional group my_list (LIST) {
1047        //   repeated group element {
1048        //     required binary str (UTF8);
1049        //     required int32 num;
1050        //   };
1051        // }
1052        {
1053            let fields = vec![
1054                Field::new("str", DataType::Utf8, false),
1055                Field::new("num", DataType::Int32, false),
1056            ];
1057            arrow_fields.push(Field::new_list(
1058                "my_list",
1059                Field::new_struct("element", fields, false),
1060                true,
1061            ));
1062        }
1063
1064        // // List<OneTuple<String>> (nullable list, non-null elements)
1065        // optional group my_list (LIST) {
1066        //   repeated group array {
1067        //     required binary str (UTF8);
1068        //   };
1069        // }
1070        // Special case: group is named array
1071        {
1072            let fields = vec![Field::new("str", DataType::Utf8, false)];
1073            arrow_fields.push(Field::new_list(
1074                "my_list",
1075                Field::new_struct("array", fields, false),
1076                true,
1077            ));
1078        }
1079
1080        // // List<OneTuple<String>> (nullable list, non-null elements)
1081        // optional group my_list (LIST) {
1082        //   repeated group my_list_tuple {
1083        //     required binary str (UTF8);
1084        //   };
1085        // }
1086        // Special case: group named ends in _tuple
1087        {
1088            let fields = vec![Field::new("str", DataType::Utf8, false)];
1089            arrow_fields.push(Field::new_list(
1090                "my_list",
1091                Field::new_struct("my_list_tuple", fields, false),
1092                true,
1093            ));
1094        }
1095
1096        // One-level encoding: Only allows required lists with required cells
1097        //   repeated value_type name
1098        {
1099            arrow_fields.push(Field::new_list(
1100                "name",
1101                Field::new("name", DataType::Int32, false),
1102                false,
1103            ));
1104        }
1105
1106        let parquet_group_type = parse_message_type(message_type).unwrap();
1107
1108        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1109        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1110        let converted_fields = converted_arrow_schema.fields();
1111
1112        assert_eq!(arrow_fields.len(), converted_fields.len());
1113        for i in 0..arrow_fields.len() {
1114            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1115        }
1116    }
1117
1118    #[test]
1119    fn test_parquet_list_nullable() {
1120        let mut arrow_fields = Vec::new();
1121
1122        let message_type = "
1123        message test_schema {
1124          REQUIRED GROUP my_list1 (LIST) {
1125            REPEATED GROUP list {
1126              OPTIONAL BINARY element (UTF8);
1127            }
1128          }
1129          OPTIONAL GROUP my_list2 (LIST) {
1130            REPEATED GROUP list {
1131              REQUIRED BINARY element (UTF8);
1132            }
1133          }
1134          REQUIRED GROUP my_list3 (LIST) {
1135            REPEATED GROUP list {
1136              REQUIRED BINARY element (UTF8);
1137            }
1138          }
1139        }
1140        ";
1141
1142        // // List<String> (list non-null, elements nullable)
1143        // required group my_list1 (LIST) {
1144        //   repeated group list {
1145        //     optional binary element (UTF8);
1146        //   }
1147        // }
1148        {
1149            arrow_fields.push(Field::new_list(
1150                "my_list1",
1151                Field::new("element", DataType::Utf8, true),
1152                false,
1153            ));
1154        }
1155
1156        // // List<String> (list nullable, elements non-null)
1157        // optional group my_list2 (LIST) {
1158        //   repeated group list {
1159        //     required binary element (UTF8);
1160        //   }
1161        // }
1162        {
1163            arrow_fields.push(Field::new_list(
1164                "my_list2",
1165                Field::new("element", DataType::Utf8, false),
1166                true,
1167            ));
1168        }
1169
1170        // // List<String> (list non-null, elements non-null)
1171        // repeated group my_list3 (LIST) {
1172        //   repeated group list {
1173        //     required binary element (UTF8);
1174        //   }
1175        // }
1176        {
1177            arrow_fields.push(Field::new_list(
1178                "my_list3",
1179                Field::new("element", DataType::Utf8, false),
1180                false,
1181            ));
1182        }
1183
1184        let parquet_group_type = parse_message_type(message_type).unwrap();
1185
1186        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1187        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1188        let converted_fields = converted_arrow_schema.fields();
1189
1190        assert_eq!(arrow_fields.len(), converted_fields.len());
1191        for i in 0..arrow_fields.len() {
1192            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1193        }
1194    }
1195
1196    #[test]
1197    fn test_parquet_maps() {
1198        let mut arrow_fields = Vec::new();
1199
1200        // LIST encoding example taken from parquet-format/LogicalTypes.md
1201        let message_type = "
1202        message test_schema {
1203          REQUIRED group my_map1 (MAP) {
1204            REPEATED group key_value {
1205              REQUIRED binary key (UTF8);
1206              OPTIONAL int32 value;
1207            }
1208          }
1209          OPTIONAL group my_map2 (MAP) {
1210            REPEATED group map {
1211              REQUIRED binary str (UTF8);
1212              REQUIRED int32 num;
1213            }
1214          }
1215          OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1216            REPEATED group map {
1217              REQUIRED binary key (UTF8);
1218              OPTIONAL int32 value;
1219            }
1220          }
1221          REQUIRED group my_map4 (MAP) {
1222            REPEATED group map {
1223              OPTIONAL binary key (UTF8);
1224              REQUIRED int32 value;
1225            }
1226          }
1227        }
1228        ";
1229
1230        // // Map<String, Integer>
1231        // required group my_map (MAP) {
1232        //   repeated group key_value {
1233        //     required binary key (UTF8);
1234        //     optional int32 value;
1235        //   }
1236        // }
1237        {
1238            arrow_fields.push(Field::new_map(
1239                "my_map1",
1240                "key_value",
1241                Field::new("key", DataType::Utf8, false),
1242                Field::new("value", DataType::Int32, true),
1243                false,
1244                false,
1245            ));
1246        }
1247
1248        // // Map<String, Integer> (nullable map, non-null values)
1249        // optional group my_map (MAP) {
1250        //   repeated group map {
1251        //     required binary str (UTF8);
1252        //     required int32 num;
1253        //   }
1254        // }
1255        {
1256            arrow_fields.push(Field::new_map(
1257                "my_map2",
1258                "map",
1259                Field::new("str", DataType::Utf8, false),
1260                Field::new("num", DataType::Int32, false),
1261                false,
1262                true,
1263            ));
1264        }
1265
1266        // // Map<String, Integer> (nullable map, nullable values)
1267        // optional group my_map (MAP_KEY_VALUE) {
1268        //   repeated group map {
1269        //     required binary key (UTF8);
1270        //     optional int32 value;
1271        //   }
1272        // }
1273        {
1274            arrow_fields.push(Field::new_map(
1275                "my_map3",
1276                "map",
1277                Field::new("key", DataType::Utf8, false),
1278                Field::new("value", DataType::Int32, true),
1279                false,
1280                true,
1281            ));
1282        }
1283
1284        // // Map<String, Integer> (non-compliant nullable key)
1285        // group my_map (MAP_KEY_VALUE) {
1286        //   repeated group map {
1287        //     optional binary key (UTF8);
1288        //     required int32 value;
1289        //   }
1290        // }
1291        {
1292            arrow_fields.push(Field::new_map(
1293                "my_map4",
1294                "map",
1295                Field::new("key", DataType::Utf8, false), // The key is always non-nullable (#5630)
1296                Field::new("value", DataType::Int32, false),
1297                false,
1298                false,
1299            ));
1300        }
1301
1302        let parquet_group_type = parse_message_type(message_type).unwrap();
1303
1304        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1305        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1306        let converted_fields = converted_arrow_schema.fields();
1307
1308        assert_eq!(arrow_fields.len(), converted_fields.len());
1309        for i in 0..arrow_fields.len() {
1310            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1311        }
1312    }
1313
1314    #[test]
1315    fn test_nested_schema() {
1316        let mut arrow_fields = Vec::new();
1317        {
1318            let group1_fields = Fields::from(vec![
1319                Field::new("leaf1", DataType::Boolean, false),
1320                Field::new("leaf2", DataType::Int32, false),
1321            ]);
1322            let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1323            arrow_fields.push(group1_struct);
1324
1325            let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1326            arrow_fields.push(leaf3_field);
1327        }
1328
1329        let message_type = "
1330        message test_schema {
1331          REQUIRED GROUP group1 {
1332            REQUIRED BOOLEAN leaf1;
1333            REQUIRED INT32 leaf2;
1334          }
1335          REQUIRED INT64 leaf3;
1336        }
1337        ";
1338        let parquet_group_type = parse_message_type(message_type).unwrap();
1339
1340        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1341        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1342        let converted_fields = converted_arrow_schema.fields();
1343
1344        assert_eq!(arrow_fields.len(), converted_fields.len());
1345        for i in 0..arrow_fields.len() {
1346            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1347        }
1348    }
1349
1350    #[test]
1351    fn test_nested_schema_partial() {
1352        let mut arrow_fields = Vec::new();
1353        {
1354            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1355            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1356            arrow_fields.push(group1);
1357
1358            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1359            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1360            arrow_fields.push(group2);
1361
1362            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1363        }
1364
1365        let message_type = "
1366        message test_schema {
1367          REQUIRED GROUP group1 {
1368            REQUIRED INT64 leaf1;
1369            REQUIRED INT64 leaf2;
1370          }
1371          REQUIRED  GROUP group2 {
1372            REQUIRED INT64 leaf3;
1373            REQUIRED INT64 leaf4;
1374          }
1375          REQUIRED INT64 leaf5;
1376        }
1377        ";
1378        let parquet_group_type = parse_message_type(message_type).unwrap();
1379
1380        // Expected partial arrow schema (columns 0, 3, 4):
1381        // required group group1 {
1382        //   required int64 leaf1;
1383        // }
1384        // required group group2 {
1385        //   required int64 leaf4;
1386        // }
1387        // required int64 leaf5;
1388
1389        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1390        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1391        let converted_arrow_schema =
1392            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1393        let converted_fields = converted_arrow_schema.fields();
1394
1395        assert_eq!(arrow_fields.len(), converted_fields.len());
1396        for i in 0..arrow_fields.len() {
1397            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1398        }
1399    }
1400
1401    #[test]
1402    fn test_nested_schema_partial_ordering() {
1403        let mut arrow_fields = Vec::new();
1404        {
1405            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1406            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1407            arrow_fields.push(group1);
1408
1409            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1410            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1411            arrow_fields.push(group2);
1412
1413            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1414        }
1415
1416        let message_type = "
1417        message test_schema {
1418          REQUIRED GROUP group1 {
1419            REQUIRED INT64 leaf1;
1420            REQUIRED INT64 leaf2;
1421          }
1422          REQUIRED  GROUP group2 {
1423            REQUIRED INT64 leaf3;
1424            REQUIRED INT64 leaf4;
1425          }
1426          REQUIRED INT64 leaf5;
1427        }
1428        ";
1429        let parquet_group_type = parse_message_type(message_type).unwrap();
1430
1431        // Expected partial arrow schema (columns 3, 4, 0):
1432        // required group group1 {
1433        //   required int64 leaf1;
1434        // }
1435        // required group group2 {
1436        //   required int64 leaf4;
1437        // }
1438        // required int64 leaf5;
1439
1440        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1441        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1442        let converted_arrow_schema =
1443            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1444        let converted_fields = converted_arrow_schema.fields();
1445
1446        assert_eq!(arrow_fields.len(), converted_fields.len());
1447        for i in 0..arrow_fields.len() {
1448            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1449        }
1450
1451        let mask =
1452            ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1453        let converted_arrow_schema =
1454            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1455        let converted_fields = converted_arrow_schema.fields();
1456
1457        assert_eq!(arrow_fields.len(), converted_fields.len());
1458        for i in 0..arrow_fields.len() {
1459            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1460        }
1461    }
1462
1463    #[test]
1464    fn test_repeated_nested_schema() {
1465        let mut arrow_fields = Vec::new();
1466        {
1467            arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1468
1469            let inner_group_list = Field::new_list(
1470                "innerGroup",
1471                Field::new_struct(
1472                    "innerGroup",
1473                    vec![Field::new("leaf3", DataType::Int32, true)],
1474                    false,
1475                ),
1476                false,
1477            );
1478
1479            let outer_group_list = Field::new_list(
1480                "outerGroup",
1481                Field::new_struct(
1482                    "outerGroup",
1483                    vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1484                    false,
1485                ),
1486                false,
1487            );
1488            arrow_fields.push(outer_group_list);
1489        }
1490
1491        let message_type = "
1492        message test_schema {
1493          OPTIONAL INT32 leaf1;
1494          REPEATED GROUP outerGroup {
1495            OPTIONAL INT32 leaf2;
1496            REPEATED GROUP innerGroup {
1497              OPTIONAL INT32 leaf3;
1498            }
1499          }
1500        }
1501        ";
1502        let parquet_group_type = parse_message_type(message_type).unwrap();
1503
1504        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1505        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1506        let converted_fields = converted_arrow_schema.fields();
1507
1508        assert_eq!(arrow_fields.len(), converted_fields.len());
1509        for i in 0..arrow_fields.len() {
1510            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1511        }
1512    }
1513
1514    #[test]
1515    fn test_column_desc_to_field() {
1516        let message_type = "
1517        message test_schema {
1518            REQUIRED BOOLEAN boolean;
1519            REQUIRED INT32   int8  (INT_8);
1520            REQUIRED INT32   uint8 (INTEGER(8,false));
1521            REQUIRED INT32   int16 (INT_16);
1522            REQUIRED INT32   uint16 (INTEGER(16,false));
1523            REQUIRED INT32   int32;
1524            REQUIRED INT64   int64;
1525            OPTIONAL DOUBLE  double;
1526            OPTIONAL FLOAT   float;
1527            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1528            OPTIONAL BINARY  string (UTF8);
1529            REPEATED BOOLEAN bools;
1530            OPTIONAL INT32   date       (DATE);
1531            OPTIONAL INT32   time_milli (TIME_MILLIS);
1532            OPTIONAL INT64   time_micro (TIME_MICROS);
1533            OPTIONAL INT64   time_nano (TIME(NANOS,false));
1534            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1535            REQUIRED INT64   ts_micro (TIMESTAMP_MICROS);
1536            REQUIRED INT64   ts_nano (TIMESTAMP(NANOS,true));
1537            REPEATED INT32   int_list;
1538            REPEATED BINARY  byte_list;
1539            REPEATED BINARY  string_list (UTF8);
1540            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1541            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1542            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1543        }
1544        ";
1545        let parquet_group_type = parse_message_type(message_type).unwrap();
1546
1547        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1548        let converted_arrow_fields = parquet_schema
1549            .columns()
1550            .iter()
1551            .map(|c| parquet_to_arrow_field(c).unwrap())
1552            .collect::<Vec<Field>>();
1553
1554        let arrow_fields = vec![
1555            Field::new("boolean", DataType::Boolean, false),
1556            Field::new("int8", DataType::Int8, false),
1557            Field::new("uint8", DataType::UInt8, false),
1558            Field::new("int16", DataType::Int16, false),
1559            Field::new("uint16", DataType::UInt16, false),
1560            Field::new("int32", DataType::Int32, false),
1561            Field::new("int64", DataType::Int64, false),
1562            Field::new("double", DataType::Float64, true),
1563            Field::new("float", DataType::Float32, true),
1564            Field::new("float16", DataType::Float16, true),
1565            Field::new("string", DataType::Utf8, true),
1566            Field::new_list(
1567                "bools",
1568                Field::new("bools", DataType::Boolean, false),
1569                false,
1570            ),
1571            Field::new("date", DataType::Date32, true),
1572            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1573            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1574            Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1575            Field::new(
1576                "ts_milli",
1577                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1578                true,
1579            ),
1580            Field::new(
1581                "ts_micro",
1582                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1583                false,
1584            ),
1585            Field::new(
1586                "ts_nano",
1587                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1588                false,
1589            ),
1590            Field::new_list(
1591                "int_list",
1592                Field::new("int_list", DataType::Int32, false),
1593                false,
1594            ),
1595            Field::new_list(
1596                "byte_list",
1597                Field::new("byte_list", DataType::Binary, false),
1598                false,
1599            ),
1600            Field::new_list(
1601                "string_list",
1602                Field::new("string_list", DataType::Utf8, false),
1603                false,
1604            ),
1605            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1606            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1607            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1608        ];
1609
1610        assert_eq!(arrow_fields, converted_arrow_fields);
1611    }
1612
1613    #[test]
1614    fn test_coerced_map_list() {
1615        // Create Arrow schema with non-Parquet naming
1616        let arrow_fields = vec![
1617            Field::new_list(
1618                "my_list",
1619                Field::new("item", DataType::Boolean, true),
1620                false,
1621            ),
1622            Field::new_map(
1623                "my_map",
1624                "entries",
1625                Field::new("keys", DataType::Utf8, false),
1626                Field::new("values", DataType::Int32, true),
1627                false,
1628                true,
1629            ),
1630        ];
1631        let arrow_schema = Schema::new(arrow_fields);
1632
1633        // Create Parquet schema with coerced names
1634        let message_type = "
1635        message parquet_schema {
1636            REQUIRED GROUP my_list (LIST) {
1637                REPEATED GROUP list {
1638                    OPTIONAL BOOLEAN element;
1639                }
1640            }
1641            OPTIONAL GROUP my_map (MAP) {
1642                REPEATED GROUP key_value {
1643                    REQUIRED BINARY key (STRING);
1644                    OPTIONAL INT32 value;
1645                }
1646            }
1647        }
1648        ";
1649        let parquet_group_type = parse_message_type(message_type).unwrap();
1650        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1651        let converted_arrow_schema = ArrowSchemaConverter::new()
1652            .with_coerce_types(true)
1653            .convert(&arrow_schema)
1654            .unwrap();
1655        assert_eq!(
1656            parquet_schema.columns().len(),
1657            converted_arrow_schema.columns().len()
1658        );
1659
1660        // Create Parquet schema without coerced names
1661        let message_type = "
1662        message parquet_schema {
1663            REQUIRED GROUP my_list (LIST) {
1664                REPEATED GROUP list {
1665                    OPTIONAL BOOLEAN item;
1666                }
1667            }
1668            OPTIONAL GROUP my_map (MAP) {
1669                REPEATED GROUP entries {
1670                    REQUIRED BINARY keys (STRING);
1671                    OPTIONAL INT32 values;
1672                }
1673            }
1674        }
1675        ";
1676        let parquet_group_type = parse_message_type(message_type).unwrap();
1677        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1678        let converted_arrow_schema = ArrowSchemaConverter::new()
1679            .with_coerce_types(false)
1680            .convert(&arrow_schema)
1681            .unwrap();
1682        assert_eq!(
1683            parquet_schema.columns().len(),
1684            converted_arrow_schema.columns().len()
1685        );
1686    }
1687
1688    #[test]
1689    fn test_field_to_column_desc() {
1690        let message_type = "
1691        message arrow_schema {
1692            REQUIRED BOOLEAN boolean;
1693            REQUIRED INT32   int8  (INT_8);
1694            REQUIRED INT32   int16 (INTEGER(16,true));
1695            REQUIRED INT32   int32;
1696            REQUIRED INT64   int64;
1697            OPTIONAL DOUBLE  double;
1698            OPTIONAL FLOAT   float;
1699            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1700            OPTIONAL BINARY  string (STRING);
1701            OPTIONAL GROUP   bools (LIST) {
1702                REPEATED GROUP list {
1703                    OPTIONAL BOOLEAN element;
1704                }
1705            }
1706            REQUIRED GROUP   bools_non_null (LIST) {
1707                REPEATED GROUP list {
1708                    REQUIRED BOOLEAN element;
1709                }
1710            }
1711            OPTIONAL INT32   date       (DATE);
1712            OPTIONAL INT32   time_milli (TIME(MILLIS,false));
1713            OPTIONAL INT32   time_milli_utc (TIME(MILLIS,true));
1714            OPTIONAL INT64   time_micro (TIME_MICROS);
1715            OPTIONAL INT64   time_micro_utc (TIME(MICROS, true));
1716            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1717            REQUIRED INT64   ts_micro (TIMESTAMP(MICROS,false));
1718            REQUIRED INT64   ts_seconds;
1719            REQUIRED INT64   ts_micro_utc (TIMESTAMP(MICROS, true));
1720            REQUIRED INT64   ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1721            REQUIRED INT64   ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1722            REQUIRED INT64   ts_micro_non_utc (TIMESTAMP(MICROS, true));
1723            REQUIRED GROUP struct {
1724                REQUIRED BOOLEAN bools;
1725                REQUIRED INT32 uint32 (INTEGER(32,false));
1726                REQUIRED GROUP   int32 (LIST) {
1727                    REPEATED GROUP list {
1728                        OPTIONAL INT32 element;
1729                    }
1730                }
1731            }
1732            REQUIRED BINARY  dictionary_strings (STRING);
1733            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1734            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1735            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1736            REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1737            REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1738        }
1739        ";
1740        let parquet_group_type = parse_message_type(message_type).unwrap();
1741
1742        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1743
1744        let arrow_fields = vec![
1745            Field::new("boolean", DataType::Boolean, false),
1746            Field::new("int8", DataType::Int8, false),
1747            Field::new("int16", DataType::Int16, false),
1748            Field::new("int32", DataType::Int32, false),
1749            Field::new("int64", DataType::Int64, false),
1750            Field::new("double", DataType::Float64, true),
1751            Field::new("float", DataType::Float32, true),
1752            Field::new("float16", DataType::Float16, true),
1753            Field::new("string", DataType::Utf8, true),
1754            Field::new_list(
1755                "bools",
1756                Field::new("element", DataType::Boolean, true),
1757                true,
1758            ),
1759            Field::new_list(
1760                "bools_non_null",
1761                Field::new("element", DataType::Boolean, false),
1762                false,
1763            ),
1764            Field::new("date", DataType::Date32, true),
1765            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1766            Field::new(
1767                "time_milli_utc",
1768                DataType::Time32(TimeUnit::Millisecond),
1769                true,
1770            )
1771            .with_metadata(HashMap::from_iter(vec![(
1772                "adjusted_to_utc".to_string(),
1773                "".to_string(),
1774            )])),
1775            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1776            Field::new(
1777                "time_micro_utc",
1778                DataType::Time64(TimeUnit::Microsecond),
1779                true,
1780            )
1781            .with_metadata(HashMap::from_iter(vec![(
1782                "adjusted_to_utc".to_string(),
1783                "".to_string(),
1784            )])),
1785            Field::new(
1786                "ts_milli",
1787                DataType::Timestamp(TimeUnit::Millisecond, None),
1788                true,
1789            ),
1790            Field::new(
1791                "ts_micro",
1792                DataType::Timestamp(TimeUnit::Microsecond, None),
1793                false,
1794            ),
1795            Field::new(
1796                "ts_seconds",
1797                DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1798                false,
1799            ),
1800            Field::new(
1801                "ts_micro_utc",
1802                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1803                false,
1804            ),
1805            Field::new(
1806                "ts_millis_zero_offset",
1807                DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1808                false,
1809            ),
1810            Field::new(
1811                "ts_millis_zero_negative_offset",
1812                DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1813                false,
1814            ),
1815            Field::new(
1816                "ts_micro_non_utc",
1817                DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1818                false,
1819            ),
1820            Field::new_struct(
1821                "struct",
1822                vec![
1823                    Field::new("bools", DataType::Boolean, false),
1824                    Field::new("uint32", DataType::UInt32, false),
1825                    Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1826                ],
1827                false,
1828            ),
1829            Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1830            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1831            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1832            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1833            Field::new("decimal128", DataType::Decimal128(38, 2), false),
1834            Field::new("decimal256", DataType::Decimal256(39, 2), false),
1835        ];
1836        let arrow_schema = Schema::new(arrow_fields);
1837        let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1838
1839        assert_eq!(
1840            parquet_schema.columns().len(),
1841            converted_arrow_schema.columns().len()
1842        );
1843        parquet_schema
1844            .columns()
1845            .iter()
1846            .zip(converted_arrow_schema.columns())
1847            .for_each(|(a, b)| {
1848                // Only check logical type if it's set on the Parquet side.
1849                // This is because the Arrow conversion always sets logical type,
1850                // even if there wasn't originally one.
1851                // This is not an issue, but is an inconvenience for this test.
1852                match a.logical_type() {
1853                    Some(_) => {
1854                        assert_eq!(a, b)
1855                    }
1856                    None => {
1857                        assert_eq!(a.name(), b.name());
1858                        assert_eq!(a.physical_type(), b.physical_type());
1859                        assert_eq!(a.converted_type(), b.converted_type());
1860                    }
1861                };
1862            });
1863    }
1864
1865    #[test]
1866    #[should_panic(expected = "Parquet does not support writing empty structs")]
1867    fn test_empty_struct_field() {
1868        let arrow_fields = vec![Field::new(
1869            "struct",
1870            DataType::Struct(Fields::empty()),
1871            false,
1872        )];
1873        let arrow_schema = Schema::new(arrow_fields);
1874        let converted_arrow_schema = ArrowSchemaConverter::new()
1875            .with_coerce_types(true)
1876            .convert(&arrow_schema);
1877
1878        converted_arrow_schema.unwrap();
1879    }
1880
1881    #[test]
1882    fn test_metadata() {
1883        let message_type = "
1884        message test_schema {
1885            OPTIONAL BINARY  string (STRING);
1886        }
1887        ";
1888        let parquet_group_type = parse_message_type(message_type).unwrap();
1889
1890        let key_value_metadata = vec![
1891            KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
1892            KeyValue::new("baz".to_owned(), None),
1893        ];
1894
1895        let mut expected_metadata: HashMap<String, String> = HashMap::new();
1896        expected_metadata.insert("foo".to_owned(), "bar".to_owned());
1897
1898        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1899        let converted_arrow_schema =
1900            parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
1901
1902        assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
1903    }
1904
1905    #[test]
1906    fn test_arrow_schema_roundtrip() -> Result<()> {
1907        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
1908            a.iter()
1909                .map(|(a, b)| (a.to_string(), b.to_string()))
1910                .collect()
1911        };
1912
1913        let schema = Schema::new_with_metadata(
1914            vec![
1915                Field::new("c1", DataType::Utf8, false)
1916                    .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
1917                Field::new("c2", DataType::Binary, false),
1918                Field::new("c3", DataType::FixedSizeBinary(3), false),
1919                Field::new("c4", DataType::Boolean, false),
1920                Field::new("c5", DataType::Date32, false),
1921                Field::new("c6", DataType::Date64, false),
1922                Field::new("c7", DataType::Time32(TimeUnit::Second), false),
1923                Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
1924                Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
1925                Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
1926                Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
1927                Field::new(
1928                    "c16",
1929                    DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1930                    false,
1931                ),
1932                Field::new(
1933                    "c17",
1934                    DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1935                    false,
1936                ),
1937                Field::new(
1938                    "c18",
1939                    DataType::Timestamp(TimeUnit::Nanosecond, None),
1940                    false,
1941                ),
1942                Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
1943                Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
1944                Field::new_list(
1945                    "c21",
1946                    Field::new_list_field(DataType::Boolean, true)
1947                        .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
1948                    false,
1949                )
1950                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
1951                Field::new(
1952                    "c22",
1953                    DataType::FixedSizeList(
1954                        Arc::new(Field::new_list_field(DataType::Boolean, true)),
1955                        5,
1956                    ),
1957                    false,
1958                ),
1959                Field::new_list(
1960                    "c23",
1961                    Field::new_large_list(
1962                        "inner",
1963                        Field::new_list_field(
1964                            DataType::Struct(
1965                                vec![
1966                                    Field::new("a", DataType::Int16, true),
1967                                    Field::new("b", DataType::Float64, false),
1968                                    Field::new("c", DataType::Float32, false),
1969                                    Field::new("d", DataType::Float16, false),
1970                                ]
1971                                .into(),
1972                            ),
1973                            false,
1974                        ),
1975                        true,
1976                    ),
1977                    false,
1978                ),
1979                Field::new(
1980                    "c24",
1981                    DataType::Struct(Fields::from(vec![
1982                        Field::new("a", DataType::Utf8, false),
1983                        Field::new("b", DataType::UInt16, false),
1984                    ])),
1985                    false,
1986                ),
1987                Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
1988                Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
1989                // Duration types not supported
1990                // Field::new("c27", DataType::Duration(TimeUnit::Second), false),
1991                // Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false),
1992                // Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false),
1993                // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false),
1994                #[allow(deprecated)]
1995                Field::new_dict(
1996                    "c31",
1997                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
1998                    true,
1999                    123,
2000                    true,
2001                )
2002                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
2003                Field::new("c32", DataType::LargeBinary, true),
2004                Field::new("c33", DataType::LargeUtf8, true),
2005                Field::new_large_list(
2006                    "c34",
2007                    Field::new_list(
2008                        "inner",
2009                        Field::new_list_field(
2010                            DataType::Struct(
2011                                vec![
2012                                    Field::new("a", DataType::Int16, true),
2013                                    Field::new("b", DataType::Float64, true),
2014                                ]
2015                                .into(),
2016                            ),
2017                            true,
2018                        ),
2019                        true,
2020                    ),
2021                    true,
2022                ),
2023                Field::new("c35", DataType::Null, true),
2024                Field::new("c36", DataType::Decimal128(2, 1), false),
2025                Field::new("c37", DataType::Decimal256(50, 20), false),
2026                Field::new("c38", DataType::Decimal128(18, 12), true),
2027                Field::new_map(
2028                    "c39",
2029                    "key_value",
2030                    Field::new("key", DataType::Utf8, false),
2031                    Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2032                    false, // fails to roundtrip keys_sorted
2033                    true,
2034                ),
2035                Field::new_map(
2036                    "c40",
2037                    "my_entries",
2038                    Field::new("my_key", DataType::Utf8, false)
2039                        .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2040                    Field::new_list(
2041                        "my_value",
2042                        Field::new_list_field(DataType::Utf8, true)
2043                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2044                        true,
2045                    )
2046                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2047                    false, // fails to roundtrip keys_sorted
2048                    true,
2049                )
2050                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2051                Field::new_map(
2052                    "c41",
2053                    "my_entries",
2054                    Field::new("my_key", DataType::Utf8, false),
2055                    Field::new_list(
2056                        "my_value",
2057                        Field::new_list_field(DataType::Utf8, true)
2058                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2059                        true,
2060                    ),
2061                    false, // fails to roundtrip keys_sorted
2062                    false,
2063                ),
2064            ],
2065            meta(&[("Key", "Value")]),
2066        );
2067
2068        // write to an empty parquet file so that schema is serialized
2069        let file = tempfile::tempfile().unwrap();
2070        let writer =
2071            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2072        writer.close()?;
2073
2074        // read file back
2075        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2076
2077        // Check arrow schema
2078        let read_schema = arrow_reader.schema();
2079        assert_eq!(&schema, read_schema.as_ref());
2080
2081        // Walk schema finding field IDs
2082        let mut stack = Vec::with_capacity(10);
2083        let mut out = Vec::with_capacity(10);
2084
2085        let root = arrow_reader.parquet_schema().root_schema_ptr();
2086        stack.push((root.name().to_string(), root));
2087
2088        while let Some((p, t)) = stack.pop() {
2089            if t.is_group() {
2090                for f in t.get_fields() {
2091                    stack.push((format!("{p}.{}", f.name()), f.clone()))
2092                }
2093            }
2094
2095            let info = t.get_basic_info();
2096            if info.has_id() {
2097                out.push(format!("{p} -> {}", info.id()))
2098            }
2099        }
2100        out.sort_unstable();
2101        let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2102
2103        assert_eq!(
2104            &out,
2105            &[
2106                "arrow_schema.c1 -> 2",
2107                "arrow_schema.c21 -> 4",
2108                "arrow_schema.c21.list.item -> 5",
2109                "arrow_schema.c31 -> 6",
2110                "arrow_schema.c40 -> 7",
2111                "arrow_schema.c40.my_entries.my_key -> 8",
2112                "arrow_schema.c40.my_entries.my_value -> 9",
2113                "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2114                "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2115            ]
2116        );
2117
2118        Ok(())
2119    }
2120
2121    #[test]
2122    fn test_read_parquet_field_ids_raw() -> Result<()> {
2123        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2124            a.iter()
2125                .map(|(a, b)| (a.to_string(), b.to_string()))
2126                .collect()
2127        };
2128        let schema = Schema::new_with_metadata(
2129            vec![
2130                Field::new("c1", DataType::Utf8, true)
2131                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2132                Field::new("c2", DataType::Utf8, true)
2133                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2134            ],
2135            HashMap::new(),
2136        );
2137
2138        let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2139        let parquet_bytes = writer.into_inner()?;
2140
2141        let reader =
2142            crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2143        let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2144
2145        // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema
2146        let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2147
2148        let parq_schema_descr = ArrowSchemaConverter::new()
2149            .with_coerce_types(true)
2150            .convert(&arrow_schema)?;
2151        let parq_fields = parq_schema_descr.root_schema().get_fields();
2152        assert_eq!(parq_fields.len(), 2);
2153        assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2154        assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2155
2156        Ok(())
2157    }
2158
2159    #[test]
2160    fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2161        let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2162            .iter()
2163            .cloned()
2164            .collect();
2165
2166        let schema = Schema::new_with_metadata(
2167            vec![
2168                Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2169                Field::new(
2170                    "c22",
2171                    DataType::FixedSizeList(
2172                        Arc::new(Field::new("items", DataType::Boolean, false)),
2173                        5,
2174                    ),
2175                    false,
2176                ),
2177                Field::new_list(
2178                    "c23",
2179                    Field::new_large_list(
2180                        "items",
2181                        Field::new_struct(
2182                            "items",
2183                            vec![
2184                                Field::new("a", DataType::Int16, true),
2185                                Field::new("b", DataType::Float64, false),
2186                            ],
2187                            true,
2188                        ),
2189                        true,
2190                    ),
2191                    true,
2192                ),
2193            ],
2194            metadata,
2195        );
2196
2197        // write to an empty parquet file so that schema is serialized
2198        let file = tempfile::tempfile().unwrap();
2199        let writer =
2200            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2201        writer.close()?;
2202
2203        // read file back
2204        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2205        let read_schema = arrow_reader.schema();
2206        assert_eq!(&schema, read_schema.as_ref());
2207        Ok(())
2208    }
2209
2210    #[test]
2211    fn test_get_arrow_schema_from_metadata() {
2212        assert!(get_arrow_schema_from_metadata("").is_err());
2213    }
2214
2215    #[test]
2216    #[cfg(feature = "arrow_canonical_extension_types")]
2217    fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2218        let arrow_schema = Schema::new(vec![Field::new(
2219            "uuid",
2220            DataType::FixedSizeBinary(16),
2221            false,
2222        )
2223        .with_extension_type(Uuid)]);
2224
2225        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2226
2227        assert_eq!(
2228            parquet_schema.column(0).logical_type(),
2229            Some(LogicalType::Uuid)
2230        );
2231
2232        // TODO: roundtrip
2233        // let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2234        // assert_eq!(arrow_schema.field(0).try_extension_type::<Uuid>()?, Uuid);
2235
2236        Ok(())
2237    }
2238
2239    #[test]
2240    #[cfg(feature = "arrow_canonical_extension_types")]
2241    fn arrow_json_to_parquet_json() -> Result<()> {
2242        let arrow_schema = Schema::new(vec![
2243            Field::new("json", DataType::Utf8, false).with_extension_type(Json::default())
2244        ]);
2245
2246        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2247
2248        assert_eq!(
2249            parquet_schema.column(0).logical_type(),
2250            Some(LogicalType::Json)
2251        );
2252
2253        // TODO: roundtrip
2254        // https://github.com/apache/arrow-rs/issues/7063
2255        // let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2256        // assert_eq!(
2257        //     arrow_schema.field(0).try_extension_type::<Json>()?,
2258        //     Json::default()
2259        // );
2260
2261        Ok(())
2262    }
2263}