parquet/arrow/schema/
primitive.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::basic::{ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType};
19use crate::errors::{ParquetError, Result};
20use crate::schema::types::{BasicTypeInfo, Type};
21use arrow_schema::{DataType, IntervalUnit, TimeUnit, DECIMAL128_MAX_PRECISION};
22
23/// Converts [`Type`] to [`DataType`] with an optional `arrow_type_hint`
24/// provided by the arrow schema
25///
26/// Note: the values embedded in the schema are advisory,
27pub fn convert_primitive(
28    parquet_type: &Type,
29    arrow_type_hint: Option<DataType>,
30) -> Result<DataType> {
31    let physical_type = from_parquet(parquet_type)?;
32    Ok(match arrow_type_hint {
33        Some(hint) => apply_hint(physical_type, hint),
34        None => physical_type,
35    })
36}
37
38/// Uses an type hint from the embedded arrow schema to aid in faithfully
39/// reproducing the data as it was written into parquet
40fn apply_hint(parquet: DataType, hint: DataType) -> DataType {
41    match (&parquet, &hint) {
42        // Not all time units can be represented as LogicalType / ConvertedType
43        (DataType::Int32 | DataType::Int64, DataType::Timestamp(_, _)) => hint,
44        (DataType::Int32, DataType::Time32(_)) => hint,
45        (DataType::Int64, DataType::Time64(_)) => hint,
46
47        // Date64 doesn't have a corresponding LogicalType / ConvertedType
48        (DataType::Int64, DataType::Date64) => hint,
49
50        // Coerce Date32 back to Date64 (#1666)
51        (DataType::Date32, DataType::Date64) => hint,
52
53        // Timestamps of the same resolution can be converted to a a different timezone.
54        (DataType::Timestamp(p, _), DataType::Timestamp(h, Some(_))) if p == h => hint,
55
56        // INT96 default to Timestamp(TimeUnit::Nanosecond, None) (see from_parquet below).
57        // Allow different resolutions to support larger date ranges.
58        (
59            DataType::Timestamp(TimeUnit::Nanosecond, None),
60            DataType::Timestamp(TimeUnit::Second, _),
61        ) => hint,
62        (
63            DataType::Timestamp(TimeUnit::Nanosecond, None),
64            DataType::Timestamp(TimeUnit::Millisecond, _),
65        ) => hint,
66        (
67            DataType::Timestamp(TimeUnit::Nanosecond, None),
68            DataType::Timestamp(TimeUnit::Microsecond, _),
69        ) => hint,
70
71        // Determine offset size
72        (DataType::Utf8, DataType::LargeUtf8) => hint,
73        (DataType::Binary, DataType::LargeBinary) => hint,
74
75        // Read as Utf8
76        (DataType::Binary, DataType::Utf8) => hint,
77        (DataType::Binary, DataType::LargeUtf8) => hint,
78        (DataType::Binary, DataType::Utf8View) => hint,
79
80        // Determine view type
81        (DataType::Utf8, DataType::Utf8View) => hint,
82        (DataType::Binary, DataType::BinaryView) => hint,
83
84        // Determine interval time unit (#1666)
85        (DataType::Interval(_), DataType::Interval(_)) => hint,
86
87        // Promote to Decimal256
88        (DataType::Decimal128(_, _), DataType::Decimal256(_, _)) => hint,
89
90        // Potentially preserve dictionary encoding
91        (_, DataType::Dictionary(_, value)) => {
92            // Apply hint to inner type
93            let hinted = apply_hint(parquet, value.as_ref().clone());
94
95            // If matches dictionary value - preserve dictionary
96            // otherwise use hinted inner type
97            match &hinted == value.as_ref() {
98                true => hint,
99                false => hinted,
100            }
101        }
102        _ => parquet,
103    }
104}
105
106fn from_parquet(parquet_type: &Type) -> Result<DataType> {
107    match parquet_type {
108        Type::PrimitiveType {
109            physical_type,
110            basic_info,
111            type_length,
112            scale,
113            precision,
114            ..
115        } => match physical_type {
116            PhysicalType::BOOLEAN => Ok(DataType::Boolean),
117            PhysicalType::INT32 => from_int32(basic_info, *scale, *precision),
118            PhysicalType::INT64 => from_int64(basic_info, *scale, *precision),
119            PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
120            PhysicalType::FLOAT => Ok(DataType::Float32),
121            PhysicalType::DOUBLE => Ok(DataType::Float64),
122            PhysicalType::BYTE_ARRAY => from_byte_array(basic_info, *precision, *scale),
123            PhysicalType::FIXED_LEN_BYTE_ARRAY => {
124                from_fixed_len_byte_array(basic_info, *scale, *precision, *type_length)
125            }
126        },
127        Type::GroupType { .. } => unreachable!(),
128    }
129}
130
131fn decimal_type(scale: i32, precision: i32) -> Result<DataType> {
132    if precision <= DECIMAL128_MAX_PRECISION as i32 {
133        decimal_128_type(scale, precision)
134    } else {
135        decimal_256_type(scale, precision)
136    }
137}
138
139fn decimal_128_type(scale: i32, precision: i32) -> Result<DataType> {
140    let scale = scale
141        .try_into()
142        .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
143
144    let precision = precision
145        .try_into()
146        .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
147
148    Ok(DataType::Decimal128(precision, scale))
149}
150
151fn decimal_256_type(scale: i32, precision: i32) -> Result<DataType> {
152    let scale = scale
153        .try_into()
154        .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
155
156    let precision = precision
157        .try_into()
158        .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
159
160    Ok(DataType::Decimal256(precision, scale))
161}
162
163fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
164    match (info.logical_type(), info.converted_type()) {
165        (None, ConvertedType::NONE) => Ok(DataType::Int32),
166        (
167            Some(
168                ref t @ LogicalType::Integer {
169                    bit_width,
170                    is_signed,
171                },
172            ),
173            _,
174        ) => match (bit_width, is_signed) {
175            (8, true) => Ok(DataType::Int8),
176            (16, true) => Ok(DataType::Int16),
177            (32, true) => Ok(DataType::Int32),
178            (8, false) => Ok(DataType::UInt8),
179            (16, false) => Ok(DataType::UInt16),
180            (32, false) => Ok(DataType::UInt32),
181            _ => Err(arrow_err!("Cannot create INT32 physical type from {:?}", t)),
182        },
183        (Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision),
184        (Some(LogicalType::Date), _) => Ok(DataType::Date32),
185        (Some(LogicalType::Time { unit, .. }), _) => match unit {
186            ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)),
187            _ => Err(arrow_err!(
188                "Cannot create INT32 physical type from {:?}",
189                unit
190            )),
191        },
192        // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#unknown-always-null
193        (Some(LogicalType::Unknown), _) => Ok(DataType::Null),
194        (None, ConvertedType::UINT_8) => Ok(DataType::UInt8),
195        (None, ConvertedType::UINT_16) => Ok(DataType::UInt16),
196        (None, ConvertedType::UINT_32) => Ok(DataType::UInt32),
197        (None, ConvertedType::INT_8) => Ok(DataType::Int8),
198        (None, ConvertedType::INT_16) => Ok(DataType::Int16),
199        (None, ConvertedType::INT_32) => Ok(DataType::Int32),
200        (None, ConvertedType::DATE) => Ok(DataType::Date32),
201        (None, ConvertedType::TIME_MILLIS) => Ok(DataType::Time32(TimeUnit::Millisecond)),
202        (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
203        (logical, converted) => Err(arrow_err!(
204            "Unable to convert parquet INT32 logical type {:?} or converted type {}",
205            logical,
206            converted
207        )),
208    }
209}
210
211fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
212    match (info.logical_type(), info.converted_type()) {
213        (None, ConvertedType::NONE) => Ok(DataType::Int64),
214        (
215            Some(LogicalType::Integer {
216                bit_width: 64,
217                is_signed,
218            }),
219            _,
220        ) => match is_signed {
221            true => Ok(DataType::Int64),
222            false => Ok(DataType::UInt64),
223        },
224        (Some(LogicalType::Time { unit, .. }), _) => match unit {
225            ParquetTimeUnit::MILLIS(_) => {
226                Err(arrow_err!("Cannot create INT64 from MILLIS time unit",))
227            }
228            ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)),
229            ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)),
230        },
231        (
232            Some(LogicalType::Timestamp {
233                is_adjusted_to_u_t_c,
234                unit,
235            }),
236            _,
237        ) => Ok(DataType::Timestamp(
238            match unit {
239                ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond,
240                ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond,
241                ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond,
242            },
243            if is_adjusted_to_u_t_c {
244                Some("UTC".into())
245            } else {
246                None
247            },
248        )),
249        (None, ConvertedType::INT_64) => Ok(DataType::Int64),
250        (None, ConvertedType::UINT_64) => Ok(DataType::UInt64),
251        (None, ConvertedType::TIME_MICROS) => Ok(DataType::Time64(TimeUnit::Microsecond)),
252        (None, ConvertedType::TIMESTAMP_MILLIS) => Ok(DataType::Timestamp(
253            TimeUnit::Millisecond,
254            Some("UTC".into()),
255        )),
256        (None, ConvertedType::TIMESTAMP_MICROS) => Ok(DataType::Timestamp(
257            TimeUnit::Microsecond,
258            Some("UTC".into()),
259        )),
260        (Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision),
261        (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
262        (logical, converted) => Err(arrow_err!(
263            "Unable to convert parquet INT64 logical type {:?} or converted type {}",
264            logical,
265            converted
266        )),
267    }
268}
269
270fn from_byte_array(info: &BasicTypeInfo, precision: i32, scale: i32) -> Result<DataType> {
271    match (info.logical_type(), info.converted_type()) {
272        (Some(LogicalType::String), _) => Ok(DataType::Utf8),
273        (Some(LogicalType::Json), _) => Ok(DataType::Utf8),
274        (Some(LogicalType::Bson), _) => Ok(DataType::Binary),
275        (Some(LogicalType::Enum), _) => Ok(DataType::Binary),
276        (None, ConvertedType::NONE) => Ok(DataType::Binary),
277        (None, ConvertedType::JSON) => Ok(DataType::Utf8),
278        (None, ConvertedType::BSON) => Ok(DataType::Binary),
279        (None, ConvertedType::ENUM) => Ok(DataType::Binary),
280        (None, ConvertedType::UTF8) => Ok(DataType::Utf8),
281        (
282            Some(LogicalType::Decimal {
283                scale: s,
284                precision: p,
285            }),
286            _,
287        ) => decimal_type(s, p),
288        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
289        (logical, converted) => Err(arrow_err!(
290            "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}",
291            logical,
292            converted
293        )),
294    }
295}
296
297fn from_fixed_len_byte_array(
298    info: &BasicTypeInfo,
299    scale: i32,
300    precision: i32,
301    type_length: i32,
302) -> Result<DataType> {
303    // TODO: This should check the type length for the decimal and interval types
304    match (info.logical_type(), info.converted_type()) {
305        (Some(LogicalType::Decimal { scale, precision }), _) => {
306            if type_length <= 16 {
307                decimal_128_type(scale, precision)
308            } else {
309                decimal_256_type(scale, precision)
310            }
311        }
312        (None, ConvertedType::DECIMAL) => {
313            if type_length <= 16 {
314                decimal_128_type(scale, precision)
315            } else {
316                decimal_256_type(scale, precision)
317            }
318        }
319        (None, ConvertedType::INTERVAL) => {
320            // There is currently no reliable way of determining which IntervalUnit
321            // to return. Thus without the original Arrow schema, the results
322            // would be incorrect if all 12 bytes of the interval are populated
323            Ok(DataType::Interval(IntervalUnit::DayTime))
324        }
325        (Some(LogicalType::Float16), _) => {
326            if type_length == 2 {
327                Ok(DataType::Float16)
328            } else {
329                Err(ParquetError::General(
330                    "FLOAT16 logical type must be Fixed Length Byte Array with length 2"
331                        .to_string(),
332                ))
333            }
334        }
335        _ => Ok(DataType::FixedSizeBinary(type_length)),
336    }
337}