parquet/arrow/array_reader/
primitive_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
19use crate::arrow::record_reader::RecordReader;
20use crate::arrow::schema::parquet_to_arrow_field;
21use crate::basic::Type as PhysicalType;
22use crate::column::page::PageIterator;
23use crate::data_type::{DataType, Int96};
24use crate::errors::{ParquetError, Result};
25use crate::schema::types::ColumnDescPtr;
26use arrow_array::{
27    builder::{
28        TimestampMicrosecondBufferBuilder, TimestampMillisecondBufferBuilder,
29        TimestampNanosecondBufferBuilder, TimestampSecondBufferBuilder,
30    },
31    ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
32    Int32Array, Int64Array, TimestampMicrosecondArray, TimestampMillisecondArray,
33    TimestampNanosecondArray, TimestampSecondArray, UInt32Array, UInt64Array,
34};
35use arrow_buffer::{i256, BooleanBuffer, Buffer};
36use arrow_data::ArrayDataBuilder;
37use arrow_schema::{DataType as ArrowType, TimeUnit};
38use std::any::Any;
39use std::sync::Arc;
40
41/// Provides conversion from `Vec<T>` to `Buffer`
42pub trait IntoBuffer {
43    fn into_buffer(self, target_type: &ArrowType) -> Buffer;
44}
45
46macro_rules! native_buffer {
47    ($($t:ty),*) => {
48        $(impl IntoBuffer for Vec<$t> {
49            fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
50                Buffer::from_vec(self)
51            }
52        })*
53    };
54}
55native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64);
56
57impl IntoBuffer for Vec<bool> {
58    fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
59        BooleanBuffer::from_iter(self).into_inner()
60    }
61}
62
63impl IntoBuffer for Vec<Int96> {
64    fn into_buffer(self, target_type: &ArrowType) -> Buffer {
65        match target_type {
66            ArrowType::Timestamp(TimeUnit::Second, _) => {
67                let mut builder = TimestampSecondBufferBuilder::new(self.len());
68                for v in self {
69                    builder.append(v.to_seconds())
70                }
71                builder.finish()
72            }
73            ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
74                let mut builder = TimestampMillisecondBufferBuilder::new(self.len());
75                for v in self {
76                    builder.append(v.to_millis())
77                }
78                builder.finish()
79            }
80            ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
81                let mut builder = TimestampMicrosecondBufferBuilder::new(self.len());
82                for v in self {
83                    builder.append(v.to_micros())
84                }
85                builder.finish()
86            }
87            ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
88                let mut builder = TimestampNanosecondBufferBuilder::new(self.len());
89                for v in self {
90                    builder.append(v.to_nanos())
91                }
92                builder.finish()
93            }
94            _ => unreachable!("Invalid target_type for Int96."),
95        }
96    }
97}
98
99/// Primitive array readers are leaves of array reader tree. They accept page iterator
100/// and read them into primitive arrays.
101pub struct PrimitiveArrayReader<T>
102where
103    T: DataType,
104    T::T: Copy + Default,
105    Vec<T::T>: IntoBuffer,
106{
107    data_type: ArrowType,
108    pages: Box<dyn PageIterator>,
109    def_levels_buffer: Option<Vec<i16>>,
110    rep_levels_buffer: Option<Vec<i16>>,
111    record_reader: RecordReader<T>,
112}
113
114impl<T> PrimitiveArrayReader<T>
115where
116    T: DataType,
117    T::T: Copy + Default,
118    Vec<T::T>: IntoBuffer,
119{
120    /// Construct primitive array reader.
121    pub fn new(
122        pages: Box<dyn PageIterator>,
123        column_desc: ColumnDescPtr,
124        arrow_type: Option<ArrowType>,
125    ) -> Result<Self> {
126        // Check if Arrow type is specified, else create it from Parquet type
127        let data_type = match arrow_type {
128            Some(t) => t,
129            None => parquet_to_arrow_field(column_desc.as_ref())?
130                .data_type()
131                .clone(),
132        };
133
134        let record_reader = RecordReader::<T>::new(column_desc);
135
136        Ok(Self {
137            data_type,
138            pages,
139            def_levels_buffer: None,
140            rep_levels_buffer: None,
141            record_reader,
142        })
143    }
144}
145
146/// Implementation of primitive array reader.
147impl<T> ArrayReader for PrimitiveArrayReader<T>
148where
149    T: DataType,
150    T::T: Copy + Default,
151    Vec<T::T>: IntoBuffer,
152{
153    fn as_any(&self) -> &dyn Any {
154        self
155    }
156
157    /// Returns data type of primitive array.
158    fn get_data_type(&self) -> &ArrowType {
159        &self.data_type
160    }
161
162    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
163        read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
164    }
165
166    fn consume_batch(&mut self) -> Result<ArrayRef> {
167        let target_type = &self.data_type;
168        let arrow_data_type = match T::get_physical_type() {
169            PhysicalType::BOOLEAN => ArrowType::Boolean,
170            PhysicalType::INT32 => {
171                match target_type {
172                    ArrowType::UInt32 => {
173                        // follow C++ implementation and use overflow/reinterpret cast from  i32 to u32 which will map
174                        // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
175                        ArrowType::UInt32
176                    }
177                    _ => ArrowType::Int32,
178                }
179            }
180            PhysicalType::INT64 => {
181                match target_type {
182                    ArrowType::UInt64 => {
183                        // follow C++ implementation and use overflow/reinterpret cast from  i64 to u64 which will map
184                        // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
185                        ArrowType::UInt64
186                    }
187                    _ => ArrowType::Int64,
188                }
189            }
190            PhysicalType::FLOAT => ArrowType::Float32,
191            PhysicalType::DOUBLE => ArrowType::Float64,
192            PhysicalType::INT96 => match target_type {
193                ArrowType::Timestamp(TimeUnit::Second, _) => target_type.clone(),
194                ArrowType::Timestamp(TimeUnit::Millisecond, _) => target_type.clone(),
195                ArrowType::Timestamp(TimeUnit::Microsecond, _) => target_type.clone(),
196                ArrowType::Timestamp(TimeUnit::Nanosecond, _) => target_type.clone(),
197                _ => unreachable!("INT96 must be a timestamp."),
198            },
199            PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
200                unreachable!("PrimitiveArrayReaders don't support complex physical types");
201            }
202        };
203
204        // Convert to arrays by using the Parquet physical type.
205        // The physical types are then cast to Arrow types if necessary
206
207        let record_data = self
208            .record_reader
209            .consume_record_data()
210            .into_buffer(target_type);
211
212        let array_data = ArrayDataBuilder::new(arrow_data_type)
213            .len(self.record_reader.num_values())
214            .add_buffer(record_data)
215            .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
216
217        let array_data = unsafe { array_data.build_unchecked() };
218        let array: ArrayRef = match T::get_physical_type() {
219            PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)),
220            PhysicalType::INT32 => match array_data.data_type() {
221                ArrowType::UInt32 => Arc::new(UInt32Array::from(array_data)),
222                ArrowType::Int32 => Arc::new(Int32Array::from(array_data)),
223                _ => unreachable!(),
224            },
225            PhysicalType::INT64 => match array_data.data_type() {
226                ArrowType::UInt64 => Arc::new(UInt64Array::from(array_data)),
227                ArrowType::Int64 => Arc::new(Int64Array::from(array_data)),
228                _ => unreachable!(),
229            },
230            PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)),
231            PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)),
232            PhysicalType::INT96 => match target_type {
233                ArrowType::Timestamp(TimeUnit::Second, _) => {
234                    Arc::new(TimestampSecondArray::from(array_data))
235                }
236                ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
237                    Arc::new(TimestampMillisecondArray::from(array_data))
238                }
239                ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
240                    Arc::new(TimestampMicrosecondArray::from(array_data))
241                }
242                ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
243                    Arc::new(TimestampNanosecondArray::from(array_data))
244                }
245                _ => unreachable!("INT96 must be a timestamp."),
246            },
247
248            PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
249                unreachable!("PrimitiveArrayReaders don't support complex physical types");
250            }
251        };
252
253        // cast to Arrow type
254        // We make a strong assumption here that the casts should be infallible.
255        // If the cast fails because of incompatible datatypes, then there might
256        // be a bigger problem with how Arrow schemas are converted to Parquet.
257        //
258        // As there is not always a 1:1 mapping between Arrow and Parquet, there
259        // are datatypes which we must convert explicitly.
260        // These are:
261        // - date64: cast int32 to date32, then date32 to date64.
262        // - decimal: cast int32 to decimal, int64 to decimal
263        let array = match target_type {
264            ArrowType::Date64 if *(array.data_type()) == ArrowType::Int32 => {
265                // this is cheap as it internally reinterprets the data
266                let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
267                arrow_cast::cast(&a, target_type)?
268            }
269            ArrowType::Decimal128(p, s) => {
270                // Apply conversion to all elements regardless of null slots as the conversion
271                // to `i128` is infallible. This improves performance by avoiding a branch in
272                // the inner loop (see docs for `PrimitiveArray::unary`).
273                let array = match array.data_type() {
274                    ArrowType::Int32 => array
275                        .as_any()
276                        .downcast_ref::<Int32Array>()
277                        .unwrap()
278                        .unary(|i| i as i128)
279                        as Decimal128Array,
280                    ArrowType::Int64 => array
281                        .as_any()
282                        .downcast_ref::<Int64Array>()
283                        .unwrap()
284                        .unary(|i| i as i128)
285                        as Decimal128Array,
286                    _ => {
287                        return Err(arrow_err!(
288                            "Cannot convert {:?} to decimal",
289                            array.data_type()
290                        ));
291                    }
292                }
293                .with_precision_and_scale(*p, *s)?;
294
295                Arc::new(array) as ArrayRef
296            }
297            ArrowType::Decimal256(p, s) => {
298                // See above comment. Conversion to `i256` is likewise infallible.
299                let array = match array.data_type() {
300                    ArrowType::Int32 => array
301                        .as_any()
302                        .downcast_ref::<Int32Array>()
303                        .unwrap()
304                        .unary(|i| i256::from_i128(i as i128))
305                        as Decimal256Array,
306                    ArrowType::Int64 => array
307                        .as_any()
308                        .downcast_ref::<Int64Array>()
309                        .unwrap()
310                        .unary(|i| i256::from_i128(i as i128))
311                        as Decimal256Array,
312                    _ => {
313                        return Err(arrow_err!(
314                            "Cannot convert {:?} to decimal",
315                            array.data_type()
316                        ));
317                    }
318                }
319                .with_precision_and_scale(*p, *s)?;
320
321                Arc::new(array) as ArrayRef
322            }
323            ArrowType::Dictionary(_, value_type) => match value_type.as_ref() {
324                ArrowType::Decimal128(p, s) => {
325                    let array = match array.data_type() {
326                        ArrowType::Int32 => array
327                            .as_any()
328                            .downcast_ref::<Int32Array>()
329                            .unwrap()
330                            .unary(|i| i as i128)
331                            as Decimal128Array,
332                        ArrowType::Int64 => array
333                            .as_any()
334                            .downcast_ref::<Int64Array>()
335                            .unwrap()
336                            .unary(|i| i as i128)
337                            as Decimal128Array,
338                        _ => {
339                            return Err(arrow_err!(
340                                "Cannot convert {:?} to decimal dictionary",
341                                array.data_type()
342                            ));
343                        }
344                    }
345                    .with_precision_and_scale(*p, *s)?;
346
347                    arrow_cast::cast(&array, target_type)?
348                }
349                ArrowType::Decimal256(p, s) => {
350                    let array = match array.data_type() {
351                        ArrowType::Int32 => array
352                            .as_any()
353                            .downcast_ref::<Int32Array>()
354                            .unwrap()
355                            .unary(i256::from)
356                            as Decimal256Array,
357                        ArrowType::Int64 => array
358                            .as_any()
359                            .downcast_ref::<Int64Array>()
360                            .unwrap()
361                            .unary(i256::from)
362                            as Decimal256Array,
363                        _ => {
364                            return Err(arrow_err!(
365                                "Cannot convert {:?} to decimal dictionary",
366                                array.data_type()
367                            ));
368                        }
369                    }
370                    .with_precision_and_scale(*p, *s)?;
371
372                    arrow_cast::cast(&array, target_type)?
373                }
374                _ => arrow_cast::cast(&array, target_type)?,
375            },
376            _ => arrow_cast::cast(&array, target_type)?,
377        };
378
379        // save definition and repetition buffers
380        self.def_levels_buffer = self.record_reader.consume_def_levels();
381        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
382        self.record_reader.reset();
383        Ok(array)
384    }
385
386    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
387        skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
388    }
389
390    fn get_def_levels(&self) -> Option<&[i16]> {
391        self.def_levels_buffer.as_deref()
392    }
393
394    fn get_rep_levels(&self) -> Option<&[i16]> {
395        self.rep_levels_buffer.as_deref()
396    }
397}
398
399#[cfg(test)]
400mod tests {
401    use super::*;
402    use crate::arrow::array_reader::test_util::EmptyPageIterator;
403    use crate::basic::Encoding;
404    use crate::column::page::Page;
405    use crate::data_type::{Int32Type, Int64Type};
406    use crate::schema::parser::parse_message_type;
407    use crate::schema::types::SchemaDescriptor;
408    use crate::util::test_common::rand_gen::make_pages;
409    use crate::util::InMemoryPageIterator;
410    use arrow::datatypes::ArrowPrimitiveType;
411    use arrow_array::{Array, Date32Array, PrimitiveArray};
412
413    use arrow::datatypes::DataType::{Date32, Decimal128};
414    use rand::distr::uniform::SampleUniform;
415    use std::collections::VecDeque;
416
417    #[allow(clippy::too_many_arguments)]
418    fn make_column_chunks<T: DataType>(
419        column_desc: ColumnDescPtr,
420        encoding: Encoding,
421        num_levels: usize,
422        min_value: T::T,
423        max_value: T::T,
424        def_levels: &mut Vec<i16>,
425        rep_levels: &mut Vec<i16>,
426        values: &mut Vec<T::T>,
427        page_lists: &mut Vec<Vec<Page>>,
428        use_v2: bool,
429        num_chunks: usize,
430    ) where
431        T::T: PartialOrd + SampleUniform + Copy,
432    {
433        for _i in 0..num_chunks {
434            let mut pages = VecDeque::new();
435            let mut data = Vec::new();
436            let mut page_def_levels = Vec::new();
437            let mut page_rep_levels = Vec::new();
438
439            make_pages::<T>(
440                column_desc.clone(),
441                encoding,
442                1,
443                num_levels,
444                min_value,
445                max_value,
446                &mut page_def_levels,
447                &mut page_rep_levels,
448                &mut data,
449                &mut pages,
450                use_v2,
451            );
452
453            def_levels.append(&mut page_def_levels);
454            rep_levels.append(&mut page_rep_levels);
455            values.append(&mut data);
456            page_lists.push(Vec::from(pages));
457        }
458    }
459
460    #[test]
461    fn test_primitive_array_reader_empty_pages() {
462        // Construct column schema
463        let message_type = "
464        message test_schema {
465          REQUIRED INT32 leaf;
466        }
467        ";
468
469        let schema = parse_message_type(message_type)
470            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
471            .unwrap();
472
473        let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
474            Box::<EmptyPageIterator>::default(),
475            schema.column(0),
476            None,
477        )
478        .unwrap();
479
480        // expect no values to be read
481        let array = array_reader.next_batch(50).unwrap();
482        assert!(array.is_empty());
483    }
484
485    #[test]
486    fn test_primitive_array_reader_data() {
487        // Construct column schema
488        let message_type = "
489        message test_schema {
490          REQUIRED INT32 leaf;
491        }
492        ";
493
494        let schema = parse_message_type(message_type)
495            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
496            .unwrap();
497
498        let column_desc = schema.column(0);
499
500        // Construct page iterator
501        {
502            let mut data = Vec::new();
503            let mut page_lists = Vec::new();
504            make_column_chunks::<Int32Type>(
505                column_desc.clone(),
506                Encoding::PLAIN,
507                100,
508                1,
509                200,
510                &mut Vec::new(),
511                &mut Vec::new(),
512                &mut data,
513                &mut page_lists,
514                true,
515                2,
516            );
517            let page_iterator = InMemoryPageIterator::new(page_lists);
518
519            let mut array_reader =
520                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
521                    .unwrap();
522
523            // Read first 50 values, which are all from the first column chunk
524            let array = array_reader.next_batch(50).unwrap();
525            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
526
527            assert_eq!(&Int32Array::from(data[0..50].to_vec()), array);
528
529            // Read next 100 values, the first 50 ones are from the first column chunk,
530            // and the last 50 ones are from the second column chunk
531            let array = array_reader.next_batch(100).unwrap();
532            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
533
534            assert_eq!(&Int32Array::from(data[50..150].to_vec()), array);
535
536            // Try to read 100 values, however there are only 50 values
537            let array = array_reader.next_batch(100).unwrap();
538            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
539
540            assert_eq!(&Int32Array::from(data[150..200].to_vec()), array);
541        }
542    }
543
544    macro_rules! test_primitive_array_reader_one_type {
545        (
546            $arrow_parquet_type:ty,
547            $physical_type:expr,
548            $converted_type_str:expr,
549            $result_arrow_type:ty,
550            $result_arrow_cast_type:ty,
551            $result_primitive_type:ty
552            $(, $timezone:expr)?
553        ) => {{
554            let message_type = format!(
555                "
556            message test_schema {{
557              REQUIRED {:?} leaf ({});
558          }}
559            ",
560                $physical_type, $converted_type_str
561            );
562            let schema = parse_message_type(&message_type)
563                .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
564                .unwrap();
565
566            let column_desc = schema.column(0);
567
568            // Construct page iterator
569            {
570                let mut data = Vec::new();
571                let mut page_lists = Vec::new();
572                make_column_chunks::<$arrow_parquet_type>(
573                    column_desc.clone(),
574                    Encoding::PLAIN,
575                    100,
576                    1,
577                    200,
578                    &mut Vec::new(),
579                    &mut Vec::new(),
580                    &mut data,
581                    &mut page_lists,
582                    true,
583                    2,
584                );
585                let page_iterator = InMemoryPageIterator::new(page_lists);
586                let mut array_reader = PrimitiveArrayReader::<$arrow_parquet_type>::new(
587                    Box::new(page_iterator),
588                    column_desc.clone(),
589                    None,
590                )
591                .expect("Unable to get array reader");
592
593                let array = array_reader
594                    .next_batch(50)
595                    .expect("Unable to get batch from reader");
596
597                let result_data_type = <$result_arrow_type>::DATA_TYPE;
598                let array = array
599                    .as_any()
600                    .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
601                    .expect(
602                        format!(
603                            "Unable to downcast {:?} to {:?}",
604                            array.data_type(),
605                            result_data_type
606                        )
607                        .as_str(),
608                    )
609                    $(.clone().with_timezone($timezone))?
610                    ;
611
612                // create expected array as primitive, and cast to result type
613                let expected = PrimitiveArray::<$result_arrow_cast_type>::from(
614                    data[0..50]
615                        .iter()
616                        .map(|x| *x as $result_primitive_type)
617                        .collect::<Vec<$result_primitive_type>>(),
618                );
619                let expected = Arc::new(expected) as ArrayRef;
620                let expected = arrow::compute::cast(&expected, &result_data_type)
621                    .expect("Unable to cast expected array");
622                assert_eq!(expected.data_type(), &result_data_type);
623                let expected = expected
624                    .as_any()
625                    .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
626                    .expect(
627                        format!(
628                            "Unable to downcast expected {:?} to {:?}",
629                            expected.data_type(),
630                            result_data_type
631                        )
632                        .as_str(),
633                    )
634                    $(.clone().with_timezone($timezone))?
635                    ;
636                assert_eq!(expected, array);
637            }
638        }};
639    }
640
641    #[test]
642    fn test_primitive_array_reader_temporal_types() {
643        test_primitive_array_reader_one_type!(
644            crate::data_type::Int32Type,
645            PhysicalType::INT32,
646            "DATE",
647            arrow::datatypes::Date32Type,
648            arrow::datatypes::Int32Type,
649            i32
650        );
651        test_primitive_array_reader_one_type!(
652            crate::data_type::Int32Type,
653            PhysicalType::INT32,
654            "TIME_MILLIS",
655            arrow::datatypes::Time32MillisecondType,
656            arrow::datatypes::Int32Type,
657            i32
658        );
659        test_primitive_array_reader_one_type!(
660            crate::data_type::Int64Type,
661            PhysicalType::INT64,
662            "TIME_MICROS",
663            arrow::datatypes::Time64MicrosecondType,
664            arrow::datatypes::Int64Type,
665            i64
666        );
667        test_primitive_array_reader_one_type!(
668            crate::data_type::Int64Type,
669            PhysicalType::INT64,
670            "TIMESTAMP_MILLIS",
671            arrow::datatypes::TimestampMillisecondType,
672            arrow::datatypes::Int64Type,
673            i64,
674            "UTC"
675        );
676        test_primitive_array_reader_one_type!(
677            crate::data_type::Int64Type,
678            PhysicalType::INT64,
679            "TIMESTAMP_MICROS",
680            arrow::datatypes::TimestampMicrosecondType,
681            arrow::datatypes::Int64Type,
682            i64,
683            "UTC"
684        );
685    }
686
687    #[test]
688    fn test_primitive_array_reader_def_and_rep_levels() {
689        // Construct column schema
690        let message_type = "
691        message test_schema {
692            REPEATED Group test_mid {
693                OPTIONAL INT32 leaf;
694            }
695        }
696        ";
697
698        let schema = parse_message_type(message_type)
699            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
700            .unwrap();
701
702        let column_desc = schema.column(0);
703
704        // Construct page iterator
705        {
706            let mut def_levels = Vec::new();
707            let mut rep_levels = Vec::new();
708            let mut page_lists = Vec::new();
709            make_column_chunks::<Int32Type>(
710                column_desc.clone(),
711                Encoding::PLAIN,
712                100,
713                1,
714                200,
715                &mut def_levels,
716                &mut rep_levels,
717                &mut Vec::new(),
718                &mut page_lists,
719                true,
720                2,
721            );
722
723            let page_iterator = InMemoryPageIterator::new(page_lists);
724
725            let mut array_reader =
726                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
727                    .unwrap();
728
729            let mut accu_len: usize = 0;
730
731            // Read first 50 values, which are all from the first column chunk
732            let array = array_reader.next_batch(50).unwrap();
733            assert_eq!(
734                Some(&def_levels[accu_len..(accu_len + array.len())]),
735                array_reader.get_def_levels()
736            );
737            assert_eq!(
738                Some(&rep_levels[accu_len..(accu_len + array.len())]),
739                array_reader.get_rep_levels()
740            );
741            accu_len += array.len();
742
743            // Read next 100 values, the first 50 ones are from the first column chunk,
744            // and the last 50 ones are from the second column chunk
745            let array = array_reader.next_batch(100).unwrap();
746            assert_eq!(
747                Some(&def_levels[accu_len..(accu_len + array.len())]),
748                array_reader.get_def_levels()
749            );
750            assert_eq!(
751                Some(&rep_levels[accu_len..(accu_len + array.len())]),
752                array_reader.get_rep_levels()
753            );
754            accu_len += array.len();
755
756            // Try to read 100 values, however there are only 50 values
757            let array = array_reader.next_batch(100).unwrap();
758            assert_eq!(
759                Some(&def_levels[accu_len..(accu_len + array.len())]),
760                array_reader.get_def_levels()
761            );
762            assert_eq!(
763                Some(&rep_levels[accu_len..(accu_len + array.len())]),
764                array_reader.get_rep_levels()
765            );
766        }
767    }
768
769    #[test]
770    fn test_primitive_array_reader_decimal_types() {
771        // parquet `INT32` to decimal
772        let message_type = "
773            message test_schema {
774                REQUIRED INT32 decimal1 (DECIMAL(8,2));
775        }
776        ";
777        let schema = parse_message_type(message_type)
778            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
779            .unwrap();
780        let column_desc = schema.column(0);
781
782        // create the array reader
783        {
784            let mut data = Vec::new();
785            let mut page_lists = Vec::new();
786            make_column_chunks::<Int32Type>(
787                column_desc.clone(),
788                Encoding::PLAIN,
789                100,
790                -99999999,
791                99999999,
792                &mut Vec::new(),
793                &mut Vec::new(),
794                &mut data,
795                &mut page_lists,
796                true,
797                2,
798            );
799            let page_iterator = InMemoryPageIterator::new(page_lists);
800
801            let mut array_reader =
802                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
803                    .unwrap();
804
805            // read data from the reader
806            // the data type is decimal(8,2)
807            let array = array_reader.next_batch(50).unwrap();
808            assert_eq!(array.data_type(), &Decimal128(8, 2));
809            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
810            let data_decimal_array = data[0..50]
811                .iter()
812                .copied()
813                .map(|v| Some(v as i128))
814                .collect::<Decimal128Array>()
815                .with_precision_and_scale(8, 2)
816                .unwrap();
817            assert_eq!(array, &data_decimal_array);
818
819            // not equal with different data type(precision and scale)
820            let data_decimal_array = data[0..50]
821                .iter()
822                .copied()
823                .map(|v| Some(v as i128))
824                .collect::<Decimal128Array>()
825                .with_precision_and_scale(9, 0)
826                .unwrap();
827            assert_ne!(array, &data_decimal_array)
828        }
829
830        // parquet `INT64` to decimal
831        let message_type = "
832            message test_schema {
833                REQUIRED INT64 decimal1 (DECIMAL(18,4));
834        }
835        ";
836        let schema = parse_message_type(message_type)
837            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
838            .unwrap();
839        let column_desc = schema.column(0);
840
841        // create the array reader
842        {
843            let mut data = Vec::new();
844            let mut page_lists = Vec::new();
845            make_column_chunks::<Int64Type>(
846                column_desc.clone(),
847                Encoding::PLAIN,
848                100,
849                -999999999999999999,
850                999999999999999999,
851                &mut Vec::new(),
852                &mut Vec::new(),
853                &mut data,
854                &mut page_lists,
855                true,
856                2,
857            );
858            let page_iterator = InMemoryPageIterator::new(page_lists);
859
860            let mut array_reader =
861                PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
862                    .unwrap();
863
864            // read data from the reader
865            // the data type is decimal(18,4)
866            let array = array_reader.next_batch(50).unwrap();
867            assert_eq!(array.data_type(), &Decimal128(18, 4));
868            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
869            let data_decimal_array = data[0..50]
870                .iter()
871                .copied()
872                .map(|v| Some(v as i128))
873                .collect::<Decimal128Array>()
874                .with_precision_and_scale(18, 4)
875                .unwrap();
876            assert_eq!(array, &data_decimal_array);
877
878            // not equal with different data type(precision and scale)
879            let data_decimal_array = data[0..50]
880                .iter()
881                .copied()
882                .map(|v| Some(v as i128))
883                .collect::<Decimal128Array>()
884                .with_precision_and_scale(34, 0)
885                .unwrap();
886            assert_ne!(array, &data_decimal_array)
887        }
888    }
889
890    #[test]
891    fn test_primitive_array_reader_date32_type() {
892        // parquet `INT32` to date
893        let message_type = "
894            message test_schema {
895                REQUIRED INT32 date1 (DATE);
896        }
897        ";
898        let schema = parse_message_type(message_type)
899            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
900            .unwrap();
901        let column_desc = schema.column(0);
902
903        // create the array reader
904        {
905            let mut data = Vec::new();
906            let mut page_lists = Vec::new();
907            make_column_chunks::<Int32Type>(
908                column_desc.clone(),
909                Encoding::PLAIN,
910                100,
911                -99999999,
912                99999999,
913                &mut Vec::new(),
914                &mut Vec::new(),
915                &mut data,
916                &mut page_lists,
917                true,
918                2,
919            );
920            let page_iterator = InMemoryPageIterator::new(page_lists);
921
922            let mut array_reader =
923                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
924                    .unwrap();
925
926            // read data from the reader
927            // the data type is date
928            let array = array_reader.next_batch(50).unwrap();
929            assert_eq!(array.data_type(), &Date32);
930            let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
931            let data_date_array = data[0..50]
932                .iter()
933                .copied()
934                .map(Some)
935                .collect::<Date32Array>();
936            assert_eq!(array, &data_date_array);
937        }
938    }
939}