parquet/arrow/array_reader/
primitive_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
19use crate::arrow::record_reader::RecordReader;
20use crate::arrow::schema::parquet_to_arrow_field;
21use crate::basic::Type as PhysicalType;
22use crate::column::page::PageIterator;
23use crate::data_type::{DataType, Int96};
24use crate::errors::{ParquetError, Result};
25use crate::schema::types::ColumnDescPtr;
26use arrow_array::Decimal256Array;
27use arrow_array::{
28    builder::TimestampNanosecondBufferBuilder, ArrayRef, BooleanArray, Decimal128Array,
29    Float32Array, Float64Array, Int32Array, Int64Array, TimestampNanosecondArray, UInt32Array,
30    UInt64Array,
31};
32use arrow_buffer::{i256, BooleanBuffer, Buffer};
33use arrow_data::ArrayDataBuilder;
34use arrow_schema::{DataType as ArrowType, TimeUnit};
35use std::any::Any;
36use std::sync::Arc;
37
38/// Provides conversion from `Vec<T>` to `Buffer`
39pub trait IntoBuffer {
40    fn into_buffer(self) -> Buffer;
41}
42
43macro_rules! native_buffer {
44    ($($t:ty),*) => {
45        $(impl IntoBuffer for Vec<$t> {
46            fn into_buffer(self) -> Buffer {
47                Buffer::from_vec(self)
48            }
49        })*
50    };
51}
52native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64);
53
54impl IntoBuffer for Vec<bool> {
55    fn into_buffer(self) -> Buffer {
56        BooleanBuffer::from_iter(self).into_inner()
57    }
58}
59
60impl IntoBuffer for Vec<Int96> {
61    fn into_buffer(self) -> Buffer {
62        let mut builder = TimestampNanosecondBufferBuilder::new(self.len());
63        for v in self {
64            builder.append(v.to_nanos())
65        }
66        builder.finish()
67    }
68}
69
70/// Primitive array readers are leaves of array reader tree. They accept page iterator
71/// and read them into primitive arrays.
72pub struct PrimitiveArrayReader<T>
73where
74    T: DataType,
75    T::T: Copy + Default,
76    Vec<T::T>: IntoBuffer,
77{
78    data_type: ArrowType,
79    pages: Box<dyn PageIterator>,
80    def_levels_buffer: Option<Vec<i16>>,
81    rep_levels_buffer: Option<Vec<i16>>,
82    record_reader: RecordReader<T>,
83}
84
85impl<T> PrimitiveArrayReader<T>
86where
87    T: DataType,
88    T::T: Copy + Default,
89    Vec<T::T>: IntoBuffer,
90{
91    /// Construct primitive array reader.
92    pub fn new(
93        pages: Box<dyn PageIterator>,
94        column_desc: ColumnDescPtr,
95        arrow_type: Option<ArrowType>,
96    ) -> Result<Self> {
97        // Check if Arrow type is specified, else create it from Parquet type
98        let data_type = match arrow_type {
99            Some(t) => t,
100            None => parquet_to_arrow_field(column_desc.as_ref())?
101                .data_type()
102                .clone(),
103        };
104
105        let record_reader = RecordReader::<T>::new(column_desc);
106
107        Ok(Self {
108            data_type,
109            pages,
110            def_levels_buffer: None,
111            rep_levels_buffer: None,
112            record_reader,
113        })
114    }
115}
116
117/// Implementation of primitive array reader.
118impl<T> ArrayReader for PrimitiveArrayReader<T>
119where
120    T: DataType,
121    T::T: Copy + Default,
122    Vec<T::T>: IntoBuffer,
123{
124    fn as_any(&self) -> &dyn Any {
125        self
126    }
127
128    /// Returns data type of primitive array.
129    fn get_data_type(&self) -> &ArrowType {
130        &self.data_type
131    }
132
133    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
134        read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
135    }
136
137    fn consume_batch(&mut self) -> Result<ArrayRef> {
138        let target_type = &self.data_type;
139        let arrow_data_type = match T::get_physical_type() {
140            PhysicalType::BOOLEAN => ArrowType::Boolean,
141            PhysicalType::INT32 => {
142                match target_type {
143                    ArrowType::UInt32 => {
144                        // follow C++ implementation and use overflow/reinterpret cast from  i32 to u32 which will map
145                        // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
146                        ArrowType::UInt32
147                    }
148                    _ => ArrowType::Int32,
149                }
150            }
151            PhysicalType::INT64 => {
152                match target_type {
153                    ArrowType::UInt64 => {
154                        // follow C++ implementation and use overflow/reinterpret cast from  i64 to u64 which will map
155                        // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
156                        ArrowType::UInt64
157                    }
158                    _ => ArrowType::Int64,
159                }
160            }
161            PhysicalType::FLOAT => ArrowType::Float32,
162            PhysicalType::DOUBLE => ArrowType::Float64,
163            PhysicalType::INT96 => match target_type {
164                ArrowType::Timestamp(TimeUnit::Nanosecond, _) => target_type.clone(),
165                _ => unreachable!("INT96 must be timestamp nanosecond"),
166            },
167            PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
168                unreachable!("PrimitiveArrayReaders don't support complex physical types");
169            }
170        };
171
172        // Convert to arrays by using the Parquet physical type.
173        // The physical types are then cast to Arrow types if necessary
174
175        let record_data = self.record_reader.consume_record_data().into_buffer();
176
177        let array_data = ArrayDataBuilder::new(arrow_data_type)
178            .len(self.record_reader.num_values())
179            .add_buffer(record_data)
180            .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
181
182        let array_data = unsafe { array_data.build_unchecked() };
183        let array: ArrayRef = match T::get_physical_type() {
184            PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)),
185            PhysicalType::INT32 => match array_data.data_type() {
186                ArrowType::UInt32 => Arc::new(UInt32Array::from(array_data)),
187                ArrowType::Int32 => Arc::new(Int32Array::from(array_data)),
188                _ => unreachable!(),
189            },
190            PhysicalType::INT64 => match array_data.data_type() {
191                ArrowType::UInt64 => Arc::new(UInt64Array::from(array_data)),
192                ArrowType::Int64 => Arc::new(Int64Array::from(array_data)),
193                _ => unreachable!(),
194            },
195            PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)),
196            PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)),
197            PhysicalType::INT96 => Arc::new(TimestampNanosecondArray::from(array_data)),
198            PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
199                unreachable!("PrimitiveArrayReaders don't support complex physical types");
200            }
201        };
202
203        // cast to Arrow type
204        // We make a strong assumption here that the casts should be infallible.
205        // If the cast fails because of incompatible datatypes, then there might
206        // be a bigger problem with how Arrow schemas are converted to Parquet.
207        //
208        // As there is not always a 1:1 mapping between Arrow and Parquet, there
209        // are datatypes which we must convert explicitly.
210        // These are:
211        // - date64: cast int32 to date32, then date32 to date64.
212        // - decimal: cast int32 to decimal, int64 to decimal
213        let array = match target_type {
214            ArrowType::Date64 if *(array.data_type()) == ArrowType::Int32 => {
215                // this is cheap as it internally reinterprets the data
216                let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
217                arrow_cast::cast(&a, target_type)?
218            }
219            ArrowType::Decimal128(p, s) => {
220                // Apply conversion to all elements regardless of null slots as the conversion
221                // to `i128` is infallible. This improves performance by avoiding a branch in
222                // the inner loop (see docs for `PrimitiveArray::unary`).
223                let array = match array.data_type() {
224                    ArrowType::Int32 => array
225                        .as_any()
226                        .downcast_ref::<Int32Array>()
227                        .unwrap()
228                        .unary(|i| i as i128)
229                        as Decimal128Array,
230                    ArrowType::Int64 => array
231                        .as_any()
232                        .downcast_ref::<Int64Array>()
233                        .unwrap()
234                        .unary(|i| i as i128)
235                        as Decimal128Array,
236                    _ => {
237                        return Err(arrow_err!(
238                            "Cannot convert {:?} to decimal",
239                            array.data_type()
240                        ));
241                    }
242                }
243                .with_precision_and_scale(*p, *s)?;
244
245                Arc::new(array) as ArrayRef
246            }
247            ArrowType::Decimal256(p, s) => {
248                // See above comment. Conversion to `i256` is likewise infallible.
249                let array = match array.data_type() {
250                    ArrowType::Int32 => array
251                        .as_any()
252                        .downcast_ref::<Int32Array>()
253                        .unwrap()
254                        .unary(|i| i256::from_i128(i as i128))
255                        as Decimal256Array,
256                    ArrowType::Int64 => array
257                        .as_any()
258                        .downcast_ref::<Int64Array>()
259                        .unwrap()
260                        .unary(|i| i256::from_i128(i as i128))
261                        as Decimal256Array,
262                    _ => {
263                        return Err(arrow_err!(
264                            "Cannot convert {:?} to decimal",
265                            array.data_type()
266                        ));
267                    }
268                }
269                .with_precision_and_scale(*p, *s)?;
270
271                Arc::new(array) as ArrayRef
272            }
273            ArrowType::Dictionary(_, value_type) => match value_type.as_ref() {
274                ArrowType::Decimal128(p, s) => {
275                    let array = match array.data_type() {
276                        ArrowType::Int32 => array
277                            .as_any()
278                            .downcast_ref::<Int32Array>()
279                            .unwrap()
280                            .unary(|i| i as i128)
281                            as Decimal128Array,
282                        ArrowType::Int64 => array
283                            .as_any()
284                            .downcast_ref::<Int64Array>()
285                            .unwrap()
286                            .unary(|i| i as i128)
287                            as Decimal128Array,
288                        _ => {
289                            return Err(arrow_err!(
290                                "Cannot convert {:?} to decimal dictionary",
291                                array.data_type()
292                            ));
293                        }
294                    }
295                    .with_precision_and_scale(*p, *s)?;
296
297                    arrow_cast::cast(&array, target_type)?
298                }
299                ArrowType::Decimal256(p, s) => {
300                    let array = match array.data_type() {
301                        ArrowType::Int32 => array
302                            .as_any()
303                            .downcast_ref::<Int32Array>()
304                            .unwrap()
305                            .unary(i256::from)
306                            as Decimal256Array,
307                        ArrowType::Int64 => array
308                            .as_any()
309                            .downcast_ref::<Int64Array>()
310                            .unwrap()
311                            .unary(i256::from)
312                            as Decimal256Array,
313                        _ => {
314                            return Err(arrow_err!(
315                                "Cannot convert {:?} to decimal dictionary",
316                                array.data_type()
317                            ));
318                        }
319                    }
320                    .with_precision_and_scale(*p, *s)?;
321
322                    arrow_cast::cast(&array, target_type)?
323                }
324                _ => arrow_cast::cast(&array, target_type)?,
325            },
326            _ => arrow_cast::cast(&array, target_type)?,
327        };
328
329        // save definition and repetition buffers
330        self.def_levels_buffer = self.record_reader.consume_def_levels();
331        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
332        self.record_reader.reset();
333        Ok(array)
334    }
335
336    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
337        skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
338    }
339
340    fn get_def_levels(&self) -> Option<&[i16]> {
341        self.def_levels_buffer.as_deref()
342    }
343
344    fn get_rep_levels(&self) -> Option<&[i16]> {
345        self.rep_levels_buffer.as_deref()
346    }
347}
348
349#[cfg(test)]
350mod tests {
351    use super::*;
352    use crate::arrow::array_reader::test_util::EmptyPageIterator;
353    use crate::basic::Encoding;
354    use crate::column::page::Page;
355    use crate::data_type::{Int32Type, Int64Type};
356    use crate::schema::parser::parse_message_type;
357    use crate::schema::types::SchemaDescriptor;
358    use crate::util::test_common::rand_gen::make_pages;
359    use crate::util::InMemoryPageIterator;
360    use arrow::datatypes::ArrowPrimitiveType;
361    use arrow_array::{Array, Date32Array, PrimitiveArray};
362
363    use arrow::datatypes::DataType::{Date32, Decimal128};
364    use rand::distributions::uniform::SampleUniform;
365    use std::collections::VecDeque;
366
367    #[allow(clippy::too_many_arguments)]
368    fn make_column_chunks<T: DataType>(
369        column_desc: ColumnDescPtr,
370        encoding: Encoding,
371        num_levels: usize,
372        min_value: T::T,
373        max_value: T::T,
374        def_levels: &mut Vec<i16>,
375        rep_levels: &mut Vec<i16>,
376        values: &mut Vec<T::T>,
377        page_lists: &mut Vec<Vec<Page>>,
378        use_v2: bool,
379        num_chunks: usize,
380    ) where
381        T::T: PartialOrd + SampleUniform + Copy,
382    {
383        for _i in 0..num_chunks {
384            let mut pages = VecDeque::new();
385            let mut data = Vec::new();
386            let mut page_def_levels = Vec::new();
387            let mut page_rep_levels = Vec::new();
388
389            make_pages::<T>(
390                column_desc.clone(),
391                encoding,
392                1,
393                num_levels,
394                min_value,
395                max_value,
396                &mut page_def_levels,
397                &mut page_rep_levels,
398                &mut data,
399                &mut pages,
400                use_v2,
401            );
402
403            def_levels.append(&mut page_def_levels);
404            rep_levels.append(&mut page_rep_levels);
405            values.append(&mut data);
406            page_lists.push(Vec::from(pages));
407        }
408    }
409
410    #[test]
411    fn test_primitive_array_reader_empty_pages() {
412        // Construct column schema
413        let message_type = "
414        message test_schema {
415          REQUIRED INT32 leaf;
416        }
417        ";
418
419        let schema = parse_message_type(message_type)
420            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
421            .unwrap();
422
423        let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
424            Box::<EmptyPageIterator>::default(),
425            schema.column(0),
426            None,
427        )
428        .unwrap();
429
430        // expect no values to be read
431        let array = array_reader.next_batch(50).unwrap();
432        assert!(array.is_empty());
433    }
434
435    #[test]
436    fn test_primitive_array_reader_data() {
437        // Construct column schema
438        let message_type = "
439        message test_schema {
440          REQUIRED INT32 leaf;
441        }
442        ";
443
444        let schema = parse_message_type(message_type)
445            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
446            .unwrap();
447
448        let column_desc = schema.column(0);
449
450        // Construct page iterator
451        {
452            let mut data = Vec::new();
453            let mut page_lists = Vec::new();
454            make_column_chunks::<Int32Type>(
455                column_desc.clone(),
456                Encoding::PLAIN,
457                100,
458                1,
459                200,
460                &mut Vec::new(),
461                &mut Vec::new(),
462                &mut data,
463                &mut page_lists,
464                true,
465                2,
466            );
467            let page_iterator = InMemoryPageIterator::new(page_lists);
468
469            let mut array_reader =
470                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
471                    .unwrap();
472
473            // Read first 50 values, which are all from the first column chunk
474            let array = array_reader.next_batch(50).unwrap();
475            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
476
477            assert_eq!(&Int32Array::from(data[0..50].to_vec()), array);
478
479            // Read next 100 values, the first 50 ones are from the first column chunk,
480            // and the last 50 ones are from the second column chunk
481            let array = array_reader.next_batch(100).unwrap();
482            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
483
484            assert_eq!(&Int32Array::from(data[50..150].to_vec()), array);
485
486            // Try to read 100 values, however there are only 50 values
487            let array = array_reader.next_batch(100).unwrap();
488            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
489
490            assert_eq!(&Int32Array::from(data[150..200].to_vec()), array);
491        }
492    }
493
494    macro_rules! test_primitive_array_reader_one_type {
495        (
496            $arrow_parquet_type:ty,
497            $physical_type:expr,
498            $converted_type_str:expr,
499            $result_arrow_type:ty,
500            $result_arrow_cast_type:ty,
501            $result_primitive_type:ty
502            $(, $timezone:expr)?
503        ) => {{
504            let message_type = format!(
505                "
506            message test_schema {{
507              REQUIRED {:?} leaf ({});
508          }}
509            ",
510                $physical_type, $converted_type_str
511            );
512            let schema = parse_message_type(&message_type)
513                .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
514                .unwrap();
515
516            let column_desc = schema.column(0);
517
518            // Construct page iterator
519            {
520                let mut data = Vec::new();
521                let mut page_lists = Vec::new();
522                make_column_chunks::<$arrow_parquet_type>(
523                    column_desc.clone(),
524                    Encoding::PLAIN,
525                    100,
526                    1,
527                    200,
528                    &mut Vec::new(),
529                    &mut Vec::new(),
530                    &mut data,
531                    &mut page_lists,
532                    true,
533                    2,
534                );
535                let page_iterator = InMemoryPageIterator::new(page_lists);
536                let mut array_reader = PrimitiveArrayReader::<$arrow_parquet_type>::new(
537                    Box::new(page_iterator),
538                    column_desc.clone(),
539                    None,
540                )
541                .expect("Unable to get array reader");
542
543                let array = array_reader
544                    .next_batch(50)
545                    .expect("Unable to get batch from reader");
546
547                let result_data_type = <$result_arrow_type>::DATA_TYPE;
548                let array = array
549                    .as_any()
550                    .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
551                    .expect(
552                        format!(
553                            "Unable to downcast {:?} to {:?}",
554                            array.data_type(),
555                            result_data_type
556                        )
557                        .as_str(),
558                    )
559                    $(.clone().with_timezone($timezone))?
560                    ;
561
562                // create expected array as primitive, and cast to result type
563                let expected = PrimitiveArray::<$result_arrow_cast_type>::from(
564                    data[0..50]
565                        .iter()
566                        .map(|x| *x as $result_primitive_type)
567                        .collect::<Vec<$result_primitive_type>>(),
568                );
569                let expected = Arc::new(expected) as ArrayRef;
570                let expected = arrow::compute::cast(&expected, &result_data_type)
571                    .expect("Unable to cast expected array");
572                assert_eq!(expected.data_type(), &result_data_type);
573                let expected = expected
574                    .as_any()
575                    .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
576                    .expect(
577                        format!(
578                            "Unable to downcast expected {:?} to {:?}",
579                            expected.data_type(),
580                            result_data_type
581                        )
582                        .as_str(),
583                    )
584                    $(.clone().with_timezone($timezone))?
585                    ;
586                assert_eq!(expected, array);
587            }
588        }};
589    }
590
591    #[test]
592    fn test_primitive_array_reader_temporal_types() {
593        test_primitive_array_reader_one_type!(
594            crate::data_type::Int32Type,
595            PhysicalType::INT32,
596            "DATE",
597            arrow::datatypes::Date32Type,
598            arrow::datatypes::Int32Type,
599            i32
600        );
601        test_primitive_array_reader_one_type!(
602            crate::data_type::Int32Type,
603            PhysicalType::INT32,
604            "TIME_MILLIS",
605            arrow::datatypes::Time32MillisecondType,
606            arrow::datatypes::Int32Type,
607            i32
608        );
609        test_primitive_array_reader_one_type!(
610            crate::data_type::Int64Type,
611            PhysicalType::INT64,
612            "TIME_MICROS",
613            arrow::datatypes::Time64MicrosecondType,
614            arrow::datatypes::Int64Type,
615            i64
616        );
617        test_primitive_array_reader_one_type!(
618            crate::data_type::Int64Type,
619            PhysicalType::INT64,
620            "TIMESTAMP_MILLIS",
621            arrow::datatypes::TimestampMillisecondType,
622            arrow::datatypes::Int64Type,
623            i64,
624            "UTC"
625        );
626        test_primitive_array_reader_one_type!(
627            crate::data_type::Int64Type,
628            PhysicalType::INT64,
629            "TIMESTAMP_MICROS",
630            arrow::datatypes::TimestampMicrosecondType,
631            arrow::datatypes::Int64Type,
632            i64,
633            "UTC"
634        );
635    }
636
637    #[test]
638    fn test_primitive_array_reader_def_and_rep_levels() {
639        // Construct column schema
640        let message_type = "
641        message test_schema {
642            REPEATED Group test_mid {
643                OPTIONAL INT32 leaf;
644            }
645        }
646        ";
647
648        let schema = parse_message_type(message_type)
649            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
650            .unwrap();
651
652        let column_desc = schema.column(0);
653
654        // Construct page iterator
655        {
656            let mut def_levels = Vec::new();
657            let mut rep_levels = Vec::new();
658            let mut page_lists = Vec::new();
659            make_column_chunks::<Int32Type>(
660                column_desc.clone(),
661                Encoding::PLAIN,
662                100,
663                1,
664                200,
665                &mut def_levels,
666                &mut rep_levels,
667                &mut Vec::new(),
668                &mut page_lists,
669                true,
670                2,
671            );
672
673            let page_iterator = InMemoryPageIterator::new(page_lists);
674
675            let mut array_reader =
676                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
677                    .unwrap();
678
679            let mut accu_len: usize = 0;
680
681            // Read first 50 values, which are all from the first column chunk
682            let array = array_reader.next_batch(50).unwrap();
683            assert_eq!(
684                Some(&def_levels[accu_len..(accu_len + array.len())]),
685                array_reader.get_def_levels()
686            );
687            assert_eq!(
688                Some(&rep_levels[accu_len..(accu_len + array.len())]),
689                array_reader.get_rep_levels()
690            );
691            accu_len += array.len();
692
693            // Read next 100 values, the first 50 ones are from the first column chunk,
694            // and the last 50 ones are from the second column chunk
695            let array = array_reader.next_batch(100).unwrap();
696            assert_eq!(
697                Some(&def_levels[accu_len..(accu_len + array.len())]),
698                array_reader.get_def_levels()
699            );
700            assert_eq!(
701                Some(&rep_levels[accu_len..(accu_len + array.len())]),
702                array_reader.get_rep_levels()
703            );
704            accu_len += array.len();
705
706            // Try to read 100 values, however there are only 50 values
707            let array = array_reader.next_batch(100).unwrap();
708            assert_eq!(
709                Some(&def_levels[accu_len..(accu_len + array.len())]),
710                array_reader.get_def_levels()
711            );
712            assert_eq!(
713                Some(&rep_levels[accu_len..(accu_len + array.len())]),
714                array_reader.get_rep_levels()
715            );
716        }
717    }
718
719    #[test]
720    fn test_primitive_array_reader_decimal_types() {
721        // parquet `INT32` to decimal
722        let message_type = "
723            message test_schema {
724                REQUIRED INT32 decimal1 (DECIMAL(8,2));
725        }
726        ";
727        let schema = parse_message_type(message_type)
728            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
729            .unwrap();
730        let column_desc = schema.column(0);
731
732        // create the array reader
733        {
734            let mut data = Vec::new();
735            let mut page_lists = Vec::new();
736            make_column_chunks::<Int32Type>(
737                column_desc.clone(),
738                Encoding::PLAIN,
739                100,
740                -99999999,
741                99999999,
742                &mut Vec::new(),
743                &mut Vec::new(),
744                &mut data,
745                &mut page_lists,
746                true,
747                2,
748            );
749            let page_iterator = InMemoryPageIterator::new(page_lists);
750
751            let mut array_reader =
752                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
753                    .unwrap();
754
755            // read data from the reader
756            // the data type is decimal(8,2)
757            let array = array_reader.next_batch(50).unwrap();
758            assert_eq!(array.data_type(), &Decimal128(8, 2));
759            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
760            let data_decimal_array = data[0..50]
761                .iter()
762                .copied()
763                .map(|v| Some(v as i128))
764                .collect::<Decimal128Array>()
765                .with_precision_and_scale(8, 2)
766                .unwrap();
767            assert_eq!(array, &data_decimal_array);
768
769            // not equal with different data type(precision and scale)
770            let data_decimal_array = data[0..50]
771                .iter()
772                .copied()
773                .map(|v| Some(v as i128))
774                .collect::<Decimal128Array>()
775                .with_precision_and_scale(9, 0)
776                .unwrap();
777            assert_ne!(array, &data_decimal_array)
778        }
779
780        // parquet `INT64` to decimal
781        let message_type = "
782            message test_schema {
783                REQUIRED INT64 decimal1 (DECIMAL(18,4));
784        }
785        ";
786        let schema = parse_message_type(message_type)
787            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
788            .unwrap();
789        let column_desc = schema.column(0);
790
791        // create the array reader
792        {
793            let mut data = Vec::new();
794            let mut page_lists = Vec::new();
795            make_column_chunks::<Int64Type>(
796                column_desc.clone(),
797                Encoding::PLAIN,
798                100,
799                -999999999999999999,
800                999999999999999999,
801                &mut Vec::new(),
802                &mut Vec::new(),
803                &mut data,
804                &mut page_lists,
805                true,
806                2,
807            );
808            let page_iterator = InMemoryPageIterator::new(page_lists);
809
810            let mut array_reader =
811                PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
812                    .unwrap();
813
814            // read data from the reader
815            // the data type is decimal(18,4)
816            let array = array_reader.next_batch(50).unwrap();
817            assert_eq!(array.data_type(), &Decimal128(18, 4));
818            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
819            let data_decimal_array = data[0..50]
820                .iter()
821                .copied()
822                .map(|v| Some(v as i128))
823                .collect::<Decimal128Array>()
824                .with_precision_and_scale(18, 4)
825                .unwrap();
826            assert_eq!(array, &data_decimal_array);
827
828            // not equal with different data type(precision and scale)
829            let data_decimal_array = data[0..50]
830                .iter()
831                .copied()
832                .map(|v| Some(v as i128))
833                .collect::<Decimal128Array>()
834                .with_precision_and_scale(34, 0)
835                .unwrap();
836            assert_ne!(array, &data_decimal_array)
837        }
838    }
839
840    #[test]
841    fn test_primitive_array_reader_date32_type() {
842        // parquet `INT32` to date
843        let message_type = "
844            message test_schema {
845                REQUIRED INT32 date1 (DATE);
846        }
847        ";
848        let schema = parse_message_type(message_type)
849            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
850            .unwrap();
851        let column_desc = schema.column(0);
852
853        // create the array reader
854        {
855            let mut data = Vec::new();
856            let mut page_lists = Vec::new();
857            make_column_chunks::<Int32Type>(
858                column_desc.clone(),
859                Encoding::PLAIN,
860                100,
861                -99999999,
862                99999999,
863                &mut Vec::new(),
864                &mut Vec::new(),
865                &mut data,
866                &mut page_lists,
867                true,
868                2,
869            );
870            let page_iterator = InMemoryPageIterator::new(page_lists);
871
872            let mut array_reader =
873                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
874                    .unwrap();
875
876            // read data from the reader
877            // the data type is date
878            let array = array_reader.next_batch(50).unwrap();
879            assert_eq!(array.data_type(), &Date32);
880            let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
881            let data_date_array = data[0..50]
882                .iter()
883                .copied()
884                .map(Some)
885                .collect::<Date32Array>();
886            assert_eq!(array, &data_date_array);
887        }
888    }
889}