parquet/arrow/schema/
primitive.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::basic::{ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType};
19use crate::errors::{ParquetError, Result};
20use crate::schema::types::{BasicTypeInfo, Type};
21use arrow_schema::{DataType, IntervalUnit, TimeUnit, DECIMAL128_MAX_PRECISION};
22
23/// Converts [`Type`] to [`DataType`] with an optional `arrow_type_hint`
24/// provided by the arrow schema
25///
26/// Note: the values embedded in the schema are advisory,
27pub fn convert_primitive(
28    parquet_type: &Type,
29    arrow_type_hint: Option<DataType>,
30) -> Result<DataType> {
31    let physical_type = from_parquet(parquet_type)?;
32    Ok(match arrow_type_hint {
33        Some(hint) => apply_hint(physical_type, hint),
34        None => physical_type,
35    })
36}
37
38/// Uses an type hint from the embedded arrow schema to aid in faithfully
39/// reproducing the data as it was written into parquet
40fn apply_hint(parquet: DataType, hint: DataType) -> DataType {
41    match (&parquet, &hint) {
42        // Not all time units can be represented as LogicalType / ConvertedType
43        (DataType::Int32 | DataType::Int64, DataType::Timestamp(_, _)) => hint,
44        (DataType::Int32, DataType::Time32(_)) => hint,
45        (DataType::Int64, DataType::Time64(_)) => hint,
46        (DataType::Int64, DataType::Duration(_)) => hint,
47
48        // Date64 doesn't have a corresponding LogicalType / ConvertedType
49        (DataType::Int64, DataType::Date64) => hint,
50
51        // Coerce Date32 back to Date64 (#1666)
52        (DataType::Date32, DataType::Date64) => hint,
53
54        // Timestamps of the same resolution can be converted to a a different timezone.
55        (DataType::Timestamp(p, _), DataType::Timestamp(h, Some(_))) if p == h => hint,
56
57        // INT96 default to Timestamp(TimeUnit::Nanosecond, None) (see from_parquet below).
58        // Allow different resolutions to support larger date ranges.
59        (
60            DataType::Timestamp(TimeUnit::Nanosecond, None),
61            DataType::Timestamp(TimeUnit::Second, _),
62        ) => hint,
63        (
64            DataType::Timestamp(TimeUnit::Nanosecond, None),
65            DataType::Timestamp(TimeUnit::Millisecond, _),
66        ) => hint,
67        (
68            DataType::Timestamp(TimeUnit::Nanosecond, None),
69            DataType::Timestamp(TimeUnit::Microsecond, _),
70        ) => hint,
71
72        // Determine offset size
73        (DataType::Utf8, DataType::LargeUtf8) => hint,
74        (DataType::Binary, DataType::LargeBinary) => hint,
75
76        // Read as Utf8
77        (DataType::Binary, DataType::Utf8) => hint,
78        (DataType::Binary, DataType::LargeUtf8) => hint,
79        (DataType::Binary, DataType::Utf8View) => hint,
80
81        // Determine view type
82        (DataType::Utf8, DataType::Utf8View) => hint,
83        (DataType::Binary, DataType::BinaryView) => hint,
84
85        // Determine interval time unit (#1666)
86        (DataType::Interval(_), DataType::Interval(_)) => hint,
87
88        // Promote to Decimal256 or narrow to Decimal32 or Decimal64
89        (DataType::Decimal128(_, _), DataType::Decimal32(_, _)) => hint,
90        (DataType::Decimal128(_, _), DataType::Decimal64(_, _)) => hint,
91        (DataType::Decimal128(_, _), DataType::Decimal256(_, _)) => hint,
92
93        // Potentially preserve dictionary encoding
94        (_, DataType::Dictionary(_, value)) => {
95            // Apply hint to inner type
96            let hinted = apply_hint(parquet, value.as_ref().clone());
97
98            // If matches dictionary value - preserve dictionary
99            // otherwise use hinted inner type
100            match &hinted == value.as_ref() {
101                true => hint,
102                false => hinted,
103            }
104        }
105        _ => parquet,
106    }
107}
108
109fn from_parquet(parquet_type: &Type) -> Result<DataType> {
110    match parquet_type {
111        Type::PrimitiveType {
112            physical_type,
113            basic_info,
114            type_length,
115            scale,
116            precision,
117            ..
118        } => match physical_type {
119            PhysicalType::BOOLEAN => Ok(DataType::Boolean),
120            PhysicalType::INT32 => from_int32(basic_info, *scale, *precision),
121            PhysicalType::INT64 => from_int64(basic_info, *scale, *precision),
122            PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
123            PhysicalType::FLOAT => Ok(DataType::Float32),
124            PhysicalType::DOUBLE => Ok(DataType::Float64),
125            PhysicalType::BYTE_ARRAY => from_byte_array(basic_info, *precision, *scale),
126            PhysicalType::FIXED_LEN_BYTE_ARRAY => {
127                from_fixed_len_byte_array(basic_info, *scale, *precision, *type_length)
128            }
129        },
130        Type::GroupType { .. } => unreachable!(),
131    }
132}
133
134fn decimal_type(scale: i32, precision: i32) -> Result<DataType> {
135    if precision <= DECIMAL128_MAX_PRECISION as i32 {
136        decimal_128_type(scale, precision)
137    } else {
138        decimal_256_type(scale, precision)
139    }
140}
141
142fn decimal_128_type(scale: i32, precision: i32) -> Result<DataType> {
143    let scale = scale
144        .try_into()
145        .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
146
147    let precision = precision
148        .try_into()
149        .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
150
151    Ok(DataType::Decimal128(precision, scale))
152}
153
154fn decimal_256_type(scale: i32, precision: i32) -> Result<DataType> {
155    let scale = scale
156        .try_into()
157        .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
158
159    let precision = precision
160        .try_into()
161        .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
162
163    Ok(DataType::Decimal256(precision, scale))
164}
165
166fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
167    match (info.logical_type(), info.converted_type()) {
168        (None, ConvertedType::NONE) => Ok(DataType::Int32),
169        (
170            Some(
171                ref t @ LogicalType::Integer {
172                    bit_width,
173                    is_signed,
174                },
175            ),
176            _,
177        ) => match (bit_width, is_signed) {
178            (8, true) => Ok(DataType::Int8),
179            (16, true) => Ok(DataType::Int16),
180            (32, true) => Ok(DataType::Int32),
181            (8, false) => Ok(DataType::UInt8),
182            (16, false) => Ok(DataType::UInt16),
183            (32, false) => Ok(DataType::UInt32),
184            _ => Err(arrow_err!("Cannot create INT32 physical type from {:?}", t)),
185        },
186        (Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision),
187        (Some(LogicalType::Date), _) => Ok(DataType::Date32),
188        (Some(LogicalType::Time { unit, .. }), _) => match unit {
189            ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)),
190            _ => Err(arrow_err!(
191                "Cannot create INT32 physical type from {:?}",
192                unit
193            )),
194        },
195        // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#unknown-always-null
196        (Some(LogicalType::Unknown), _) => Ok(DataType::Null),
197        (None, ConvertedType::UINT_8) => Ok(DataType::UInt8),
198        (None, ConvertedType::UINT_16) => Ok(DataType::UInt16),
199        (None, ConvertedType::UINT_32) => Ok(DataType::UInt32),
200        (None, ConvertedType::INT_8) => Ok(DataType::Int8),
201        (None, ConvertedType::INT_16) => Ok(DataType::Int16),
202        (None, ConvertedType::INT_32) => Ok(DataType::Int32),
203        (None, ConvertedType::DATE) => Ok(DataType::Date32),
204        (None, ConvertedType::TIME_MILLIS) => Ok(DataType::Time32(TimeUnit::Millisecond)),
205        (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
206        (logical, converted) => Err(arrow_err!(
207            "Unable to convert parquet INT32 logical type {:?} or converted type {}",
208            logical,
209            converted
210        )),
211    }
212}
213
214fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
215    match (info.logical_type(), info.converted_type()) {
216        (None, ConvertedType::NONE) => Ok(DataType::Int64),
217        (
218            Some(LogicalType::Integer {
219                bit_width: 64,
220                is_signed,
221            }),
222            _,
223        ) => match is_signed {
224            true => Ok(DataType::Int64),
225            false => Ok(DataType::UInt64),
226        },
227        (Some(LogicalType::Time { unit, .. }), _) => match unit {
228            ParquetTimeUnit::MILLIS(_) => {
229                Err(arrow_err!("Cannot create INT64 from MILLIS time unit",))
230            }
231            ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)),
232            ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)),
233        },
234        (
235            Some(LogicalType::Timestamp {
236                is_adjusted_to_u_t_c,
237                unit,
238            }),
239            _,
240        ) => Ok(DataType::Timestamp(
241            match unit {
242                ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond,
243                ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond,
244                ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond,
245            },
246            if is_adjusted_to_u_t_c {
247                Some("UTC".into())
248            } else {
249                None
250            },
251        )),
252        (None, ConvertedType::INT_64) => Ok(DataType::Int64),
253        (None, ConvertedType::UINT_64) => Ok(DataType::UInt64),
254        (None, ConvertedType::TIME_MICROS) => Ok(DataType::Time64(TimeUnit::Microsecond)),
255        (None, ConvertedType::TIMESTAMP_MILLIS) => Ok(DataType::Timestamp(
256            TimeUnit::Millisecond,
257            Some("UTC".into()),
258        )),
259        (None, ConvertedType::TIMESTAMP_MICROS) => Ok(DataType::Timestamp(
260            TimeUnit::Microsecond,
261            Some("UTC".into()),
262        )),
263        (Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision),
264        (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
265        (logical, converted) => Err(arrow_err!(
266            "Unable to convert parquet INT64 logical type {:?} or converted type {}",
267            logical,
268            converted
269        )),
270    }
271}
272
273fn from_byte_array(info: &BasicTypeInfo, precision: i32, scale: i32) -> Result<DataType> {
274    match (info.logical_type(), info.converted_type()) {
275        (Some(LogicalType::String), _) => Ok(DataType::Utf8),
276        (Some(LogicalType::Json), _) => Ok(DataType::Utf8),
277        (Some(LogicalType::Bson), _) => Ok(DataType::Binary),
278        (Some(LogicalType::Enum), _) => Ok(DataType::Binary),
279        (None, ConvertedType::NONE) => Ok(DataType::Binary),
280        (None, ConvertedType::JSON) => Ok(DataType::Utf8),
281        (None, ConvertedType::BSON) => Ok(DataType::Binary),
282        (None, ConvertedType::ENUM) => Ok(DataType::Binary),
283        (None, ConvertedType::UTF8) => Ok(DataType::Utf8),
284        (
285            Some(LogicalType::Decimal {
286                scale: s,
287                precision: p,
288            }),
289            _,
290        ) => decimal_type(s, p),
291        (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
292        (logical, converted) => Err(arrow_err!(
293            "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}",
294            logical,
295            converted
296        )),
297    }
298}
299
300fn from_fixed_len_byte_array(
301    info: &BasicTypeInfo,
302    scale: i32,
303    precision: i32,
304    type_length: i32,
305) -> Result<DataType> {
306    // TODO: This should check the type length for the decimal and interval types
307    match (info.logical_type(), info.converted_type()) {
308        (Some(LogicalType::Decimal { scale, precision }), _) => {
309            if type_length <= 16 {
310                decimal_128_type(scale, precision)
311            } else {
312                decimal_256_type(scale, precision)
313            }
314        }
315        (None, ConvertedType::DECIMAL) => {
316            if type_length <= 16 {
317                decimal_128_type(scale, precision)
318            } else {
319                decimal_256_type(scale, precision)
320            }
321        }
322        (None, ConvertedType::INTERVAL) => {
323            // There is currently no reliable way of determining which IntervalUnit
324            // to return. Thus without the original Arrow schema, the results
325            // would be incorrect if all 12 bytes of the interval are populated
326            Ok(DataType::Interval(IntervalUnit::DayTime))
327        }
328        (Some(LogicalType::Float16), _) => {
329            if type_length == 2 {
330                Ok(DataType::Float16)
331            } else {
332                Err(ParquetError::General(
333                    "FLOAT16 logical type must be Fixed Length Byte Array with length 2"
334                        .to_string(),
335                ))
336            }
337        }
338        _ => Ok(DataType::FixedSizeBinary(type_length)),
339    }
340}