parquet/arrow/array_reader/
primitive_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
19use crate::arrow::record_reader::RecordReader;
20use crate::arrow::schema::parquet_to_arrow_field;
21use crate::basic::Type as PhysicalType;
22use crate::column::page::PageIterator;
23use crate::data_type::{DataType, Int96};
24use crate::errors::{ParquetError, Result};
25use crate::schema::types::ColumnDescPtr;
26use arrow_array::{
27    builder::{
28        TimestampMicrosecondBufferBuilder, TimestampMillisecondBufferBuilder,
29        TimestampNanosecondBufferBuilder, TimestampSecondBufferBuilder,
30    },
31    ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
32    Int16Array, Int32Array, Int64Array, Int8Array, TimestampMicrosecondArray,
33    TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array,
34    UInt32Array, UInt64Array, UInt8Array,
35};
36use arrow_buffer::{i256, BooleanBuffer, Buffer};
37use arrow_data::ArrayDataBuilder;
38use arrow_schema::{DataType as ArrowType, TimeUnit};
39use std::any::Any;
40use std::sync::Arc;
41
42/// Provides conversion from `Vec<T>` to `Buffer`
43pub trait IntoBuffer {
44    fn into_buffer(self, target_type: &ArrowType) -> Buffer;
45}
46
47macro_rules! native_buffer {
48    ($($t:ty),*) => {
49        $(impl IntoBuffer for Vec<$t> {
50            fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
51                Buffer::from_vec(self)
52            }
53        })*
54    };
55}
56native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64);
57
58impl IntoBuffer for Vec<bool> {
59    fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
60        BooleanBuffer::from_iter(self).into_inner()
61    }
62}
63
64impl IntoBuffer for Vec<Int96> {
65    fn into_buffer(self, target_type: &ArrowType) -> Buffer {
66        match target_type {
67            ArrowType::Timestamp(TimeUnit::Second, _) => {
68                let mut builder = TimestampSecondBufferBuilder::new(self.len());
69                for v in self {
70                    builder.append(v.to_seconds())
71                }
72                builder.finish()
73            }
74            ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
75                let mut builder = TimestampMillisecondBufferBuilder::new(self.len());
76                for v in self {
77                    builder.append(v.to_millis())
78                }
79                builder.finish()
80            }
81            ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
82                let mut builder = TimestampMicrosecondBufferBuilder::new(self.len());
83                for v in self {
84                    builder.append(v.to_micros())
85                }
86                builder.finish()
87            }
88            ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
89                let mut builder = TimestampNanosecondBufferBuilder::new(self.len());
90                for v in self {
91                    builder.append(v.to_nanos())
92                }
93                builder.finish()
94            }
95            _ => unreachable!("Invalid target_type for Int96."),
96        }
97    }
98}
99
100/// Primitive array readers are leaves of array reader tree. They accept page iterator
101/// and read them into primitive arrays.
102pub struct PrimitiveArrayReader<T>
103where
104    T: DataType,
105    T::T: Copy + Default,
106    Vec<T::T>: IntoBuffer,
107{
108    data_type: ArrowType,
109    pages: Box<dyn PageIterator>,
110    def_levels_buffer: Option<Vec<i16>>,
111    rep_levels_buffer: Option<Vec<i16>>,
112    record_reader: RecordReader<T>,
113}
114
115impl<T> PrimitiveArrayReader<T>
116where
117    T: DataType,
118    T::T: Copy + Default,
119    Vec<T::T>: IntoBuffer,
120{
121    /// Construct primitive array reader.
122    pub fn new(
123        pages: Box<dyn PageIterator>,
124        column_desc: ColumnDescPtr,
125        arrow_type: Option<ArrowType>,
126    ) -> Result<Self> {
127        // Check if Arrow type is specified, else create it from Parquet type
128        let data_type = match arrow_type {
129            Some(t) => t,
130            None => parquet_to_arrow_field(column_desc.as_ref())?
131                .data_type()
132                .clone(),
133        };
134
135        let record_reader = RecordReader::<T>::new(column_desc);
136
137        Ok(Self {
138            data_type,
139            pages,
140            def_levels_buffer: None,
141            rep_levels_buffer: None,
142            record_reader,
143        })
144    }
145}
146
147/// Implementation of primitive array reader.
148impl<T> ArrayReader for PrimitiveArrayReader<T>
149where
150    T: DataType,
151    T::T: Copy + Default,
152    Vec<T::T>: IntoBuffer,
153{
154    fn as_any(&self) -> &dyn Any {
155        self
156    }
157
158    /// Returns data type of primitive array.
159    fn get_data_type(&self) -> &ArrowType {
160        &self.data_type
161    }
162
163    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
164        read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
165    }
166
167    fn consume_batch(&mut self) -> Result<ArrayRef> {
168        let target_type = &self.data_type;
169        let arrow_data_type = match T::get_physical_type() {
170            PhysicalType::BOOLEAN => ArrowType::Boolean,
171            PhysicalType::INT32 => {
172                match target_type {
173                    ArrowType::UInt32 => {
174                        // follow C++ implementation and use overflow/reinterpret cast from  i32 to u32 which will map
175                        // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
176                        ArrowType::UInt32
177                    }
178                    _ => ArrowType::Int32,
179                }
180            }
181            PhysicalType::INT64 => {
182                match target_type {
183                    ArrowType::UInt64 => {
184                        // follow C++ implementation and use overflow/reinterpret cast from  i64 to u64 which will map
185                        // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
186                        ArrowType::UInt64
187                    }
188                    _ => ArrowType::Int64,
189                }
190            }
191            PhysicalType::FLOAT => ArrowType::Float32,
192            PhysicalType::DOUBLE => ArrowType::Float64,
193            PhysicalType::INT96 => match target_type {
194                ArrowType::Timestamp(TimeUnit::Second, _) => target_type.clone(),
195                ArrowType::Timestamp(TimeUnit::Millisecond, _) => target_type.clone(),
196                ArrowType::Timestamp(TimeUnit::Microsecond, _) => target_type.clone(),
197                ArrowType::Timestamp(TimeUnit::Nanosecond, _) => target_type.clone(),
198                _ => unreachable!("INT96 must be a timestamp."),
199            },
200            PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
201                unreachable!("PrimitiveArrayReaders don't support complex physical types");
202            }
203        };
204
205        // Convert to arrays by using the Parquet physical type.
206        // The physical types are then cast to Arrow types if necessary
207
208        let record_data = self
209            .record_reader
210            .consume_record_data()
211            .into_buffer(target_type);
212
213        let array_data = ArrayDataBuilder::new(arrow_data_type)
214            .len(self.record_reader.num_values())
215            .add_buffer(record_data)
216            .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
217
218        let array_data = unsafe { array_data.build_unchecked() };
219        let array: ArrayRef = match T::get_physical_type() {
220            PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)),
221            PhysicalType::INT32 => match array_data.data_type() {
222                ArrowType::UInt32 => Arc::new(UInt32Array::from(array_data)),
223                ArrowType::Int32 => Arc::new(Int32Array::from(array_data)),
224                _ => unreachable!(),
225            },
226            PhysicalType::INT64 => match array_data.data_type() {
227                ArrowType::UInt64 => Arc::new(UInt64Array::from(array_data)),
228                ArrowType::Int64 => Arc::new(Int64Array::from(array_data)),
229                _ => unreachable!(),
230            },
231            PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)),
232            PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)),
233            PhysicalType::INT96 => match target_type {
234                ArrowType::Timestamp(TimeUnit::Second, _) => {
235                    Arc::new(TimestampSecondArray::from(array_data))
236                }
237                ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
238                    Arc::new(TimestampMillisecondArray::from(array_data))
239                }
240                ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
241                    Arc::new(TimestampMicrosecondArray::from(array_data))
242                }
243                ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
244                    Arc::new(TimestampNanosecondArray::from(array_data))
245                }
246                _ => unreachable!("INT96 must be a timestamp."),
247            },
248
249            PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
250                unreachable!("PrimitiveArrayReaders don't support complex physical types");
251            }
252        };
253
254        // cast to Arrow type
255        // We make a strong assumption here that the casts should be infallible.
256        // If the cast fails because of incompatible datatypes, then there might
257        // be a bigger problem with how Arrow schemas are converted to Parquet.
258        //
259        // As there is not always a 1:1 mapping between Arrow and Parquet, there
260        // are datatypes which we must convert explicitly.
261        // These are:
262        // - date64: cast int32 to date32, then date32 to date64.
263        // - decimal: cast int32 to decimal, int64 to decimal
264        let array = match target_type {
265            // Using `arrow_cast::cast` has been found to be very slow for converting
266            // INT32 physical type to lower bitwidth logical types. Since rust casts
267            // are infallible, instead use `unary` which is much faster (by up to 40%).
268            // One consequence of this approach is that some malformed integer columns
269            // will return (an arguably correct) result rather than null.
270            // See https://github.com/apache/arrow-rs/issues/7040 for a discussion of this
271            // issue.
272            ArrowType::UInt8 if *(array.data_type()) == ArrowType::Int32 => {
273                let array = array
274                    .as_any()
275                    .downcast_ref::<Int32Array>()
276                    .unwrap()
277                    .unary(|i| i as u8) as UInt8Array;
278                Arc::new(array) as ArrayRef
279            }
280            ArrowType::Int8 if *(array.data_type()) == ArrowType::Int32 => {
281                let array = array
282                    .as_any()
283                    .downcast_ref::<Int32Array>()
284                    .unwrap()
285                    .unary(|i| i as i8) as Int8Array;
286                Arc::new(array) as ArrayRef
287            }
288            ArrowType::UInt16 if *(array.data_type()) == ArrowType::Int32 => {
289                let array = array
290                    .as_any()
291                    .downcast_ref::<Int32Array>()
292                    .unwrap()
293                    .unary(|i| i as u16) as UInt16Array;
294                Arc::new(array) as ArrayRef
295            }
296            ArrowType::Int16 if *(array.data_type()) == ArrowType::Int32 => {
297                let array = array
298                    .as_any()
299                    .downcast_ref::<Int32Array>()
300                    .unwrap()
301                    .unary(|i| i as i16) as Int16Array;
302                Arc::new(array) as ArrayRef
303            }
304            ArrowType::Date64 if *(array.data_type()) == ArrowType::Int32 => {
305                // this is cheap as it internally reinterprets the data
306                let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
307                arrow_cast::cast(&a, target_type)?
308            }
309            ArrowType::Decimal128(p, s) => {
310                // Apply conversion to all elements regardless of null slots as the conversion
311                // to `i128` is infallible. This improves performance by avoiding a branch in
312                // the inner loop (see docs for `PrimitiveArray::unary`).
313                let array = match array.data_type() {
314                    ArrowType::Int32 => array
315                        .as_any()
316                        .downcast_ref::<Int32Array>()
317                        .unwrap()
318                        .unary(|i| i as i128)
319                        as Decimal128Array,
320                    ArrowType::Int64 => array
321                        .as_any()
322                        .downcast_ref::<Int64Array>()
323                        .unwrap()
324                        .unary(|i| i as i128)
325                        as Decimal128Array,
326                    _ => {
327                        return Err(arrow_err!(
328                            "Cannot convert {:?} to decimal",
329                            array.data_type()
330                        ));
331                    }
332                }
333                .with_precision_and_scale(*p, *s)?;
334
335                Arc::new(array) as ArrayRef
336            }
337            ArrowType::Decimal256(p, s) => {
338                // See above comment. Conversion to `i256` is likewise infallible.
339                let array = match array.data_type() {
340                    ArrowType::Int32 => array
341                        .as_any()
342                        .downcast_ref::<Int32Array>()
343                        .unwrap()
344                        .unary(|i| i256::from_i128(i as i128))
345                        as Decimal256Array,
346                    ArrowType::Int64 => array
347                        .as_any()
348                        .downcast_ref::<Int64Array>()
349                        .unwrap()
350                        .unary(|i| i256::from_i128(i as i128))
351                        as Decimal256Array,
352                    _ => {
353                        return Err(arrow_err!(
354                            "Cannot convert {:?} to decimal",
355                            array.data_type()
356                        ));
357                    }
358                }
359                .with_precision_and_scale(*p, *s)?;
360
361                Arc::new(array) as ArrayRef
362            }
363            ArrowType::Dictionary(_, value_type) => match value_type.as_ref() {
364                ArrowType::Decimal128(p, s) => {
365                    let array = match array.data_type() {
366                        ArrowType::Int32 => array
367                            .as_any()
368                            .downcast_ref::<Int32Array>()
369                            .unwrap()
370                            .unary(|i| i as i128)
371                            as Decimal128Array,
372                        ArrowType::Int64 => array
373                            .as_any()
374                            .downcast_ref::<Int64Array>()
375                            .unwrap()
376                            .unary(|i| i as i128)
377                            as Decimal128Array,
378                        _ => {
379                            return Err(arrow_err!(
380                                "Cannot convert {:?} to decimal dictionary",
381                                array.data_type()
382                            ));
383                        }
384                    }
385                    .with_precision_and_scale(*p, *s)?;
386
387                    arrow_cast::cast(&array, target_type)?
388                }
389                ArrowType::Decimal256(p, s) => {
390                    let array = match array.data_type() {
391                        ArrowType::Int32 => array
392                            .as_any()
393                            .downcast_ref::<Int32Array>()
394                            .unwrap()
395                            .unary(i256::from)
396                            as Decimal256Array,
397                        ArrowType::Int64 => array
398                            .as_any()
399                            .downcast_ref::<Int64Array>()
400                            .unwrap()
401                            .unary(i256::from)
402                            as Decimal256Array,
403                        _ => {
404                            return Err(arrow_err!(
405                                "Cannot convert {:?} to decimal dictionary",
406                                array.data_type()
407                            ));
408                        }
409                    }
410                    .with_precision_and_scale(*p, *s)?;
411
412                    arrow_cast::cast(&array, target_type)?
413                }
414                _ => arrow_cast::cast(&array, target_type)?,
415            },
416            _ => arrow_cast::cast(&array, target_type)?,
417        };
418
419        // save definition and repetition buffers
420        self.def_levels_buffer = self.record_reader.consume_def_levels();
421        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
422        self.record_reader.reset();
423        Ok(array)
424    }
425
426    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
427        skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
428    }
429
430    fn get_def_levels(&self) -> Option<&[i16]> {
431        self.def_levels_buffer.as_deref()
432    }
433
434    fn get_rep_levels(&self) -> Option<&[i16]> {
435        self.rep_levels_buffer.as_deref()
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442    use crate::arrow::array_reader::test_util::EmptyPageIterator;
443    use crate::basic::Encoding;
444    use crate::column::page::Page;
445    use crate::data_type::{Int32Type, Int64Type};
446    use crate::schema::parser::parse_message_type;
447    use crate::schema::types::SchemaDescriptor;
448    use crate::util::test_common::rand_gen::make_pages;
449    use crate::util::InMemoryPageIterator;
450    use arrow::datatypes::ArrowPrimitiveType;
451    use arrow_array::{Array, Date32Array, PrimitiveArray};
452
453    use arrow::datatypes::DataType::{Date32, Decimal128};
454    use rand::distr::uniform::SampleUniform;
455    use std::collections::VecDeque;
456
457    #[allow(clippy::too_many_arguments)]
458    fn make_column_chunks<T: DataType>(
459        column_desc: ColumnDescPtr,
460        encoding: Encoding,
461        num_levels: usize,
462        min_value: T::T,
463        max_value: T::T,
464        def_levels: &mut Vec<i16>,
465        rep_levels: &mut Vec<i16>,
466        values: &mut Vec<T::T>,
467        page_lists: &mut Vec<Vec<Page>>,
468        use_v2: bool,
469        num_chunks: usize,
470    ) where
471        T::T: PartialOrd + SampleUniform + Copy,
472    {
473        for _i in 0..num_chunks {
474            let mut pages = VecDeque::new();
475            let mut data = Vec::new();
476            let mut page_def_levels = Vec::new();
477            let mut page_rep_levels = Vec::new();
478
479            make_pages::<T>(
480                column_desc.clone(),
481                encoding,
482                1,
483                num_levels,
484                min_value,
485                max_value,
486                &mut page_def_levels,
487                &mut page_rep_levels,
488                &mut data,
489                &mut pages,
490                use_v2,
491            );
492
493            def_levels.append(&mut page_def_levels);
494            rep_levels.append(&mut page_rep_levels);
495            values.append(&mut data);
496            page_lists.push(Vec::from(pages));
497        }
498    }
499
500    #[test]
501    fn test_primitive_array_reader_empty_pages() {
502        // Construct column schema
503        let message_type = "
504        message test_schema {
505          REQUIRED INT32 leaf;
506        }
507        ";
508
509        let schema = parse_message_type(message_type)
510            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
511            .unwrap();
512
513        let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
514            Box::<EmptyPageIterator>::default(),
515            schema.column(0),
516            None,
517        )
518        .unwrap();
519
520        // expect no values to be read
521        let array = array_reader.next_batch(50).unwrap();
522        assert!(array.is_empty());
523    }
524
525    #[test]
526    fn test_primitive_array_reader_data() {
527        // Construct column schema
528        let message_type = "
529        message test_schema {
530          REQUIRED INT32 leaf;
531        }
532        ";
533
534        let schema = parse_message_type(message_type)
535            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
536            .unwrap();
537
538        let column_desc = schema.column(0);
539
540        // Construct page iterator
541        {
542            let mut data = Vec::new();
543            let mut page_lists = Vec::new();
544            make_column_chunks::<Int32Type>(
545                column_desc.clone(),
546                Encoding::PLAIN,
547                100,
548                1,
549                200,
550                &mut Vec::new(),
551                &mut Vec::new(),
552                &mut data,
553                &mut page_lists,
554                true,
555                2,
556            );
557            let page_iterator = InMemoryPageIterator::new(page_lists);
558
559            let mut array_reader =
560                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
561                    .unwrap();
562
563            // Read first 50 values, which are all from the first column chunk
564            let array = array_reader.next_batch(50).unwrap();
565            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
566
567            assert_eq!(&Int32Array::from(data[0..50].to_vec()), array);
568
569            // Read next 100 values, the first 50 ones are from the first column chunk,
570            // and the last 50 ones are from the second column chunk
571            let array = array_reader.next_batch(100).unwrap();
572            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
573
574            assert_eq!(&Int32Array::from(data[50..150].to_vec()), array);
575
576            // Try to read 100 values, however there are only 50 values
577            let array = array_reader.next_batch(100).unwrap();
578            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
579
580            assert_eq!(&Int32Array::from(data[150..200].to_vec()), array);
581        }
582    }
583
584    macro_rules! test_primitive_array_reader_one_type {
585        (
586            $arrow_parquet_type:ty,
587            $physical_type:expr,
588            $converted_type_str:expr,
589            $result_arrow_type:ty,
590            $result_arrow_cast_type:ty,
591            $result_primitive_type:ty
592            $(, $timezone:expr)?
593        ) => {{
594            let message_type = format!(
595                "
596            message test_schema {{
597              REQUIRED {:?} leaf ({});
598          }}
599            ",
600                $physical_type, $converted_type_str
601            );
602            let schema = parse_message_type(&message_type)
603                .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
604                .unwrap();
605
606            let column_desc = schema.column(0);
607
608            // Construct page iterator
609            {
610                let mut data = Vec::new();
611                let mut page_lists = Vec::new();
612                make_column_chunks::<$arrow_parquet_type>(
613                    column_desc.clone(),
614                    Encoding::PLAIN,
615                    100,
616                    1,
617                    200,
618                    &mut Vec::new(),
619                    &mut Vec::new(),
620                    &mut data,
621                    &mut page_lists,
622                    true,
623                    2,
624                );
625                let page_iterator = InMemoryPageIterator::new(page_lists);
626                let mut array_reader = PrimitiveArrayReader::<$arrow_parquet_type>::new(
627                    Box::new(page_iterator),
628                    column_desc.clone(),
629                    None,
630                )
631                .expect("Unable to get array reader");
632
633                let array = array_reader
634                    .next_batch(50)
635                    .expect("Unable to get batch from reader");
636
637                let result_data_type = <$result_arrow_type>::DATA_TYPE;
638                let array = array
639                    .as_any()
640                    .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
641                    .expect(
642                        format!(
643                            "Unable to downcast {:?} to {:?}",
644                            array.data_type(),
645                            result_data_type
646                        )
647                        .as_str(),
648                    )
649                    $(.clone().with_timezone($timezone))?
650                    ;
651
652                // create expected array as primitive, and cast to result type
653                let expected = PrimitiveArray::<$result_arrow_cast_type>::from(
654                    data[0..50]
655                        .iter()
656                        .map(|x| *x as $result_primitive_type)
657                        .collect::<Vec<$result_primitive_type>>(),
658                );
659                let expected = Arc::new(expected) as ArrayRef;
660                let expected = arrow::compute::cast(&expected, &result_data_type)
661                    .expect("Unable to cast expected array");
662                assert_eq!(expected.data_type(), &result_data_type);
663                let expected = expected
664                    .as_any()
665                    .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
666                    .expect(
667                        format!(
668                            "Unable to downcast expected {:?} to {:?}",
669                            expected.data_type(),
670                            result_data_type
671                        )
672                        .as_str(),
673                    )
674                    $(.clone().with_timezone($timezone))?
675                    ;
676                assert_eq!(expected, array);
677            }
678        }};
679    }
680
681    #[test]
682    fn test_primitive_array_reader_temporal_types() {
683        test_primitive_array_reader_one_type!(
684            crate::data_type::Int32Type,
685            PhysicalType::INT32,
686            "DATE",
687            arrow::datatypes::Date32Type,
688            arrow::datatypes::Int32Type,
689            i32
690        );
691        test_primitive_array_reader_one_type!(
692            crate::data_type::Int32Type,
693            PhysicalType::INT32,
694            "TIME_MILLIS",
695            arrow::datatypes::Time32MillisecondType,
696            arrow::datatypes::Int32Type,
697            i32
698        );
699        test_primitive_array_reader_one_type!(
700            crate::data_type::Int64Type,
701            PhysicalType::INT64,
702            "TIME_MICROS",
703            arrow::datatypes::Time64MicrosecondType,
704            arrow::datatypes::Int64Type,
705            i64
706        );
707        test_primitive_array_reader_one_type!(
708            crate::data_type::Int64Type,
709            PhysicalType::INT64,
710            "TIMESTAMP_MILLIS",
711            arrow::datatypes::TimestampMillisecondType,
712            arrow::datatypes::Int64Type,
713            i64,
714            "UTC"
715        );
716        test_primitive_array_reader_one_type!(
717            crate::data_type::Int64Type,
718            PhysicalType::INT64,
719            "TIMESTAMP_MICROS",
720            arrow::datatypes::TimestampMicrosecondType,
721            arrow::datatypes::Int64Type,
722            i64,
723            "UTC"
724        );
725    }
726
727    #[test]
728    fn test_primitive_array_reader_def_and_rep_levels() {
729        // Construct column schema
730        let message_type = "
731        message test_schema {
732            REPEATED Group test_mid {
733                OPTIONAL INT32 leaf;
734            }
735        }
736        ";
737
738        let schema = parse_message_type(message_type)
739            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
740            .unwrap();
741
742        let column_desc = schema.column(0);
743
744        // Construct page iterator
745        {
746            let mut def_levels = Vec::new();
747            let mut rep_levels = Vec::new();
748            let mut page_lists = Vec::new();
749            make_column_chunks::<Int32Type>(
750                column_desc.clone(),
751                Encoding::PLAIN,
752                100,
753                1,
754                200,
755                &mut def_levels,
756                &mut rep_levels,
757                &mut Vec::new(),
758                &mut page_lists,
759                true,
760                2,
761            );
762
763            let page_iterator = InMemoryPageIterator::new(page_lists);
764
765            let mut array_reader =
766                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
767                    .unwrap();
768
769            let mut accu_len: usize = 0;
770
771            // Read first 50 values, which are all from the first column chunk
772            let array = array_reader.next_batch(50).unwrap();
773            assert_eq!(
774                Some(&def_levels[accu_len..(accu_len + array.len())]),
775                array_reader.get_def_levels()
776            );
777            assert_eq!(
778                Some(&rep_levels[accu_len..(accu_len + array.len())]),
779                array_reader.get_rep_levels()
780            );
781            accu_len += array.len();
782
783            // Read next 100 values, the first 50 ones are from the first column chunk,
784            // and the last 50 ones are from the second column chunk
785            let array = array_reader.next_batch(100).unwrap();
786            assert_eq!(
787                Some(&def_levels[accu_len..(accu_len + array.len())]),
788                array_reader.get_def_levels()
789            );
790            assert_eq!(
791                Some(&rep_levels[accu_len..(accu_len + array.len())]),
792                array_reader.get_rep_levels()
793            );
794            accu_len += array.len();
795
796            // Try to read 100 values, however there are only 50 values
797            let array = array_reader.next_batch(100).unwrap();
798            assert_eq!(
799                Some(&def_levels[accu_len..(accu_len + array.len())]),
800                array_reader.get_def_levels()
801            );
802            assert_eq!(
803                Some(&rep_levels[accu_len..(accu_len + array.len())]),
804                array_reader.get_rep_levels()
805            );
806        }
807    }
808
809    #[test]
810    fn test_primitive_array_reader_decimal_types() {
811        // parquet `INT32` to decimal
812        let message_type = "
813            message test_schema {
814                REQUIRED INT32 decimal1 (DECIMAL(8,2));
815        }
816        ";
817        let schema = parse_message_type(message_type)
818            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
819            .unwrap();
820        let column_desc = schema.column(0);
821
822        // create the array reader
823        {
824            let mut data = Vec::new();
825            let mut page_lists = Vec::new();
826            make_column_chunks::<Int32Type>(
827                column_desc.clone(),
828                Encoding::PLAIN,
829                100,
830                -99999999,
831                99999999,
832                &mut Vec::new(),
833                &mut Vec::new(),
834                &mut data,
835                &mut page_lists,
836                true,
837                2,
838            );
839            let page_iterator = InMemoryPageIterator::new(page_lists);
840
841            let mut array_reader =
842                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
843                    .unwrap();
844
845            // read data from the reader
846            // the data type is decimal(8,2)
847            let array = array_reader.next_batch(50).unwrap();
848            assert_eq!(array.data_type(), &Decimal128(8, 2));
849            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
850            let data_decimal_array = data[0..50]
851                .iter()
852                .copied()
853                .map(|v| Some(v as i128))
854                .collect::<Decimal128Array>()
855                .with_precision_and_scale(8, 2)
856                .unwrap();
857            assert_eq!(array, &data_decimal_array);
858
859            // not equal with different data type(precision and scale)
860            let data_decimal_array = data[0..50]
861                .iter()
862                .copied()
863                .map(|v| Some(v as i128))
864                .collect::<Decimal128Array>()
865                .with_precision_and_scale(9, 0)
866                .unwrap();
867            assert_ne!(array, &data_decimal_array)
868        }
869
870        // parquet `INT64` to decimal
871        let message_type = "
872            message test_schema {
873                REQUIRED INT64 decimal1 (DECIMAL(18,4));
874        }
875        ";
876        let schema = parse_message_type(message_type)
877            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
878            .unwrap();
879        let column_desc = schema.column(0);
880
881        // create the array reader
882        {
883            let mut data = Vec::new();
884            let mut page_lists = Vec::new();
885            make_column_chunks::<Int64Type>(
886                column_desc.clone(),
887                Encoding::PLAIN,
888                100,
889                -999999999999999999,
890                999999999999999999,
891                &mut Vec::new(),
892                &mut Vec::new(),
893                &mut data,
894                &mut page_lists,
895                true,
896                2,
897            );
898            let page_iterator = InMemoryPageIterator::new(page_lists);
899
900            let mut array_reader =
901                PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
902                    .unwrap();
903
904            // read data from the reader
905            // the data type is decimal(18,4)
906            let array = array_reader.next_batch(50).unwrap();
907            assert_eq!(array.data_type(), &Decimal128(18, 4));
908            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
909            let data_decimal_array = data[0..50]
910                .iter()
911                .copied()
912                .map(|v| Some(v as i128))
913                .collect::<Decimal128Array>()
914                .with_precision_and_scale(18, 4)
915                .unwrap();
916            assert_eq!(array, &data_decimal_array);
917
918            // not equal with different data type(precision and scale)
919            let data_decimal_array = data[0..50]
920                .iter()
921                .copied()
922                .map(|v| Some(v as i128))
923                .collect::<Decimal128Array>()
924                .with_precision_and_scale(34, 0)
925                .unwrap();
926            assert_ne!(array, &data_decimal_array)
927        }
928    }
929
930    #[test]
931    fn test_primitive_array_reader_date32_type() {
932        // parquet `INT32` to date
933        let message_type = "
934            message test_schema {
935                REQUIRED INT32 date1 (DATE);
936        }
937        ";
938        let schema = parse_message_type(message_type)
939            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
940            .unwrap();
941        let column_desc = schema.column(0);
942
943        // create the array reader
944        {
945            let mut data = Vec::new();
946            let mut page_lists = Vec::new();
947            make_column_chunks::<Int32Type>(
948                column_desc.clone(),
949                Encoding::PLAIN,
950                100,
951                -99999999,
952                99999999,
953                &mut Vec::new(),
954                &mut Vec::new(),
955                &mut data,
956                &mut page_lists,
957                true,
958                2,
959            );
960            let page_iterator = InMemoryPageIterator::new(page_lists);
961
962            let mut array_reader =
963                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
964                    .unwrap();
965
966            // read data from the reader
967            // the data type is date
968            let array = array_reader.next_batch(50).unwrap();
969            assert_eq!(array.data_type(), &Date32);
970            let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
971            let data_date_array = data[0..50]
972                .iter()
973                .copied()
974                .map(Some)
975                .collect::<Date32Array>();
976            assert_eq!(array, &data_date_array);
977        }
978    }
979}