1use crate::basic::{ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType};
19use crate::errors::{ParquetError, Result};
20use crate::schema::types::{BasicTypeInfo, Type};
21use arrow_schema::{DataType, IntervalUnit, TimeUnit, DECIMAL128_MAX_PRECISION};
22
23pub fn convert_primitive(
28 parquet_type: &Type,
29 arrow_type_hint: Option<DataType>,
30) -> Result<DataType> {
31 let physical_type = from_parquet(parquet_type)?;
32 Ok(match arrow_type_hint {
33 Some(hint) => apply_hint(physical_type, hint),
34 None => physical_type,
35 })
36}
37
38fn apply_hint(parquet: DataType, hint: DataType) -> DataType {
41 match (&parquet, &hint) {
42 (DataType::Int32 | DataType::Int64, DataType::Timestamp(_, _)) => hint,
44 (DataType::Int32, DataType::Time32(_)) => hint,
45 (DataType::Int64, DataType::Time64(_)) => hint,
46 (DataType::Int64, DataType::Duration(_)) => hint,
47
48 (DataType::Int64, DataType::Date64) => hint,
50
51 (DataType::Date32, DataType::Date64) => hint,
53
54 (DataType::Timestamp(p, _), DataType::Timestamp(h, Some(_))) if p == h => hint,
56
57 (
60 DataType::Timestamp(TimeUnit::Nanosecond, None),
61 DataType::Timestamp(TimeUnit::Second, _),
62 ) => hint,
63 (
64 DataType::Timestamp(TimeUnit::Nanosecond, None),
65 DataType::Timestamp(TimeUnit::Millisecond, _),
66 ) => hint,
67 (
68 DataType::Timestamp(TimeUnit::Nanosecond, None),
69 DataType::Timestamp(TimeUnit::Microsecond, _),
70 ) => hint,
71
72 (DataType::Utf8, DataType::LargeUtf8) => hint,
74 (DataType::Binary, DataType::LargeBinary) => hint,
75
76 (DataType::Binary, DataType::Utf8) => hint,
78 (DataType::Binary, DataType::LargeUtf8) => hint,
79 (DataType::Binary, DataType::Utf8View) => hint,
80
81 (DataType::Utf8, DataType::Utf8View) => hint,
83 (DataType::Binary, DataType::BinaryView) => hint,
84
85 (DataType::Interval(_), DataType::Interval(_)) => hint,
87
88 (DataType::Decimal128(_, _), DataType::Decimal256(_, _)) => hint,
90
91 (_, DataType::Dictionary(_, value)) => {
93 let hinted = apply_hint(parquet, value.as_ref().clone());
95
96 match &hinted == value.as_ref() {
99 true => hint,
100 false => hinted,
101 }
102 }
103 _ => parquet,
104 }
105}
106
107fn from_parquet(parquet_type: &Type) -> Result<DataType> {
108 match parquet_type {
109 Type::PrimitiveType {
110 physical_type,
111 basic_info,
112 type_length,
113 scale,
114 precision,
115 ..
116 } => match physical_type {
117 PhysicalType::BOOLEAN => Ok(DataType::Boolean),
118 PhysicalType::INT32 => from_int32(basic_info, *scale, *precision),
119 PhysicalType::INT64 => from_int64(basic_info, *scale, *precision),
120 PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
121 PhysicalType::FLOAT => Ok(DataType::Float32),
122 PhysicalType::DOUBLE => Ok(DataType::Float64),
123 PhysicalType::BYTE_ARRAY => from_byte_array(basic_info, *precision, *scale),
124 PhysicalType::FIXED_LEN_BYTE_ARRAY => {
125 from_fixed_len_byte_array(basic_info, *scale, *precision, *type_length)
126 }
127 },
128 Type::GroupType { .. } => unreachable!(),
129 }
130}
131
132fn decimal_type(scale: i32, precision: i32) -> Result<DataType> {
133 if precision <= DECIMAL128_MAX_PRECISION as i32 {
134 decimal_128_type(scale, precision)
135 } else {
136 decimal_256_type(scale, precision)
137 }
138}
139
140fn decimal_128_type(scale: i32, precision: i32) -> Result<DataType> {
141 let scale = scale
142 .try_into()
143 .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
144
145 let precision = precision
146 .try_into()
147 .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
148
149 Ok(DataType::Decimal128(precision, scale))
150}
151
152fn decimal_256_type(scale: i32, precision: i32) -> Result<DataType> {
153 let scale = scale
154 .try_into()
155 .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
156
157 let precision = precision
158 .try_into()
159 .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
160
161 Ok(DataType::Decimal256(precision, scale))
162}
163
164fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
165 match (info.logical_type(), info.converted_type()) {
166 (None, ConvertedType::NONE) => Ok(DataType::Int32),
167 (
168 Some(
169 ref t @ LogicalType::Integer {
170 bit_width,
171 is_signed,
172 },
173 ),
174 _,
175 ) => match (bit_width, is_signed) {
176 (8, true) => Ok(DataType::Int8),
177 (16, true) => Ok(DataType::Int16),
178 (32, true) => Ok(DataType::Int32),
179 (8, false) => Ok(DataType::UInt8),
180 (16, false) => Ok(DataType::UInt16),
181 (32, false) => Ok(DataType::UInt32),
182 _ => Err(arrow_err!("Cannot create INT32 physical type from {:?}", t)),
183 },
184 (Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision),
185 (Some(LogicalType::Date), _) => Ok(DataType::Date32),
186 (Some(LogicalType::Time { unit, .. }), _) => match unit {
187 ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)),
188 _ => Err(arrow_err!(
189 "Cannot create INT32 physical type from {:?}",
190 unit
191 )),
192 },
193 (Some(LogicalType::Unknown), _) => Ok(DataType::Null),
195 (None, ConvertedType::UINT_8) => Ok(DataType::UInt8),
196 (None, ConvertedType::UINT_16) => Ok(DataType::UInt16),
197 (None, ConvertedType::UINT_32) => Ok(DataType::UInt32),
198 (None, ConvertedType::INT_8) => Ok(DataType::Int8),
199 (None, ConvertedType::INT_16) => Ok(DataType::Int16),
200 (None, ConvertedType::INT_32) => Ok(DataType::Int32),
201 (None, ConvertedType::DATE) => Ok(DataType::Date32),
202 (None, ConvertedType::TIME_MILLIS) => Ok(DataType::Time32(TimeUnit::Millisecond)),
203 (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
204 (logical, converted) => Err(arrow_err!(
205 "Unable to convert parquet INT32 logical type {:?} or converted type {}",
206 logical,
207 converted
208 )),
209 }
210}
211
212fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
213 match (info.logical_type(), info.converted_type()) {
214 (None, ConvertedType::NONE) => Ok(DataType::Int64),
215 (
216 Some(LogicalType::Integer {
217 bit_width: 64,
218 is_signed,
219 }),
220 _,
221 ) => match is_signed {
222 true => Ok(DataType::Int64),
223 false => Ok(DataType::UInt64),
224 },
225 (Some(LogicalType::Time { unit, .. }), _) => match unit {
226 ParquetTimeUnit::MILLIS(_) => {
227 Err(arrow_err!("Cannot create INT64 from MILLIS time unit",))
228 }
229 ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)),
230 ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)),
231 },
232 (
233 Some(LogicalType::Timestamp {
234 is_adjusted_to_u_t_c,
235 unit,
236 }),
237 _,
238 ) => Ok(DataType::Timestamp(
239 match unit {
240 ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond,
241 ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond,
242 ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond,
243 },
244 if is_adjusted_to_u_t_c {
245 Some("UTC".into())
246 } else {
247 None
248 },
249 )),
250 (None, ConvertedType::INT_64) => Ok(DataType::Int64),
251 (None, ConvertedType::UINT_64) => Ok(DataType::UInt64),
252 (None, ConvertedType::TIME_MICROS) => Ok(DataType::Time64(TimeUnit::Microsecond)),
253 (None, ConvertedType::TIMESTAMP_MILLIS) => Ok(DataType::Timestamp(
254 TimeUnit::Millisecond,
255 Some("UTC".into()),
256 )),
257 (None, ConvertedType::TIMESTAMP_MICROS) => Ok(DataType::Timestamp(
258 TimeUnit::Microsecond,
259 Some("UTC".into()),
260 )),
261 (Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision),
262 (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
263 (logical, converted) => Err(arrow_err!(
264 "Unable to convert parquet INT64 logical type {:?} or converted type {}",
265 logical,
266 converted
267 )),
268 }
269}
270
271fn from_byte_array(info: &BasicTypeInfo, precision: i32, scale: i32) -> Result<DataType> {
272 match (info.logical_type(), info.converted_type()) {
273 (Some(LogicalType::String), _) => Ok(DataType::Utf8),
274 (Some(LogicalType::Json), _) => Ok(DataType::Utf8),
275 (Some(LogicalType::Bson), _) => Ok(DataType::Binary),
276 (Some(LogicalType::Enum), _) => Ok(DataType::Binary),
277 (None, ConvertedType::NONE) => Ok(DataType::Binary),
278 (None, ConvertedType::JSON) => Ok(DataType::Utf8),
279 (None, ConvertedType::BSON) => Ok(DataType::Binary),
280 (None, ConvertedType::ENUM) => Ok(DataType::Binary),
281 (None, ConvertedType::UTF8) => Ok(DataType::Utf8),
282 (
283 Some(LogicalType::Decimal {
284 scale: s,
285 precision: p,
286 }),
287 _,
288 ) => decimal_type(s, p),
289 (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
290 (logical, converted) => Err(arrow_err!(
291 "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}",
292 logical,
293 converted
294 )),
295 }
296}
297
298fn from_fixed_len_byte_array(
299 info: &BasicTypeInfo,
300 scale: i32,
301 precision: i32,
302 type_length: i32,
303) -> Result<DataType> {
304 match (info.logical_type(), info.converted_type()) {
306 (Some(LogicalType::Decimal { scale, precision }), _) => {
307 if type_length <= 16 {
308 decimal_128_type(scale, precision)
309 } else {
310 decimal_256_type(scale, precision)
311 }
312 }
313 (None, ConvertedType::DECIMAL) => {
314 if type_length <= 16 {
315 decimal_128_type(scale, precision)
316 } else {
317 decimal_256_type(scale, precision)
318 }
319 }
320 (None, ConvertedType::INTERVAL) => {
321 Ok(DataType::Interval(IntervalUnit::DayTime))
325 }
326 (Some(LogicalType::Float16), _) => {
327 if type_length == 2 {
328 Ok(DataType::Float16)
329 } else {
330 Err(ParquetError::General(
331 "FLOAT16 logical type must be Fixed Length Byte Array with length 2"
332 .to_string(),
333 ))
334 }
335 }
336 _ => Ok(DataType::FixedSizeBinary(type_length)),
337 }
338}