parquet/arrow/schema/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Converting Parquet schema <--> Arrow schema: [`ArrowSchemaConverter`] and [parquet_to_arrow_schema]
19
20use base64::prelude::BASE64_STANDARD;
21use base64::Engine;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26#[cfg(feature = "arrow_canonical_extension_types")]
27use arrow_schema::extension::{Json, Uuid};
28use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
29
30use crate::basic::{
31    ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
32};
33use crate::errors::{ParquetError, Result};
34use crate::file::{metadata::KeyValue, properties::WriterProperties};
35use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
36
37mod complex;
38mod primitive;
39
40use crate::arrow::ProjectionMask;
41pub(crate) use complex::{ParquetField, ParquetFieldType};
42
43use super::PARQUET_FIELD_ID_META_KEY;
44
45/// Convert Parquet schema to Arrow schema including optional metadata
46///
47/// Attempts to decode any existing Arrow schema metadata, falling back
48/// to converting the Parquet schema column-wise
49pub fn parquet_to_arrow_schema(
50    parquet_schema: &SchemaDescriptor,
51    key_value_metadata: Option<&Vec<KeyValue>>,
52) -> Result<Schema> {
53    parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
54}
55
56/// Convert parquet schema to arrow schema including optional metadata,
57/// only preserving some leaf columns.
58pub fn parquet_to_arrow_schema_by_columns(
59    parquet_schema: &SchemaDescriptor,
60    mask: ProjectionMask,
61    key_value_metadata: Option<&Vec<KeyValue>>,
62) -> Result<Schema> {
63    Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0)
64}
65
66/// Extracts the arrow metadata
67pub(crate) fn parquet_to_arrow_schema_and_fields(
68    parquet_schema: &SchemaDescriptor,
69    mask: ProjectionMask,
70    key_value_metadata: Option<&Vec<KeyValue>>,
71) -> Result<(Schema, Option<ParquetField>)> {
72    let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
73    let maybe_schema = metadata
74        .remove(super::ARROW_SCHEMA_META_KEY)
75        .map(|value| get_arrow_schema_from_metadata(&value))
76        .transpose()?;
77
78    // Add the Arrow metadata to the Parquet metadata skipping keys that collide
79    if let Some(arrow_schema) = &maybe_schema {
80        arrow_schema.metadata().iter().for_each(|(k, v)| {
81            metadata.entry(k.clone()).or_insert_with(|| v.clone());
82        });
83    }
84
85    let hint = maybe_schema.as_ref().map(|s| s.fields());
86    let field_levels = parquet_to_arrow_field_levels(parquet_schema, mask, hint)?;
87    let schema = Schema::new_with_metadata(field_levels.fields, metadata);
88    Ok((schema, field_levels.levels))
89}
90
91/// Schema information necessary to decode a parquet file as arrow [`Fields`]
92///
93/// In particular this stores the dremel-level information necessary to correctly
94/// interpret the encoded definition and repetition levels
95///
96/// Note: this is an opaque container intended to be used with lower-level APIs
97/// within this crate
98#[derive(Debug, Clone)]
99pub struct FieldLevels {
100    pub(crate) fields: Fields,
101    pub(crate) levels: Option<ParquetField>,
102}
103
104/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`]
105///
106/// Columns not included within [`ProjectionMask`] will be ignored.
107///
108/// The optional `hint` parameter is the desired Arrow schema. See the
109/// [`arrow`] module documentation for more information.
110///
111/// [`arrow`]: crate::arrow
112///
113/// # Notes:
114/// Where a field type in `hint` is compatible with the corresponding parquet type in `schema`, it
115/// will be used, otherwise the default arrow type for the given parquet column type will be used.
116///
117/// This is to accommodate arrow types that cannot be round-tripped through parquet natively.
118/// Depending on the parquet writer, this can lead to a mismatch between a file's parquet schema
119/// and its embedded arrow schema. The parquet `schema` must be treated as authoritative in such
120/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for more information
121///
122/// Note: this is a low-level API, most users will want to make use of the higher-level
123/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file.
124pub fn parquet_to_arrow_field_levels(
125    schema: &SchemaDescriptor,
126    mask: ProjectionMask,
127    hint: Option<&Fields>,
128) -> Result<FieldLevels> {
129    match complex::convert_schema(schema, mask, hint)? {
130        Some(field) => match &field.arrow_type {
131            DataType::Struct(fields) => Ok(FieldLevels {
132                fields: fields.clone(),
133                levels: Some(field),
134            }),
135            _ => unreachable!(),
136        },
137        None => Ok(FieldLevels {
138            fields: Fields::empty(),
139            levels: None,
140        }),
141    }
142}
143
144/// Try to convert Arrow schema metadata into a schema
145fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
146    let decoded = BASE64_STANDARD.decode(encoded_meta);
147    match decoded {
148        Ok(bytes) => {
149            let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
150                &bytes[8..]
151            } else {
152                bytes.as_slice()
153            };
154            match arrow_ipc::root_as_message(slice) {
155                Ok(message) => message
156                    .header_as_schema()
157                    .map(arrow_ipc::convert::fb_to_schema)
158                    .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
159                Err(err) => {
160                    // The flatbuffers implementation returns an error on verification error.
161                    Err(arrow_err!(
162                        "Unable to get root as message stored in {}: {:?}",
163                        super::ARROW_SCHEMA_META_KEY,
164                        err
165                    ))
166                }
167            }
168        }
169        Err(err) => {
170            // The C++ implementation returns an error if the schema can't be parsed.
171            Err(arrow_err!(
172                "Unable to decode the encoded schema stored in {}, {:?}",
173                super::ARROW_SCHEMA_META_KEY,
174                err
175            ))
176        }
177    }
178}
179
180/// Encodes the Arrow schema into the IPC format, and base64 encodes it
181pub fn encode_arrow_schema(schema: &Schema) -> String {
182    let options = writer::IpcWriteOptions::default();
183    #[allow(deprecated)]
184    let mut dictionary_tracker =
185        writer::DictionaryTracker::new_with_preserve_dict_id(true, options.preserve_dict_id());
186    let data_gen = writer::IpcDataGenerator::default();
187    let mut serialized_schema =
188        data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
189
190    // manually prepending the length to the schema as arrow uses the legacy IPC format
191    // TODO: change after addressing ARROW-9777
192    let schema_len = serialized_schema.ipc_message.len();
193    let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
194    len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
195    len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
196    len_prefix_schema.append(&mut serialized_schema.ipc_message);
197
198    BASE64_STANDARD.encode(&len_prefix_schema)
199}
200
201/// Mutates writer metadata by storing the encoded Arrow schema hint in
202/// [`ARROW_SCHEMA_META_KEY`].
203///
204/// If there is an existing Arrow schema metadata, it is replaced.
205///
206/// [`ARROW_SCHEMA_META_KEY`]: crate::arrow::ARROW_SCHEMA_META_KEY
207pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
208    let encoded = encode_arrow_schema(schema);
209
210    let schema_kv = KeyValue {
211        key: super::ARROW_SCHEMA_META_KEY.to_string(),
212        value: Some(encoded),
213    };
214
215    let meta = props
216        .key_value_metadata
217        .get_or_insert_with(Default::default);
218
219    // check if ARROW:schema exists, and overwrite it
220    let schema_meta = meta
221        .iter()
222        .enumerate()
223        .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
224    match schema_meta {
225        Some((i, _)) => {
226            meta.remove(i);
227            meta.push(schema_kv);
228        }
229        None => {
230            meta.push(schema_kv);
231        }
232    }
233}
234
235/// Converter for Arrow schema to Parquet schema
236///
237/// See the documentation on the [`arrow`] module for background
238/// information on how Arrow schema is represented in Parquet.
239///
240/// [`arrow`]: crate::arrow
241///
242/// # Example:
243/// ```
244/// # use std::sync::Arc;
245/// # use arrow_schema::{Field, Schema, DataType};
246/// # use parquet::arrow::ArrowSchemaConverter;
247/// use parquet::schema::types::{SchemaDescriptor, Type};
248/// use parquet::basic; // note there are two `Type`s in the following example
249/// // create an Arrow Schema
250/// let arrow_schema = Schema::new(vec![
251///   Field::new("a", DataType::Int64, true),
252///   Field::new("b", DataType::Date32, true),
253/// ]);
254/// // convert the Arrow schema to a Parquet schema
255/// let parquet_schema = ArrowSchemaConverter::new()
256///   .convert(&arrow_schema)
257///   .unwrap();
258///
259/// let expected_parquet_schema = SchemaDescriptor::new(
260///   Arc::new(
261///     Type::group_type_builder("arrow_schema")
262///       .with_fields(vec![
263///         Arc::new(
264///          Type::primitive_type_builder("a", basic::Type::INT64)
265///           .build().unwrap()
266///         ),
267///         Arc::new(
268///          Type::primitive_type_builder("b", basic::Type::INT32)
269///           .with_converted_type(basic::ConvertedType::DATE)
270///           .with_logical_type(Some(basic::LogicalType::Date))
271///           .build().unwrap()
272///         ),
273///      ])
274///      .build().unwrap()
275///   )
276/// );
277/// assert_eq!(parquet_schema, expected_parquet_schema);
278/// ```
279#[derive(Debug)]
280pub struct ArrowSchemaConverter<'a> {
281    /// Name of the root schema in Parquet
282    schema_root: &'a str,
283    /// Should we coerce Arrow types to compatible Parquet types?
284    ///
285    /// See docs on [Self::with_coerce_types]`
286    coerce_types: bool,
287}
288
289impl Default for ArrowSchemaConverter<'_> {
290    fn default() -> Self {
291        Self::new()
292    }
293}
294
295impl<'a> ArrowSchemaConverter<'a> {
296    /// Create a new converter
297    pub fn new() -> Self {
298        Self {
299            schema_root: "arrow_schema",
300            coerce_types: false,
301        }
302    }
303
304    /// Should Arrow types be coerced into Parquet native types (default `false`).
305    ///
306    /// Setting this option to `true` will result in Parquet files that can be
307    /// read by more readers, but may lose precision for Arrow types such as
308    /// [`DataType::Date64`] which have no direct [corresponding Parquet type].
309    ///
310    /// By default, this converter does not coerce to native Parquet types. Enabling type
311    /// coercion allows for meaningful representations that do not require
312    /// downstream readers to consider the embedded Arrow schema, and can allow
313    /// for greater compatibility with other Parquet implementations. However,
314    /// type coercion also prevents data from being losslessly round-tripped.
315    ///
316    /// # Discussion
317    ///
318    /// Some Arrow types such as `Date64`, `Timestamp` and `Interval` have no
319    /// corresponding Parquet logical type. Thus, they can not be losslessly
320    /// round-tripped when stored using the appropriate Parquet logical type.
321    /// For example, some Date64 values may be truncated when stored with
322    /// parquet's native 32 bit date type.
323    ///
324    /// For [`List`] and [`Map`] types, some Parquet readers expect certain
325    /// schema elements to have specific names (earlier versions of the spec
326    /// were somewhat ambiguous on this point). Type coercion will use the names
327    /// prescribed by the Parquet specification, potentially losing naming
328    /// metadata from the Arrow schema.
329    ///
330    /// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
331    /// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
332    /// [corresponding Parquet type]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date
333    ///
334    pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
335        self.coerce_types = coerce_types;
336        self
337    }
338
339    /// Set the root schema element name (defaults to `"arrow_schema"`).
340    pub fn schema_root(mut self, schema_root: &'a str) -> Self {
341        self.schema_root = schema_root;
342        self
343    }
344
345    /// Convert the specified Arrow [`Schema`] to the desired Parquet [`SchemaDescriptor`]
346    ///
347    /// See example in [`ArrowSchemaConverter`]
348    pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
349        let fields = schema
350            .fields()
351            .iter()
352            .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
353            .collect::<Result<_>>()?;
354        let group = Type::group_type_builder(self.schema_root)
355            .with_fields(fields)
356            .build()?;
357        Ok(SchemaDescriptor::new(Arc::new(group)))
358    }
359}
360
361/// Convert arrow schema to parquet schema
362///
363/// The name of the root schema element defaults to `"arrow_schema"`, this can be
364/// overridden with [`ArrowSchemaConverter`]
365#[deprecated(since = "54.0.0", note = "Use `ArrowSchemaConverter` instead")]
366pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
367    ArrowSchemaConverter::new().convert(schema)
368}
369
370fn parse_key_value_metadata(
371    key_value_metadata: Option<&Vec<KeyValue>>,
372) -> Option<HashMap<String, String>> {
373    match key_value_metadata {
374        Some(key_values) => {
375            let map: HashMap<String, String> = key_values
376                .iter()
377                .filter_map(|kv| {
378                    kv.value
379                        .as_ref()
380                        .map(|value| (kv.key.clone(), value.clone()))
381                })
382                .collect();
383
384            if map.is_empty() {
385                None
386            } else {
387                Some(map)
388            }
389        }
390        None => None,
391    }
392}
393
394/// Convert parquet column schema to arrow field.
395pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
396    let field = complex::convert_type(&parquet_column.self_type_ptr())?;
397    let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
398
399    let basic_info = parquet_column.self_type().get_basic_info();
400    let mut meta = HashMap::with_capacity(if cfg!(feature = "arrow_canonical_extension_types") {
401        2
402    } else {
403        1
404    });
405    if basic_info.has_id() {
406        meta.insert(
407            PARQUET_FIELD_ID_META_KEY.to_string(),
408            basic_info.id().to_string(),
409        );
410    }
411    #[cfg(feature = "arrow_canonical_extension_types")]
412    if let Some(logical_type) = basic_info.logical_type() {
413        match logical_type {
414            LogicalType::Uuid => ret.try_with_extension_type(Uuid)?,
415            LogicalType::Json => ret.try_with_extension_type(Json::default())?,
416            _ => {}
417        }
418    }
419    if !meta.is_empty() {
420        ret.set_metadata(meta);
421    }
422
423    Ok(ret)
424}
425
426pub fn decimal_length_from_precision(precision: u8) -> usize {
427    // digits = floor(log_10(2^(8*n - 1) - 1))  // definition in parquet's logical types
428    // ceil(digits) = log10(2^(8*n - 1) - 1)
429    // 10^ceil(digits) = 2^(8*n - 1) - 1
430    // 10^ceil(digits) + 1 = 2^(8*n - 1)
431    // log2(10^ceil(digits) + 1) = (8*n - 1)
432    // log2(10^ceil(digits) + 1) + 1 = 8*n
433    // (log2(10^ceil(a) + 1) + 1) / 8 = n
434    (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
435}
436
437/// Convert an arrow field to a parquet `Type`
438fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
439    const PARQUET_LIST_ELEMENT_NAME: &str = "element";
440    const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
441    const PARQUET_KEY_FIELD_NAME: &str = "key";
442    const PARQUET_VALUE_FIELD_NAME: &str = "value";
443
444    let name = field.name().as_str();
445    let repetition = if field.is_nullable() {
446        Repetition::OPTIONAL
447    } else {
448        Repetition::REQUIRED
449    };
450    let id = field_id(field);
451    // create type from field
452    match field.data_type() {
453        DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
454            .with_logical_type(Some(LogicalType::Unknown))
455            .with_repetition(repetition)
456            .with_id(id)
457            .build(),
458        DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
459            .with_repetition(repetition)
460            .with_id(id)
461            .build(),
462        DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
463            .with_logical_type(Some(LogicalType::Integer {
464                bit_width: 8,
465                is_signed: true,
466            }))
467            .with_repetition(repetition)
468            .with_id(id)
469            .build(),
470        DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
471            .with_logical_type(Some(LogicalType::Integer {
472                bit_width: 16,
473                is_signed: true,
474            }))
475            .with_repetition(repetition)
476            .with_id(id)
477            .build(),
478        DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
479            .with_repetition(repetition)
480            .with_id(id)
481            .build(),
482        DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
483            .with_repetition(repetition)
484            .with_id(id)
485            .build(),
486        DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
487            .with_logical_type(Some(LogicalType::Integer {
488                bit_width: 8,
489                is_signed: false,
490            }))
491            .with_repetition(repetition)
492            .with_id(id)
493            .build(),
494        DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
495            .with_logical_type(Some(LogicalType::Integer {
496                bit_width: 16,
497                is_signed: false,
498            }))
499            .with_repetition(repetition)
500            .with_id(id)
501            .build(),
502        DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
503            .with_logical_type(Some(LogicalType::Integer {
504                bit_width: 32,
505                is_signed: false,
506            }))
507            .with_repetition(repetition)
508            .with_id(id)
509            .build(),
510        DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
511            .with_logical_type(Some(LogicalType::Integer {
512                bit_width: 64,
513                is_signed: false,
514            }))
515            .with_repetition(repetition)
516            .with_id(id)
517            .build(),
518        DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
519            .with_repetition(repetition)
520            .with_id(id)
521            .with_logical_type(Some(LogicalType::Float16))
522            .with_length(2)
523            .build(),
524        DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
525            .with_repetition(repetition)
526            .with_id(id)
527            .build(),
528        DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
529            .with_repetition(repetition)
530            .with_id(id)
531            .build(),
532        DataType::Timestamp(TimeUnit::Second, _) => {
533            // Cannot represent seconds in LogicalType
534            Type::primitive_type_builder(name, PhysicalType::INT64)
535                .with_repetition(repetition)
536                .with_id(id)
537                .build()
538        }
539        DataType::Timestamp(time_unit, tz) => {
540            Type::primitive_type_builder(name, PhysicalType::INT64)
541                .with_logical_type(Some(LogicalType::Timestamp {
542                    // If timezone set, values are normalized to UTC timezone
543                    is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
544                    unit: match time_unit {
545                        TimeUnit::Second => unreachable!(),
546                        TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
547                        TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
548                        TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
549                    },
550                }))
551                .with_repetition(repetition)
552                .with_id(id)
553                .build()
554        }
555        DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
556            .with_logical_type(Some(LogicalType::Date))
557            .with_repetition(repetition)
558            .with_id(id)
559            .build(),
560        DataType::Date64 => {
561            if coerce_types {
562                Type::primitive_type_builder(name, PhysicalType::INT32)
563                    .with_logical_type(Some(LogicalType::Date))
564                    .with_repetition(repetition)
565                    .with_id(id)
566                    .build()
567            } else {
568                Type::primitive_type_builder(name, PhysicalType::INT64)
569                    .with_repetition(repetition)
570                    .with_id(id)
571                    .build()
572            }
573        }
574        DataType::Time32(TimeUnit::Second) => {
575            // Cannot represent seconds in LogicalType
576            Type::primitive_type_builder(name, PhysicalType::INT32)
577                .with_repetition(repetition)
578                .with_id(id)
579                .build()
580        }
581        DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
582            .with_logical_type(Some(LogicalType::Time {
583                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
584                unit: match unit {
585                    TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
586                    u => unreachable!("Invalid unit for Time32: {:?}", u),
587                },
588            }))
589            .with_repetition(repetition)
590            .with_id(id)
591            .build(),
592        DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
593            .with_logical_type(Some(LogicalType::Time {
594                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
595                unit: match unit {
596                    TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
597                    TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
598                    u => unreachable!("Invalid unit for Time64: {:?}", u),
599                },
600            }))
601            .with_repetition(repetition)
602            .with_id(id)
603            .build(),
604        DataType::Duration(_) => Type::primitive_type_builder(name, PhysicalType::INT64)
605            .with_repetition(repetition)
606            .with_id(id)
607            .build(),
608        DataType::Interval(_) => {
609            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
610                .with_converted_type(ConvertedType::INTERVAL)
611                .with_repetition(repetition)
612                .with_id(id)
613                .with_length(12)
614                .build()
615        }
616        DataType::Binary | DataType::LargeBinary => {
617            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
618                .with_repetition(repetition)
619                .with_id(id)
620                .build()
621        }
622        DataType::FixedSizeBinary(length) => {
623            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
624                .with_repetition(repetition)
625                .with_id(id)
626                .with_length(*length)
627                .with_logical_type(
628                    #[cfg(feature = "arrow_canonical_extension_types")]
629                    // If set, map arrow uuid extension type to parquet uuid logical type.
630                    field
631                        .try_extension_type::<Uuid>()
632                        .ok()
633                        .map(|_| LogicalType::Uuid),
634                    #[cfg(not(feature = "arrow_canonical_extension_types"))]
635                    None,
636                )
637                .build()
638        }
639        DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
640            .with_repetition(repetition)
641            .with_id(id)
642            .build(),
643        DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => {
644            // Decimal precision determines the Parquet physical type to use.
645            // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
646            let (physical_type, length) = if *precision > 1 && *precision <= 9 {
647                (PhysicalType::INT32, -1)
648            } else if *precision <= 18 {
649                (PhysicalType::INT64, -1)
650            } else {
651                (
652                    PhysicalType::FIXED_LEN_BYTE_ARRAY,
653                    decimal_length_from_precision(*precision) as i32,
654                )
655            };
656            Type::primitive_type_builder(name, physical_type)
657                .with_repetition(repetition)
658                .with_id(id)
659                .with_length(length)
660                .with_logical_type(Some(LogicalType::Decimal {
661                    scale: *scale as i32,
662                    precision: *precision as i32,
663                }))
664                .with_precision(*precision as i32)
665                .with_scale(*scale as i32)
666                .build()
667        }
668        DataType::Utf8 | DataType::LargeUtf8 => {
669            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
670                .with_logical_type({
671                    #[cfg(feature = "arrow_canonical_extension_types")]
672                    {
673                        // Use the Json logical type if the canonical Json
674                        // extension type is set on this field.
675                        field
676                            .try_extension_type::<Json>()
677                            .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
678                    }
679                    #[cfg(not(feature = "arrow_canonical_extension_types"))]
680                    Some(LogicalType::String)
681                })
682                .with_repetition(repetition)
683                .with_id(id)
684                .build()
685        }
686        DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
687            .with_logical_type({
688                #[cfg(feature = "arrow_canonical_extension_types")]
689                {
690                    // Use the Json logical type if the canonical Json
691                    // extension type is set on this field.
692                    field
693                        .try_extension_type::<Json>()
694                        .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
695                }
696                #[cfg(not(feature = "arrow_canonical_extension_types"))]
697                Some(LogicalType::String)
698            })
699            .with_repetition(repetition)
700            .with_id(id)
701            .build(),
702        DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
703            let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
704                // Ensure proper naming per the Parquet specification
705                let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
706                Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
707            } else {
708                Arc::new(arrow_to_parquet_type(f, coerce_types)?)
709            };
710
711            Type::group_type_builder(name)
712                .with_fields(vec![Arc::new(
713                    Type::group_type_builder("list")
714                        .with_fields(vec![field_ref])
715                        .with_repetition(Repetition::REPEATED)
716                        .build()?,
717                )])
718                .with_logical_type(Some(LogicalType::List))
719                .with_repetition(repetition)
720                .with_id(id)
721                .build()
722        }
723        DataType::ListView(_) | DataType::LargeListView(_) => {
724            unimplemented!("ListView/LargeListView not implemented")
725        }
726        DataType::Struct(fields) => {
727            if fields.is_empty() {
728                return Err(arrow_err!("Parquet does not support writing empty structs",));
729            }
730            // recursively convert children to types/nodes
731            let fields = fields
732                .iter()
733                .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
734                .collect::<Result<_>>()?;
735            Type::group_type_builder(name)
736                .with_fields(fields)
737                .with_repetition(repetition)
738                .with_id(id)
739                .build()
740        }
741        DataType::Map(field, _) => {
742            if let DataType::Struct(struct_fields) = field.data_type() {
743                // If coercing then set inner struct name to "key_value"
744                let map_struct_name = if coerce_types {
745                    PARQUET_MAP_STRUCT_NAME
746                } else {
747                    field.name()
748                };
749
750                // If coercing then ensure struct fields are named "key" and "value"
751                let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
752                    if coerce_types && fld.name() != name {
753                        let f = fld.as_ref().clone().with_name(name);
754                        Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
755                    } else {
756                        Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
757                    }
758                };
759                let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
760                let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
761
762                Type::group_type_builder(name)
763                    .with_fields(vec![Arc::new(
764                        Type::group_type_builder(map_struct_name)
765                            .with_fields(vec![key_field, val_field])
766                            .with_repetition(Repetition::REPEATED)
767                            .build()?,
768                    )])
769                    .with_logical_type(Some(LogicalType::Map))
770                    .with_repetition(repetition)
771                    .with_id(id)
772                    .build()
773            } else {
774                Err(arrow_err!(
775                    "DataType::Map should contain a struct field child",
776                ))
777            }
778        }
779        DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
780        DataType::Dictionary(_, ref value) => {
781            // Dictionary encoding not handled at the schema level
782            let dict_field = field.clone().with_data_type(value.as_ref().clone());
783            arrow_to_parquet_type(&dict_field, coerce_types)
784        }
785        DataType::RunEndEncoded(_, _) => Err(arrow_err!(
786            "Converting RunEndEncodedType to parquet not supported",
787        )),
788    }
789}
790
791fn field_id(field: &Field) -> Option<i32> {
792    let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
793    value.parse().ok() // Fail quietly if not a valid integer
794}
795
796#[cfg(test)]
797mod tests {
798    use super::*;
799
800    use std::{collections::HashMap, sync::Arc};
801
802    use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
803
804    use crate::arrow::PARQUET_FIELD_ID_META_KEY;
805    use crate::file::metadata::KeyValue;
806    use crate::file::reader::FileReader;
807    use crate::{
808        arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
809        schema::{parser::parse_message_type, types::SchemaDescriptor},
810    };
811
812    #[test]
813    fn test_flat_primitives() {
814        let message_type = "
815        message test_schema {
816            REQUIRED BOOLEAN boolean;
817            REQUIRED INT32   int8  (INT_8);
818            REQUIRED INT32   int16 (INT_16);
819            REQUIRED INT32   uint8 (INTEGER(8,false));
820            REQUIRED INT32   uint16 (INTEGER(16,false));
821            REQUIRED INT32   int32;
822            REQUIRED INT64   int64;
823            OPTIONAL DOUBLE  double;
824            OPTIONAL FLOAT   float;
825            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
826            OPTIONAL BINARY  string (UTF8);
827            OPTIONAL BINARY  string_2 (STRING);
828            OPTIONAL BINARY  json (JSON);
829        }
830        ";
831        let parquet_group_type = parse_message_type(message_type).unwrap();
832
833        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
834        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
835
836        let arrow_fields = Fields::from(vec![
837            Field::new("boolean", DataType::Boolean, false),
838            Field::new("int8", DataType::Int8, false),
839            Field::new("int16", DataType::Int16, false),
840            Field::new("uint8", DataType::UInt8, false),
841            Field::new("uint16", DataType::UInt16, false),
842            Field::new("int32", DataType::Int32, false),
843            Field::new("int64", DataType::Int64, false),
844            Field::new("double", DataType::Float64, true),
845            Field::new("float", DataType::Float32, true),
846            Field::new("float16", DataType::Float16, true),
847            Field::new("string", DataType::Utf8, true),
848            Field::new("string_2", DataType::Utf8, true),
849            Field::new("json", DataType::Utf8, true),
850        ]);
851
852        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
853    }
854
855    #[test]
856    fn test_decimal_fields() {
857        let message_type = "
858        message test_schema {
859                    REQUIRED INT32 decimal1 (DECIMAL(4,2));
860                    REQUIRED INT64 decimal2 (DECIMAL(12,2));
861                    REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
862                    REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
863                    REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
864                    REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
865                    REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
866        }
867        ";
868
869        let parquet_group_type = parse_message_type(message_type).unwrap();
870
871        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
872        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
873
874        let arrow_fields = Fields::from(vec![
875            Field::new("decimal1", DataType::Decimal128(4, 2), false),
876            Field::new("decimal2", DataType::Decimal128(12, 2), false),
877            Field::new("decimal3", DataType::Decimal128(30, 2), false),
878            Field::new("decimal4", DataType::Decimal128(33, 2), false),
879            Field::new("decimal5", DataType::Decimal128(38, 2), false),
880            Field::new("decimal6", DataType::Decimal256(39, 2), false),
881            Field::new("decimal7", DataType::Decimal256(39, 2), false),
882        ]);
883        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
884    }
885
886    #[test]
887    fn test_byte_array_fields() {
888        let message_type = "
889        message test_schema {
890            REQUIRED BYTE_ARRAY binary;
891            REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
892        }
893        ";
894
895        let parquet_group_type = parse_message_type(message_type).unwrap();
896
897        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
898        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
899
900        let arrow_fields = Fields::from(vec![
901            Field::new("binary", DataType::Binary, false),
902            Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
903        ]);
904        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
905    }
906
907    #[test]
908    fn test_duplicate_fields() {
909        let message_type = "
910        message test_schema {
911            REQUIRED BOOLEAN boolean;
912            REQUIRED INT32 int8 (INT_8);
913        }
914        ";
915
916        let parquet_group_type = parse_message_type(message_type).unwrap();
917
918        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
919        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
920
921        let arrow_fields = Fields::from(vec![
922            Field::new("boolean", DataType::Boolean, false),
923            Field::new("int8", DataType::Int8, false),
924        ]);
925        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
926
927        let converted_arrow_schema =
928            parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
929                .unwrap();
930        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
931    }
932
933    #[test]
934    fn test_parquet_lists() {
935        let mut arrow_fields = Vec::new();
936
937        // LIST encoding example taken from parquet-format/LogicalTypes.md
938        let message_type = "
939        message test_schema {
940          REQUIRED GROUP my_list (LIST) {
941            REPEATED GROUP list {
942              OPTIONAL BINARY element (UTF8);
943            }
944          }
945          OPTIONAL GROUP my_list (LIST) {
946            REPEATED GROUP list {
947              REQUIRED BINARY element (UTF8);
948            }
949          }
950          OPTIONAL GROUP array_of_arrays (LIST) {
951            REPEATED GROUP list {
952              REQUIRED GROUP element (LIST) {
953                REPEATED GROUP list {
954                  REQUIRED INT32 element;
955                }
956              }
957            }
958          }
959          OPTIONAL GROUP my_list (LIST) {
960            REPEATED GROUP element {
961              REQUIRED BINARY str (UTF8);
962            }
963          }
964          OPTIONAL GROUP my_list (LIST) {
965            REPEATED INT32 element;
966          }
967          OPTIONAL GROUP my_list (LIST) {
968            REPEATED GROUP element {
969              REQUIRED BINARY str (UTF8);
970              REQUIRED INT32 num;
971            }
972          }
973          OPTIONAL GROUP my_list (LIST) {
974            REPEATED GROUP array {
975              REQUIRED BINARY str (UTF8);
976            }
977
978          }
979          OPTIONAL GROUP my_list (LIST) {
980            REPEATED GROUP my_list_tuple {
981              REQUIRED BINARY str (UTF8);
982            }
983          }
984          REPEATED INT32 name;
985        }
986        ";
987
988        // // List<String> (list non-null, elements nullable)
989        // required group my_list (LIST) {
990        //   repeated group list {
991        //     optional binary element (UTF8);
992        //   }
993        // }
994        {
995            arrow_fields.push(Field::new_list(
996                "my_list",
997                Field::new("element", DataType::Utf8, true),
998                false,
999            ));
1000        }
1001
1002        // // List<String> (list nullable, elements non-null)
1003        // optional group my_list (LIST) {
1004        //   repeated group list {
1005        //     required binary element (UTF8);
1006        //   }
1007        // }
1008        {
1009            arrow_fields.push(Field::new_list(
1010                "my_list",
1011                Field::new("element", DataType::Utf8, false),
1012                true,
1013            ));
1014        }
1015
1016        // Element types can be nested structures. For example, a list of lists:
1017        //
1018        // // List<List<Integer>>
1019        // optional group array_of_arrays (LIST) {
1020        //   repeated group list {
1021        //     required group element (LIST) {
1022        //       repeated group list {
1023        //         required int32 element;
1024        //       }
1025        //     }
1026        //   }
1027        // }
1028        {
1029            let arrow_inner_list = Field::new("element", DataType::Int32, false);
1030            arrow_fields.push(Field::new_list(
1031                "array_of_arrays",
1032                Field::new_list("element", arrow_inner_list, false),
1033                true,
1034            ));
1035        }
1036
1037        // // List<String> (list nullable, elements non-null)
1038        // optional group my_list (LIST) {
1039        //   repeated group element {
1040        //     required binary str (UTF8);
1041        //   };
1042        // }
1043        {
1044            arrow_fields.push(Field::new_list(
1045                "my_list",
1046                Field::new("str", DataType::Utf8, false),
1047                true,
1048            ));
1049        }
1050
1051        // // List<Integer> (nullable list, non-null elements)
1052        // optional group my_list (LIST) {
1053        //   repeated int32 element;
1054        // }
1055        {
1056            arrow_fields.push(Field::new_list(
1057                "my_list",
1058                Field::new("element", DataType::Int32, false),
1059                true,
1060            ));
1061        }
1062
1063        // // List<Tuple<String, Integer>> (nullable list, non-null elements)
1064        // optional group my_list (LIST) {
1065        //   repeated group element {
1066        //     required binary str (UTF8);
1067        //     required int32 num;
1068        //   };
1069        // }
1070        {
1071            let fields = vec![
1072                Field::new("str", DataType::Utf8, false),
1073                Field::new("num", DataType::Int32, false),
1074            ];
1075            arrow_fields.push(Field::new_list(
1076                "my_list",
1077                Field::new_struct("element", fields, false),
1078                true,
1079            ));
1080        }
1081
1082        // // List<OneTuple<String>> (nullable list, non-null elements)
1083        // optional group my_list (LIST) {
1084        //   repeated group array {
1085        //     required binary str (UTF8);
1086        //   };
1087        // }
1088        // Special case: group is named array
1089        {
1090            let fields = vec![Field::new("str", DataType::Utf8, false)];
1091            arrow_fields.push(Field::new_list(
1092                "my_list",
1093                Field::new_struct("array", fields, false),
1094                true,
1095            ));
1096        }
1097
1098        // // List<OneTuple<String>> (nullable list, non-null elements)
1099        // optional group my_list (LIST) {
1100        //   repeated group my_list_tuple {
1101        //     required binary str (UTF8);
1102        //   };
1103        // }
1104        // Special case: group named ends in _tuple
1105        {
1106            let fields = vec![Field::new("str", DataType::Utf8, false)];
1107            arrow_fields.push(Field::new_list(
1108                "my_list",
1109                Field::new_struct("my_list_tuple", fields, false),
1110                true,
1111            ));
1112        }
1113
1114        // One-level encoding: Only allows required lists with required cells
1115        //   repeated value_type name
1116        {
1117            arrow_fields.push(Field::new_list(
1118                "name",
1119                Field::new("name", DataType::Int32, false),
1120                false,
1121            ));
1122        }
1123
1124        let parquet_group_type = parse_message_type(message_type).unwrap();
1125
1126        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1127        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1128        let converted_fields = converted_arrow_schema.fields();
1129
1130        assert_eq!(arrow_fields.len(), converted_fields.len());
1131        for i in 0..arrow_fields.len() {
1132            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1133        }
1134    }
1135
1136    #[test]
1137    fn test_parquet_list_nullable() {
1138        let mut arrow_fields = Vec::new();
1139
1140        let message_type = "
1141        message test_schema {
1142          REQUIRED GROUP my_list1 (LIST) {
1143            REPEATED GROUP list {
1144              OPTIONAL BINARY element (UTF8);
1145            }
1146          }
1147          OPTIONAL GROUP my_list2 (LIST) {
1148            REPEATED GROUP list {
1149              REQUIRED BINARY element (UTF8);
1150            }
1151          }
1152          REQUIRED GROUP my_list3 (LIST) {
1153            REPEATED GROUP list {
1154              REQUIRED BINARY element (UTF8);
1155            }
1156          }
1157        }
1158        ";
1159
1160        // // List<String> (list non-null, elements nullable)
1161        // required group my_list1 (LIST) {
1162        //   repeated group list {
1163        //     optional binary element (UTF8);
1164        //   }
1165        // }
1166        {
1167            arrow_fields.push(Field::new_list(
1168                "my_list1",
1169                Field::new("element", DataType::Utf8, true),
1170                false,
1171            ));
1172        }
1173
1174        // // List<String> (list nullable, elements non-null)
1175        // optional group my_list2 (LIST) {
1176        //   repeated group list {
1177        //     required binary element (UTF8);
1178        //   }
1179        // }
1180        {
1181            arrow_fields.push(Field::new_list(
1182                "my_list2",
1183                Field::new("element", DataType::Utf8, false),
1184                true,
1185            ));
1186        }
1187
1188        // // List<String> (list non-null, elements non-null)
1189        // repeated group my_list3 (LIST) {
1190        //   repeated group list {
1191        //     required binary element (UTF8);
1192        //   }
1193        // }
1194        {
1195            arrow_fields.push(Field::new_list(
1196                "my_list3",
1197                Field::new("element", DataType::Utf8, false),
1198                false,
1199            ));
1200        }
1201
1202        let parquet_group_type = parse_message_type(message_type).unwrap();
1203
1204        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1205        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1206        let converted_fields = converted_arrow_schema.fields();
1207
1208        assert_eq!(arrow_fields.len(), converted_fields.len());
1209        for i in 0..arrow_fields.len() {
1210            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1211        }
1212    }
1213
1214    #[test]
1215    fn test_parquet_maps() {
1216        let mut arrow_fields = Vec::new();
1217
1218        // LIST encoding example taken from parquet-format/LogicalTypes.md
1219        let message_type = "
1220        message test_schema {
1221          REQUIRED group my_map1 (MAP) {
1222            REPEATED group key_value {
1223              REQUIRED binary key (UTF8);
1224              OPTIONAL int32 value;
1225            }
1226          }
1227          OPTIONAL group my_map2 (MAP) {
1228            REPEATED group map {
1229              REQUIRED binary str (UTF8);
1230              REQUIRED int32 num;
1231            }
1232          }
1233          OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1234            REPEATED group map {
1235              REQUIRED binary key (UTF8);
1236              OPTIONAL int32 value;
1237            }
1238          }
1239          REQUIRED group my_map4 (MAP) {
1240            REPEATED group map {
1241              OPTIONAL binary key (UTF8);
1242              REQUIRED int32 value;
1243            }
1244          }
1245        }
1246        ";
1247
1248        // // Map<String, Integer>
1249        // required group my_map (MAP) {
1250        //   repeated group key_value {
1251        //     required binary key (UTF8);
1252        //     optional int32 value;
1253        //   }
1254        // }
1255        {
1256            arrow_fields.push(Field::new_map(
1257                "my_map1",
1258                "key_value",
1259                Field::new("key", DataType::Utf8, false),
1260                Field::new("value", DataType::Int32, true),
1261                false,
1262                false,
1263            ));
1264        }
1265
1266        // // Map<String, Integer> (nullable map, non-null values)
1267        // optional group my_map (MAP) {
1268        //   repeated group map {
1269        //     required binary str (UTF8);
1270        //     required int32 num;
1271        //   }
1272        // }
1273        {
1274            arrow_fields.push(Field::new_map(
1275                "my_map2",
1276                "map",
1277                Field::new("str", DataType::Utf8, false),
1278                Field::new("num", DataType::Int32, false),
1279                false,
1280                true,
1281            ));
1282        }
1283
1284        // // Map<String, Integer> (nullable map, nullable values)
1285        // optional group my_map (MAP_KEY_VALUE) {
1286        //   repeated group map {
1287        //     required binary key (UTF8);
1288        //     optional int32 value;
1289        //   }
1290        // }
1291        {
1292            arrow_fields.push(Field::new_map(
1293                "my_map3",
1294                "map",
1295                Field::new("key", DataType::Utf8, false),
1296                Field::new("value", DataType::Int32, true),
1297                false,
1298                true,
1299            ));
1300        }
1301
1302        // // Map<String, Integer> (non-compliant nullable key)
1303        // group my_map (MAP_KEY_VALUE) {
1304        //   repeated group map {
1305        //     optional binary key (UTF8);
1306        //     required int32 value;
1307        //   }
1308        // }
1309        {
1310            arrow_fields.push(Field::new_map(
1311                "my_map4",
1312                "map",
1313                Field::new("key", DataType::Utf8, false), // The key is always non-nullable (#5630)
1314                Field::new("value", DataType::Int32, false),
1315                false,
1316                false,
1317            ));
1318        }
1319
1320        let parquet_group_type = parse_message_type(message_type).unwrap();
1321
1322        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1323        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1324        let converted_fields = converted_arrow_schema.fields();
1325
1326        assert_eq!(arrow_fields.len(), converted_fields.len());
1327        for i in 0..arrow_fields.len() {
1328            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1329        }
1330    }
1331
1332    #[test]
1333    fn test_nested_schema() {
1334        let mut arrow_fields = Vec::new();
1335        {
1336            let group1_fields = Fields::from(vec![
1337                Field::new("leaf1", DataType::Boolean, false),
1338                Field::new("leaf2", DataType::Int32, false),
1339            ]);
1340            let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1341            arrow_fields.push(group1_struct);
1342
1343            let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1344            arrow_fields.push(leaf3_field);
1345        }
1346
1347        let message_type = "
1348        message test_schema {
1349          REQUIRED GROUP group1 {
1350            REQUIRED BOOLEAN leaf1;
1351            REQUIRED INT32 leaf2;
1352          }
1353          REQUIRED INT64 leaf3;
1354        }
1355        ";
1356        let parquet_group_type = parse_message_type(message_type).unwrap();
1357
1358        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1359        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1360        let converted_fields = converted_arrow_schema.fields();
1361
1362        assert_eq!(arrow_fields.len(), converted_fields.len());
1363        for i in 0..arrow_fields.len() {
1364            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1365        }
1366    }
1367
1368    #[test]
1369    fn test_nested_schema_partial() {
1370        let mut arrow_fields = Vec::new();
1371        {
1372            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1373            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1374            arrow_fields.push(group1);
1375
1376            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1377            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1378            arrow_fields.push(group2);
1379
1380            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1381        }
1382
1383        let message_type = "
1384        message test_schema {
1385          REQUIRED GROUP group1 {
1386            REQUIRED INT64 leaf1;
1387            REQUIRED INT64 leaf2;
1388          }
1389          REQUIRED  GROUP group2 {
1390            REQUIRED INT64 leaf3;
1391            REQUIRED INT64 leaf4;
1392          }
1393          REQUIRED INT64 leaf5;
1394        }
1395        ";
1396        let parquet_group_type = parse_message_type(message_type).unwrap();
1397
1398        // Expected partial arrow schema (columns 0, 3, 4):
1399        // required group group1 {
1400        //   required int64 leaf1;
1401        // }
1402        // required group group2 {
1403        //   required int64 leaf4;
1404        // }
1405        // required int64 leaf5;
1406
1407        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1408        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1409        let converted_arrow_schema =
1410            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1411        let converted_fields = converted_arrow_schema.fields();
1412
1413        assert_eq!(arrow_fields.len(), converted_fields.len());
1414        for i in 0..arrow_fields.len() {
1415            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1416        }
1417    }
1418
1419    #[test]
1420    fn test_nested_schema_partial_ordering() {
1421        let mut arrow_fields = Vec::new();
1422        {
1423            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1424            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1425            arrow_fields.push(group1);
1426
1427            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1428            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1429            arrow_fields.push(group2);
1430
1431            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1432        }
1433
1434        let message_type = "
1435        message test_schema {
1436          REQUIRED GROUP group1 {
1437            REQUIRED INT64 leaf1;
1438            REQUIRED INT64 leaf2;
1439          }
1440          REQUIRED  GROUP group2 {
1441            REQUIRED INT64 leaf3;
1442            REQUIRED INT64 leaf4;
1443          }
1444          REQUIRED INT64 leaf5;
1445        }
1446        ";
1447        let parquet_group_type = parse_message_type(message_type).unwrap();
1448
1449        // Expected partial arrow schema (columns 3, 4, 0):
1450        // required group group1 {
1451        //   required int64 leaf1;
1452        // }
1453        // required group group2 {
1454        //   required int64 leaf4;
1455        // }
1456        // required int64 leaf5;
1457
1458        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1459        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1460        let converted_arrow_schema =
1461            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1462        let converted_fields = converted_arrow_schema.fields();
1463
1464        assert_eq!(arrow_fields.len(), converted_fields.len());
1465        for i in 0..arrow_fields.len() {
1466            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1467        }
1468
1469        let mask =
1470            ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1471        let converted_arrow_schema =
1472            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1473        let converted_fields = converted_arrow_schema.fields();
1474
1475        assert_eq!(arrow_fields.len(), converted_fields.len());
1476        for i in 0..arrow_fields.len() {
1477            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1478        }
1479    }
1480
1481    #[test]
1482    fn test_repeated_nested_schema() {
1483        let mut arrow_fields = Vec::new();
1484        {
1485            arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1486
1487            let inner_group_list = Field::new_list(
1488                "innerGroup",
1489                Field::new_struct(
1490                    "innerGroup",
1491                    vec![Field::new("leaf3", DataType::Int32, true)],
1492                    false,
1493                ),
1494                false,
1495            );
1496
1497            let outer_group_list = Field::new_list(
1498                "outerGroup",
1499                Field::new_struct(
1500                    "outerGroup",
1501                    vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1502                    false,
1503                ),
1504                false,
1505            );
1506            arrow_fields.push(outer_group_list);
1507        }
1508
1509        let message_type = "
1510        message test_schema {
1511          OPTIONAL INT32 leaf1;
1512          REPEATED GROUP outerGroup {
1513            OPTIONAL INT32 leaf2;
1514            REPEATED GROUP innerGroup {
1515              OPTIONAL INT32 leaf3;
1516            }
1517          }
1518        }
1519        ";
1520        let parquet_group_type = parse_message_type(message_type).unwrap();
1521
1522        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1523        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1524        let converted_fields = converted_arrow_schema.fields();
1525
1526        assert_eq!(arrow_fields.len(), converted_fields.len());
1527        for i in 0..arrow_fields.len() {
1528            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1529        }
1530    }
1531
1532    #[test]
1533    fn test_column_desc_to_field() {
1534        let message_type = "
1535        message test_schema {
1536            REQUIRED BOOLEAN boolean;
1537            REQUIRED INT32   int8  (INT_8);
1538            REQUIRED INT32   uint8 (INTEGER(8,false));
1539            REQUIRED INT32   int16 (INT_16);
1540            REQUIRED INT32   uint16 (INTEGER(16,false));
1541            REQUIRED INT32   int32;
1542            REQUIRED INT64   int64;
1543            OPTIONAL DOUBLE  double;
1544            OPTIONAL FLOAT   float;
1545            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1546            OPTIONAL BINARY  string (UTF8);
1547            REPEATED BOOLEAN bools;
1548            OPTIONAL INT32   date       (DATE);
1549            OPTIONAL INT32   time_milli (TIME_MILLIS);
1550            OPTIONAL INT64   time_micro (TIME_MICROS);
1551            OPTIONAL INT64   time_nano (TIME(NANOS,false));
1552            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1553            REQUIRED INT64   ts_micro (TIMESTAMP_MICROS);
1554            REQUIRED INT64   ts_nano (TIMESTAMP(NANOS,true));
1555            REPEATED INT32   int_list;
1556            REPEATED BINARY  byte_list;
1557            REPEATED BINARY  string_list (UTF8);
1558            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1559            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1560            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1561        }
1562        ";
1563        let parquet_group_type = parse_message_type(message_type).unwrap();
1564
1565        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1566        let converted_arrow_fields = parquet_schema
1567            .columns()
1568            .iter()
1569            .map(|c| parquet_to_arrow_field(c).unwrap())
1570            .collect::<Vec<Field>>();
1571
1572        let arrow_fields = vec![
1573            Field::new("boolean", DataType::Boolean, false),
1574            Field::new("int8", DataType::Int8, false),
1575            Field::new("uint8", DataType::UInt8, false),
1576            Field::new("int16", DataType::Int16, false),
1577            Field::new("uint16", DataType::UInt16, false),
1578            Field::new("int32", DataType::Int32, false),
1579            Field::new("int64", DataType::Int64, false),
1580            Field::new("double", DataType::Float64, true),
1581            Field::new("float", DataType::Float32, true),
1582            Field::new("float16", DataType::Float16, true),
1583            Field::new("string", DataType::Utf8, true),
1584            Field::new_list(
1585                "bools",
1586                Field::new("bools", DataType::Boolean, false),
1587                false,
1588            ),
1589            Field::new("date", DataType::Date32, true),
1590            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1591            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1592            Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1593            Field::new(
1594                "ts_milli",
1595                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1596                true,
1597            ),
1598            Field::new(
1599                "ts_micro",
1600                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1601                false,
1602            ),
1603            Field::new(
1604                "ts_nano",
1605                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1606                false,
1607            ),
1608            Field::new_list(
1609                "int_list",
1610                Field::new("int_list", DataType::Int32, false),
1611                false,
1612            ),
1613            Field::new_list(
1614                "byte_list",
1615                Field::new("byte_list", DataType::Binary, false),
1616                false,
1617            ),
1618            Field::new_list(
1619                "string_list",
1620                Field::new("string_list", DataType::Utf8, false),
1621                false,
1622            ),
1623            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1624            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1625            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1626        ];
1627
1628        assert_eq!(arrow_fields, converted_arrow_fields);
1629    }
1630
1631    #[test]
1632    fn test_coerced_map_list() {
1633        // Create Arrow schema with non-Parquet naming
1634        let arrow_fields = vec![
1635            Field::new_list(
1636                "my_list",
1637                Field::new("item", DataType::Boolean, true),
1638                false,
1639            ),
1640            Field::new_map(
1641                "my_map",
1642                "entries",
1643                Field::new("keys", DataType::Utf8, false),
1644                Field::new("values", DataType::Int32, true),
1645                false,
1646                true,
1647            ),
1648        ];
1649        let arrow_schema = Schema::new(arrow_fields);
1650
1651        // Create Parquet schema with coerced names
1652        let message_type = "
1653        message parquet_schema {
1654            REQUIRED GROUP my_list (LIST) {
1655                REPEATED GROUP list {
1656                    OPTIONAL BOOLEAN element;
1657                }
1658            }
1659            OPTIONAL GROUP my_map (MAP) {
1660                REPEATED GROUP key_value {
1661                    REQUIRED BINARY key (STRING);
1662                    OPTIONAL INT32 value;
1663                }
1664            }
1665        }
1666        ";
1667        let parquet_group_type = parse_message_type(message_type).unwrap();
1668        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1669        let converted_arrow_schema = ArrowSchemaConverter::new()
1670            .with_coerce_types(true)
1671            .convert(&arrow_schema)
1672            .unwrap();
1673        assert_eq!(
1674            parquet_schema.columns().len(),
1675            converted_arrow_schema.columns().len()
1676        );
1677
1678        // Create Parquet schema without coerced names
1679        let message_type = "
1680        message parquet_schema {
1681            REQUIRED GROUP my_list (LIST) {
1682                REPEATED GROUP list {
1683                    OPTIONAL BOOLEAN item;
1684                }
1685            }
1686            OPTIONAL GROUP my_map (MAP) {
1687                REPEATED GROUP entries {
1688                    REQUIRED BINARY keys (STRING);
1689                    OPTIONAL INT32 values;
1690                }
1691            }
1692        }
1693        ";
1694        let parquet_group_type = parse_message_type(message_type).unwrap();
1695        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1696        let converted_arrow_schema = ArrowSchemaConverter::new()
1697            .with_coerce_types(false)
1698            .convert(&arrow_schema)
1699            .unwrap();
1700        assert_eq!(
1701            parquet_schema.columns().len(),
1702            converted_arrow_schema.columns().len()
1703        );
1704    }
1705
1706    #[test]
1707    fn test_field_to_column_desc() {
1708        let message_type = "
1709        message arrow_schema {
1710            REQUIRED BOOLEAN boolean;
1711            REQUIRED INT32   int8  (INT_8);
1712            REQUIRED INT32   int16 (INTEGER(16,true));
1713            REQUIRED INT32   int32;
1714            REQUIRED INT64   int64;
1715            OPTIONAL DOUBLE  double;
1716            OPTIONAL FLOAT   float;
1717            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1718            OPTIONAL BINARY  string (STRING);
1719            OPTIONAL GROUP   bools (LIST) {
1720                REPEATED GROUP list {
1721                    OPTIONAL BOOLEAN element;
1722                }
1723            }
1724            REQUIRED GROUP   bools_non_null (LIST) {
1725                REPEATED GROUP list {
1726                    REQUIRED BOOLEAN element;
1727                }
1728            }
1729            OPTIONAL INT32   date       (DATE);
1730            OPTIONAL INT32   time_milli (TIME(MILLIS,false));
1731            OPTIONAL INT32   time_milli_utc (TIME(MILLIS,true));
1732            OPTIONAL INT64   time_micro (TIME_MICROS);
1733            OPTIONAL INT64   time_micro_utc (TIME(MICROS, true));
1734            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1735            REQUIRED INT64   ts_micro (TIMESTAMP(MICROS,false));
1736            REQUIRED INT64   ts_seconds;
1737            REQUIRED INT64   ts_micro_utc (TIMESTAMP(MICROS, true));
1738            REQUIRED INT64   ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1739            REQUIRED INT64   ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1740            REQUIRED INT64   ts_micro_non_utc (TIMESTAMP(MICROS, true));
1741            REQUIRED GROUP struct {
1742                REQUIRED BOOLEAN bools;
1743                REQUIRED INT32 uint32 (INTEGER(32,false));
1744                REQUIRED GROUP   int32 (LIST) {
1745                    REPEATED GROUP list {
1746                        OPTIONAL INT32 element;
1747                    }
1748                }
1749            }
1750            REQUIRED BINARY  dictionary_strings (STRING);
1751            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1752            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1753            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1754            REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1755            REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1756        }
1757        ";
1758        let parquet_group_type = parse_message_type(message_type).unwrap();
1759
1760        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1761
1762        let arrow_fields = vec![
1763            Field::new("boolean", DataType::Boolean, false),
1764            Field::new("int8", DataType::Int8, false),
1765            Field::new("int16", DataType::Int16, false),
1766            Field::new("int32", DataType::Int32, false),
1767            Field::new("int64", DataType::Int64, false),
1768            Field::new("double", DataType::Float64, true),
1769            Field::new("float", DataType::Float32, true),
1770            Field::new("float16", DataType::Float16, true),
1771            Field::new("string", DataType::Utf8, true),
1772            Field::new_list(
1773                "bools",
1774                Field::new("element", DataType::Boolean, true),
1775                true,
1776            ),
1777            Field::new_list(
1778                "bools_non_null",
1779                Field::new("element", DataType::Boolean, false),
1780                false,
1781            ),
1782            Field::new("date", DataType::Date32, true),
1783            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1784            Field::new(
1785                "time_milli_utc",
1786                DataType::Time32(TimeUnit::Millisecond),
1787                true,
1788            )
1789            .with_metadata(HashMap::from_iter(vec![(
1790                "adjusted_to_utc".to_string(),
1791                "".to_string(),
1792            )])),
1793            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1794            Field::new(
1795                "time_micro_utc",
1796                DataType::Time64(TimeUnit::Microsecond),
1797                true,
1798            )
1799            .with_metadata(HashMap::from_iter(vec![(
1800                "adjusted_to_utc".to_string(),
1801                "".to_string(),
1802            )])),
1803            Field::new(
1804                "ts_milli",
1805                DataType::Timestamp(TimeUnit::Millisecond, None),
1806                true,
1807            ),
1808            Field::new(
1809                "ts_micro",
1810                DataType::Timestamp(TimeUnit::Microsecond, None),
1811                false,
1812            ),
1813            Field::new(
1814                "ts_seconds",
1815                DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1816                false,
1817            ),
1818            Field::new(
1819                "ts_micro_utc",
1820                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1821                false,
1822            ),
1823            Field::new(
1824                "ts_millis_zero_offset",
1825                DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1826                false,
1827            ),
1828            Field::new(
1829                "ts_millis_zero_negative_offset",
1830                DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1831                false,
1832            ),
1833            Field::new(
1834                "ts_micro_non_utc",
1835                DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1836                false,
1837            ),
1838            Field::new_struct(
1839                "struct",
1840                vec![
1841                    Field::new("bools", DataType::Boolean, false),
1842                    Field::new("uint32", DataType::UInt32, false),
1843                    Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1844                ],
1845                false,
1846            ),
1847            Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1848            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1849            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1850            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1851            Field::new("decimal128", DataType::Decimal128(38, 2), false),
1852            Field::new("decimal256", DataType::Decimal256(39, 2), false),
1853        ];
1854        let arrow_schema = Schema::new(arrow_fields);
1855        let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1856
1857        assert_eq!(
1858            parquet_schema.columns().len(),
1859            converted_arrow_schema.columns().len()
1860        );
1861        parquet_schema
1862            .columns()
1863            .iter()
1864            .zip(converted_arrow_schema.columns())
1865            .for_each(|(a, b)| {
1866                // Only check logical type if it's set on the Parquet side.
1867                // This is because the Arrow conversion always sets logical type,
1868                // even if there wasn't originally one.
1869                // This is not an issue, but is an inconvenience for this test.
1870                match a.logical_type() {
1871                    Some(_) => {
1872                        assert_eq!(a, b)
1873                    }
1874                    None => {
1875                        assert_eq!(a.name(), b.name());
1876                        assert_eq!(a.physical_type(), b.physical_type());
1877                        assert_eq!(a.converted_type(), b.converted_type());
1878                    }
1879                };
1880            });
1881    }
1882
1883    #[test]
1884    #[should_panic(expected = "Parquet does not support writing empty structs")]
1885    fn test_empty_struct_field() {
1886        let arrow_fields = vec![Field::new(
1887            "struct",
1888            DataType::Struct(Fields::empty()),
1889            false,
1890        )];
1891        let arrow_schema = Schema::new(arrow_fields);
1892        let converted_arrow_schema = ArrowSchemaConverter::new()
1893            .with_coerce_types(true)
1894            .convert(&arrow_schema);
1895
1896        converted_arrow_schema.unwrap();
1897    }
1898
1899    #[test]
1900    fn test_metadata() {
1901        let message_type = "
1902        message test_schema {
1903            OPTIONAL BINARY  string (STRING);
1904        }
1905        ";
1906        let parquet_group_type = parse_message_type(message_type).unwrap();
1907
1908        let key_value_metadata = vec![
1909            KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
1910            KeyValue::new("baz".to_owned(), None),
1911        ];
1912
1913        let mut expected_metadata: HashMap<String, String> = HashMap::new();
1914        expected_metadata.insert("foo".to_owned(), "bar".to_owned());
1915
1916        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1917        let converted_arrow_schema =
1918            parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
1919
1920        assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
1921    }
1922
1923    #[test]
1924    fn test_arrow_schema_roundtrip() -> Result<()> {
1925        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
1926            a.iter()
1927                .map(|(a, b)| (a.to_string(), b.to_string()))
1928                .collect()
1929        };
1930
1931        let schema = Schema::new_with_metadata(
1932            vec![
1933                Field::new("c1", DataType::Utf8, false)
1934                    .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
1935                Field::new("c2", DataType::Binary, false),
1936                Field::new("c3", DataType::FixedSizeBinary(3), false),
1937                Field::new("c4", DataType::Boolean, false),
1938                Field::new("c5", DataType::Date32, false),
1939                Field::new("c6", DataType::Date64, false),
1940                Field::new("c7", DataType::Time32(TimeUnit::Second), false),
1941                Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
1942                Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
1943                Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
1944                Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
1945                Field::new(
1946                    "c16",
1947                    DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1948                    false,
1949                ),
1950                Field::new(
1951                    "c17",
1952                    DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1953                    false,
1954                ),
1955                Field::new(
1956                    "c18",
1957                    DataType::Timestamp(TimeUnit::Nanosecond, None),
1958                    false,
1959                ),
1960                Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
1961                Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
1962                Field::new_list(
1963                    "c21",
1964                    Field::new_list_field(DataType::Boolean, true)
1965                        .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
1966                    false,
1967                )
1968                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
1969                Field::new(
1970                    "c22",
1971                    DataType::FixedSizeList(
1972                        Arc::new(Field::new_list_field(DataType::Boolean, true)),
1973                        5,
1974                    ),
1975                    false,
1976                ),
1977                Field::new_list(
1978                    "c23",
1979                    Field::new_large_list(
1980                        "inner",
1981                        Field::new_list_field(
1982                            DataType::Struct(
1983                                vec![
1984                                    Field::new("a", DataType::Int16, true),
1985                                    Field::new("b", DataType::Float64, false),
1986                                    Field::new("c", DataType::Float32, false),
1987                                    Field::new("d", DataType::Float16, false),
1988                                ]
1989                                .into(),
1990                            ),
1991                            false,
1992                        ),
1993                        true,
1994                    ),
1995                    false,
1996                ),
1997                Field::new(
1998                    "c24",
1999                    DataType::Struct(Fields::from(vec![
2000                        Field::new("a", DataType::Utf8, false),
2001                        Field::new("b", DataType::UInt16, false),
2002                    ])),
2003                    false,
2004                ),
2005                Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
2006                Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
2007                // Duration types not supported
2008                // Field::new("c27", DataType::Duration(TimeUnit::Second), false),
2009                // Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false),
2010                // Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false),
2011                // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false),
2012                #[allow(deprecated)]
2013                Field::new_dict(
2014                    "c31",
2015                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2016                    true,
2017                    123,
2018                    true,
2019                )
2020                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
2021                Field::new("c32", DataType::LargeBinary, true),
2022                Field::new("c33", DataType::LargeUtf8, true),
2023                Field::new_large_list(
2024                    "c34",
2025                    Field::new_list(
2026                        "inner",
2027                        Field::new_list_field(
2028                            DataType::Struct(
2029                                vec![
2030                                    Field::new("a", DataType::Int16, true),
2031                                    Field::new("b", DataType::Float64, true),
2032                                ]
2033                                .into(),
2034                            ),
2035                            true,
2036                        ),
2037                        true,
2038                    ),
2039                    true,
2040                ),
2041                Field::new("c35", DataType::Null, true),
2042                Field::new("c36", DataType::Decimal128(2, 1), false),
2043                Field::new("c37", DataType::Decimal256(50, 20), false),
2044                Field::new("c38", DataType::Decimal128(18, 12), true),
2045                Field::new_map(
2046                    "c39",
2047                    "key_value",
2048                    Field::new("key", DataType::Utf8, false),
2049                    Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2050                    false, // fails to roundtrip keys_sorted
2051                    true,
2052                ),
2053                Field::new_map(
2054                    "c40",
2055                    "my_entries",
2056                    Field::new("my_key", DataType::Utf8, false)
2057                        .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2058                    Field::new_list(
2059                        "my_value",
2060                        Field::new_list_field(DataType::Utf8, true)
2061                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2062                        true,
2063                    )
2064                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2065                    false, // fails to roundtrip keys_sorted
2066                    true,
2067                )
2068                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2069                Field::new_map(
2070                    "c41",
2071                    "my_entries",
2072                    Field::new("my_key", DataType::Utf8, false),
2073                    Field::new_list(
2074                        "my_value",
2075                        Field::new_list_field(DataType::Utf8, true)
2076                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2077                        true,
2078                    ),
2079                    false, // fails to roundtrip keys_sorted
2080                    false,
2081                ),
2082            ],
2083            meta(&[("Key", "Value")]),
2084        );
2085
2086        // write to an empty parquet file so that schema is serialized
2087        let file = tempfile::tempfile().unwrap();
2088        let writer =
2089            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2090        writer.close()?;
2091
2092        // read file back
2093        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2094
2095        // Check arrow schema
2096        let read_schema = arrow_reader.schema();
2097        assert_eq!(&schema, read_schema.as_ref());
2098
2099        // Walk schema finding field IDs
2100        let mut stack = Vec::with_capacity(10);
2101        let mut out = Vec::with_capacity(10);
2102
2103        let root = arrow_reader.parquet_schema().root_schema_ptr();
2104        stack.push((root.name().to_string(), root));
2105
2106        while let Some((p, t)) = stack.pop() {
2107            if t.is_group() {
2108                for f in t.get_fields() {
2109                    stack.push((format!("{p}.{}", f.name()), f.clone()))
2110                }
2111            }
2112
2113            let info = t.get_basic_info();
2114            if info.has_id() {
2115                out.push(format!("{p} -> {}", info.id()))
2116            }
2117        }
2118        out.sort_unstable();
2119        let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2120
2121        assert_eq!(
2122            &out,
2123            &[
2124                "arrow_schema.c1 -> 2",
2125                "arrow_schema.c21 -> 4",
2126                "arrow_schema.c21.list.item -> 5",
2127                "arrow_schema.c31 -> 6",
2128                "arrow_schema.c40 -> 7",
2129                "arrow_schema.c40.my_entries.my_key -> 8",
2130                "arrow_schema.c40.my_entries.my_value -> 9",
2131                "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2132                "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2133            ]
2134        );
2135
2136        Ok(())
2137    }
2138
2139    #[test]
2140    fn test_read_parquet_field_ids_raw() -> Result<()> {
2141        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2142            a.iter()
2143                .map(|(a, b)| (a.to_string(), b.to_string()))
2144                .collect()
2145        };
2146        let schema = Schema::new_with_metadata(
2147            vec![
2148                Field::new("c1", DataType::Utf8, true)
2149                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2150                Field::new("c2", DataType::Utf8, true)
2151                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2152            ],
2153            HashMap::new(),
2154        );
2155
2156        let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2157        let parquet_bytes = writer.into_inner()?;
2158
2159        let reader =
2160            crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2161        let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2162
2163        // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema
2164        let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2165
2166        let parq_schema_descr = ArrowSchemaConverter::new()
2167            .with_coerce_types(true)
2168            .convert(&arrow_schema)?;
2169        let parq_fields = parq_schema_descr.root_schema().get_fields();
2170        assert_eq!(parq_fields.len(), 2);
2171        assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2172        assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2173
2174        Ok(())
2175    }
2176
2177    #[test]
2178    fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2179        let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2180            .iter()
2181            .cloned()
2182            .collect();
2183
2184        let schema = Schema::new_with_metadata(
2185            vec![
2186                Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2187                Field::new(
2188                    "c22",
2189                    DataType::FixedSizeList(
2190                        Arc::new(Field::new("items", DataType::Boolean, false)),
2191                        5,
2192                    ),
2193                    false,
2194                ),
2195                Field::new_list(
2196                    "c23",
2197                    Field::new_large_list(
2198                        "items",
2199                        Field::new_struct(
2200                            "items",
2201                            vec![
2202                                Field::new("a", DataType::Int16, true),
2203                                Field::new("b", DataType::Float64, false),
2204                            ],
2205                            true,
2206                        ),
2207                        true,
2208                    ),
2209                    true,
2210                ),
2211            ],
2212            metadata,
2213        );
2214
2215        // write to an empty parquet file so that schema is serialized
2216        let file = tempfile::tempfile().unwrap();
2217        let writer =
2218            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2219        writer.close()?;
2220
2221        // read file back
2222        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2223        let read_schema = arrow_reader.schema();
2224        assert_eq!(&schema, read_schema.as_ref());
2225        Ok(())
2226    }
2227
2228    #[test]
2229    fn test_get_arrow_schema_from_metadata() {
2230        assert!(get_arrow_schema_from_metadata("").is_err());
2231    }
2232
2233    #[test]
2234    #[cfg(feature = "arrow_canonical_extension_types")]
2235    fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2236        let arrow_schema = Schema::new(vec![Field::new(
2237            "uuid",
2238            DataType::FixedSizeBinary(16),
2239            false,
2240        )
2241        .with_extension_type(Uuid)]);
2242
2243        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2244
2245        assert_eq!(
2246            parquet_schema.column(0).logical_type(),
2247            Some(LogicalType::Uuid)
2248        );
2249
2250        // TODO: roundtrip
2251        // let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2252        // assert_eq!(arrow_schema.field(0).try_extension_type::<Uuid>()?, Uuid);
2253
2254        Ok(())
2255    }
2256
2257    #[test]
2258    #[cfg(feature = "arrow_canonical_extension_types")]
2259    fn arrow_json_to_parquet_json() -> Result<()> {
2260        let arrow_schema = Schema::new(vec![
2261            Field::new("json", DataType::Utf8, false).with_extension_type(Json::default())
2262        ]);
2263
2264        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2265
2266        assert_eq!(
2267            parquet_schema.column(0).logical_type(),
2268            Some(LogicalType::Json)
2269        );
2270
2271        // TODO: roundtrip
2272        // https://github.com/apache/arrow-rs/issues/7063
2273        // let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2274        // assert_eq!(
2275        //     arrow_schema.field(0).try_extension_type::<Json>()?,
2276        //     Json::default()
2277        // );
2278
2279        Ok(())
2280    }
2281}