Skip to main content

parquet/arrow/array_reader/
primitive_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::{ArrayReader, read_records, skip_records};
19use crate::arrow::record_reader::RecordReader;
20use crate::arrow::schema::parquet_to_arrow_field;
21use crate::basic::Type as PhysicalType;
22use crate::column::page::PageIterator;
23use crate::data_type::{DataType, Int96};
24use crate::errors::Result;
25use crate::schema::types::ColumnDescPtr;
26use arrow_array::{
27    Array, ArrayRef, BooleanArray, Date64Array, Decimal64Array, Decimal128Array, Decimal256Array,
28    Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, PrimitiveArray,
29    UInt8Array, UInt16Array, builder::PrimitiveDictionaryBuilder, cast::AsArray, downcast_integer,
30    types::*,
31};
32use arrow_array::{
33    TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
34    TimestampSecondArray, UInt32Array, UInt64Array,
35};
36use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, ScalarBuffer, i256};
37use arrow_schema::{DataType as ArrowType, TimeUnit};
38use std::any::Any;
39use std::sync::Arc;
40
41/// Provides conversion from `Vec<T>` to `Buffer`
42pub trait IntoBuffer {
43    fn into_buffer(self, target_type: &ArrowType) -> Buffer;
44}
45
46macro_rules! native_buffer {
47    ($($t:ty),*) => {
48        $(impl IntoBuffer for Vec<$t> {
49            fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
50                Buffer::from_vec(self)
51            }
52        })*
53    };
54}
55native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64);
56
57impl IntoBuffer for Vec<bool> {
58    fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
59        BooleanBuffer::from_iter(self).into_inner()
60    }
61}
62
63impl IntoBuffer for Vec<Int96> {
64    fn into_buffer(self, target_type: &ArrowType) -> Buffer {
65        let mut builder = Vec::with_capacity(self.len());
66        match target_type {
67            ArrowType::Timestamp(TimeUnit::Second, _) => {
68                builder.extend(self.iter().map(|x| x.to_seconds()));
69            }
70            ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
71                builder.extend(self.iter().map(|x| x.to_millis()));
72            }
73            ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
74                builder.extend(self.iter().map(|x| x.to_micros()));
75            }
76            ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
77                builder.extend(self.iter().map(|x| x.to_nanos()));
78            }
79            _ => unreachable!("Invalid target_type for Int96."),
80        }
81        Buffer::from_vec(builder)
82    }
83}
84
85/// Primitive array readers are leaves of array reader tree. They accept page iterator
86/// and read them into primitive arrays.
87pub struct PrimitiveArrayReader<T>
88where
89    T: DataType,
90    T::T: Copy + Default,
91    Vec<T::T>: IntoBuffer,
92{
93    data_type: ArrowType,
94    pages: Box<dyn PageIterator>,
95    def_levels_buffer: Option<Vec<i16>>,
96    rep_levels_buffer: Option<Vec<i16>>,
97    record_reader: RecordReader<T>,
98}
99
100impl<T> PrimitiveArrayReader<T>
101where
102    T: DataType,
103    T::T: Copy + Default,
104    Vec<T::T>: IntoBuffer,
105{
106    /// Construct primitive array reader.
107    ///
108    /// `batch_size` is used to pre-allocate internal buffers.
109    pub fn new(
110        pages: Box<dyn PageIterator>,
111        column_desc: ColumnDescPtr,
112        arrow_type: Option<ArrowType>,
113        batch_size: usize,
114    ) -> Result<Self> {
115        // Check if Arrow type is specified, else create it from Parquet type
116        let data_type = match arrow_type {
117            Some(t) => t,
118            None => parquet_to_arrow_field(column_desc.as_ref())?
119                .data_type()
120                .clone(),
121        };
122
123        let record_reader = RecordReader::<T>::new(column_desc, batch_size);
124
125        Ok(Self {
126            data_type,
127            pages,
128            def_levels_buffer: None,
129            rep_levels_buffer: None,
130            record_reader,
131        })
132    }
133}
134
135/// Implementation of primitive array reader.
136impl<T> ArrayReader for PrimitiveArrayReader<T>
137where
138    T: DataType,
139    T::T: Copy + Default,
140    Vec<T::T>: IntoBuffer,
141{
142    fn as_any(&self) -> &dyn Any {
143        self
144    }
145
146    /// Returns data type of primitive array.
147    fn get_data_type(&self) -> &ArrowType {
148        &self.data_type
149    }
150
151    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
152        read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
153    }
154
155    fn consume_batch(&mut self) -> Result<ArrayRef> {
156        let target_type = &self.data_type;
157
158        // Convert physical data to equivalent arrow type, and then perform
159        // coercion as needed
160        let record_data = self
161            .record_reader
162            .consume_record_data()
163            .into_buffer(target_type);
164
165        let len = self.record_reader.num_values();
166        let nulls = self
167            .record_reader
168            .consume_bitmap_buffer()
169            .and_then(|b| NullBuffer::from_unsliced_buffer(b, len));
170
171        let array: ArrayRef = match T::get_physical_type() {
172            PhysicalType::BOOLEAN => Arc::new(BooleanArray::new(
173                BooleanBuffer::new(record_data, 0, len),
174                nulls,
175            )),
176            PhysicalType::INT32 => Arc::new(Int32Array::new(
177                ScalarBuffer::new(record_data, 0, len),
178                nulls,
179            )),
180            PhysicalType::INT64 => Arc::new(Int64Array::new(
181                ScalarBuffer::new(record_data, 0, len),
182                nulls,
183            )),
184            PhysicalType::FLOAT => Arc::new(Float32Array::new(
185                ScalarBuffer::new(record_data, 0, len),
186                nulls,
187            )),
188            PhysicalType::DOUBLE => Arc::new(Float64Array::new(
189                ScalarBuffer::new(record_data, 0, len),
190                nulls,
191            )),
192            PhysicalType::INT96 => Arc::new(Int64Array::new(
193                ScalarBuffer::new(record_data, 0, len),
194                nulls,
195            )),
196            PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
197                unreachable!("PrimitiveArrayReaders don't support complex physical types");
198            }
199        };
200
201        // Coerce the arrow type to the desired array type
202        let array = coerce_array(array, target_type)?;
203
204        // save definition and repetition buffers
205        self.def_levels_buffer = self.record_reader.consume_def_levels();
206        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
207        self.record_reader.reset();
208        Ok(array)
209    }
210
211    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
212        skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
213    }
214
215    fn get_def_levels(&self) -> Option<&[i16]> {
216        self.def_levels_buffer.as_deref()
217    }
218
219    fn get_rep_levels(&self) -> Option<&[i16]> {
220        self.rep_levels_buffer.as_deref()
221    }
222}
223
224/// Coerce the parquet physical type array to the target type
225///
226/// This should match the logic in schema::primitive::apply_hint
227fn coerce_array(array: ArrayRef, target_type: &ArrowType) -> Result<ArrayRef> {
228    if let ArrowType::Dictionary(key_type, value_type) = target_type {
229        let dictionary = pack_dictionary(key_type, array.as_ref())?;
230        let any_dictionary = dictionary.as_any_dictionary();
231
232        let coerced_values =
233            coerce_array(Arc::clone(any_dictionary.values()), value_type.as_ref())?;
234
235        return Ok(any_dictionary.with_values(coerced_values));
236    }
237
238    match array.data_type() {
239        ArrowType::Int32 => coerce_i32(array.as_primitive(), target_type),
240        ArrowType::Int64 => coerce_i64(array.as_primitive(), target_type),
241        ArrowType::Boolean | ArrowType::Float32 | ArrowType::Float64 => Ok(array),
242        _ => unreachable!("Cannot coerce array of type {}", array.data_type()),
243    }
244}
245
246fn coerce_i32(array: &Int32Array, target_type: &ArrowType) -> Result<ArrayRef> {
247    Ok(match target_type {
248        ArrowType::UInt8 => {
249            let array = array.unary(|i| i as u8) as UInt8Array;
250            Arc::new(array) as ArrayRef
251        }
252        ArrowType::Int8 => {
253            let array = array.unary(|i| i as i8) as Int8Array;
254            Arc::new(array) as ArrayRef
255        }
256        ArrowType::UInt16 => {
257            let array = array.unary(|i| i as u16) as UInt16Array;
258            Arc::new(array) as ArrayRef
259        }
260        ArrowType::Int16 => {
261            let array = array.unary(|i| i as i16) as Int16Array;
262            Arc::new(array) as ArrayRef
263        }
264        ArrowType::Int32 => Arc::new(array.clone()),
265        // follow C++ implementation and use overflow/reinterpret cast from  i32 to u32 which will map
266        // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
267        ArrowType::UInt32 => Arc::new(UInt32Array::new(
268            array.values().inner().clone().into(),
269            array.nulls().cloned(),
270        )) as ArrayRef,
271        ArrowType::Date32 => Arc::new(array.reinterpret_cast::<Date32Type>()) as _,
272        ArrowType::Date64 => {
273            let array: Date64Array = array.unary(|x| x as i64 * 86_400_000);
274            Arc::new(array) as ArrayRef
275        }
276        ArrowType::Time32(TimeUnit::Second) => {
277            Arc::new(array.reinterpret_cast::<Time32SecondType>()) as ArrayRef
278        }
279        ArrowType::Time32(TimeUnit::Millisecond) => {
280            Arc::new(array.reinterpret_cast::<Time32MillisecondType>()) as ArrayRef
281        }
282        ArrowType::Timestamp(time_unit, timezone) => match time_unit {
283            TimeUnit::Second => {
284                let array: TimestampSecondArray = array
285                    .unary(|x| x as i64)
286                    .with_timezone_opt(timezone.clone());
287                Arc::new(array) as _
288            }
289            TimeUnit::Millisecond => {
290                let array: TimestampMillisecondArray = array
291                    .unary(|x| x as i64)
292                    .with_timezone_opt(timezone.clone());
293                Arc::new(array) as _
294            }
295            TimeUnit::Microsecond => {
296                let array: TimestampMicrosecondArray = array
297                    .unary(|x| x as i64)
298                    .with_timezone_opt(timezone.clone());
299                Arc::new(array) as _
300            }
301            TimeUnit::Nanosecond => {
302                let array: TimestampNanosecondArray = array
303                    .unary(|x| x as i64)
304                    .with_timezone_opt(timezone.clone());
305                Arc::new(array) as _
306            }
307        },
308        ArrowType::Decimal32(p, s) => {
309            let array = array
310                .reinterpret_cast::<Decimal32Type>()
311                .with_precision_and_scale(*p, *s)?;
312            Arc::new(array) as ArrayRef
313        }
314        ArrowType::Decimal64(p, s) => {
315            let array: Decimal64Array =
316                array.unary(|i| i as i64).with_precision_and_scale(*p, *s)?;
317            Arc::new(array) as ArrayRef
318        }
319        ArrowType::Decimal128(p, s) => {
320            let array: Decimal128Array = array
321                .unary(|i| i as i128)
322                .with_precision_and_scale(*p, *s)?;
323            Arc::new(array) as ArrayRef
324        }
325        ArrowType::Decimal256(p, s) => {
326            let array: Decimal256Array = array
327                .unary(|i| i256::from_i128(i as i128))
328                .with_precision_and_scale(*p, *s)?;
329            Arc::new(array) as ArrayRef
330        }
331        _ => unreachable!("Cannot coerce i32 to {target_type}"),
332    })
333}
334
335fn coerce_i64(array: &Int64Array, target_type: &ArrowType) -> Result<ArrayRef> {
336    Ok(match target_type {
337        ArrowType::Int64 => Arc::new(array.clone()) as _,
338        // follow C++ implementation and use overflow/reinterpret cast from i64 to u64 which will map
339        // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
340        ArrowType::UInt64 => Arc::new(UInt64Array::new(
341            array.values().inner().clone().into(),
342            array.nulls().cloned(),
343        )) as ArrayRef,
344        ArrowType::Date64 => Arc::new(array.reinterpret_cast::<Date64Type>()) as _,
345        ArrowType::Time64(TimeUnit::Microsecond) => {
346            Arc::new(array.reinterpret_cast::<Time64MicrosecondType>()) as _
347        }
348        ArrowType::Time64(TimeUnit::Nanosecond) => {
349            Arc::new(array.reinterpret_cast::<Time64NanosecondType>()) as _
350        }
351        ArrowType::Duration(unit) => match unit {
352            TimeUnit::Second => Arc::new(array.reinterpret_cast::<DurationSecondType>()) as _,
353            TimeUnit::Millisecond => {
354                Arc::new(array.reinterpret_cast::<DurationMillisecondType>()) as _
355            }
356            TimeUnit::Microsecond => {
357                Arc::new(array.reinterpret_cast::<DurationMicrosecondType>()) as _
358            }
359            TimeUnit::Nanosecond => {
360                Arc::new(array.reinterpret_cast::<DurationNanosecondType>()) as _
361            }
362        },
363        ArrowType::Timestamp(time_unit, timezone) => match time_unit {
364            TimeUnit::Second => {
365                let array = array
366                    .reinterpret_cast::<TimestampSecondType>()
367                    .with_timezone_opt(timezone.clone());
368                Arc::new(array) as _
369            }
370            TimeUnit::Millisecond => {
371                let array = array
372                    .reinterpret_cast::<TimestampMillisecondType>()
373                    .with_timezone_opt(timezone.clone());
374                Arc::new(array) as _
375            }
376            TimeUnit::Microsecond => {
377                let array = array
378                    .reinterpret_cast::<TimestampMicrosecondType>()
379                    .with_timezone_opt(timezone.clone());
380                Arc::new(array) as _
381            }
382            TimeUnit::Nanosecond => {
383                let array = array
384                    .reinterpret_cast::<TimestampNanosecondType>()
385                    .with_timezone_opt(timezone.clone());
386                Arc::new(array) as _
387            }
388        },
389        ArrowType::Decimal64(p, s) => {
390            let array = array
391                .reinterpret_cast::<Decimal64Type>()
392                .with_precision_and_scale(*p, *s)?;
393            Arc::new(array) as _
394        }
395        ArrowType::Decimal128(p, s) => {
396            let array: Decimal128Array = array
397                .unary(|i| i as i128)
398                .with_precision_and_scale(*p, *s)?;
399            Arc::new(array) as _
400        }
401        ArrowType::Decimal256(p, s) => {
402            let array: Decimal256Array = array
403                .unary(|i| i256::from_i128(i as i128))
404                .with_precision_and_scale(*p, *s)?;
405            Arc::new(array) as _
406        }
407        _ => unreachable!("Cannot coerce i64 to {target_type}"),
408    })
409}
410
411macro_rules! pack_dictionary_helper {
412    ($t:ty, $values:ident) => {
413        match $values.data_type() {
414            ArrowType::Int32 => pack_dictionary_impl::<$t, Int32Type>($values.as_primitive()),
415            ArrowType::Int64 => pack_dictionary_impl::<$t, Int64Type>($values.as_primitive()),
416            ArrowType::Float32 => pack_dictionary_impl::<$t, Float32Type>($values.as_primitive()),
417            ArrowType::Float64 => pack_dictionary_impl::<$t, Float64Type>($values.as_primitive()),
418            _ => unreachable!("Invalid physical type"),
419        }
420    };
421}
422
423fn pack_dictionary(key: &ArrowType, values: &dyn Array) -> Result<ArrayRef> {
424    downcast_integer! {
425        key => (pack_dictionary_helper, values),
426        _ => unreachable!("Invalid key type"),
427    }
428}
429
430fn pack_dictionary_impl<K: ArrowDictionaryKeyType, V: ArrowPrimitiveType>(
431    values: &PrimitiveArray<V>,
432) -> Result<ArrayRef> {
433    let mut builder = PrimitiveDictionaryBuilder::<K, V>::with_capacity(1024, values.len());
434    builder.extend(values);
435    Ok(Arc::new(builder.finish()))
436}
437
438#[cfg(test)]
439mod tests {
440    use super::*;
441    use crate::arrow::array_reader::test_util::EmptyPageIterator;
442    use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
443    use crate::basic::Encoding;
444    use crate::column::page::Page;
445    use crate::data_type::{Int32Type, Int64Type};
446    use crate::schema::parser::parse_message_type;
447    use crate::schema::types::SchemaDescriptor;
448    use crate::util::InMemoryPageIterator;
449    use crate::util::test_common::rand_gen::make_pages;
450    use arrow::datatypes::ArrowPrimitiveType;
451    use arrow_array::{Array, Date32Array, PrimitiveArray};
452
453    use arrow::datatypes::DataType::{Date32, Decimal128};
454    use rand::distr::uniform::SampleUniform;
455    use std::collections::VecDeque;
456
457    #[allow(clippy::too_many_arguments)]
458    fn make_column_chunks<T: DataType>(
459        column_desc: ColumnDescPtr,
460        encoding: Encoding,
461        num_levels: usize,
462        min_value: T::T,
463        max_value: T::T,
464        def_levels: &mut Vec<i16>,
465        rep_levels: &mut Vec<i16>,
466        values: &mut Vec<T::T>,
467        page_lists: &mut Vec<Vec<Page>>,
468        use_v2: bool,
469        num_chunks: usize,
470    ) where
471        T::T: PartialOrd + SampleUniform + Copy,
472    {
473        for _i in 0..num_chunks {
474            let mut pages = VecDeque::new();
475            let mut data = Vec::new();
476            let mut page_def_levels = Vec::new();
477            let mut page_rep_levels = Vec::new();
478
479            make_pages::<T>(
480                column_desc.clone(),
481                encoding,
482                1,
483                num_levels,
484                min_value,
485                max_value,
486                &mut page_def_levels,
487                &mut page_rep_levels,
488                &mut data,
489                &mut pages,
490                use_v2,
491            );
492
493            def_levels.append(&mut page_def_levels);
494            rep_levels.append(&mut page_rep_levels);
495            values.append(&mut data);
496            page_lists.push(Vec::from(pages));
497        }
498    }
499
500    #[test]
501    fn test_primitive_array_reader_empty_pages() {
502        // Construct column schema
503        let message_type = "
504        message test_schema {
505          REQUIRED INT32 leaf;
506        }
507        ";
508
509        let schema = parse_message_type(message_type)
510            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
511            .unwrap();
512
513        let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
514            Box::<EmptyPageIterator>::default(),
515            schema.column(0),
516            None,
517            DEFAULT_BATCH_SIZE,
518        )
519        .unwrap();
520
521        // expect no values to be read
522        let array = array_reader.next_batch(50).unwrap();
523        assert!(array.is_empty());
524    }
525
526    #[test]
527    fn test_primitive_array_reader_data() {
528        // Construct column schema
529        let message_type = "
530        message test_schema {
531          REQUIRED INT32 leaf;
532        }
533        ";
534
535        let schema = parse_message_type(message_type)
536            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
537            .unwrap();
538
539        let column_desc = schema.column(0);
540
541        // Construct page iterator
542        {
543            let mut data = Vec::new();
544            let mut page_lists = Vec::new();
545            make_column_chunks::<Int32Type>(
546                column_desc.clone(),
547                Encoding::PLAIN,
548                100,
549                1,
550                200,
551                &mut Vec::new(),
552                &mut Vec::new(),
553                &mut data,
554                &mut page_lists,
555                true,
556                2,
557            );
558            let page_iterator = InMemoryPageIterator::new(page_lists);
559
560            let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
561                Box::new(page_iterator),
562                column_desc,
563                None,
564                DEFAULT_BATCH_SIZE,
565            )
566            .unwrap();
567
568            // Read first 50 values, which are all from the first column chunk
569            let array = array_reader.next_batch(50).unwrap();
570            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
571
572            assert_eq!(&Int32Array::from(data[0..50].to_vec()), array);
573
574            // Read next 100 values, the first 50 ones are from the first column chunk,
575            // and the last 50 ones are from the second column chunk
576            let array = array_reader.next_batch(100).unwrap();
577            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
578
579            assert_eq!(&Int32Array::from(data[50..150].to_vec()), array);
580
581            // Try to read 100 values, however there are only 50 values
582            let array = array_reader.next_batch(100).unwrap();
583            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
584
585            assert_eq!(&Int32Array::from(data[150..200].to_vec()), array);
586        }
587    }
588
589    macro_rules! test_primitive_array_reader_one_type {
590        (
591            $arrow_parquet_type:ty,
592            $physical_type:expr,
593            $converted_type_str:expr,
594            $result_arrow_type:ty,
595            $result_arrow_cast_type:ty,
596            $result_primitive_type:ty
597            $(, $timezone:expr)?
598        ) => {{
599            let message_type = format!(
600                "
601            message test_schema {{
602              REQUIRED {:?} leaf ({});
603          }}
604            ",
605                $physical_type, $converted_type_str
606            );
607            let schema = parse_message_type(&message_type)
608                .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
609                .unwrap();
610
611            let column_desc = schema.column(0);
612
613            // Construct page iterator
614            {
615                let mut data = Vec::new();
616                let mut page_lists = Vec::new();
617                make_column_chunks::<$arrow_parquet_type>(
618                    column_desc.clone(),
619                    Encoding::PLAIN,
620                    100,
621                    1,
622                    200,
623                    &mut Vec::new(),
624                    &mut Vec::new(),
625                    &mut data,
626                    &mut page_lists,
627                    true,
628                    2,
629                );
630                let page_iterator = InMemoryPageIterator::new(page_lists);
631                let mut array_reader = PrimitiveArrayReader::<$arrow_parquet_type>::new(
632                    Box::new(page_iterator),
633                    column_desc.clone(),
634                    None,
635                    DEFAULT_BATCH_SIZE,
636                )
637                .expect("Unable to get array reader");
638
639                let array = array_reader
640                    .next_batch(50)
641                    .expect("Unable to get batch from reader");
642
643                let result_data_type = <$result_arrow_type>::DATA_TYPE;
644                let array = array
645                    .as_any()
646                    .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
647                    .expect(
648                        format!(
649                            "Unable to downcast {:?} to {:?}",
650                            array.data_type(),
651                            result_data_type
652                        )
653                        .as_str(),
654                    )
655                    $(.clone().with_timezone($timezone))?
656                    ;
657
658                // create expected array as primitive, and cast to result type
659                let expected = PrimitiveArray::<$result_arrow_cast_type>::from(
660                    data[0..50]
661                        .iter()
662                        .map(|x| *x as $result_primitive_type)
663                        .collect::<Vec<$result_primitive_type>>(),
664                );
665                let expected = Arc::new(expected) as ArrayRef;
666                let expected = arrow::compute::cast(&expected, &result_data_type)
667                    .expect("Unable to cast expected array");
668                assert_eq!(expected.data_type(), &result_data_type);
669                let expected = expected
670                    .as_any()
671                    .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
672                    .expect(
673                        format!(
674                            "Unable to downcast expected {:?} to {:?}",
675                            expected.data_type(),
676                            result_data_type
677                        )
678                        .as_str(),
679                    )
680                    $(.clone().with_timezone($timezone))?
681                    ;
682                assert_eq!(expected, array);
683            }
684        }};
685    }
686
687    #[test]
688    fn test_primitive_array_reader_temporal_types() {
689        test_primitive_array_reader_one_type!(
690            crate::data_type::Int32Type,
691            PhysicalType::INT32,
692            "DATE",
693            arrow::datatypes::Date32Type,
694            arrow::datatypes::Int32Type,
695            i32
696        );
697        test_primitive_array_reader_one_type!(
698            crate::data_type::Int32Type,
699            PhysicalType::INT32,
700            "TIME_MILLIS",
701            arrow::datatypes::Time32MillisecondType,
702            arrow::datatypes::Int32Type,
703            i32
704        );
705        test_primitive_array_reader_one_type!(
706            crate::data_type::Int64Type,
707            PhysicalType::INT64,
708            "TIME_MICROS",
709            arrow::datatypes::Time64MicrosecondType,
710            arrow::datatypes::Int64Type,
711            i64
712        );
713        test_primitive_array_reader_one_type!(
714            crate::data_type::Int64Type,
715            PhysicalType::INT64,
716            "TIMESTAMP_MILLIS",
717            arrow::datatypes::TimestampMillisecondType,
718            arrow::datatypes::Int64Type,
719            i64,
720            "UTC"
721        );
722        test_primitive_array_reader_one_type!(
723            crate::data_type::Int64Type,
724            PhysicalType::INT64,
725            "TIMESTAMP_MICROS",
726            arrow::datatypes::TimestampMicrosecondType,
727            arrow::datatypes::Int64Type,
728            i64,
729            "UTC"
730        );
731    }
732
733    #[test]
734    fn test_primitive_array_reader_def_and_rep_levels() {
735        // Construct column schema
736        let message_type = "
737        message test_schema {
738            REPEATED Group test_mid {
739                OPTIONAL INT32 leaf;
740            }
741        }
742        ";
743
744        let schema = parse_message_type(message_type)
745            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
746            .unwrap();
747
748        let column_desc = schema.column(0);
749
750        // Construct page iterator
751        {
752            let mut def_levels = Vec::new();
753            let mut rep_levels = Vec::new();
754            let mut page_lists = Vec::new();
755            make_column_chunks::<Int32Type>(
756                column_desc.clone(),
757                Encoding::PLAIN,
758                100,
759                1,
760                200,
761                &mut def_levels,
762                &mut rep_levels,
763                &mut Vec::new(),
764                &mut page_lists,
765                true,
766                2,
767            );
768
769            let page_iterator = InMemoryPageIterator::new(page_lists);
770
771            let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
772                Box::new(page_iterator),
773                column_desc,
774                None,
775                DEFAULT_BATCH_SIZE,
776            )
777            .unwrap();
778
779            let mut accu_len: usize = 0;
780
781            // Read first 50 values, which are all from the first column chunk
782            let array = array_reader.next_batch(50).unwrap();
783            assert_eq!(
784                Some(&def_levels[accu_len..(accu_len + array.len())]),
785                array_reader.get_def_levels()
786            );
787            assert_eq!(
788                Some(&rep_levels[accu_len..(accu_len + array.len())]),
789                array_reader.get_rep_levels()
790            );
791            accu_len += array.len();
792
793            // Read next 100 values, the first 50 ones are from the first column chunk,
794            // and the last 50 ones are from the second column chunk
795            let array = array_reader.next_batch(100).unwrap();
796            assert_eq!(
797                Some(&def_levels[accu_len..(accu_len + array.len())]),
798                array_reader.get_def_levels()
799            );
800            assert_eq!(
801                Some(&rep_levels[accu_len..(accu_len + array.len())]),
802                array_reader.get_rep_levels()
803            );
804            accu_len += array.len();
805
806            // Try to read 100 values, however there are only 50 values
807            let array = array_reader.next_batch(100).unwrap();
808            assert_eq!(
809                Some(&def_levels[accu_len..(accu_len + array.len())]),
810                array_reader.get_def_levels()
811            );
812            assert_eq!(
813                Some(&rep_levels[accu_len..(accu_len + array.len())]),
814                array_reader.get_rep_levels()
815            );
816        }
817    }
818
819    #[test]
820    fn test_primitive_array_reader_decimal_types() {
821        // parquet `INT32` to decimal
822        let message_type = "
823            message test_schema {
824                REQUIRED INT32 decimal1 (DECIMAL(8,2));
825        }
826        ";
827        let schema = parse_message_type(message_type)
828            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
829            .unwrap();
830        let column_desc = schema.column(0);
831
832        // create the array reader
833        {
834            let mut data = Vec::new();
835            let mut page_lists = Vec::new();
836            make_column_chunks::<Int32Type>(
837                column_desc.clone(),
838                Encoding::PLAIN,
839                100,
840                -99999999,
841                99999999,
842                &mut Vec::new(),
843                &mut Vec::new(),
844                &mut data,
845                &mut page_lists,
846                true,
847                2,
848            );
849            let page_iterator = InMemoryPageIterator::new(page_lists);
850
851            let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
852                Box::new(page_iterator),
853                column_desc,
854                None,
855                DEFAULT_BATCH_SIZE,
856            )
857            .unwrap();
858
859            // read data from the reader
860            // the data type is decimal(8,2)
861            let array = array_reader.next_batch(50).unwrap();
862            assert_eq!(array.data_type(), &Decimal128(8, 2));
863            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
864            let data_decimal_array = data[0..50]
865                .iter()
866                .copied()
867                .map(|v| Some(v as i128))
868                .collect::<Decimal128Array>()
869                .with_precision_and_scale(8, 2)
870                .unwrap();
871            assert_eq!(array, &data_decimal_array);
872
873            // not equal with different data type(precision and scale)
874            let data_decimal_array = data[0..50]
875                .iter()
876                .copied()
877                .map(|v| Some(v as i128))
878                .collect::<Decimal128Array>()
879                .with_precision_and_scale(9, 0)
880                .unwrap();
881            assert_ne!(array, &data_decimal_array)
882        }
883
884        // parquet `INT64` to decimal
885        let message_type = "
886            message test_schema {
887                REQUIRED INT64 decimal1 (DECIMAL(18,4));
888        }
889        ";
890        let schema = parse_message_type(message_type)
891            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
892            .unwrap();
893        let column_desc = schema.column(0);
894
895        // create the array reader
896        {
897            let mut data = Vec::new();
898            let mut page_lists = Vec::new();
899            make_column_chunks::<Int64Type>(
900                column_desc.clone(),
901                Encoding::PLAIN,
902                100,
903                -999999999999999999,
904                999999999999999999,
905                &mut Vec::new(),
906                &mut Vec::new(),
907                &mut data,
908                &mut page_lists,
909                true,
910                2,
911            );
912            let page_iterator = InMemoryPageIterator::new(page_lists);
913
914            let mut array_reader = PrimitiveArrayReader::<Int64Type>::new(
915                Box::new(page_iterator),
916                column_desc,
917                None,
918                DEFAULT_BATCH_SIZE,
919            )
920            .unwrap();
921
922            // read data from the reader
923            // the data type is decimal(18,4)
924            let array = array_reader.next_batch(50).unwrap();
925            assert_eq!(array.data_type(), &Decimal128(18, 4));
926            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
927            let data_decimal_array = data[0..50]
928                .iter()
929                .copied()
930                .map(|v| Some(v as i128))
931                .collect::<Decimal128Array>()
932                .with_precision_and_scale(18, 4)
933                .unwrap();
934            assert_eq!(array, &data_decimal_array);
935
936            // not equal with different data type(precision and scale)
937            let data_decimal_array = data[0..50]
938                .iter()
939                .copied()
940                .map(|v| Some(v as i128))
941                .collect::<Decimal128Array>()
942                .with_precision_and_scale(34, 0)
943                .unwrap();
944            assert_ne!(array, &data_decimal_array)
945        }
946    }
947
948    #[test]
949    fn test_primitive_array_reader_date32_type() {
950        // parquet `INT32` to date
951        let message_type = "
952            message test_schema {
953                REQUIRED INT32 date1 (DATE);
954        }
955        ";
956        let schema = parse_message_type(message_type)
957            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
958            .unwrap();
959        let column_desc = schema.column(0);
960
961        // create the array reader
962        {
963            let mut data = Vec::new();
964            let mut page_lists = Vec::new();
965            make_column_chunks::<Int32Type>(
966                column_desc.clone(),
967                Encoding::PLAIN,
968                100,
969                -99999999,
970                99999999,
971                &mut Vec::new(),
972                &mut Vec::new(),
973                &mut data,
974                &mut page_lists,
975                true,
976                2,
977            );
978            let page_iterator = InMemoryPageIterator::new(page_lists);
979
980            let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
981                Box::new(page_iterator),
982                column_desc,
983                None,
984                DEFAULT_BATCH_SIZE,
985            )
986            .unwrap();
987
988            // read data from the reader
989            // the data type is date
990            let array = array_reader.next_batch(50).unwrap();
991            assert_eq!(array.data_type(), &Date32);
992            let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
993            let data_date_array = data[0..50]
994                .iter()
995                .copied()
996                .map(Some)
997                .collect::<Date32Array>();
998            assert_eq!(array, &data_date_array);
999        }
1000    }
1001}