parquet/arrow/schema/
primitive.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::basic::{ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType};
19use crate::errors::{ParquetError, Result};
20use crate::schema::types::{BasicTypeInfo, Type};
21use arrow_schema::{DataType, IntervalUnit, TimeUnit, DECIMAL128_MAX_PRECISION};
22
23/// Converts [`Type`] to [`DataType`] with an optional `arrow_type_hint`
24/// provided by the arrow schema
25///
26/// Note: the values embedded in the schema are advisory,
27pub fn convert_primitive(
28    parquet_type: &Type,
29    arrow_type_hint: Option<DataType>,
30) -> Result<DataType> {
31    let physical_type = from_parquet(parquet_type)?;
32    Ok(match arrow_type_hint {
33        Some(hint) => apply_hint(physical_type, hint),
34        None => physical_type,
35    })
36}
37
38/// Uses an type hint from the embedded arrow schema to aid in faithfully
39/// reproducing the data as it was written into parquet
40fn apply_hint(parquet: DataType, hint: DataType) -> DataType {
41    match (&parquet, &hint) {
42        // Not all time units can be represented as LogicalType / ConvertedType
43        (DataType::Int32 | DataType::Int64, DataType::Timestamp(_, _)) => hint,
44        (DataType::Int32, DataType::Time32(_)) => hint,
45        (DataType::Int64, DataType::Time64(_)) => hint,
46        (DataType::Int64, DataType::Duration(_)) => hint,
47
48        // Date64 doesn't have a corresponding LogicalType / ConvertedType
49        (DataType::Int64, DataType::Date64) => hint,
50
51        // Coerce Date32 back to Date64 (#1666)
52        (DataType::Date32, DataType::Date64) => hint,
53
54        // Timestamps of the same resolution can be converted to a a different timezone.
55        (DataType::Timestamp(p, _), DataType::Timestamp(h, Some(_))) if p == h => hint,
56
57        // INT96 default to Timestamp(TimeUnit::Nanosecond, None) (see from_parquet below).
58        // Allow different resolutions to support larger date ranges.
59        (
60            DataType::Timestamp(TimeUnit::Nanosecond, None),
61            DataType::Timestamp(TimeUnit::Second, _),
62        ) => hint,
63        (
64            DataType::Timestamp(TimeUnit::Nanosecond, None),
65            DataType::Timestamp(TimeUnit::Millisecond, _),
66        ) => hint,
67        (
68            DataType::Timestamp(TimeUnit::Nanosecond, None),
69            DataType::Timestamp(TimeUnit::Microsecond, _),
70        ) => hint,
71
72        // Determine offset size
73        (DataType::Utf8, DataType::LargeUtf8) => hint,
74        (DataType::Binary, DataType::LargeBinary) => hint,
75
76        // Read as Utf8
77        (DataType::Binary, DataType::Utf8) => hint,
78        (DataType::Binary, DataType::LargeUtf8) => hint,
79        (DataType::Binary, DataType::Utf8View) => hint,
80
81        // Determine view type
82        (DataType::Utf8, DataType::Utf8View) => hint,
83        (DataType::Binary, DataType::BinaryView) => hint,
84
85        // Determine interval time unit (#1666)
86        (DataType::Interval(_), DataType::Interval(_)) => hint,
87
88        // Promote to Decimal256
89        (DataType::Decimal128(_, _), DataType::Decimal256(_, _)) => hint,
90
91        // Potentially preserve dictionary encoding
92        (_, DataType::Dictionary(_, value)) => {
93            // Apply hint to inner type
94            let hinted = apply_hint(parquet, value.as_ref().clone());
95
96            // If matches dictionary value - preserve dictionary
97            // otherwise use hinted inner type
98            match &hinted == value.as_ref() {
99                true => hint,
100                false => hinted,
101            }
102        }
103        _ => parquet,
104    }
105}
106
107fn from_parquet(parquet_type: &Type) -> Result<DataType> {
108    match parquet_type {
109        Type::PrimitiveType {
110            physical_type,
111            basic_info,
112            type_length,
113            scale,
114            precision,
115            ..
116        } => match physical_type {
117            PhysicalType::BOOLEAN => Ok(DataType::Boolean),
118            PhysicalType::INT32 => from_int32(basic_info, *scale, *precision),
119            PhysicalType::INT64 => from_int64(basic_info, *scale, *precision),
120            PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
121            PhysicalType::FLOAT => Ok(DataType::Float32),
122            PhysicalType::DOUBLE => Ok(DataType::Float64),
123            PhysicalType::BYTE_ARRAY => from_byte_array(basic_info, *precision, *scale),
124            PhysicalType::FIXED_LEN_BYTE_ARRAY => {
125                from_fixed_len_byte_array(basic_info, *scale, *precision, *type_length)
126            }
127        },
128        Type::GroupType { .. } => unreachable!(),
129    }
130}
131
132fn decimal_type(scale: i32, precision: i32) -> Result<DataType> {
133    if precision <= DECIMAL128_MAX_PRECISION as i32 {
134        decimal_128_type(scale, precision)
135    } else {
136        decimal_256_type(scale, precision)
137    }
138}
139
140fn decimal_128_type(scale: i32, precision: i32) -> Result<DataType> {
141    let scale = scale
142        .try_into()
143        .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
144
145    let precision = precision
146        .try_into()
147        .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
148
149    Ok(DataType::Decimal128(precision, scale))
150}
151
152fn decimal_256_type(scale: i32, precision: i32) -> Result<DataType> {
153    let scale = scale
154        .try_into()
155        .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
156
157    let precision = precision
158        .try_into()
159        .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
160
161    Ok(DataType::Decimal256(precision, scale))
162}
163
164fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
165    match (info.logical_type(), info.converted_type()) {
166        (None, ConvertedType::NONE) => Ok(DataType::Int32),
167        (
168            Some(
169                ref t @ LogicalType::Integer {
170                    bit_width,
171                    is_signed,
172                },
173            ),
174            _,
175        ) => match (bit_width, is_signed) {
176            (8, true) => Ok(DataType::Int8),
177            (16, true) => Ok(DataType::Int16),
178            (32, true) => Ok(DataType::Int32),
179            (8, false) => Ok(DataType::UInt8),
180            (16, false) => Ok(DataType::UInt16),
181            (32, false) => Ok(DataType::UInt32),
182            _ => Err(arrow_err!("Cannot create INT32 physical type from {:?}", t)),
183        },
184        (Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision),
185        (Some(LogicalType::Date), _) => Ok(DataType::Date32),
186        (Some(LogicalType::Time { unit, .. }), _) => match unit {
187            ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)),
188            _ => Err(arrow_err!(
189                "Cannot create INT32 physical type from {:?}",
190                unit
191            )),
192        },
193        // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#unknown-always-null
194        (Some(LogicalType::Unknown), _) => Ok(DataType::Null),
195        (None, ConvertedType::UINT_8) => Ok(DataType::UInt8),
196        (None, ConvertedType::UINT_16) => Ok(DataType::UInt16),
197        (None, ConvertedType::UINT_32) => Ok(DataType::UInt32),
198        (None, ConvertedType::INT_8) => Ok(DataType::Int8),
199        (None, ConvertedType::INT_16) => Ok(DataType::Int16),
200        (None, ConvertedType::INT_32) => Ok(DataType::Int32),
201        (None, ConvertedType::DATE) => Ok(DataType::Date32),
202        (None, ConvertedType::TIME_MILLIS) => Ok(DataType::Time32(TimeUnit::Millisecond)),
203        (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
204        (logical, converted) => Err(arrow_err!(
205            "Unable to convert parquet INT32 logical type {:?} or converted type {}",
206            logical,
207            converted
208        )),
209    }
210}
211
212fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
213    match (info.logical_type(), info.converted_type()) {
214        (None, ConvertedType::NONE) => Ok(DataType::Int64),
215        (
216            Some(LogicalType::Integer {
217                bit_width: 64,
218                is_signed,
219            }),
220            _,
221        ) => match is_signed {
222            true => Ok(DataType::Int64),
223            false => Ok(DataType::UInt64),
224        },
225        (Some(LogicalType::Time { unit, .. }), _) => match unit {
226            ParquetTimeUnit::MILLIS(_) => {
227                Err(arrow_err!("Cannot create INT64 from MILLIS time unit",))
228            }
229            ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)),
230            ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)),
231        },
232        (
233            Some(LogicalType::Timestamp {
234                is_adjusted_to_u_t_c,
235                unit,
236            }),
237            _,
238        ) => Ok(DataType::Timestamp(
239            match unit {
240                ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond,
241                ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond,
242                ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond,
243            },
244            if is_adjusted_to_u_t_c {
245                Some("UTC".into())
246            } else {
247                None
248            },
249        )),
250        (None, ConvertedType::INT_64) => Ok(DataType::Int64),
251        (None, ConvertedType::UINT_64) => Ok(DataType::UInt64),
252        (None, ConvertedType::TIME_MICROS) => Ok(DataType::Time64(TimeUnit::Microsecond)),
253        (None, ConvertedType::TIMESTAMP_MILLIS) => Ok(DataType::Timestamp(
254            TimeUnit::Millisecond,
255            Some("UTC".into()),
256        )),
257        (None, ConvertedType::TIMESTAMP_MICROS) => Ok(DataType::Timestamp(
258            TimeUnit::Microsecond,
259            Some("UTC".into()),
260        )),
261        (Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision),
262        (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
263        (logical, converted) => Err(arrow_err!(
264            "Unable to convert parquet INT64 logical type {:?} or converted type {}",
265            logical,
266            converted
267        )),
268    }
269}
270
271fn from_byte_array(info: &BasicTypeInfo, precision: i32, scale: i32) -> Result<DataType> {
272    match (info.logical_type(), info.converted_type()) {
273        (Some(LogicalType::String), _) => Ok(DataType::Utf8),
274        (Some(LogicalType::Json), _) => Ok(DataType::Utf8),
275        (Some(LogicalType::Bson), _) => Ok(DataType::Binary),
276        (Some(LogicalType::Enum), _) => Ok(DataType::Binary),
277        (None, ConvertedType::NONE) => Ok(DataType::Binary),
278        (None, ConvertedType::JSON) => Ok(DataType::Utf8),
279        (None, ConvertedType::BSON) => Ok(DataType::Binary),
280        (None, ConvertedType::ENUM) => Ok(DataType::Binary),
281        (None, ConvertedType::UTF8) => Ok(DataType::Utf8),
282        (
283            Some(LogicalType::Decimal {
284                scale: s,
285                precision: p,
286            }),
287            _,
288        ) => decimal_type(s, p),
289        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
290        (logical, converted) => Err(arrow_err!(
291            "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}",
292            logical,
293            converted
294        )),
295    }
296}
297
298fn from_fixed_len_byte_array(
299    info: &BasicTypeInfo,
300    scale: i32,
301    precision: i32,
302    type_length: i32,
303) -> Result<DataType> {
304    // TODO: This should check the type length for the decimal and interval types
305    match (info.logical_type(), info.converted_type()) {
306        (Some(LogicalType::Decimal { scale, precision }), _) => {
307            if type_length <= 16 {
308                decimal_128_type(scale, precision)
309            } else {
310                decimal_256_type(scale, precision)
311            }
312        }
313        (None, ConvertedType::DECIMAL) => {
314            if type_length <= 16 {
315                decimal_128_type(scale, precision)
316            } else {
317                decimal_256_type(scale, precision)
318            }
319        }
320        (None, ConvertedType::INTERVAL) => {
321            // There is currently no reliable way of determining which IntervalUnit
322            // to return. Thus without the original Arrow schema, the results
323            // would be incorrect if all 12 bytes of the interval are populated
324            Ok(DataType::Interval(IntervalUnit::DayTime))
325        }
326        (Some(LogicalType::Float16), _) => {
327            if type_length == 2 {
328                Ok(DataType::Float16)
329            } else {
330                Err(ParquetError::General(
331                    "FLOAT16 logical type must be Fixed Length Byte Array with length 2"
332                        .to_string(),
333                ))
334            }
335        }
336        _ => Ok(DataType::FixedSizeBinary(type_length)),
337    }
338}