1use crate::basic::{ConvertedType, LogicalType, TimeUnit as ParquetTimeUnit, Type as PhysicalType};
19use crate::errors::{ParquetError, Result};
20use crate::schema::types::{BasicTypeInfo, Type};
21use arrow_schema::{DataType, IntervalUnit, TimeUnit, DECIMAL128_MAX_PRECISION};
22
23pub fn convert_primitive(
28 parquet_type: &Type,
29 arrow_type_hint: Option<DataType>,
30) -> Result<DataType> {
31 let physical_type = from_parquet(parquet_type)?;
32 Ok(match arrow_type_hint {
33 Some(hint) => apply_hint(physical_type, hint),
34 None => physical_type,
35 })
36}
37
38fn apply_hint(parquet: DataType, hint: DataType) -> DataType {
41 match (&parquet, &hint) {
42 (DataType::Int32 | DataType::Int64, DataType::Timestamp(_, _)) => hint,
44 (DataType::Int32, DataType::Time32(_)) => hint,
45 (DataType::Int64, DataType::Time64(_)) => hint,
46
47 (DataType::Int64, DataType::Date64) => hint,
49
50 (DataType::Date32, DataType::Date64) => hint,
52
53 (DataType::Timestamp(p, _), DataType::Timestamp(h, Some(_))) if p == h => hint,
55
56 (
59 DataType::Timestamp(TimeUnit::Nanosecond, None),
60 DataType::Timestamp(TimeUnit::Second, _),
61 ) => hint,
62 (
63 DataType::Timestamp(TimeUnit::Nanosecond, None),
64 DataType::Timestamp(TimeUnit::Millisecond, _),
65 ) => hint,
66 (
67 DataType::Timestamp(TimeUnit::Nanosecond, None),
68 DataType::Timestamp(TimeUnit::Microsecond, _),
69 ) => hint,
70
71 (DataType::Utf8, DataType::LargeUtf8) => hint,
73 (DataType::Binary, DataType::LargeBinary) => hint,
74
75 (DataType::Binary, DataType::Utf8) => hint,
77 (DataType::Binary, DataType::LargeUtf8) => hint,
78 (DataType::Binary, DataType::Utf8View) => hint,
79
80 (DataType::Utf8, DataType::Utf8View) => hint,
82 (DataType::Binary, DataType::BinaryView) => hint,
83
84 (DataType::Interval(_), DataType::Interval(_)) => hint,
86
87 (DataType::Decimal128(_, _), DataType::Decimal256(_, _)) => hint,
89
90 (_, DataType::Dictionary(_, value)) => {
92 let hinted = apply_hint(parquet, value.as_ref().clone());
94
95 match &hinted == value.as_ref() {
98 true => hint,
99 false => hinted,
100 }
101 }
102 _ => parquet,
103 }
104}
105
106fn from_parquet(parquet_type: &Type) -> Result<DataType> {
107 match parquet_type {
108 Type::PrimitiveType {
109 physical_type,
110 basic_info,
111 type_length,
112 scale,
113 precision,
114 ..
115 } => match physical_type {
116 PhysicalType::BOOLEAN => Ok(DataType::Boolean),
117 PhysicalType::INT32 => from_int32(basic_info, *scale, *precision),
118 PhysicalType::INT64 => from_int64(basic_info, *scale, *precision),
119 PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
120 PhysicalType::FLOAT => Ok(DataType::Float32),
121 PhysicalType::DOUBLE => Ok(DataType::Float64),
122 PhysicalType::BYTE_ARRAY => from_byte_array(basic_info, *precision, *scale),
123 PhysicalType::FIXED_LEN_BYTE_ARRAY => {
124 from_fixed_len_byte_array(basic_info, *scale, *precision, *type_length)
125 }
126 },
127 Type::GroupType { .. } => unreachable!(),
128 }
129}
130
131fn decimal_type(scale: i32, precision: i32) -> Result<DataType> {
132 if precision <= DECIMAL128_MAX_PRECISION as i32 {
133 decimal_128_type(scale, precision)
134 } else {
135 decimal_256_type(scale, precision)
136 }
137}
138
139fn decimal_128_type(scale: i32, precision: i32) -> Result<DataType> {
140 let scale = scale
141 .try_into()
142 .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
143
144 let precision = precision
145 .try_into()
146 .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
147
148 Ok(DataType::Decimal128(precision, scale))
149}
150
151fn decimal_256_type(scale: i32, precision: i32) -> Result<DataType> {
152 let scale = scale
153 .try_into()
154 .map_err(|_| arrow_err!("scale cannot be negative: {}", scale))?;
155
156 let precision = precision
157 .try_into()
158 .map_err(|_| arrow_err!("precision cannot be negative: {}", precision))?;
159
160 Ok(DataType::Decimal256(precision, scale))
161}
162
163fn from_int32(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
164 match (info.logical_type(), info.converted_type()) {
165 (None, ConvertedType::NONE) => Ok(DataType::Int32),
166 (
167 Some(
168 ref t @ LogicalType::Integer {
169 bit_width,
170 is_signed,
171 },
172 ),
173 _,
174 ) => match (bit_width, is_signed) {
175 (8, true) => Ok(DataType::Int8),
176 (16, true) => Ok(DataType::Int16),
177 (32, true) => Ok(DataType::Int32),
178 (8, false) => Ok(DataType::UInt8),
179 (16, false) => Ok(DataType::UInt16),
180 (32, false) => Ok(DataType::UInt32),
181 _ => Err(arrow_err!("Cannot create INT32 physical type from {:?}", t)),
182 },
183 (Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision),
184 (Some(LogicalType::Date), _) => Ok(DataType::Date32),
185 (Some(LogicalType::Time { unit, .. }), _) => match unit {
186 ParquetTimeUnit::MILLIS(_) => Ok(DataType::Time32(TimeUnit::Millisecond)),
187 _ => Err(arrow_err!(
188 "Cannot create INT32 physical type from {:?}",
189 unit
190 )),
191 },
192 (Some(LogicalType::Unknown), _) => Ok(DataType::Null),
194 (None, ConvertedType::UINT_8) => Ok(DataType::UInt8),
195 (None, ConvertedType::UINT_16) => Ok(DataType::UInt16),
196 (None, ConvertedType::UINT_32) => Ok(DataType::UInt32),
197 (None, ConvertedType::INT_8) => Ok(DataType::Int8),
198 (None, ConvertedType::INT_16) => Ok(DataType::Int16),
199 (None, ConvertedType::INT_32) => Ok(DataType::Int32),
200 (None, ConvertedType::DATE) => Ok(DataType::Date32),
201 (None, ConvertedType::TIME_MILLIS) => Ok(DataType::Time32(TimeUnit::Millisecond)),
202 (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
203 (logical, converted) => Err(arrow_err!(
204 "Unable to convert parquet INT32 logical type {:?} or converted type {}",
205 logical,
206 converted
207 )),
208 }
209}
210
211fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result<DataType> {
212 match (info.logical_type(), info.converted_type()) {
213 (None, ConvertedType::NONE) => Ok(DataType::Int64),
214 (
215 Some(LogicalType::Integer {
216 bit_width: 64,
217 is_signed,
218 }),
219 _,
220 ) => match is_signed {
221 true => Ok(DataType::Int64),
222 false => Ok(DataType::UInt64),
223 },
224 (Some(LogicalType::Time { unit, .. }), _) => match unit {
225 ParquetTimeUnit::MILLIS(_) => {
226 Err(arrow_err!("Cannot create INT64 from MILLIS time unit",))
227 }
228 ParquetTimeUnit::MICROS(_) => Ok(DataType::Time64(TimeUnit::Microsecond)),
229 ParquetTimeUnit::NANOS(_) => Ok(DataType::Time64(TimeUnit::Nanosecond)),
230 },
231 (
232 Some(LogicalType::Timestamp {
233 is_adjusted_to_u_t_c,
234 unit,
235 }),
236 _,
237 ) => Ok(DataType::Timestamp(
238 match unit {
239 ParquetTimeUnit::MILLIS(_) => TimeUnit::Millisecond,
240 ParquetTimeUnit::MICROS(_) => TimeUnit::Microsecond,
241 ParquetTimeUnit::NANOS(_) => TimeUnit::Nanosecond,
242 },
243 if is_adjusted_to_u_t_c {
244 Some("UTC".into())
245 } else {
246 None
247 },
248 )),
249 (None, ConvertedType::INT_64) => Ok(DataType::Int64),
250 (None, ConvertedType::UINT_64) => Ok(DataType::UInt64),
251 (None, ConvertedType::TIME_MICROS) => Ok(DataType::Time64(TimeUnit::Microsecond)),
252 (None, ConvertedType::TIMESTAMP_MILLIS) => Ok(DataType::Timestamp(
253 TimeUnit::Millisecond,
254 Some("UTC".into()),
255 )),
256 (None, ConvertedType::TIMESTAMP_MICROS) => Ok(DataType::Timestamp(
257 TimeUnit::Microsecond,
258 Some("UTC".into()),
259 )),
260 (Some(LogicalType::Decimal { scale, precision }), _) => decimal_128_type(scale, precision),
261 (None, ConvertedType::DECIMAL) => decimal_128_type(scale, precision),
262 (logical, converted) => Err(arrow_err!(
263 "Unable to convert parquet INT64 logical type {:?} or converted type {}",
264 logical,
265 converted
266 )),
267 }
268}
269
270fn from_byte_array(info: &BasicTypeInfo, precision: i32, scale: i32) -> Result<DataType> {
271 match (info.logical_type(), info.converted_type()) {
272 (Some(LogicalType::String), _) => Ok(DataType::Utf8),
273 (Some(LogicalType::Json), _) => Ok(DataType::Utf8),
274 (Some(LogicalType::Bson), _) => Ok(DataType::Binary),
275 (Some(LogicalType::Enum), _) => Ok(DataType::Binary),
276 (None, ConvertedType::NONE) => Ok(DataType::Binary),
277 (None, ConvertedType::JSON) => Ok(DataType::Utf8),
278 (None, ConvertedType::BSON) => Ok(DataType::Binary),
279 (None, ConvertedType::ENUM) => Ok(DataType::Binary),
280 (None, ConvertedType::UTF8) => Ok(DataType::Utf8),
281 (
282 Some(LogicalType::Decimal {
283 scale: s,
284 precision: p,
285 }),
286 _,
287 ) => decimal_type(s, p),
288 (None, ConvertedType::DECIMAL) => decimal_type(scale, precision),
289 (logical, converted) => Err(arrow_err!(
290 "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}",
291 logical,
292 converted
293 )),
294 }
295}
296
297fn from_fixed_len_byte_array(
298 info: &BasicTypeInfo,
299 scale: i32,
300 precision: i32,
301 type_length: i32,
302) -> Result<DataType> {
303 match (info.logical_type(), info.converted_type()) {
305 (Some(LogicalType::Decimal { scale, precision }), _) => {
306 if type_length <= 16 {
307 decimal_128_type(scale, precision)
308 } else {
309 decimal_256_type(scale, precision)
310 }
311 }
312 (None, ConvertedType::DECIMAL) => {
313 if type_length <= 16 {
314 decimal_128_type(scale, precision)
315 } else {
316 decimal_256_type(scale, precision)
317 }
318 }
319 (None, ConvertedType::INTERVAL) => {
320 Ok(DataType::Interval(IntervalUnit::DayTime))
324 }
325 (Some(LogicalType::Float16), _) => {
326 if type_length == 2 {
327 Ok(DataType::Float16)
328 } else {
329 Err(ParquetError::General(
330 "FLOAT16 logical type must be Fixed Length Byte Array with length 2"
331 .to_string(),
332 ))
333 }
334 }
335 _ => Ok(DataType::FixedSizeBinary(type_length)),
336 }
337}