parquet/arrow/schema/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Converting Parquet schema <--> Arrow schema: [`ArrowSchemaConverter`] and [parquet_to_arrow_schema]
19
20use base64::prelude::BASE64_STANDARD;
21use base64::Engine;
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use arrow_ipc::writer;
26#[cfg(feature = "arrow_canonical_extension_types")]
27use arrow_schema::extension::{Json, Uuid};
28use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
29
30use crate::basic::{
31    ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType,
32};
33use crate::errors::{ParquetError, Result};
34use crate::file::{metadata::KeyValue, properties::WriterProperties};
35use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
36
37mod complex;
38mod primitive;
39
40use crate::arrow::ProjectionMask;
41pub(crate) use complex::{ParquetField, ParquetFieldType};
42
43use super::PARQUET_FIELD_ID_META_KEY;
44
45/// Convert Parquet schema to Arrow schema including optional metadata
46///
47/// Attempts to decode any existing Arrow schema metadata, falling back
48/// to converting the Parquet schema column-wise
49pub fn parquet_to_arrow_schema(
50    parquet_schema: &SchemaDescriptor,
51    key_value_metadata: Option<&Vec<KeyValue>>,
52) -> Result<Schema> {
53    parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata)
54}
55
56/// Convert parquet schema to arrow schema including optional metadata,
57/// only preserving some leaf columns.
58pub fn parquet_to_arrow_schema_by_columns(
59    parquet_schema: &SchemaDescriptor,
60    mask: ProjectionMask,
61    key_value_metadata: Option<&Vec<KeyValue>>,
62) -> Result<Schema> {
63    Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, key_value_metadata)?.0)
64}
65
66/// Extracts the arrow metadata
67pub(crate) fn parquet_to_arrow_schema_and_fields(
68    parquet_schema: &SchemaDescriptor,
69    mask: ProjectionMask,
70    key_value_metadata: Option<&Vec<KeyValue>>,
71) -> Result<(Schema, Option<ParquetField>)> {
72    let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
73    let maybe_schema = metadata
74        .remove(super::ARROW_SCHEMA_META_KEY)
75        .map(|value| get_arrow_schema_from_metadata(&value))
76        .transpose()?;
77
78    // Add the Arrow metadata to the Parquet metadata skipping keys that collide
79    if let Some(arrow_schema) = &maybe_schema {
80        arrow_schema.metadata().iter().for_each(|(k, v)| {
81            metadata.entry(k.clone()).or_insert_with(|| v.clone());
82        });
83    }
84
85    let hint = maybe_schema.as_ref().map(|s| s.fields());
86    let field_levels = parquet_to_arrow_field_levels(parquet_schema, mask, hint)?;
87    let schema = Schema::new_with_metadata(field_levels.fields, metadata);
88    Ok((schema, field_levels.levels))
89}
90
91/// Schema information necessary to decode a parquet file as arrow [`Fields`]
92///
93/// In particular this stores the dremel-level information necessary to correctly
94/// interpret the encoded definition and repetition levels
95///
96/// Note: this is an opaque container intended to be used with lower-level APIs
97/// within this crate
98#[derive(Debug, Clone)]
99pub struct FieldLevels {
100    pub(crate) fields: Fields,
101    pub(crate) levels: Option<ParquetField>,
102}
103
104/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`]
105///
106/// Columns not included within [`ProjectionMask`] will be ignored.
107///
108/// The optional `hint` parameter is the desired Arrow schema. See the
109/// [`arrow`] module documentation for more information.
110///
111/// [`arrow`]: crate::arrow
112///
113/// # Notes:
114/// Where a field type in `hint` is compatible with the corresponding parquet type in `schema`, it
115/// will be used, otherwise the default arrow type for the given parquet column type will be used.
116///
117/// This is to accommodate arrow types that cannot be round-tripped through parquet natively.
118/// Depending on the parquet writer, this can lead to a mismatch between a file's parquet schema
119/// and its embedded arrow schema. The parquet `schema` must be treated as authoritative in such
120/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for more information
121///
122/// Note: this is a low-level API, most users will want to make use of the higher-level
123/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file.
124pub fn parquet_to_arrow_field_levels(
125    schema: &SchemaDescriptor,
126    mask: ProjectionMask,
127    hint: Option<&Fields>,
128) -> Result<FieldLevels> {
129    match complex::convert_schema(schema, mask, hint)? {
130        Some(field) => match &field.arrow_type {
131            DataType::Struct(fields) => Ok(FieldLevels {
132                fields: fields.clone(),
133                levels: Some(field),
134            }),
135            _ => unreachable!(),
136        },
137        None => Ok(FieldLevels {
138            fields: Fields::empty(),
139            levels: None,
140        }),
141    }
142}
143
144/// Try to convert Arrow schema metadata into a schema
145fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
146    let decoded = BASE64_STANDARD.decode(encoded_meta);
147    match decoded {
148        Ok(bytes) => {
149            let slice = if bytes.len() > 8 && bytes[0..4] == [255u8; 4] {
150                &bytes[8..]
151            } else {
152                bytes.as_slice()
153            };
154            match arrow_ipc::root_as_message(slice) {
155                Ok(message) => message
156                    .header_as_schema()
157                    .map(arrow_ipc::convert::fb_to_schema)
158                    .ok_or_else(|| arrow_err!("the message is not Arrow Schema")),
159                Err(err) => {
160                    // The flatbuffers implementation returns an error on verification error.
161                    Err(arrow_err!(
162                        "Unable to get root as message stored in {}: {:?}",
163                        super::ARROW_SCHEMA_META_KEY,
164                        err
165                    ))
166                }
167            }
168        }
169        Err(err) => {
170            // The C++ implementation returns an error if the schema can't be parsed.
171            Err(arrow_err!(
172                "Unable to decode the encoded schema stored in {}, {:?}",
173                super::ARROW_SCHEMA_META_KEY,
174                err
175            ))
176        }
177    }
178}
179
180/// Encodes the Arrow schema into the IPC format, and base64 encodes it
181pub fn encode_arrow_schema(schema: &Schema) -> String {
182    let options = writer::IpcWriteOptions::default();
183    let mut dictionary_tracker = writer::DictionaryTracker::new(true);
184    let data_gen = writer::IpcDataGenerator::default();
185    let mut serialized_schema =
186        data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);
187
188    // manually prepending the length to the schema as arrow uses the legacy IPC format
189    // TODO: change after addressing ARROW-9777
190    let schema_len = serialized_schema.ipc_message.len();
191    let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
192    len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
193    len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
194    len_prefix_schema.append(&mut serialized_schema.ipc_message);
195
196    BASE64_STANDARD.encode(&len_prefix_schema)
197}
198
199/// Mutates writer metadata by storing the encoded Arrow schema hint in
200/// [`ARROW_SCHEMA_META_KEY`].
201///
202/// If there is an existing Arrow schema metadata, it is replaced.
203///
204/// [`ARROW_SCHEMA_META_KEY`]: crate::arrow::ARROW_SCHEMA_META_KEY
205pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) {
206    let encoded = encode_arrow_schema(schema);
207
208    let schema_kv = KeyValue {
209        key: super::ARROW_SCHEMA_META_KEY.to_string(),
210        value: Some(encoded),
211    };
212
213    let meta = props
214        .key_value_metadata
215        .get_or_insert_with(Default::default);
216
217    // check if ARROW:schema exists, and overwrite it
218    let schema_meta = meta
219        .iter()
220        .enumerate()
221        .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
222    match schema_meta {
223        Some((i, _)) => {
224            meta.remove(i);
225            meta.push(schema_kv);
226        }
227        None => {
228            meta.push(schema_kv);
229        }
230    }
231}
232
233/// Converter for Arrow schema to Parquet schema
234///
235/// See the documentation on the [`arrow`] module for background
236/// information on how Arrow schema is represented in Parquet.
237///
238/// [`arrow`]: crate::arrow
239///
240/// # Example:
241/// ```
242/// # use std::sync::Arc;
243/// # use arrow_schema::{Field, Schema, DataType};
244/// # use parquet::arrow::ArrowSchemaConverter;
245/// use parquet::schema::types::{SchemaDescriptor, Type};
246/// use parquet::basic; // note there are two `Type`s in the following example
247/// // create an Arrow Schema
248/// let arrow_schema = Schema::new(vec![
249///   Field::new("a", DataType::Int64, true),
250///   Field::new("b", DataType::Date32, true),
251/// ]);
252/// // convert the Arrow schema to a Parquet schema
253/// let parquet_schema = ArrowSchemaConverter::new()
254///   .convert(&arrow_schema)
255///   .unwrap();
256///
257/// let expected_parquet_schema = SchemaDescriptor::new(
258///   Arc::new(
259///     Type::group_type_builder("arrow_schema")
260///       .with_fields(vec![
261///         Arc::new(
262///          Type::primitive_type_builder("a", basic::Type::INT64)
263///           .build().unwrap()
264///         ),
265///         Arc::new(
266///          Type::primitive_type_builder("b", basic::Type::INT32)
267///           .with_converted_type(basic::ConvertedType::DATE)
268///           .with_logical_type(Some(basic::LogicalType::Date))
269///           .build().unwrap()
270///         ),
271///      ])
272///      .build().unwrap()
273///   )
274/// );
275/// assert_eq!(parquet_schema, expected_parquet_schema);
276/// ```
277#[derive(Debug)]
278pub struct ArrowSchemaConverter<'a> {
279    /// Name of the root schema in Parquet
280    schema_root: &'a str,
281    /// Should we coerce Arrow types to compatible Parquet types?
282    ///
283    /// See docs on [Self::with_coerce_types]`
284    coerce_types: bool,
285}
286
287impl Default for ArrowSchemaConverter<'_> {
288    fn default() -> Self {
289        Self::new()
290    }
291}
292
293impl<'a> ArrowSchemaConverter<'a> {
294    /// Create a new converter
295    pub fn new() -> Self {
296        Self {
297            schema_root: "arrow_schema",
298            coerce_types: false,
299        }
300    }
301
302    /// Should Arrow types be coerced into Parquet native types (default `false`).
303    ///
304    /// Setting this option to `true` will result in Parquet files that can be
305    /// read by more readers, but may lose precision for Arrow types such as
306    /// [`DataType::Date64`] which have no direct [corresponding Parquet type].
307    ///
308    /// By default, this converter does not coerce to native Parquet types. Enabling type
309    /// coercion allows for meaningful representations that do not require
310    /// downstream readers to consider the embedded Arrow schema, and can allow
311    /// for greater compatibility with other Parquet implementations. However,
312    /// type coercion also prevents data from being losslessly round-tripped.
313    ///
314    /// # Discussion
315    ///
316    /// Some Arrow types such as `Date64`, `Timestamp` and `Interval` have no
317    /// corresponding Parquet logical type. Thus, they can not be losslessly
318    /// round-tripped when stored using the appropriate Parquet logical type.
319    /// For example, some Date64 values may be truncated when stored with
320    /// parquet's native 32 bit date type.
321    ///
322    /// For [`List`] and [`Map`] types, some Parquet readers expect certain
323    /// schema elements to have specific names (earlier versions of the spec
324    /// were somewhat ambiguous on this point). Type coercion will use the names
325    /// prescribed by the Parquet specification, potentially losing naming
326    /// metadata from the Arrow schema.
327    ///
328    /// [`List`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
329    /// [`Map`]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
330    /// [corresponding Parquet type]: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date
331    ///
332    pub fn with_coerce_types(mut self, coerce_types: bool) -> Self {
333        self.coerce_types = coerce_types;
334        self
335    }
336
337    /// Set the root schema element name (defaults to `"arrow_schema"`).
338    pub fn schema_root(mut self, schema_root: &'a str) -> Self {
339        self.schema_root = schema_root;
340        self
341    }
342
343    /// Convert the specified Arrow [`Schema`] to the desired Parquet [`SchemaDescriptor`]
344    ///
345    /// See example in [`ArrowSchemaConverter`]
346    pub fn convert(&self, schema: &Schema) -> Result<SchemaDescriptor> {
347        let fields = schema
348            .fields()
349            .iter()
350            .map(|field| arrow_to_parquet_type(field, self.coerce_types).map(Arc::new))
351            .collect::<Result<_>>()?;
352        let group = Type::group_type_builder(self.schema_root)
353            .with_fields(fields)
354            .build()?;
355        Ok(SchemaDescriptor::new(Arc::new(group)))
356    }
357}
358
359fn parse_key_value_metadata(
360    key_value_metadata: Option<&Vec<KeyValue>>,
361) -> Option<HashMap<String, String>> {
362    match key_value_metadata {
363        Some(key_values) => {
364            let map: HashMap<String, String> = key_values
365                .iter()
366                .filter_map(|kv| {
367                    kv.value
368                        .as_ref()
369                        .map(|value| (kv.key.clone(), value.clone()))
370                })
371                .collect();
372
373            if map.is_empty() {
374                None
375            } else {
376                Some(map)
377            }
378        }
379        None => None,
380    }
381}
382
383/// Convert parquet column schema to arrow field.
384pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field> {
385    let field = complex::convert_type(&parquet_column.self_type_ptr())?;
386    let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable);
387
388    let basic_info = parquet_column.self_type().get_basic_info();
389    let mut meta = HashMap::with_capacity(if cfg!(feature = "arrow_canonical_extension_types") {
390        2
391    } else {
392        1
393    });
394    if basic_info.has_id() {
395        meta.insert(
396            PARQUET_FIELD_ID_META_KEY.to_string(),
397            basic_info.id().to_string(),
398        );
399    }
400    #[cfg(feature = "arrow_canonical_extension_types")]
401    if let Some(logical_type) = basic_info.logical_type() {
402        match logical_type {
403            LogicalType::Uuid => ret.try_with_extension_type(Uuid)?,
404            LogicalType::Json => ret.try_with_extension_type(Json::default())?,
405            _ => {}
406        }
407    }
408    if !meta.is_empty() {
409        ret.set_metadata(meta);
410    }
411
412    Ok(ret)
413}
414
415pub fn decimal_length_from_precision(precision: u8) -> usize {
416    // digits = floor(log_10(2^(8*n - 1) - 1))  // definition in parquet's logical types
417    // ceil(digits) = log10(2^(8*n - 1) - 1)
418    // 10^ceil(digits) = 2^(8*n - 1) - 1
419    // 10^ceil(digits) + 1 = 2^(8*n - 1)
420    // log2(10^ceil(digits) + 1) = (8*n - 1)
421    // log2(10^ceil(digits) + 1) + 1 = 8*n
422    // (log2(10^ceil(a) + 1) + 1) / 8 = n
423    (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize
424}
425
426/// Convert an arrow field to a parquet `Type`
427fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
428    const PARQUET_LIST_ELEMENT_NAME: &str = "element";
429    const PARQUET_MAP_STRUCT_NAME: &str = "key_value";
430    const PARQUET_KEY_FIELD_NAME: &str = "key";
431    const PARQUET_VALUE_FIELD_NAME: &str = "value";
432
433    let name = field.name().as_str();
434    let repetition = if field.is_nullable() {
435        Repetition::OPTIONAL
436    } else {
437        Repetition::REQUIRED
438    };
439    let id = field_id(field);
440    // create type from field
441    match field.data_type() {
442        DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32)
443            .with_logical_type(Some(LogicalType::Unknown))
444            .with_repetition(repetition)
445            .with_id(id)
446            .build(),
447        DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN)
448            .with_repetition(repetition)
449            .with_id(id)
450            .build(),
451        DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32)
452            .with_logical_type(Some(LogicalType::Integer {
453                bit_width: 8,
454                is_signed: true,
455            }))
456            .with_repetition(repetition)
457            .with_id(id)
458            .build(),
459        DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32)
460            .with_logical_type(Some(LogicalType::Integer {
461                bit_width: 16,
462                is_signed: true,
463            }))
464            .with_repetition(repetition)
465            .with_id(id)
466            .build(),
467        DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32)
468            .with_repetition(repetition)
469            .with_id(id)
470            .build(),
471        DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64)
472            .with_repetition(repetition)
473            .with_id(id)
474            .build(),
475        DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32)
476            .with_logical_type(Some(LogicalType::Integer {
477                bit_width: 8,
478                is_signed: false,
479            }))
480            .with_repetition(repetition)
481            .with_id(id)
482            .build(),
483        DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32)
484            .with_logical_type(Some(LogicalType::Integer {
485                bit_width: 16,
486                is_signed: false,
487            }))
488            .with_repetition(repetition)
489            .with_id(id)
490            .build(),
491        DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32)
492            .with_logical_type(Some(LogicalType::Integer {
493                bit_width: 32,
494                is_signed: false,
495            }))
496            .with_repetition(repetition)
497            .with_id(id)
498            .build(),
499        DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64)
500            .with_logical_type(Some(LogicalType::Integer {
501                bit_width: 64,
502                is_signed: false,
503            }))
504            .with_repetition(repetition)
505            .with_id(id)
506            .build(),
507        DataType::Float16 => Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
508            .with_repetition(repetition)
509            .with_id(id)
510            .with_logical_type(Some(LogicalType::Float16))
511            .with_length(2)
512            .build(),
513        DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT)
514            .with_repetition(repetition)
515            .with_id(id)
516            .build(),
517        DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE)
518            .with_repetition(repetition)
519            .with_id(id)
520            .build(),
521        DataType::Timestamp(TimeUnit::Second, _) => {
522            // Cannot represent seconds in LogicalType
523            Type::primitive_type_builder(name, PhysicalType::INT64)
524                .with_repetition(repetition)
525                .with_id(id)
526                .build()
527        }
528        DataType::Timestamp(time_unit, tz) => {
529            Type::primitive_type_builder(name, PhysicalType::INT64)
530                .with_logical_type(Some(LogicalType::Timestamp {
531                    // If timezone set, values are normalized to UTC timezone
532                    is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()),
533                    unit: match time_unit {
534                        TimeUnit::Second => unreachable!(),
535                        TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
536                        TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
537                        TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
538                    },
539                }))
540                .with_repetition(repetition)
541                .with_id(id)
542                .build()
543        }
544        DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32)
545            .with_logical_type(Some(LogicalType::Date))
546            .with_repetition(repetition)
547            .with_id(id)
548            .build(),
549        DataType::Date64 => {
550            if coerce_types {
551                Type::primitive_type_builder(name, PhysicalType::INT32)
552                    .with_logical_type(Some(LogicalType::Date))
553                    .with_repetition(repetition)
554                    .with_id(id)
555                    .build()
556            } else {
557                Type::primitive_type_builder(name, PhysicalType::INT64)
558                    .with_repetition(repetition)
559                    .with_id(id)
560                    .build()
561            }
562        }
563        DataType::Time32(TimeUnit::Second) => {
564            // Cannot represent seconds in LogicalType
565            Type::primitive_type_builder(name, PhysicalType::INT32)
566                .with_repetition(repetition)
567                .with_id(id)
568                .build()
569        }
570        DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32)
571            .with_logical_type(Some(LogicalType::Time {
572                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
573                unit: match unit {
574                    TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()),
575                    u => unreachable!("Invalid unit for Time32: {:?}", u),
576                },
577            }))
578            .with_repetition(repetition)
579            .with_id(id)
580            .build(),
581        DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64)
582            .with_logical_type(Some(LogicalType::Time {
583                is_adjusted_to_u_t_c: field.metadata().contains_key("adjusted_to_utc"),
584                unit: match unit {
585                    TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()),
586                    TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()),
587                    u => unreachable!("Invalid unit for Time64: {:?}", u),
588                },
589            }))
590            .with_repetition(repetition)
591            .with_id(id)
592            .build(),
593        DataType::Duration(_) => Type::primitive_type_builder(name, PhysicalType::INT64)
594            .with_repetition(repetition)
595            .with_id(id)
596            .build(),
597        DataType::Interval(_) => {
598            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
599                .with_converted_type(ConvertedType::INTERVAL)
600                .with_repetition(repetition)
601                .with_id(id)
602                .with_length(12)
603                .build()
604        }
605        DataType::Binary | DataType::LargeBinary => {
606            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
607                .with_repetition(repetition)
608                .with_id(id)
609                .build()
610        }
611        DataType::FixedSizeBinary(length) => {
612            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
613                .with_repetition(repetition)
614                .with_id(id)
615                .with_length(*length)
616                .with_logical_type(
617                    #[cfg(feature = "arrow_canonical_extension_types")]
618                    // If set, map arrow uuid extension type to parquet uuid logical type.
619                    field
620                        .try_extension_type::<Uuid>()
621                        .ok()
622                        .map(|_| LogicalType::Uuid),
623                    #[cfg(not(feature = "arrow_canonical_extension_types"))]
624                    None,
625                )
626                .build()
627        }
628        DataType::BinaryView => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
629            .with_repetition(repetition)
630            .with_id(id)
631            .build(),
632        DataType::Decimal32(precision, scale)
633        | DataType::Decimal64(precision, scale)
634        | DataType::Decimal128(precision, scale)
635        | DataType::Decimal256(precision, scale) => {
636            // Decimal precision determines the Parquet physical type to use.
637            // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal
638            let (physical_type, length) = if *precision > 1 && *precision <= 9 {
639                (PhysicalType::INT32, -1)
640            } else if *precision <= 18 {
641                (PhysicalType::INT64, -1)
642            } else {
643                (
644                    PhysicalType::FIXED_LEN_BYTE_ARRAY,
645                    decimal_length_from_precision(*precision) as i32,
646                )
647            };
648            Type::primitive_type_builder(name, physical_type)
649                .with_repetition(repetition)
650                .with_id(id)
651                .with_length(length)
652                .with_logical_type(Some(LogicalType::Decimal {
653                    scale: *scale as i32,
654                    precision: *precision as i32,
655                }))
656                .with_precision(*precision as i32)
657                .with_scale(*scale as i32)
658                .build()
659        }
660        DataType::Utf8 | DataType::LargeUtf8 => {
661            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
662                .with_logical_type({
663                    #[cfg(feature = "arrow_canonical_extension_types")]
664                    {
665                        // Use the Json logical type if the canonical Json
666                        // extension type is set on this field.
667                        field
668                            .try_extension_type::<Json>()
669                            .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
670                    }
671                    #[cfg(not(feature = "arrow_canonical_extension_types"))]
672                    Some(LogicalType::String)
673                })
674                .with_repetition(repetition)
675                .with_id(id)
676                .build()
677        }
678        DataType::Utf8View => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
679            .with_logical_type({
680                #[cfg(feature = "arrow_canonical_extension_types")]
681                {
682                    // Use the Json logical type if the canonical Json
683                    // extension type is set on this field.
684                    field
685                        .try_extension_type::<Json>()
686                        .map_or(Some(LogicalType::String), |_| Some(LogicalType::Json))
687                }
688                #[cfg(not(feature = "arrow_canonical_extension_types"))]
689                Some(LogicalType::String)
690            })
691            .with_repetition(repetition)
692            .with_id(id)
693            .build(),
694        DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => {
695            let field_ref = if coerce_types && f.name() != PARQUET_LIST_ELEMENT_NAME {
696                // Ensure proper naming per the Parquet specification
697                let ff = f.as_ref().clone().with_name(PARQUET_LIST_ELEMENT_NAME);
698                Arc::new(arrow_to_parquet_type(&ff, coerce_types)?)
699            } else {
700                Arc::new(arrow_to_parquet_type(f, coerce_types)?)
701            };
702
703            Type::group_type_builder(name)
704                .with_fields(vec![Arc::new(
705                    Type::group_type_builder("list")
706                        .with_fields(vec![field_ref])
707                        .with_repetition(Repetition::REPEATED)
708                        .build()?,
709                )])
710                .with_logical_type(Some(LogicalType::List))
711                .with_repetition(repetition)
712                .with_id(id)
713                .build()
714        }
715        DataType::ListView(_) | DataType::LargeListView(_) => {
716            unimplemented!("ListView/LargeListView not implemented")
717        }
718        DataType::Struct(fields) => {
719            if fields.is_empty() {
720                return Err(arrow_err!("Parquet does not support writing empty structs",));
721            }
722            // recursively convert children to types/nodes
723            let fields = fields
724                .iter()
725                .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
726                .collect::<Result<_>>()?;
727            Type::group_type_builder(name)
728                .with_fields(fields)
729                .with_repetition(repetition)
730                .with_id(id)
731                .build()
732        }
733        DataType::Map(field, _) => {
734            if let DataType::Struct(struct_fields) = field.data_type() {
735                // If coercing then set inner struct name to "key_value"
736                let map_struct_name = if coerce_types {
737                    PARQUET_MAP_STRUCT_NAME
738                } else {
739                    field.name()
740                };
741
742                // If coercing then ensure struct fields are named "key" and "value"
743                let fix_map_field = |name: &str, fld: &Arc<Field>| -> Result<Arc<Type>> {
744                    if coerce_types && fld.name() != name {
745                        let f = fld.as_ref().clone().with_name(name);
746                        Ok(Arc::new(arrow_to_parquet_type(&f, coerce_types)?))
747                    } else {
748                        Ok(Arc::new(arrow_to_parquet_type(fld, coerce_types)?))
749                    }
750                };
751                let key_field = fix_map_field(PARQUET_KEY_FIELD_NAME, &struct_fields[0])?;
752                let val_field = fix_map_field(PARQUET_VALUE_FIELD_NAME, &struct_fields[1])?;
753
754                Type::group_type_builder(name)
755                    .with_fields(vec![Arc::new(
756                        Type::group_type_builder(map_struct_name)
757                            .with_fields(vec![key_field, val_field])
758                            .with_repetition(Repetition::REPEATED)
759                            .build()?,
760                    )])
761                    .with_logical_type(Some(LogicalType::Map))
762                    .with_repetition(repetition)
763                    .with_id(id)
764                    .build()
765            } else {
766                Err(arrow_err!(
767                    "DataType::Map should contain a struct field child",
768                ))
769            }
770        }
771        DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
772        DataType::Dictionary(_, ref value) => {
773            // Dictionary encoding not handled at the schema level
774            let dict_field = field.clone().with_data_type(value.as_ref().clone());
775            arrow_to_parquet_type(&dict_field, coerce_types)
776        }
777        DataType::RunEndEncoded(_, _) => Err(arrow_err!(
778            "Converting RunEndEncodedType to parquet not supported",
779        )),
780    }
781}
782
783fn field_id(field: &Field) -> Option<i32> {
784    let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?;
785    value.parse().ok() // Fail quietly if not a valid integer
786}
787
788#[cfg(test)]
789mod tests {
790    use super::*;
791
792    use std::{collections::HashMap, sync::Arc};
793
794    use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
795
796    use crate::arrow::PARQUET_FIELD_ID_META_KEY;
797    use crate::file::metadata::KeyValue;
798    use crate::file::reader::FileReader;
799    use crate::{
800        arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
801        schema::{parser::parse_message_type, types::SchemaDescriptor},
802    };
803
804    #[test]
805    fn test_flat_primitives() {
806        let message_type = "
807        message test_schema {
808            REQUIRED BOOLEAN boolean;
809            REQUIRED INT32   int8  (INT_8);
810            REQUIRED INT32   int16 (INT_16);
811            REQUIRED INT32   uint8 (INTEGER(8,false));
812            REQUIRED INT32   uint16 (INTEGER(16,false));
813            REQUIRED INT32   int32;
814            REQUIRED INT64   int64;
815            OPTIONAL DOUBLE  double;
816            OPTIONAL FLOAT   float;
817            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
818            OPTIONAL BINARY  string (UTF8);
819            OPTIONAL BINARY  string_2 (STRING);
820            OPTIONAL BINARY  json (JSON);
821        }
822        ";
823        let parquet_group_type = parse_message_type(message_type).unwrap();
824
825        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
826        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
827
828        let arrow_fields = Fields::from(vec![
829            Field::new("boolean", DataType::Boolean, false),
830            Field::new("int8", DataType::Int8, false),
831            Field::new("int16", DataType::Int16, false),
832            Field::new("uint8", DataType::UInt8, false),
833            Field::new("uint16", DataType::UInt16, false),
834            Field::new("int32", DataType::Int32, false),
835            Field::new("int64", DataType::Int64, false),
836            Field::new("double", DataType::Float64, true),
837            Field::new("float", DataType::Float32, true),
838            Field::new("float16", DataType::Float16, true),
839            Field::new("string", DataType::Utf8, true),
840            Field::new("string_2", DataType::Utf8, true),
841            Field::new("json", DataType::Utf8, true),
842        ]);
843
844        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
845    }
846
847    #[test]
848    fn test_decimal_fields() {
849        let message_type = "
850        message test_schema {
851                    REQUIRED INT32 decimal1 (DECIMAL(4,2));
852                    REQUIRED INT64 decimal2 (DECIMAL(12,2));
853                    REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2));
854                    REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2));
855                    REQUIRED BYTE_ARRAY decimal5 (DECIMAL(38,2));
856                    REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal6 (DECIMAL(39,2));
857                    REQUIRED BYTE_ARRAY decimal7 (DECIMAL(39,2));
858        }
859        ";
860
861        let parquet_group_type = parse_message_type(message_type).unwrap();
862
863        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
864        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
865
866        let arrow_fields = Fields::from(vec![
867            Field::new("decimal1", DataType::Decimal128(4, 2), false),
868            Field::new("decimal2", DataType::Decimal128(12, 2), false),
869            Field::new("decimal3", DataType::Decimal128(30, 2), false),
870            Field::new("decimal4", DataType::Decimal128(33, 2), false),
871            Field::new("decimal5", DataType::Decimal128(38, 2), false),
872            Field::new("decimal6", DataType::Decimal256(39, 2), false),
873            Field::new("decimal7", DataType::Decimal256(39, 2), false),
874        ]);
875        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
876    }
877
878    #[test]
879    fn test_byte_array_fields() {
880        let message_type = "
881        message test_schema {
882            REQUIRED BYTE_ARRAY binary;
883            REQUIRED FIXED_LEN_BYTE_ARRAY (20) fixed_binary;
884        }
885        ";
886
887        let parquet_group_type = parse_message_type(message_type).unwrap();
888
889        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
890        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
891
892        let arrow_fields = Fields::from(vec![
893            Field::new("binary", DataType::Binary, false),
894            Field::new("fixed_binary", DataType::FixedSizeBinary(20), false),
895        ]);
896        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
897    }
898
899    #[test]
900    fn test_duplicate_fields() {
901        let message_type = "
902        message test_schema {
903            REQUIRED BOOLEAN boolean;
904            REQUIRED INT32 int8 (INT_8);
905        }
906        ";
907
908        let parquet_group_type = parse_message_type(message_type).unwrap();
909
910        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
911        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
912
913        let arrow_fields = Fields::from(vec![
914            Field::new("boolean", DataType::Boolean, false),
915            Field::new("int8", DataType::Int8, false),
916        ]);
917        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
918
919        let converted_arrow_schema =
920            parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None)
921                .unwrap();
922        assert_eq!(&arrow_fields, converted_arrow_schema.fields());
923    }
924
925    #[test]
926    fn test_parquet_lists() {
927        let mut arrow_fields = Vec::new();
928
929        // LIST encoding example taken from parquet-format/LogicalTypes.md
930        let message_type = "
931        message test_schema {
932          REQUIRED GROUP my_list (LIST) {
933            REPEATED GROUP list {
934              OPTIONAL BINARY element (UTF8);
935            }
936          }
937          OPTIONAL GROUP my_list (LIST) {
938            REPEATED GROUP list {
939              REQUIRED BINARY element (UTF8);
940            }
941          }
942          OPTIONAL GROUP array_of_arrays (LIST) {
943            REPEATED GROUP list {
944              REQUIRED GROUP element (LIST) {
945                REPEATED GROUP list {
946                  REQUIRED INT32 element;
947                }
948              }
949            }
950          }
951          OPTIONAL GROUP my_list (LIST) {
952            REPEATED GROUP element {
953              REQUIRED BINARY str (UTF8);
954            }
955          }
956          OPTIONAL GROUP my_list (LIST) {
957            REPEATED INT32 element;
958          }
959          OPTIONAL GROUP my_list (LIST) {
960            REPEATED GROUP element {
961              REQUIRED BINARY str (UTF8);
962              REQUIRED INT32 num;
963            }
964          }
965          OPTIONAL GROUP my_list (LIST) {
966            REPEATED GROUP array {
967              REQUIRED BINARY str (UTF8);
968            }
969
970          }
971          OPTIONAL GROUP my_list (LIST) {
972            REPEATED GROUP my_list_tuple {
973              REQUIRED BINARY str (UTF8);
974            }
975          }
976          REPEATED INT32 name;
977        }
978        ";
979
980        // // List<String> (list non-null, elements nullable)
981        // required group my_list (LIST) {
982        //   repeated group list {
983        //     optional binary element (UTF8);
984        //   }
985        // }
986        {
987            arrow_fields.push(Field::new_list(
988                "my_list",
989                Field::new("element", DataType::Utf8, true),
990                false,
991            ));
992        }
993
994        // // List<String> (list nullable, elements non-null)
995        // optional group my_list (LIST) {
996        //   repeated group list {
997        //     required binary element (UTF8);
998        //   }
999        // }
1000        {
1001            arrow_fields.push(Field::new_list(
1002                "my_list",
1003                Field::new("element", DataType::Utf8, false),
1004                true,
1005            ));
1006        }
1007
1008        // Element types can be nested structures. For example, a list of lists:
1009        //
1010        // // List<List<Integer>>
1011        // optional group array_of_arrays (LIST) {
1012        //   repeated group list {
1013        //     required group element (LIST) {
1014        //       repeated group list {
1015        //         required int32 element;
1016        //       }
1017        //     }
1018        //   }
1019        // }
1020        {
1021            let arrow_inner_list = Field::new("element", DataType::Int32, false);
1022            arrow_fields.push(Field::new_list(
1023                "array_of_arrays",
1024                Field::new_list("element", arrow_inner_list, false),
1025                true,
1026            ));
1027        }
1028
1029        // // List<String> (list nullable, elements non-null)
1030        // optional group my_list (LIST) {
1031        //   repeated group element {
1032        //     required binary str (UTF8);
1033        //   };
1034        // }
1035        {
1036            arrow_fields.push(Field::new_list(
1037                "my_list",
1038                Field::new("str", DataType::Utf8, false),
1039                true,
1040            ));
1041        }
1042
1043        // // List<Integer> (nullable list, non-null elements)
1044        // optional group my_list (LIST) {
1045        //   repeated int32 element;
1046        // }
1047        {
1048            arrow_fields.push(Field::new_list(
1049                "my_list",
1050                Field::new("element", DataType::Int32, false),
1051                true,
1052            ));
1053        }
1054
1055        // // List<Tuple<String, Integer>> (nullable list, non-null elements)
1056        // optional group my_list (LIST) {
1057        //   repeated group element {
1058        //     required binary str (UTF8);
1059        //     required int32 num;
1060        //   };
1061        // }
1062        {
1063            let fields = vec![
1064                Field::new("str", DataType::Utf8, false),
1065                Field::new("num", DataType::Int32, false),
1066            ];
1067            arrow_fields.push(Field::new_list(
1068                "my_list",
1069                Field::new_struct("element", fields, false),
1070                true,
1071            ));
1072        }
1073
1074        // // List<OneTuple<String>> (nullable list, non-null elements)
1075        // optional group my_list (LIST) {
1076        //   repeated group array {
1077        //     required binary str (UTF8);
1078        //   };
1079        // }
1080        // Special case: group is named array
1081        {
1082            let fields = vec![Field::new("str", DataType::Utf8, false)];
1083            arrow_fields.push(Field::new_list(
1084                "my_list",
1085                Field::new_struct("array", fields, false),
1086                true,
1087            ));
1088        }
1089
1090        // // List<OneTuple<String>> (nullable list, non-null elements)
1091        // optional group my_list (LIST) {
1092        //   repeated group my_list_tuple {
1093        //     required binary str (UTF8);
1094        //   };
1095        // }
1096        // Special case: group named ends in _tuple
1097        {
1098            let fields = vec![Field::new("str", DataType::Utf8, false)];
1099            arrow_fields.push(Field::new_list(
1100                "my_list",
1101                Field::new_struct("my_list_tuple", fields, false),
1102                true,
1103            ));
1104        }
1105
1106        // One-level encoding: Only allows required lists with required cells
1107        //   repeated value_type name
1108        {
1109            arrow_fields.push(Field::new_list(
1110                "name",
1111                Field::new("name", DataType::Int32, false),
1112                false,
1113            ));
1114        }
1115
1116        let parquet_group_type = parse_message_type(message_type).unwrap();
1117
1118        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1119        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1120        let converted_fields = converted_arrow_schema.fields();
1121
1122        assert_eq!(arrow_fields.len(), converted_fields.len());
1123        for i in 0..arrow_fields.len() {
1124            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref(), "{i}");
1125        }
1126    }
1127
1128    #[test]
1129    fn test_parquet_list_nullable() {
1130        let mut arrow_fields = Vec::new();
1131
1132        let message_type = "
1133        message test_schema {
1134          REQUIRED GROUP my_list1 (LIST) {
1135            REPEATED GROUP list {
1136              OPTIONAL BINARY element (UTF8);
1137            }
1138          }
1139          OPTIONAL GROUP my_list2 (LIST) {
1140            REPEATED GROUP list {
1141              REQUIRED BINARY element (UTF8);
1142            }
1143          }
1144          REQUIRED GROUP my_list3 (LIST) {
1145            REPEATED GROUP list {
1146              REQUIRED BINARY element (UTF8);
1147            }
1148          }
1149        }
1150        ";
1151
1152        // // List<String> (list non-null, elements nullable)
1153        // required group my_list1 (LIST) {
1154        //   repeated group list {
1155        //     optional binary element (UTF8);
1156        //   }
1157        // }
1158        {
1159            arrow_fields.push(Field::new_list(
1160                "my_list1",
1161                Field::new("element", DataType::Utf8, true),
1162                false,
1163            ));
1164        }
1165
1166        // // List<String> (list nullable, elements non-null)
1167        // optional group my_list2 (LIST) {
1168        //   repeated group list {
1169        //     required binary element (UTF8);
1170        //   }
1171        // }
1172        {
1173            arrow_fields.push(Field::new_list(
1174                "my_list2",
1175                Field::new("element", DataType::Utf8, false),
1176                true,
1177            ));
1178        }
1179
1180        // // List<String> (list non-null, elements non-null)
1181        // repeated group my_list3 (LIST) {
1182        //   repeated group list {
1183        //     required binary element (UTF8);
1184        //   }
1185        // }
1186        {
1187            arrow_fields.push(Field::new_list(
1188                "my_list3",
1189                Field::new("element", DataType::Utf8, false),
1190                false,
1191            ));
1192        }
1193
1194        let parquet_group_type = parse_message_type(message_type).unwrap();
1195
1196        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1197        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1198        let converted_fields = converted_arrow_schema.fields();
1199
1200        assert_eq!(arrow_fields.len(), converted_fields.len());
1201        for i in 0..arrow_fields.len() {
1202            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1203        }
1204    }
1205
1206    #[test]
1207    fn test_parquet_maps() {
1208        let mut arrow_fields = Vec::new();
1209
1210        // LIST encoding example taken from parquet-format/LogicalTypes.md
1211        let message_type = "
1212        message test_schema {
1213          REQUIRED group my_map1 (MAP) {
1214            REPEATED group key_value {
1215              REQUIRED binary key (UTF8);
1216              OPTIONAL int32 value;
1217            }
1218          }
1219          OPTIONAL group my_map2 (MAP) {
1220            REPEATED group map {
1221              REQUIRED binary str (UTF8);
1222              REQUIRED int32 num;
1223            }
1224          }
1225          OPTIONAL group my_map3 (MAP_KEY_VALUE) {
1226            REPEATED group map {
1227              REQUIRED binary key (UTF8);
1228              OPTIONAL int32 value;
1229            }
1230          }
1231          REQUIRED group my_map4 (MAP) {
1232            REPEATED group map {
1233              OPTIONAL binary key (UTF8);
1234              REQUIRED int32 value;
1235            }
1236          }
1237        }
1238        ";
1239
1240        // // Map<String, Integer>
1241        // required group my_map (MAP) {
1242        //   repeated group key_value {
1243        //     required binary key (UTF8);
1244        //     optional int32 value;
1245        //   }
1246        // }
1247        {
1248            arrow_fields.push(Field::new_map(
1249                "my_map1",
1250                "key_value",
1251                Field::new("key", DataType::Utf8, false),
1252                Field::new("value", DataType::Int32, true),
1253                false,
1254                false,
1255            ));
1256        }
1257
1258        // // Map<String, Integer> (nullable map, non-null values)
1259        // optional group my_map (MAP) {
1260        //   repeated group map {
1261        //     required binary str (UTF8);
1262        //     required int32 num;
1263        //   }
1264        // }
1265        {
1266            arrow_fields.push(Field::new_map(
1267                "my_map2",
1268                "map",
1269                Field::new("str", DataType::Utf8, false),
1270                Field::new("num", DataType::Int32, false),
1271                false,
1272                true,
1273            ));
1274        }
1275
1276        // // Map<String, Integer> (nullable map, nullable values)
1277        // optional group my_map (MAP_KEY_VALUE) {
1278        //   repeated group map {
1279        //     required binary key (UTF8);
1280        //     optional int32 value;
1281        //   }
1282        // }
1283        {
1284            arrow_fields.push(Field::new_map(
1285                "my_map3",
1286                "map",
1287                Field::new("key", DataType::Utf8, false),
1288                Field::new("value", DataType::Int32, true),
1289                false,
1290                true,
1291            ));
1292        }
1293
1294        // // Map<String, Integer> (non-compliant nullable key)
1295        // group my_map (MAP_KEY_VALUE) {
1296        //   repeated group map {
1297        //     optional binary key (UTF8);
1298        //     required int32 value;
1299        //   }
1300        // }
1301        {
1302            arrow_fields.push(Field::new_map(
1303                "my_map4",
1304                "map",
1305                Field::new("key", DataType::Utf8, false), // The key is always non-nullable (#5630)
1306                Field::new("value", DataType::Int32, false),
1307                false,
1308                false,
1309            ));
1310        }
1311
1312        let parquet_group_type = parse_message_type(message_type).unwrap();
1313
1314        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1315        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1316        let converted_fields = converted_arrow_schema.fields();
1317
1318        assert_eq!(arrow_fields.len(), converted_fields.len());
1319        for i in 0..arrow_fields.len() {
1320            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1321        }
1322    }
1323
1324    #[test]
1325    fn test_nested_schema() {
1326        let mut arrow_fields = Vec::new();
1327        {
1328            let group1_fields = Fields::from(vec![
1329                Field::new("leaf1", DataType::Boolean, false),
1330                Field::new("leaf2", DataType::Int32, false),
1331            ]);
1332            let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false);
1333            arrow_fields.push(group1_struct);
1334
1335            let leaf3_field = Field::new("leaf3", DataType::Int64, false);
1336            arrow_fields.push(leaf3_field);
1337        }
1338
1339        let message_type = "
1340        message test_schema {
1341          REQUIRED GROUP group1 {
1342            REQUIRED BOOLEAN leaf1;
1343            REQUIRED INT32 leaf2;
1344          }
1345          REQUIRED INT64 leaf3;
1346        }
1347        ";
1348        let parquet_group_type = parse_message_type(message_type).unwrap();
1349
1350        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1351        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1352        let converted_fields = converted_arrow_schema.fields();
1353
1354        assert_eq!(arrow_fields.len(), converted_fields.len());
1355        for i in 0..arrow_fields.len() {
1356            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1357        }
1358    }
1359
1360    #[test]
1361    fn test_nested_schema_partial() {
1362        let mut arrow_fields = Vec::new();
1363        {
1364            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1365            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1366            arrow_fields.push(group1);
1367
1368            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1369            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1370            arrow_fields.push(group2);
1371
1372            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1373        }
1374
1375        let message_type = "
1376        message test_schema {
1377          REQUIRED GROUP group1 {
1378            REQUIRED INT64 leaf1;
1379            REQUIRED INT64 leaf2;
1380          }
1381          REQUIRED  GROUP group2 {
1382            REQUIRED INT64 leaf3;
1383            REQUIRED INT64 leaf4;
1384          }
1385          REQUIRED INT64 leaf5;
1386        }
1387        ";
1388        let parquet_group_type = parse_message_type(message_type).unwrap();
1389
1390        // Expected partial arrow schema (columns 0, 3, 4):
1391        // required group group1 {
1392        //   required int64 leaf1;
1393        // }
1394        // required group group2 {
1395        //   required int64 leaf4;
1396        // }
1397        // required int64 leaf5;
1398
1399        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1400        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4, 4]);
1401        let converted_arrow_schema =
1402            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1403        let converted_fields = converted_arrow_schema.fields();
1404
1405        assert_eq!(arrow_fields.len(), converted_fields.len());
1406        for i in 0..arrow_fields.len() {
1407            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1408        }
1409    }
1410
1411    #[test]
1412    fn test_nested_schema_partial_ordering() {
1413        let mut arrow_fields = Vec::new();
1414        {
1415            let group1_fields = vec![Field::new("leaf1", DataType::Int64, false)].into();
1416            let group1 = Field::new("group1", DataType::Struct(group1_fields), false);
1417            arrow_fields.push(group1);
1418
1419            let group2_fields = vec![Field::new("leaf4", DataType::Int64, false)].into();
1420            let group2 = Field::new("group2", DataType::Struct(group2_fields), false);
1421            arrow_fields.push(group2);
1422
1423            arrow_fields.push(Field::new("leaf5", DataType::Int64, false));
1424        }
1425
1426        let message_type = "
1427        message test_schema {
1428          REQUIRED GROUP group1 {
1429            REQUIRED INT64 leaf1;
1430            REQUIRED INT64 leaf2;
1431          }
1432          REQUIRED  GROUP group2 {
1433            REQUIRED INT64 leaf3;
1434            REQUIRED INT64 leaf4;
1435          }
1436          REQUIRED INT64 leaf5;
1437        }
1438        ";
1439        let parquet_group_type = parse_message_type(message_type).unwrap();
1440
1441        // Expected partial arrow schema (columns 3, 4, 0):
1442        // required group group1 {
1443        //   required int64 leaf1;
1444        // }
1445        // required group group2 {
1446        //   required int64 leaf4;
1447        // }
1448        // required int64 leaf5;
1449
1450        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1451        let mask = ProjectionMask::leaves(&parquet_schema, [3, 0, 4]);
1452        let converted_arrow_schema =
1453            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1454        let converted_fields = converted_arrow_schema.fields();
1455
1456        assert_eq!(arrow_fields.len(), converted_fields.len());
1457        for i in 0..arrow_fields.len() {
1458            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1459        }
1460
1461        let mask =
1462            ProjectionMask::columns(&parquet_schema, ["group2.leaf4", "group1.leaf1", "leaf5"]);
1463        let converted_arrow_schema =
1464            parquet_to_arrow_schema_by_columns(&parquet_schema, mask, None).unwrap();
1465        let converted_fields = converted_arrow_schema.fields();
1466
1467        assert_eq!(arrow_fields.len(), converted_fields.len());
1468        for i in 0..arrow_fields.len() {
1469            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1470        }
1471    }
1472
1473    #[test]
1474    fn test_repeated_nested_schema() {
1475        let mut arrow_fields = Vec::new();
1476        {
1477            arrow_fields.push(Field::new("leaf1", DataType::Int32, true));
1478
1479            let inner_group_list = Field::new_list(
1480                "innerGroup",
1481                Field::new_struct(
1482                    "innerGroup",
1483                    vec![Field::new("leaf3", DataType::Int32, true)],
1484                    false,
1485                ),
1486                false,
1487            );
1488
1489            let outer_group_list = Field::new_list(
1490                "outerGroup",
1491                Field::new_struct(
1492                    "outerGroup",
1493                    vec![Field::new("leaf2", DataType::Int32, true), inner_group_list],
1494                    false,
1495                ),
1496                false,
1497            );
1498            arrow_fields.push(outer_group_list);
1499        }
1500
1501        let message_type = "
1502        message test_schema {
1503          OPTIONAL INT32 leaf1;
1504          REPEATED GROUP outerGroup {
1505            OPTIONAL INT32 leaf2;
1506            REPEATED GROUP innerGroup {
1507              OPTIONAL INT32 leaf3;
1508            }
1509          }
1510        }
1511        ";
1512        let parquet_group_type = parse_message_type(message_type).unwrap();
1513
1514        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1515        let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap();
1516        let converted_fields = converted_arrow_schema.fields();
1517
1518        assert_eq!(arrow_fields.len(), converted_fields.len());
1519        for i in 0..arrow_fields.len() {
1520            assert_eq!(&arrow_fields[i], converted_fields[i].as_ref());
1521        }
1522    }
1523
1524    #[test]
1525    fn test_column_desc_to_field() {
1526        let message_type = "
1527        message test_schema {
1528            REQUIRED BOOLEAN boolean;
1529            REQUIRED INT32   int8  (INT_8);
1530            REQUIRED INT32   uint8 (INTEGER(8,false));
1531            REQUIRED INT32   int16 (INT_16);
1532            REQUIRED INT32   uint16 (INTEGER(16,false));
1533            REQUIRED INT32   int32;
1534            REQUIRED INT64   int64;
1535            OPTIONAL DOUBLE  double;
1536            OPTIONAL FLOAT   float;
1537            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1538            OPTIONAL BINARY  string (UTF8);
1539            REPEATED BOOLEAN bools;
1540            OPTIONAL INT32   date       (DATE);
1541            OPTIONAL INT32   time_milli (TIME_MILLIS);
1542            OPTIONAL INT64   time_micro (TIME_MICROS);
1543            OPTIONAL INT64   time_nano (TIME(NANOS,false));
1544            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1545            REQUIRED INT64   ts_micro (TIMESTAMP_MICROS);
1546            REQUIRED INT64   ts_nano (TIMESTAMP(NANOS,true));
1547            REPEATED INT32   int_list;
1548            REPEATED BINARY  byte_list;
1549            REPEATED BINARY  string_list (UTF8);
1550            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1551            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1552            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1553        }
1554        ";
1555        let parquet_group_type = parse_message_type(message_type).unwrap();
1556
1557        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1558        let converted_arrow_fields = parquet_schema
1559            .columns()
1560            .iter()
1561            .map(|c| parquet_to_arrow_field(c).unwrap())
1562            .collect::<Vec<Field>>();
1563
1564        let arrow_fields = vec![
1565            Field::new("boolean", DataType::Boolean, false),
1566            Field::new("int8", DataType::Int8, false),
1567            Field::new("uint8", DataType::UInt8, false),
1568            Field::new("int16", DataType::Int16, false),
1569            Field::new("uint16", DataType::UInt16, false),
1570            Field::new("int32", DataType::Int32, false),
1571            Field::new("int64", DataType::Int64, false),
1572            Field::new("double", DataType::Float64, true),
1573            Field::new("float", DataType::Float32, true),
1574            Field::new("float16", DataType::Float16, true),
1575            Field::new("string", DataType::Utf8, true),
1576            Field::new_list(
1577                "bools",
1578                Field::new("bools", DataType::Boolean, false),
1579                false,
1580            ),
1581            Field::new("date", DataType::Date32, true),
1582            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1583            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1584            Field::new("time_nano", DataType::Time64(TimeUnit::Nanosecond), true),
1585            Field::new(
1586                "ts_milli",
1587                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1588                true,
1589            ),
1590            Field::new(
1591                "ts_micro",
1592                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1593                false,
1594            ),
1595            Field::new(
1596                "ts_nano",
1597                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1598                false,
1599            ),
1600            Field::new_list(
1601                "int_list",
1602                Field::new("int_list", DataType::Int32, false),
1603                false,
1604            ),
1605            Field::new_list(
1606                "byte_list",
1607                Field::new("byte_list", DataType::Binary, false),
1608                false,
1609            ),
1610            Field::new_list(
1611                "string_list",
1612                Field::new("string_list", DataType::Utf8, false),
1613                false,
1614            ),
1615            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1616            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1617            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1618        ];
1619
1620        assert_eq!(arrow_fields, converted_arrow_fields);
1621    }
1622
1623    #[test]
1624    fn test_coerced_map_list() {
1625        // Create Arrow schema with non-Parquet naming
1626        let arrow_fields = vec![
1627            Field::new_list(
1628                "my_list",
1629                Field::new("item", DataType::Boolean, true),
1630                false,
1631            ),
1632            Field::new_map(
1633                "my_map",
1634                "entries",
1635                Field::new("keys", DataType::Utf8, false),
1636                Field::new("values", DataType::Int32, true),
1637                false,
1638                true,
1639            ),
1640        ];
1641        let arrow_schema = Schema::new(arrow_fields);
1642
1643        // Create Parquet schema with coerced names
1644        let message_type = "
1645        message parquet_schema {
1646            REQUIRED GROUP my_list (LIST) {
1647                REPEATED GROUP list {
1648                    OPTIONAL BOOLEAN element;
1649                }
1650            }
1651            OPTIONAL GROUP my_map (MAP) {
1652                REPEATED GROUP key_value {
1653                    REQUIRED BINARY key (STRING);
1654                    OPTIONAL INT32 value;
1655                }
1656            }
1657        }
1658        ";
1659        let parquet_group_type = parse_message_type(message_type).unwrap();
1660        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1661        let converted_arrow_schema = ArrowSchemaConverter::new()
1662            .with_coerce_types(true)
1663            .convert(&arrow_schema)
1664            .unwrap();
1665        assert_eq!(
1666            parquet_schema.columns().len(),
1667            converted_arrow_schema.columns().len()
1668        );
1669
1670        // Create Parquet schema without coerced names
1671        let message_type = "
1672        message parquet_schema {
1673            REQUIRED GROUP my_list (LIST) {
1674                REPEATED GROUP list {
1675                    OPTIONAL BOOLEAN item;
1676                }
1677            }
1678            OPTIONAL GROUP my_map (MAP) {
1679                REPEATED GROUP entries {
1680                    REQUIRED BINARY keys (STRING);
1681                    OPTIONAL INT32 values;
1682                }
1683            }
1684        }
1685        ";
1686        let parquet_group_type = parse_message_type(message_type).unwrap();
1687        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1688        let converted_arrow_schema = ArrowSchemaConverter::new()
1689            .with_coerce_types(false)
1690            .convert(&arrow_schema)
1691            .unwrap();
1692        assert_eq!(
1693            parquet_schema.columns().len(),
1694            converted_arrow_schema.columns().len()
1695        );
1696    }
1697
1698    #[test]
1699    fn test_field_to_column_desc() {
1700        let message_type = "
1701        message arrow_schema {
1702            REQUIRED BOOLEAN boolean;
1703            REQUIRED INT32   int8  (INT_8);
1704            REQUIRED INT32   int16 (INTEGER(16,true));
1705            REQUIRED INT32   int32;
1706            REQUIRED INT64   int64;
1707            OPTIONAL DOUBLE  double;
1708            OPTIONAL FLOAT   float;
1709            OPTIONAL FIXED_LEN_BYTE_ARRAY (2) float16 (FLOAT16);
1710            OPTIONAL BINARY  string (STRING);
1711            OPTIONAL GROUP   bools (LIST) {
1712                REPEATED GROUP list {
1713                    OPTIONAL BOOLEAN element;
1714                }
1715            }
1716            REQUIRED GROUP   bools_non_null (LIST) {
1717                REPEATED GROUP list {
1718                    REQUIRED BOOLEAN element;
1719                }
1720            }
1721            OPTIONAL INT32   date       (DATE);
1722            OPTIONAL INT32   time_milli (TIME(MILLIS,false));
1723            OPTIONAL INT32   time_milli_utc (TIME(MILLIS,true));
1724            OPTIONAL INT64   time_micro (TIME_MICROS);
1725            OPTIONAL INT64   time_micro_utc (TIME(MICROS, true));
1726            OPTIONAL INT64   ts_milli (TIMESTAMP_MILLIS);
1727            REQUIRED INT64   ts_micro (TIMESTAMP(MICROS,false));
1728            REQUIRED INT64   ts_seconds;
1729            REQUIRED INT64   ts_micro_utc (TIMESTAMP(MICROS, true));
1730            REQUIRED INT64   ts_millis_zero_offset (TIMESTAMP(MILLIS, true));
1731            REQUIRED INT64   ts_millis_zero_negative_offset (TIMESTAMP(MILLIS, true));
1732            REQUIRED INT64   ts_micro_non_utc (TIMESTAMP(MICROS, true));
1733            REQUIRED GROUP struct {
1734                REQUIRED BOOLEAN bools;
1735                REQUIRED INT32 uint32 (INTEGER(32,false));
1736                REQUIRED GROUP   int32 (LIST) {
1737                    REPEATED GROUP list {
1738                        OPTIONAL INT32 element;
1739                    }
1740                }
1741            }
1742            REQUIRED BINARY  dictionary_strings (STRING);
1743            REQUIRED INT32 decimal_int32 (DECIMAL(8,2));
1744            REQUIRED INT64 decimal_int64 (DECIMAL(16,2));
1745            REQUIRED FIXED_LEN_BYTE_ARRAY (13) decimal_fix_length (DECIMAL(30,2));
1746            REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal128 (DECIMAL(38,2));
1747            REQUIRED FIXED_LEN_BYTE_ARRAY (17) decimal256 (DECIMAL(39,2));
1748        }
1749        ";
1750        let parquet_group_type = parse_message_type(message_type).unwrap();
1751
1752        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1753
1754        let arrow_fields = vec![
1755            Field::new("boolean", DataType::Boolean, false),
1756            Field::new("int8", DataType::Int8, false),
1757            Field::new("int16", DataType::Int16, false),
1758            Field::new("int32", DataType::Int32, false),
1759            Field::new("int64", DataType::Int64, false),
1760            Field::new("double", DataType::Float64, true),
1761            Field::new("float", DataType::Float32, true),
1762            Field::new("float16", DataType::Float16, true),
1763            Field::new("string", DataType::Utf8, true),
1764            Field::new_list(
1765                "bools",
1766                Field::new("element", DataType::Boolean, true),
1767                true,
1768            ),
1769            Field::new_list(
1770                "bools_non_null",
1771                Field::new("element", DataType::Boolean, false),
1772                false,
1773            ),
1774            Field::new("date", DataType::Date32, true),
1775            Field::new("time_milli", DataType::Time32(TimeUnit::Millisecond), true),
1776            Field::new(
1777                "time_milli_utc",
1778                DataType::Time32(TimeUnit::Millisecond),
1779                true,
1780            )
1781            .with_metadata(HashMap::from_iter(vec![(
1782                "adjusted_to_utc".to_string(),
1783                "".to_string(),
1784            )])),
1785            Field::new("time_micro", DataType::Time64(TimeUnit::Microsecond), true),
1786            Field::new(
1787                "time_micro_utc",
1788                DataType::Time64(TimeUnit::Microsecond),
1789                true,
1790            )
1791            .with_metadata(HashMap::from_iter(vec![(
1792                "adjusted_to_utc".to_string(),
1793                "".to_string(),
1794            )])),
1795            Field::new(
1796                "ts_milli",
1797                DataType::Timestamp(TimeUnit::Millisecond, None),
1798                true,
1799            ),
1800            Field::new(
1801                "ts_micro",
1802                DataType::Timestamp(TimeUnit::Microsecond, None),
1803                false,
1804            ),
1805            Field::new(
1806                "ts_seconds",
1807                DataType::Timestamp(TimeUnit::Second, Some("UTC".into())),
1808                false,
1809            ),
1810            Field::new(
1811                "ts_micro_utc",
1812                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1813                false,
1814            ),
1815            Field::new(
1816                "ts_millis_zero_offset",
1817                DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
1818                false,
1819            ),
1820            Field::new(
1821                "ts_millis_zero_negative_offset",
1822                DataType::Timestamp(TimeUnit::Millisecond, Some("-00:00".into())),
1823                false,
1824            ),
1825            Field::new(
1826                "ts_micro_non_utc",
1827                DataType::Timestamp(TimeUnit::Microsecond, Some("+01:00".into())),
1828                false,
1829            ),
1830            Field::new_struct(
1831                "struct",
1832                vec![
1833                    Field::new("bools", DataType::Boolean, false),
1834                    Field::new("uint32", DataType::UInt32, false),
1835                    Field::new_list("int32", Field::new("element", DataType::Int32, true), false),
1836                ],
1837                false,
1838            ),
1839            Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false),
1840            Field::new("decimal_int32", DataType::Decimal128(8, 2), false),
1841            Field::new("decimal_int64", DataType::Decimal128(16, 2), false),
1842            Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false),
1843            Field::new("decimal128", DataType::Decimal128(38, 2), false),
1844            Field::new("decimal256", DataType::Decimal256(39, 2), false),
1845        ];
1846        let arrow_schema = Schema::new(arrow_fields);
1847        let converted_arrow_schema = ArrowSchemaConverter::new().convert(&arrow_schema).unwrap();
1848
1849        assert_eq!(
1850            parquet_schema.columns().len(),
1851            converted_arrow_schema.columns().len()
1852        );
1853        parquet_schema
1854            .columns()
1855            .iter()
1856            .zip(converted_arrow_schema.columns())
1857            .for_each(|(a, b)| {
1858                // Only check logical type if it's set on the Parquet side.
1859                // This is because the Arrow conversion always sets logical type,
1860                // even if there wasn't originally one.
1861                // This is not an issue, but is an inconvenience for this test.
1862                match a.logical_type() {
1863                    Some(_) => {
1864                        assert_eq!(a, b)
1865                    }
1866                    None => {
1867                        assert_eq!(a.name(), b.name());
1868                        assert_eq!(a.physical_type(), b.physical_type());
1869                        assert_eq!(a.converted_type(), b.converted_type());
1870                    }
1871                };
1872            });
1873    }
1874
1875    #[test]
1876    #[should_panic(expected = "Parquet does not support writing empty structs")]
1877    fn test_empty_struct_field() {
1878        let arrow_fields = vec![Field::new(
1879            "struct",
1880            DataType::Struct(Fields::empty()),
1881            false,
1882        )];
1883        let arrow_schema = Schema::new(arrow_fields);
1884        let converted_arrow_schema = ArrowSchemaConverter::new()
1885            .with_coerce_types(true)
1886            .convert(&arrow_schema);
1887
1888        converted_arrow_schema.unwrap();
1889    }
1890
1891    #[test]
1892    fn test_metadata() {
1893        let message_type = "
1894        message test_schema {
1895            OPTIONAL BINARY  string (STRING);
1896        }
1897        ";
1898        let parquet_group_type = parse_message_type(message_type).unwrap();
1899
1900        let key_value_metadata = vec![
1901            KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
1902            KeyValue::new("baz".to_owned(), None),
1903        ];
1904
1905        let mut expected_metadata: HashMap<String, String> = HashMap::new();
1906        expected_metadata.insert("foo".to_owned(), "bar".to_owned());
1907
1908        let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
1909        let converted_arrow_schema =
1910            parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
1911
1912        assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
1913    }
1914
1915    #[test]
1916    fn test_arrow_schema_roundtrip() -> Result<()> {
1917        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
1918            a.iter()
1919                .map(|(a, b)| (a.to_string(), b.to_string()))
1920                .collect()
1921        };
1922
1923        let schema = Schema::new_with_metadata(
1924            vec![
1925                Field::new("c1", DataType::Utf8, false)
1926                    .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])),
1927                Field::new("c2", DataType::Binary, false),
1928                Field::new("c3", DataType::FixedSizeBinary(3), false),
1929                Field::new("c4", DataType::Boolean, false),
1930                Field::new("c5", DataType::Date32, false),
1931                Field::new("c6", DataType::Date64, false),
1932                Field::new("c7", DataType::Time32(TimeUnit::Second), false),
1933                Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
1934                Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
1935                Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
1936                Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
1937                Field::new(
1938                    "c16",
1939                    DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1940                    false,
1941                ),
1942                Field::new(
1943                    "c17",
1944                    DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())),
1945                    false,
1946                ),
1947                Field::new(
1948                    "c18",
1949                    DataType::Timestamp(TimeUnit::Nanosecond, None),
1950                    false,
1951                ),
1952                Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
1953                Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
1954                Field::new_list(
1955                    "c21",
1956                    Field::new_list_field(DataType::Boolean, true)
1957                        .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])),
1958                    false,
1959                )
1960                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])),
1961                Field::new(
1962                    "c22",
1963                    DataType::FixedSizeList(
1964                        Arc::new(Field::new_list_field(DataType::Boolean, true)),
1965                        5,
1966                    ),
1967                    false,
1968                ),
1969                Field::new_list(
1970                    "c23",
1971                    Field::new_large_list(
1972                        "inner",
1973                        Field::new_list_field(
1974                            DataType::Struct(
1975                                vec![
1976                                    Field::new("a", DataType::Int16, true),
1977                                    Field::new("b", DataType::Float64, false),
1978                                    Field::new("c", DataType::Float32, false),
1979                                    Field::new("d", DataType::Float16, false),
1980                                ]
1981                                .into(),
1982                            ),
1983                            false,
1984                        ),
1985                        true,
1986                    ),
1987                    false,
1988                ),
1989                Field::new(
1990                    "c24",
1991                    DataType::Struct(Fields::from(vec![
1992                        Field::new("a", DataType::Utf8, false),
1993                        Field::new("b", DataType::UInt16, false),
1994                    ])),
1995                    false,
1996                ),
1997                Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
1998                Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
1999                // Duration types not supported
2000                // Field::new("c27", DataType::Duration(TimeUnit::Second), false),
2001                // Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false),
2002                // Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false),
2003                // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false),
2004                #[allow(deprecated)]
2005                Field::new_dict(
2006                    "c31",
2007                    DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
2008                    true,
2009                    123,
2010                    true,
2011                )
2012                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])),
2013                Field::new("c32", DataType::LargeBinary, true),
2014                Field::new("c33", DataType::LargeUtf8, true),
2015                Field::new_large_list(
2016                    "c34",
2017                    Field::new_list(
2018                        "inner",
2019                        Field::new_list_field(
2020                            DataType::Struct(
2021                                vec![
2022                                    Field::new("a", DataType::Int16, true),
2023                                    Field::new("b", DataType::Float64, true),
2024                                ]
2025                                .into(),
2026                            ),
2027                            true,
2028                        ),
2029                        true,
2030                    ),
2031                    true,
2032                ),
2033                Field::new("c35", DataType::Null, true),
2034                Field::new("c36", DataType::Decimal128(2, 1), false),
2035                Field::new("c37", DataType::Decimal256(50, 20), false),
2036                Field::new("c38", DataType::Decimal128(18, 12), true),
2037                Field::new_map(
2038                    "c39",
2039                    "key_value",
2040                    Field::new("key", DataType::Utf8, false),
2041                    Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
2042                    false, // fails to roundtrip keys_sorted
2043                    true,
2044                ),
2045                Field::new_map(
2046                    "c40",
2047                    "my_entries",
2048                    Field::new("my_key", DataType::Utf8, false)
2049                        .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])),
2050                    Field::new_list(
2051                        "my_value",
2052                        Field::new_list_field(DataType::Utf8, true)
2053                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])),
2054                        true,
2055                    )
2056                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])),
2057                    false, // fails to roundtrip keys_sorted
2058                    true,
2059                )
2060                .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])),
2061                Field::new_map(
2062                    "c41",
2063                    "my_entries",
2064                    Field::new("my_key", DataType::Utf8, false),
2065                    Field::new_list(
2066                        "my_value",
2067                        Field::new_list_field(DataType::Utf8, true)
2068                            .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])),
2069                        true,
2070                    ),
2071                    false, // fails to roundtrip keys_sorted
2072                    false,
2073                ),
2074                Field::new("c42", DataType::Decimal32(5, 2), false),
2075                Field::new("c43", DataType::Decimal64(18, 12), true),
2076            ],
2077            meta(&[("Key", "Value")]),
2078        );
2079
2080        // write to an empty parquet file so that schema is serialized
2081        let file = tempfile::tempfile().unwrap();
2082        let writer =
2083            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2084        writer.close()?;
2085
2086        // read file back
2087        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2088
2089        // Check arrow schema
2090        let read_schema = arrow_reader.schema();
2091        assert_eq!(&schema, read_schema.as_ref());
2092
2093        // Walk schema finding field IDs
2094        let mut stack = Vec::with_capacity(10);
2095        let mut out = Vec::with_capacity(10);
2096
2097        let root = arrow_reader.parquet_schema().root_schema_ptr();
2098        stack.push((root.name().to_string(), root));
2099
2100        while let Some((p, t)) = stack.pop() {
2101            if t.is_group() {
2102                for f in t.get_fields() {
2103                    stack.push((format!("{p}.{}", f.name()), f.clone()))
2104                }
2105            }
2106
2107            let info = t.get_basic_info();
2108            if info.has_id() {
2109                out.push(format!("{p} -> {}", info.id()))
2110            }
2111        }
2112        out.sort_unstable();
2113        let out: Vec<_> = out.iter().map(|x| x.as_str()).collect();
2114
2115        assert_eq!(
2116            &out,
2117            &[
2118                "arrow_schema.c1 -> 2",
2119                "arrow_schema.c21 -> 4",
2120                "arrow_schema.c21.list.item -> 5",
2121                "arrow_schema.c31 -> 6",
2122                "arrow_schema.c40 -> 7",
2123                "arrow_schema.c40.my_entries.my_key -> 8",
2124                "arrow_schema.c40.my_entries.my_value -> 9",
2125                "arrow_schema.c40.my_entries.my_value.list.item -> 10",
2126                "arrow_schema.c41.my_entries.my_value.list.item -> 11",
2127            ]
2128        );
2129
2130        Ok(())
2131    }
2132
2133    #[test]
2134    fn test_read_parquet_field_ids_raw() -> Result<()> {
2135        let meta = |a: &[(&str, &str)]| -> HashMap<String, String> {
2136            a.iter()
2137                .map(|(a, b)| (a.to_string(), b.to_string()))
2138                .collect()
2139        };
2140        let schema = Schema::new_with_metadata(
2141            vec![
2142                Field::new("c1", DataType::Utf8, true)
2143                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])),
2144                Field::new("c2", DataType::Utf8, true)
2145                    .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])),
2146            ],
2147            HashMap::new(),
2148        );
2149
2150        let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?;
2151        let parquet_bytes = writer.into_inner()?;
2152
2153        let reader =
2154            crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?;
2155        let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr();
2156
2157        // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema
2158        let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
2159
2160        let parq_schema_descr = ArrowSchemaConverter::new()
2161            .with_coerce_types(true)
2162            .convert(&arrow_schema)?;
2163        let parq_fields = parq_schema_descr.root_schema().get_fields();
2164        assert_eq!(parq_fields.len(), 2);
2165        assert_eq!(parq_fields[0].get_basic_info().id(), 1);
2166        assert_eq!(parq_fields[1].get_basic_info().id(), 2);
2167
2168        Ok(())
2169    }
2170
2171    #[test]
2172    fn test_arrow_schema_roundtrip_lists() -> Result<()> {
2173        let metadata: HashMap<String, String> = [("Key".to_string(), "Value".to_string())]
2174            .iter()
2175            .cloned()
2176            .collect();
2177
2178        let schema = Schema::new_with_metadata(
2179            vec![
2180                Field::new_list("c21", Field::new("array", DataType::Boolean, true), false),
2181                Field::new(
2182                    "c22",
2183                    DataType::FixedSizeList(
2184                        Arc::new(Field::new("items", DataType::Boolean, false)),
2185                        5,
2186                    ),
2187                    false,
2188                ),
2189                Field::new_list(
2190                    "c23",
2191                    Field::new_large_list(
2192                        "items",
2193                        Field::new_struct(
2194                            "items",
2195                            vec![
2196                                Field::new("a", DataType::Int16, true),
2197                                Field::new("b", DataType::Float64, false),
2198                            ],
2199                            true,
2200                        ),
2201                        true,
2202                    ),
2203                    true,
2204                ),
2205            ],
2206            metadata,
2207        );
2208
2209        // write to an empty parquet file so that schema is serialized
2210        let file = tempfile::tempfile().unwrap();
2211        let writer =
2212            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?;
2213        writer.close()?;
2214
2215        // read file back
2216        let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2217        let read_schema = arrow_reader.schema();
2218        assert_eq!(&schema, read_schema.as_ref());
2219        Ok(())
2220    }
2221
2222    #[test]
2223    fn test_get_arrow_schema_from_metadata() {
2224        assert!(get_arrow_schema_from_metadata("").is_err());
2225    }
2226
2227    #[test]
2228    #[cfg(feature = "arrow_canonical_extension_types")]
2229    fn arrow_uuid_to_parquet_uuid() -> Result<()> {
2230        let arrow_schema = Schema::new(vec![Field::new(
2231            "uuid",
2232            DataType::FixedSizeBinary(16),
2233            false,
2234        )
2235        .with_extension_type(Uuid)]);
2236
2237        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2238
2239        assert_eq!(
2240            parquet_schema.column(0).logical_type(),
2241            Some(LogicalType::Uuid)
2242        );
2243
2244        // TODO: roundtrip
2245        // let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2246        // assert_eq!(arrow_schema.field(0).try_extension_type::<Uuid>()?, Uuid);
2247
2248        Ok(())
2249    }
2250
2251    #[test]
2252    #[cfg(feature = "arrow_canonical_extension_types")]
2253    fn arrow_json_to_parquet_json() -> Result<()> {
2254        let arrow_schema = Schema::new(vec![
2255            Field::new("json", DataType::Utf8, false).with_extension_type(Json::default())
2256        ]);
2257
2258        let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;
2259
2260        assert_eq!(
2261            parquet_schema.column(0).logical_type(),
2262            Some(LogicalType::Json)
2263        );
2264
2265        // TODO: roundtrip
2266        // https://github.com/apache/arrow-rs/issues/7063
2267        // let arrow_schema = parquet_to_arrow_schema(&parquet_schema, None)?;
2268        // assert_eq!(
2269        //     arrow_schema.field(0).try_extension_type::<Json>()?,
2270        //     Json::default()
2271        // );
2272
2273        Ok(())
2274    }
2275}