1use crate::basic::{ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType};
19use crate::errors::{ParquetError, Result};
20use crate::schema::types::{BasicTypeInfo, Type};
21use arrow_schema::{DECIMAL128_MAX_PRECISION, DataType, IntervalUnit, TimeUnit};
22
23pub fn convert_primitive(
28 parquet_type: &Type,
29 arrow_type_hint: Option<DataType>,
30) -> Result<DataType> {
31 let physical_type = from_parquet(parquet_type)?;
32 Ok(match arrow_type_hint {
33 Some(hint) => apply_hint(physical_type, hint),
34 None => physical_type,
35 })
36}
37
38fn apply_hint(parquet: DataType, hint: DataType) -> DataType {
41 match (&parquet, &hint) {
42 (DataType::Int32 | DataType::Int64, DataType::Timestamp(_, _)) => hint,
44 (DataType::Int32, DataType::Time32(_)) => hint,
45 (DataType::Int64, DataType::Time64(_)) => hint,
46 (DataType::Int64, DataType::Duration(_)) => hint,
47
48 (DataType::Int64, DataType::Date64) => hint,
50
51 (DataType::Date32, DataType::Date64) => hint,
53
54 (DataType::Timestamp(p, _), DataType::Timestamp(h, Some(_))) if p == h => hint,
56
57 (
60 DataType::Timestamp(TimeUnit::Nanosecond, None),
61 DataType::Timestamp(TimeUnit::Second, _),
62 ) => hint,
63 (
64 DataType::Timestamp(TimeUnit::Nanosecond, None),
65 DataType::Timestamp(TimeUnit::Millisecond, _),
66 ) => hint,
67 (
68 DataType::Timestamp(TimeUnit::Nanosecond, None),
69 DataType::Timestamp(TimeUnit::Microsecond, _),
70 ) => hint,
71
72 (DataType::Utf8, DataType::LargeUtf8) => hint,
74 (DataType::Binary, DataType::LargeBinary) => hint,
75
76 (DataType::Binary, DataType::Utf8) => hint,
78 (DataType::Binary, DataType::LargeUtf8) => hint,
79 (DataType::Binary, DataType::Utf8View) => hint,
80
81 (DataType::Utf8, DataType::Utf8View) => hint,
83 (DataType::Binary, DataType::BinaryView) => hint,
84
85 (DataType::Interval(_), DataType::Interval(_)) => hint,
87
88 (DataType::Decimal128(_, _), DataType::Decimal32(_, _)) => hint,
90 (DataType::Decimal128(_, _), DataType::Decimal64(_, _)) => hint,
91 (DataType::Decimal128(_, _), DataType::Decimal256(_, _)) => hint,
92
93 (_, DataType::Dictionary(_, value)) => {
95 let hinted = apply_hint(parquet, value.as_ref().clone());
97
98 match &hinted == value.as_ref() {
101 true => hint,
102 false => hinted,
103 }
104 }
105 _ => parquet,
106 }
107}
108
109fn from_parquet(parquet_type: &Type) -> Result<DataType> {
110 match parquet_type {
111 Type::PrimitiveType {
112 physical_type,
113 basic_info,
114 type_length,
115 scale,
116 precision,
117 ..
118 } => match physical_type {
119 PhysicalType::BOOLEAN => Ok(DataType::Boolean),
120 PhysicalType::INT32 => from_int32(basic_info, *scale, *precision),
121 PhysicalType::INT64 => from_int64(basic_info, *scale, *precision),
122 PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
123 PhysicalType::FLOAT => Ok(DataType::Float32),
124 PhysicalType::DOUBLE => Ok(DataType::Float64),
125 PhysicalType::BYTE_ARRAY => from_byte_array(basic_info, *precision, *scale),
126 PhysicalType::FIXED_LEN_BYTE_ARRAY => {
127 from_fixed_len_byte_array(basic_info, *scale, *precision, *type_length)
128 }
129 },
130 Type::GroupType { .. } => unreachable!(),
131 }
132}
133
134fn decimal_type(scale: i32, precision: i32) -> Result<DataType> {
135 if precision <= DECIMAL128_MAX_PRECISION as i32 {
136 decimal_128_type(scale, precision)
137 } else {
138 decimal_256_type(scale, precision)
139 }
140}
141
142fn decimal_128_type(scale: i32, precision: i32) -> Result<DataType> {
143 let scale = scale
144 .try_into()
145 .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
146
147 let precision = precision
148 .try_into()
149 .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
150
151 Ok(DataType::Decimal128(precision, scale))
152}
153
154fn decimal_256_type(scale: i32, precision: i32) -> Result<DataType> {
155 let scale = scale
156 .try_into()
157 .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
158
159 let precision = precision
160 .try_into()
161 .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
162
163 Ok(DataType::Decimal256(precision, scale))
164}
165
166fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
167 match (info.logical_type_ref(), info.converted_type()) {
168 (None, ConvertedType::NONE) => Ok(DataType::Int32),
169 (
170 Some(
171 ref t @ LogicalType::Integer {
172 bit_width,
173 is_signed,
174 },
175 ),
176 _,
177 ) => match (bit_width, is_signed) {
178 (8, true) => Ok(DataType::Int8),
179 (16, true) => Ok(DataType::Int16),
180 (32, true) => Ok(DataType::Int32),
181 (8, false) => Ok(DataType::UInt8),
182 (16, false) => Ok(DataType::UInt16),
183 (32, false) => Ok(DataType::UInt32),
184 _ => Err(arrow_err!("Cannot create INT32 physical type from {:?}", t)),
185 },
186 (Some(LogicalType::Decimal { scale, precision }), _) => {
187 decimal_128_type(*scale, *precision)
188 }
189 (Some(LogicalType::Date), _) => Ok(DataType::Date32),
190 (Some(LogicalType::Time { unit, .. }), _) => match unit {
191 ParquetTimeUnit::MILLIS => Ok(DataType::Time32(TimeUnit::Millisecond)),
192 _ => Err(arrow_err!(
193 "Cannot create INT32 physical type from {:?}",
194 unit
195 )),
196 },
197 (Some(LogicalType::Unknown), _) => Ok(DataType::Null),
199 (None, ConvertedType::UINT_8) => Ok(DataType::UInt8),
200 (None, ConvertedType::UINT_16) => Ok(DataType::UInt16),
201 (None, ConvertedType::UINT_32) => Ok(DataType::UInt32),
202 (None, ConvertedType::INT_8) => Ok(DataType::Int8),
203 (None, ConvertedType::INT_16) => Ok(DataType::Int16),
204 (None, ConvertedType::INT_32) => Ok(DataType::Int32),
205 (None, ConvertedType::DATE) => Ok(DataType::Date32),
206 (None, ConvertedType::TIME_MILLIS) => Ok(DataType::Time32(TimeUnit::Millisecond)),
207 (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
208 (logical, converted) => Err(arrow_err!(
209 "Unable to convert parquet INT32 logical type {:?} or converted type {}",
210 logical,
211 converted
212 )),
213 }
214}
215
216fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
217 match (info.logical_type_ref(), info.converted_type()) {
218 (None, ConvertedType::NONE) => Ok(DataType::Int64),
219 (
220 Some(LogicalType::Integer {
221 bit_width: 64,
222 is_signed,
223 }),
224 _,
225 ) => match is_signed {
226 true => Ok(DataType::Int64),
227 false => Ok(DataType::UInt64),
228 },
229 (Some(LogicalType::Time { unit, .. }), _) => match unit {
230 ParquetTimeUnit::MILLIS => {
231 Err(arrow_err!("Cannot create INT64 from MILLIS time unit",))
232 }
233 ParquetTimeUnit::MICROS => Ok(DataType::Time64(TimeUnit::Microsecond)),
234 ParquetTimeUnit::NANOS => Ok(DataType::Time64(TimeUnit::Nanosecond)),
235 },
236 (
237 Some(LogicalType::Timestamp {
238 is_adjusted_to_u_t_c,
239 unit,
240 }),
241 _,
242 ) => Ok(DataType::Timestamp(
243 match unit {
244 ParquetTimeUnit::MILLIS => TimeUnit::Millisecond,
245 ParquetTimeUnit::MICROS => TimeUnit::Microsecond,
246 ParquetTimeUnit::NANOS => TimeUnit::Nanosecond,
247 },
248 if *is_adjusted_to_u_t_c {
249 Some("UTC".into())
250 } else {
251 None
252 },
253 )),
254 (None, ConvertedType::INT_64) => Ok(DataType::Int64),
255 (None, ConvertedType::UINT_64) => Ok(DataType::UInt64),
256 (None, ConvertedType::TIME_MICROS) => Ok(DataType::Time64(TimeUnit::Microsecond)),
257 (None, ConvertedType::TIMESTAMP_MILLIS) => Ok(DataType::Timestamp(
258 TimeUnit::Millisecond,
259 Some("UTC".into()),
260 )),
261 (None, ConvertedType::TIMESTAMP_MICROS) => Ok(DataType::Timestamp(
262 TimeUnit::Microsecond,
263 Some("UTC".into()),
264 )),
265 (Some(LogicalType::Decimal { scale, precision }), _) => {
266 decimal_128_type(*scale, *precision)
267 }
268 (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
269 (logical, converted) => Err(arrow_err!(
270 "Unable to convert parquet INT64 logical type {:?} or converted type {}",
271 logical,
272 converted
273 )),
274 }
275}
276
277fn from_byte_array(info: &BasicTypeInfo, precision: i32, scale: i32) -> Result<DataType> {
278 match (info.logical_type_ref(), info.converted_type()) {
279 (Some(LogicalType::String), _) => Ok(DataType::Utf8),
280 (Some(LogicalType::Json), _) => Ok(DataType::Utf8),
281 (Some(LogicalType::Bson), _) => Ok(DataType::Binary),
282 (Some(LogicalType::Enum), _) => Ok(DataType::Binary),
283 (Some(LogicalType::Geometry { .. }), _) => Ok(DataType::Binary),
284 (Some(LogicalType::Geography { .. }), _) => Ok(DataType::Binary),
285 (Some(LogicalType::_Unknown { .. }), _) => Ok(DataType::Binary),
286 (None, ConvertedType::NONE) => Ok(DataType::Binary),
287 (None, ConvertedType::JSON) => Ok(DataType::Utf8),
288 (None, ConvertedType::BSON) => Ok(DataType::Binary),
289 (None, ConvertedType::ENUM) => Ok(DataType::Binary),
290 (None, ConvertedType::UTF8) => Ok(DataType::Utf8),
291 (
292 Some(LogicalType::Decimal {
293 scale: s,
294 precision: p,
295 }),
296 _,
297 ) => decimal_type(*s, *p),
298 (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
299 (logical, converted) => Err(arrow_err!(
300 "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}",
301 logical,
302 converted
303 )),
304 }
305}
306
307fn from_fixed_len_byte_array(
308 info: &BasicTypeInfo,
309 scale: i32,
310 precision: i32,
311 type_length: i32,
312) -> Result<DataType> {
313 match (info.logical_type_ref(), info.converted_type()) {
315 (Some(LogicalType::Decimal { scale, precision }), _) => {
316 if type_length <= 16 {
317 decimal_128_type(*scale, *precision)
318 } else {
319 decimal_256_type(*scale, *precision)
320 }
321 }
322 (None, ConvertedType::DECIMAL) => {
323 if type_length <= 16 {
324 decimal_128_type(scale, precision)
325 } else {
326 decimal_256_type(scale, precision)
327 }
328 }
329 (None, ConvertedType::INTERVAL) => {
330 Ok(DataType::Interval(IntervalUnit::DayTime))
334 }
335 (Some(LogicalType::Float16), _) => {
336 if type_length == 2 {
337 Ok(DataType::Float16)
338 } else {
339 Err(ParquetError::General(
340 "FLOAT16 logical type must be Fixed Length Byte Array with length 2"
341 .to_string(),
342 ))
343 }
344 }
345 _ => Ok(DataType::FixedSizeBinary(type_length)),
346 }
347}