parquet/arrow/array_reader/
primitive_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::{ArrayReader, read_records, skip_records};
19use crate::arrow::record_reader::RecordReader;
20use crate::arrow::schema::parquet_to_arrow_field;
21use crate::basic::Type as PhysicalType;
22use crate::column::page::PageIterator;
23use crate::data_type::{DataType, Int96};
24use crate::errors::Result;
25use crate::schema::types::ColumnDescPtr;
26use arrow_array::{
27    Array, ArrayRef, BooleanArray, Date64Array, Decimal64Array, Decimal128Array, Decimal256Array,
28    Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, PrimitiveArray,
29    UInt8Array, UInt16Array, builder::PrimitiveDictionaryBuilder, cast::AsArray, downcast_integer,
30    types::*,
31};
32use arrow_array::{
33    TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
34    TimestampSecondArray, UInt32Array, UInt64Array,
35};
36use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, ScalarBuffer, i256};
37use arrow_schema::{DataType as ArrowType, TimeUnit};
38use std::any::Any;
39use std::sync::Arc;
40
41/// Provides conversion from `Vec<T>` to `Buffer`
42pub trait IntoBuffer {
43    fn into_buffer(self, target_type: &ArrowType) -> Buffer;
44}
45
46macro_rules! native_buffer {
47    ($($t:ty),*) => {
48        $(impl IntoBuffer for Vec<$t> {
49            fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
50                Buffer::from_vec(self)
51            }
52        })*
53    };
54}
55native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64);
56
57impl IntoBuffer for Vec<bool> {
58    fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
59        BooleanBuffer::from_iter(self).into_inner()
60    }
61}
62
63impl IntoBuffer for Vec<Int96> {
64    fn into_buffer(self, target_type: &ArrowType) -> Buffer {
65        let mut builder = Vec::with_capacity(self.len());
66        match target_type {
67            ArrowType::Timestamp(TimeUnit::Second, _) => {
68                builder.extend(self.iter().map(|x| x.to_seconds()));
69            }
70            ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
71                builder.extend(self.iter().map(|x| x.to_millis()));
72            }
73            ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
74                builder.extend(self.iter().map(|x| x.to_micros()));
75            }
76            ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
77                builder.extend(self.iter().map(|x| x.to_nanos()));
78            }
79            _ => unreachable!("Invalid target_type for Int96."),
80        }
81        Buffer::from_vec(builder)
82    }
83}
84
85/// Primitive array readers are leaves of array reader tree. They accept page iterator
86/// and read them into primitive arrays.
87pub struct PrimitiveArrayReader<T>
88where
89    T: DataType,
90    T::T: Copy + Default,
91    Vec<T::T>: IntoBuffer,
92{
93    data_type: ArrowType,
94    pages: Box<dyn PageIterator>,
95    def_levels_buffer: Option<Vec<i16>>,
96    rep_levels_buffer: Option<Vec<i16>>,
97    record_reader: RecordReader<T>,
98}
99
100impl<T> PrimitiveArrayReader<T>
101where
102    T: DataType,
103    T::T: Copy + Default,
104    Vec<T::T>: IntoBuffer,
105{
106    /// Construct primitive array reader.
107    pub fn new(
108        pages: Box<dyn PageIterator>,
109        column_desc: ColumnDescPtr,
110        arrow_type: Option<ArrowType>,
111    ) -> Result<Self> {
112        // Check if Arrow type is specified, else create it from Parquet type
113        let data_type = match arrow_type {
114            Some(t) => t,
115            None => parquet_to_arrow_field(column_desc.as_ref())?
116                .data_type()
117                .clone(),
118        };
119
120        let record_reader = RecordReader::<T>::new(column_desc);
121
122        Ok(Self {
123            data_type,
124            pages,
125            def_levels_buffer: None,
126            rep_levels_buffer: None,
127            record_reader,
128        })
129    }
130}
131
132/// Implementation of primitive array reader.
133impl<T> ArrayReader for PrimitiveArrayReader<T>
134where
135    T: DataType,
136    T::T: Copy + Default,
137    Vec<T::T>: IntoBuffer,
138{
139    fn as_any(&self) -> &dyn Any {
140        self
141    }
142
143    /// Returns data type of primitive array.
144    fn get_data_type(&self) -> &ArrowType {
145        &self.data_type
146    }
147
148    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
149        read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
150    }
151
152    fn consume_batch(&mut self) -> Result<ArrayRef> {
153        let target_type = &self.data_type;
154
155        // Convert physical data to equivalent arrow type, and then perform
156        // coercion as needed
157        let record_data = self
158            .record_reader
159            .consume_record_data()
160            .into_buffer(target_type);
161
162        let len = self.record_reader.num_values();
163        let nulls = self
164            .record_reader
165            .consume_bitmap_buffer()
166            .map(|b| NullBuffer::new(BooleanBuffer::new(b, 0, len)));
167
168        let array: ArrayRef = match T::get_physical_type() {
169            PhysicalType::BOOLEAN => Arc::new(BooleanArray::new(
170                BooleanBuffer::new(record_data, 0, len),
171                nulls,
172            )),
173            PhysicalType::INT32 => Arc::new(Int32Array::new(
174                ScalarBuffer::new(record_data, 0, len),
175                nulls,
176            )),
177            PhysicalType::INT64 => Arc::new(Int64Array::new(
178                ScalarBuffer::new(record_data, 0, len),
179                nulls,
180            )),
181            PhysicalType::FLOAT => Arc::new(Float32Array::new(
182                ScalarBuffer::new(record_data, 0, len),
183                nulls,
184            )),
185            PhysicalType::DOUBLE => Arc::new(Float64Array::new(
186                ScalarBuffer::new(record_data, 0, len),
187                nulls,
188            )),
189            PhysicalType::INT96 => Arc::new(Int64Array::new(
190                ScalarBuffer::new(record_data, 0, len),
191                nulls,
192            )),
193            PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
194                unreachable!("PrimitiveArrayReaders don't support complex physical types");
195            }
196        };
197
198        // Coerce the arrow type to the desired array type
199        let array = coerce_array(array, target_type)?;
200
201        // save definition and repetition buffers
202        self.def_levels_buffer = self.record_reader.consume_def_levels();
203        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
204        self.record_reader.reset();
205        Ok(array)
206    }
207
208    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
209        skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
210    }
211
212    fn get_def_levels(&self) -> Option<&[i16]> {
213        self.def_levels_buffer.as_deref()
214    }
215
216    fn get_rep_levels(&self) -> Option<&[i16]> {
217        self.rep_levels_buffer.as_deref()
218    }
219}
220
221/// Coerce the parquet physical type array to the target type
222///
223/// This should match the logic in schema::primitive::apply_hint
224fn coerce_array(array: ArrayRef, target_type: &ArrowType) -> Result<ArrayRef> {
225    if let ArrowType::Dictionary(key_type, value_type) = target_type {
226        let dictionary = pack_dictionary(key_type, array.as_ref())?;
227        let any_dictionary = dictionary.as_any_dictionary();
228
229        let coerced_values =
230            coerce_array(Arc::clone(any_dictionary.values()), value_type.as_ref())?;
231
232        return Ok(any_dictionary.with_values(coerced_values));
233    }
234
235    match array.data_type() {
236        ArrowType::Int32 => coerce_i32(array.as_primitive(), target_type),
237        ArrowType::Int64 => coerce_i64(array.as_primitive(), target_type),
238        ArrowType::Boolean | ArrowType::Float32 | ArrowType::Float64 => Ok(array),
239        _ => unreachable!("Cannot coerce array of type {}", array.data_type()),
240    }
241}
242
243fn coerce_i32(array: &Int32Array, target_type: &ArrowType) -> Result<ArrayRef> {
244    Ok(match target_type {
245        ArrowType::UInt8 => {
246            let array = array.unary(|i| i as u8) as UInt8Array;
247            Arc::new(array) as ArrayRef
248        }
249        ArrowType::Int8 => {
250            let array = array.unary(|i| i as i8) as Int8Array;
251            Arc::new(array) as ArrayRef
252        }
253        ArrowType::UInt16 => {
254            let array = array.unary(|i| i as u16) as UInt16Array;
255            Arc::new(array) as ArrayRef
256        }
257        ArrowType::Int16 => {
258            let array = array.unary(|i| i as i16) as Int16Array;
259            Arc::new(array) as ArrayRef
260        }
261        ArrowType::Int32 => Arc::new(array.clone()),
262        // follow C++ implementation and use overflow/reinterpret cast from  i32 to u32 which will map
263        // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
264        ArrowType::UInt32 => Arc::new(UInt32Array::new(
265            array.values().inner().clone().into(),
266            array.nulls().cloned(),
267        )) as ArrayRef,
268        ArrowType::Date32 => Arc::new(array.reinterpret_cast::<Date32Type>()) as _,
269        ArrowType::Date64 => {
270            let array: Date64Array = array.unary(|x| x as i64 * 86_400_000);
271            Arc::new(array) as ArrayRef
272        }
273        ArrowType::Time32(TimeUnit::Second) => {
274            Arc::new(array.reinterpret_cast::<Time32SecondType>()) as ArrayRef
275        }
276        ArrowType::Time32(TimeUnit::Millisecond) => {
277            Arc::new(array.reinterpret_cast::<Time32MillisecondType>()) as ArrayRef
278        }
279        ArrowType::Timestamp(time_unit, timezone) => match time_unit {
280            TimeUnit::Second => {
281                let array: TimestampSecondArray = array
282                    .unary(|x| x as i64)
283                    .with_timezone_opt(timezone.clone());
284                Arc::new(array) as _
285            }
286            TimeUnit::Millisecond => {
287                let array: TimestampMillisecondArray = array
288                    .unary(|x| x as i64)
289                    .with_timezone_opt(timezone.clone());
290                Arc::new(array) as _
291            }
292            TimeUnit::Microsecond => {
293                let array: TimestampMicrosecondArray = array
294                    .unary(|x| x as i64)
295                    .with_timezone_opt(timezone.clone());
296                Arc::new(array) as _
297            }
298            TimeUnit::Nanosecond => {
299                let array: TimestampNanosecondArray = array
300                    .unary(|x| x as i64)
301                    .with_timezone_opt(timezone.clone());
302                Arc::new(array) as _
303            }
304        },
305        ArrowType::Decimal32(p, s) => {
306            let array = array
307                .reinterpret_cast::<Decimal32Type>()
308                .with_precision_and_scale(*p, *s)?;
309            Arc::new(array) as ArrayRef
310        }
311        ArrowType::Decimal64(p, s) => {
312            let array: Decimal64Array =
313                array.unary(|i| i as i64).with_precision_and_scale(*p, *s)?;
314            Arc::new(array) as ArrayRef
315        }
316        ArrowType::Decimal128(p, s) => {
317            let array: Decimal128Array = array
318                .unary(|i| i as i128)
319                .with_precision_and_scale(*p, *s)?;
320            Arc::new(array) as ArrayRef
321        }
322        ArrowType::Decimal256(p, s) => {
323            let array: Decimal256Array = array
324                .unary(|i| i256::from_i128(i as i128))
325                .with_precision_and_scale(*p, *s)?;
326            Arc::new(array) as ArrayRef
327        }
328        _ => unreachable!("Cannot coerce i32 to {target_type}"),
329    })
330}
331
332fn coerce_i64(array: &Int64Array, target_type: &ArrowType) -> Result<ArrayRef> {
333    Ok(match target_type {
334        ArrowType::Int64 => Arc::new(array.clone()) as _,
335        // follow C++ implementation and use overflow/reinterpret cast from i64 to u64 which will map
336        // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
337        ArrowType::UInt64 => Arc::new(UInt64Array::new(
338            array.values().inner().clone().into(),
339            array.nulls().cloned(),
340        )) as ArrayRef,
341        ArrowType::Date64 => Arc::new(array.reinterpret_cast::<Date64Type>()) as _,
342        ArrowType::Time64(TimeUnit::Microsecond) => {
343            Arc::new(array.reinterpret_cast::<Time64MicrosecondType>()) as _
344        }
345        ArrowType::Time64(TimeUnit::Nanosecond) => {
346            Arc::new(array.reinterpret_cast::<Time64NanosecondType>()) as _
347        }
348        ArrowType::Duration(unit) => match unit {
349            TimeUnit::Second => Arc::new(array.reinterpret_cast::<DurationSecondType>()) as _,
350            TimeUnit::Millisecond => {
351                Arc::new(array.reinterpret_cast::<DurationMillisecondType>()) as _
352            }
353            TimeUnit::Microsecond => {
354                Arc::new(array.reinterpret_cast::<DurationMicrosecondType>()) as _
355            }
356            TimeUnit::Nanosecond => {
357                Arc::new(array.reinterpret_cast::<DurationNanosecondType>()) as _
358            }
359        },
360        ArrowType::Timestamp(time_unit, timezone) => match time_unit {
361            TimeUnit::Second => {
362                let array = array
363                    .reinterpret_cast::<TimestampSecondType>()
364                    .with_timezone_opt(timezone.clone());
365                Arc::new(array) as _
366            }
367            TimeUnit::Millisecond => {
368                let array = array
369                    .reinterpret_cast::<TimestampMillisecondType>()
370                    .with_timezone_opt(timezone.clone());
371                Arc::new(array) as _
372            }
373            TimeUnit::Microsecond => {
374                let array = array
375                    .reinterpret_cast::<TimestampMicrosecondType>()
376                    .with_timezone_opt(timezone.clone());
377                Arc::new(array) as _
378            }
379            TimeUnit::Nanosecond => {
380                let array = array
381                    .reinterpret_cast::<TimestampNanosecondType>()
382                    .with_timezone_opt(timezone.clone());
383                Arc::new(array) as _
384            }
385        },
386        ArrowType::Decimal64(p, s) => {
387            let array = array
388                .reinterpret_cast::<Decimal64Type>()
389                .with_precision_and_scale(*p, *s)?;
390            Arc::new(array) as _
391        }
392        ArrowType::Decimal128(p, s) => {
393            let array: Decimal128Array = array
394                .unary(|i| i as i128)
395                .with_precision_and_scale(*p, *s)?;
396            Arc::new(array) as _
397        }
398        ArrowType::Decimal256(p, s) => {
399            let array: Decimal256Array = array
400                .unary(|i| i256::from_i128(i as i128))
401                .with_precision_and_scale(*p, *s)?;
402            Arc::new(array) as _
403        }
404        _ => unreachable!("Cannot coerce i64 to {target_type}"),
405    })
406}
407
408macro_rules! pack_dictionary_helper {
409    ($t:ty, $values:ident) => {
410        match $values.data_type() {
411            ArrowType::Int32 => pack_dictionary_impl::<$t, Int32Type>($values.as_primitive()),
412            ArrowType::Int64 => pack_dictionary_impl::<$t, Int64Type>($values.as_primitive()),
413            ArrowType::Float32 => pack_dictionary_impl::<$t, Float32Type>($values.as_primitive()),
414            ArrowType::Float64 => pack_dictionary_impl::<$t, Float64Type>($values.as_primitive()),
415            _ => unreachable!("Invalid physical type"),
416        }
417    };
418}
419
420fn pack_dictionary(key: &ArrowType, values: &dyn Array) -> Result<ArrayRef> {
421    downcast_integer! {
422        key => (pack_dictionary_helper, values),
423        _ => unreachable!("Invalid key type"),
424    }
425}
426
427fn pack_dictionary_impl<K: ArrowDictionaryKeyType, V: ArrowPrimitiveType>(
428    values: &PrimitiveArray<V>,
429) -> Result<ArrayRef> {
430    let mut builder = PrimitiveDictionaryBuilder::<K, V>::with_capacity(1024, values.len());
431    builder.extend(values);
432    Ok(Arc::new(builder.finish()))
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438    use crate::arrow::array_reader::test_util::EmptyPageIterator;
439    use crate::basic::Encoding;
440    use crate::column::page::Page;
441    use crate::data_type::{Int32Type, Int64Type};
442    use crate::schema::parser::parse_message_type;
443    use crate::schema::types::SchemaDescriptor;
444    use crate::util::InMemoryPageIterator;
445    use crate::util::test_common::rand_gen::make_pages;
446    use arrow::datatypes::ArrowPrimitiveType;
447    use arrow_array::{Array, Date32Array, PrimitiveArray};
448
449    use arrow::datatypes::DataType::{Date32, Decimal128};
450    use rand::distr::uniform::SampleUniform;
451    use std::collections::VecDeque;
452
453    #[allow(clippy::too_many_arguments)]
454    fn make_column_chunks<T: DataType>(
455        column_desc: ColumnDescPtr,
456        encoding: Encoding,
457        num_levels: usize,
458        min_value: T::T,
459        max_value: T::T,
460        def_levels: &mut Vec<i16>,
461        rep_levels: &mut Vec<i16>,
462        values: &mut Vec<T::T>,
463        page_lists: &mut Vec<Vec<Page>>,
464        use_v2: bool,
465        num_chunks: usize,
466    ) where
467        T::T: PartialOrd + SampleUniform + Copy,
468    {
469        for _i in 0..num_chunks {
470            let mut pages = VecDeque::new();
471            let mut data = Vec::new();
472            let mut page_def_levels = Vec::new();
473            let mut page_rep_levels = Vec::new();
474
475            make_pages::<T>(
476                column_desc.clone(),
477                encoding,
478                1,
479                num_levels,
480                min_value,
481                max_value,
482                &mut page_def_levels,
483                &mut page_rep_levels,
484                &mut data,
485                &mut pages,
486                use_v2,
487            );
488
489            def_levels.append(&mut page_def_levels);
490            rep_levels.append(&mut page_rep_levels);
491            values.append(&mut data);
492            page_lists.push(Vec::from(pages));
493        }
494    }
495
496    #[test]
497    fn test_primitive_array_reader_empty_pages() {
498        // Construct column schema
499        let message_type = "
500        message test_schema {
501          REQUIRED INT32 leaf;
502        }
503        ";
504
505        let schema = parse_message_type(message_type)
506            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
507            .unwrap();
508
509        let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
510            Box::<EmptyPageIterator>::default(),
511            schema.column(0),
512            None,
513        )
514        .unwrap();
515
516        // expect no values to be read
517        let array = array_reader.next_batch(50).unwrap();
518        assert!(array.is_empty());
519    }
520
521    #[test]
522    fn test_primitive_array_reader_data() {
523        // Construct column schema
524        let message_type = "
525        message test_schema {
526          REQUIRED INT32 leaf;
527        }
528        ";
529
530        let schema = parse_message_type(message_type)
531            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
532            .unwrap();
533
534        let column_desc = schema.column(0);
535
536        // Construct page iterator
537        {
538            let mut data = Vec::new();
539            let mut page_lists = Vec::new();
540            make_column_chunks::<Int32Type>(
541                column_desc.clone(),
542                Encoding::PLAIN,
543                100,
544                1,
545                200,
546                &mut Vec::new(),
547                &mut Vec::new(),
548                &mut data,
549                &mut page_lists,
550                true,
551                2,
552            );
553            let page_iterator = InMemoryPageIterator::new(page_lists);
554
555            let mut array_reader =
556                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
557                    .unwrap();
558
559            // Read first 50 values, which are all from the first column chunk
560            let array = array_reader.next_batch(50).unwrap();
561            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
562
563            assert_eq!(&Int32Array::from(data[0..50].to_vec()), array);
564
565            // Read next 100 values, the first 50 ones are from the first column chunk,
566            // and the last 50 ones are from the second column chunk
567            let array = array_reader.next_batch(100).unwrap();
568            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
569
570            assert_eq!(&Int32Array::from(data[50..150].to_vec()), array);
571
572            // Try to read 100 values, however there are only 50 values
573            let array = array_reader.next_batch(100).unwrap();
574            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
575
576            assert_eq!(&Int32Array::from(data[150..200].to_vec()), array);
577        }
578    }
579
580    macro_rules! test_primitive_array_reader_one_type {
581        (
582            $arrow_parquet_type:ty,
583            $physical_type:expr,
584            $converted_type_str:expr,
585            $result_arrow_type:ty,
586            $result_arrow_cast_type:ty,
587            $result_primitive_type:ty
588            $(, $timezone:expr)?
589        ) => {{
590            let message_type = format!(
591                "
592            message test_schema {{
593              REQUIRED {:?} leaf ({});
594          }}
595            ",
596                $physical_type, $converted_type_str
597            );
598            let schema = parse_message_type(&message_type)
599                .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
600                .unwrap();
601
602            let column_desc = schema.column(0);
603
604            // Construct page iterator
605            {
606                let mut data = Vec::new();
607                let mut page_lists = Vec::new();
608                make_column_chunks::<$arrow_parquet_type>(
609                    column_desc.clone(),
610                    Encoding::PLAIN,
611                    100,
612                    1,
613                    200,
614                    &mut Vec::new(),
615                    &mut Vec::new(),
616                    &mut data,
617                    &mut page_lists,
618                    true,
619                    2,
620                );
621                let page_iterator = InMemoryPageIterator::new(page_lists);
622                let mut array_reader = PrimitiveArrayReader::<$arrow_parquet_type>::new(
623                    Box::new(page_iterator),
624                    column_desc.clone(),
625                    None,
626                )
627                .expect("Unable to get array reader");
628
629                let array = array_reader
630                    .next_batch(50)
631                    .expect("Unable to get batch from reader");
632
633                let result_data_type = <$result_arrow_type>::DATA_TYPE;
634                let array = array
635                    .as_any()
636                    .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
637                    .expect(
638                        format!(
639                            "Unable to downcast {:?} to {:?}",
640                            array.data_type(),
641                            result_data_type
642                        )
643                        .as_str(),
644                    )
645                    $(.clone().with_timezone($timezone))?
646                    ;
647
648                // create expected array as primitive, and cast to result type
649                let expected = PrimitiveArray::<$result_arrow_cast_type>::from(
650                    data[0..50]
651                        .iter()
652                        .map(|x| *x as $result_primitive_type)
653                        .collect::<Vec<$result_primitive_type>>(),
654                );
655                let expected = Arc::new(expected) as ArrayRef;
656                let expected = arrow::compute::cast(&expected, &result_data_type)
657                    .expect("Unable to cast expected array");
658                assert_eq!(expected.data_type(), &result_data_type);
659                let expected = expected
660                    .as_any()
661                    .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
662                    .expect(
663                        format!(
664                            "Unable to downcast expected {:?} to {:?}",
665                            expected.data_type(),
666                            result_data_type
667                        )
668                        .as_str(),
669                    )
670                    $(.clone().with_timezone($timezone))?
671                    ;
672                assert_eq!(expected, array);
673            }
674        }};
675    }
676
677    #[test]
678    fn test_primitive_array_reader_temporal_types() {
679        test_primitive_array_reader_one_type!(
680            crate::data_type::Int32Type,
681            PhysicalType::INT32,
682            "DATE",
683            arrow::datatypes::Date32Type,
684            arrow::datatypes::Int32Type,
685            i32
686        );
687        test_primitive_array_reader_one_type!(
688            crate::data_type::Int32Type,
689            PhysicalType::INT32,
690            "TIME_MILLIS",
691            arrow::datatypes::Time32MillisecondType,
692            arrow::datatypes::Int32Type,
693            i32
694        );
695        test_primitive_array_reader_one_type!(
696            crate::data_type::Int64Type,
697            PhysicalType::INT64,
698            "TIME_MICROS",
699            arrow::datatypes::Time64MicrosecondType,
700            arrow::datatypes::Int64Type,
701            i64
702        );
703        test_primitive_array_reader_one_type!(
704            crate::data_type::Int64Type,
705            PhysicalType::INT64,
706            "TIMESTAMP_MILLIS",
707            arrow::datatypes::TimestampMillisecondType,
708            arrow::datatypes::Int64Type,
709            i64,
710            "UTC"
711        );
712        test_primitive_array_reader_one_type!(
713            crate::data_type::Int64Type,
714            PhysicalType::INT64,
715            "TIMESTAMP_MICROS",
716            arrow::datatypes::TimestampMicrosecondType,
717            arrow::datatypes::Int64Type,
718            i64,
719            "UTC"
720        );
721    }
722
723    #[test]
724    fn test_primitive_array_reader_def_and_rep_levels() {
725        // Construct column schema
726        let message_type = "
727        message test_schema {
728            REPEATED Group test_mid {
729                OPTIONAL INT32 leaf;
730            }
731        }
732        ";
733
734        let schema = parse_message_type(message_type)
735            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
736            .unwrap();
737
738        let column_desc = schema.column(0);
739
740        // Construct page iterator
741        {
742            let mut def_levels = Vec::new();
743            let mut rep_levels = Vec::new();
744            let mut page_lists = Vec::new();
745            make_column_chunks::<Int32Type>(
746                column_desc.clone(),
747                Encoding::PLAIN,
748                100,
749                1,
750                200,
751                &mut def_levels,
752                &mut rep_levels,
753                &mut Vec::new(),
754                &mut page_lists,
755                true,
756                2,
757            );
758
759            let page_iterator = InMemoryPageIterator::new(page_lists);
760
761            let mut array_reader =
762                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
763                    .unwrap();
764
765            let mut accu_len: usize = 0;
766
767            // Read first 50 values, which are all from the first column chunk
768            let array = array_reader.next_batch(50).unwrap();
769            assert_eq!(
770                Some(&def_levels[accu_len..(accu_len + array.len())]),
771                array_reader.get_def_levels()
772            );
773            assert_eq!(
774                Some(&rep_levels[accu_len..(accu_len + array.len())]),
775                array_reader.get_rep_levels()
776            );
777            accu_len += array.len();
778
779            // Read next 100 values, the first 50 ones are from the first column chunk,
780            // and the last 50 ones are from the second column chunk
781            let array = array_reader.next_batch(100).unwrap();
782            assert_eq!(
783                Some(&def_levels[accu_len..(accu_len + array.len())]),
784                array_reader.get_def_levels()
785            );
786            assert_eq!(
787                Some(&rep_levels[accu_len..(accu_len + array.len())]),
788                array_reader.get_rep_levels()
789            );
790            accu_len += array.len();
791
792            // Try to read 100 values, however there are only 50 values
793            let array = array_reader.next_batch(100).unwrap();
794            assert_eq!(
795                Some(&def_levels[accu_len..(accu_len + array.len())]),
796                array_reader.get_def_levels()
797            );
798            assert_eq!(
799                Some(&rep_levels[accu_len..(accu_len + array.len())]),
800                array_reader.get_rep_levels()
801            );
802        }
803    }
804
805    #[test]
806    fn test_primitive_array_reader_decimal_types() {
807        // parquet `INT32` to decimal
808        let message_type = "
809            message test_schema {
810                REQUIRED INT32 decimal1 (DECIMAL(8,2));
811        }
812        ";
813        let schema = parse_message_type(message_type)
814            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
815            .unwrap();
816        let column_desc = schema.column(0);
817
818        // create the array reader
819        {
820            let mut data = Vec::new();
821            let mut page_lists = Vec::new();
822            make_column_chunks::<Int32Type>(
823                column_desc.clone(),
824                Encoding::PLAIN,
825                100,
826                -99999999,
827                99999999,
828                &mut Vec::new(),
829                &mut Vec::new(),
830                &mut data,
831                &mut page_lists,
832                true,
833                2,
834            );
835            let page_iterator = InMemoryPageIterator::new(page_lists);
836
837            let mut array_reader =
838                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
839                    .unwrap();
840
841            // read data from the reader
842            // the data type is decimal(8,2)
843            let array = array_reader.next_batch(50).unwrap();
844            assert_eq!(array.data_type(), &Decimal128(8, 2));
845            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
846            let data_decimal_array = data[0..50]
847                .iter()
848                .copied()
849                .map(|v| Some(v as i128))
850                .collect::<Decimal128Array>()
851                .with_precision_and_scale(8, 2)
852                .unwrap();
853            assert_eq!(array, &data_decimal_array);
854
855            // not equal with different data type(precision and scale)
856            let data_decimal_array = data[0..50]
857                .iter()
858                .copied()
859                .map(|v| Some(v as i128))
860                .collect::<Decimal128Array>()
861                .with_precision_and_scale(9, 0)
862                .unwrap();
863            assert_ne!(array, &data_decimal_array)
864        }
865
866        // parquet `INT64` to decimal
867        let message_type = "
868            message test_schema {
869                REQUIRED INT64 decimal1 (DECIMAL(18,4));
870        }
871        ";
872        let schema = parse_message_type(message_type)
873            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
874            .unwrap();
875        let column_desc = schema.column(0);
876
877        // create the array reader
878        {
879            let mut data = Vec::new();
880            let mut page_lists = Vec::new();
881            make_column_chunks::<Int64Type>(
882                column_desc.clone(),
883                Encoding::PLAIN,
884                100,
885                -999999999999999999,
886                999999999999999999,
887                &mut Vec::new(),
888                &mut Vec::new(),
889                &mut data,
890                &mut page_lists,
891                true,
892                2,
893            );
894            let page_iterator = InMemoryPageIterator::new(page_lists);
895
896            let mut array_reader =
897                PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
898                    .unwrap();
899
900            // read data from the reader
901            // the data type is decimal(18,4)
902            let array = array_reader.next_batch(50).unwrap();
903            assert_eq!(array.data_type(), &Decimal128(18, 4));
904            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
905            let data_decimal_array = data[0..50]
906                .iter()
907                .copied()
908                .map(|v| Some(v as i128))
909                .collect::<Decimal128Array>()
910                .with_precision_and_scale(18, 4)
911                .unwrap();
912            assert_eq!(array, &data_decimal_array);
913
914            // not equal with different data type(precision and scale)
915            let data_decimal_array = data[0..50]
916                .iter()
917                .copied()
918                .map(|v| Some(v as i128))
919                .collect::<Decimal128Array>()
920                .with_precision_and_scale(34, 0)
921                .unwrap();
922            assert_ne!(array, &data_decimal_array)
923        }
924    }
925
926    #[test]
927    fn test_primitive_array_reader_date32_type() {
928        // parquet `INT32` to date
929        let message_type = "
930            message test_schema {
931                REQUIRED INT32 date1 (DATE);
932        }
933        ";
934        let schema = parse_message_type(message_type)
935            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
936            .unwrap();
937        let column_desc = schema.column(0);
938
939        // create the array reader
940        {
941            let mut data = Vec::new();
942            let mut page_lists = Vec::new();
943            make_column_chunks::<Int32Type>(
944                column_desc.clone(),
945                Encoding::PLAIN,
946                100,
947                -99999999,
948                99999999,
949                &mut Vec::new(),
950                &mut Vec::new(),
951                &mut data,
952                &mut page_lists,
953                true,
954                2,
955            );
956            let page_iterator = InMemoryPageIterator::new(page_lists);
957
958            let mut array_reader =
959                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
960                    .unwrap();
961
962            // read data from the reader
963            // the data type is date
964            let array = array_reader.next_batch(50).unwrap();
965            assert_eq!(array.data_type(), &Date32);
966            let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
967            let data_date_array = data[0..50]
968                .iter()
969                .copied()
970                .map(Some)
971                .collect::<Date32Array>();
972            assert_eq!(array, &data_date_array);
973        }
974    }
975}