parquet/arrow/array_reader/
primitive_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::{ArrayReader, read_records, skip_records};
19use crate::arrow::record_reader::RecordReader;
20use crate::arrow::schema::parquet_to_arrow_field;
21use crate::basic::Type as PhysicalType;
22use crate::column::page::PageIterator;
23use crate::data_type::{DataType, Int96};
24use crate::errors::{ParquetError, Result};
25use crate::schema::types::ColumnDescPtr;
26use arrow_array::{
27    ArrayRef, BooleanArray, Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array,
28    Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array,
29    TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
30    TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
31    builder::{
32        TimestampMicrosecondBufferBuilder, TimestampMillisecondBufferBuilder,
33        TimestampNanosecondBufferBuilder, TimestampSecondBufferBuilder,
34    },
35};
36use arrow_buffer::{BooleanBuffer, Buffer, i256};
37use arrow_data::ArrayDataBuilder;
38use arrow_schema::{DataType as ArrowType, TimeUnit};
39use std::any::Any;
40use std::sync::Arc;
41
42/// Provides conversion from `Vec<T>` to `Buffer`
43pub trait IntoBuffer {
44    fn into_buffer(self, target_type: &ArrowType) -> Buffer;
45}
46
47macro_rules! native_buffer {
48    ($($t:ty),*) => {
49        $(impl IntoBuffer for Vec<$t> {
50            fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
51                Buffer::from_vec(self)
52            }
53        })*
54    };
55}
56native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64);
57
58impl IntoBuffer for Vec<bool> {
59    fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
60        BooleanBuffer::from_iter(self).into_inner()
61    }
62}
63
64impl IntoBuffer for Vec<Int96> {
65    fn into_buffer(self, target_type: &ArrowType) -> Buffer {
66        match target_type {
67            ArrowType::Timestamp(TimeUnit::Second, _) => {
68                let mut builder = TimestampSecondBufferBuilder::new(self.len());
69                for v in self {
70                    builder.append(v.to_seconds())
71                }
72                builder.finish()
73            }
74            ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
75                let mut builder = TimestampMillisecondBufferBuilder::new(self.len());
76                for v in self {
77                    builder.append(v.to_millis())
78                }
79                builder.finish()
80            }
81            ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
82                let mut builder = TimestampMicrosecondBufferBuilder::new(self.len());
83                for v in self {
84                    builder.append(v.to_micros())
85                }
86                builder.finish()
87            }
88            ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
89                let mut builder = TimestampNanosecondBufferBuilder::new(self.len());
90                for v in self {
91                    builder.append(v.to_nanos())
92                }
93                builder.finish()
94            }
95            _ => unreachable!("Invalid target_type for Int96."),
96        }
97    }
98}
99
100/// Primitive array readers are leaves of array reader tree. They accept page iterator
101/// and read them into primitive arrays.
102pub struct PrimitiveArrayReader<T>
103where
104    T: DataType,
105    T::T: Copy + Default,
106    Vec<T::T>: IntoBuffer,
107{
108    data_type: ArrowType,
109    pages: Box<dyn PageIterator>,
110    def_levels_buffer: Option<Vec<i16>>,
111    rep_levels_buffer: Option<Vec<i16>>,
112    record_reader: RecordReader<T>,
113}
114
115impl<T> PrimitiveArrayReader<T>
116where
117    T: DataType,
118    T::T: Copy + Default,
119    Vec<T::T>: IntoBuffer,
120{
121    /// Construct primitive array reader.
122    pub fn new(
123        pages: Box<dyn PageIterator>,
124        column_desc: ColumnDescPtr,
125        arrow_type: Option<ArrowType>,
126    ) -> Result<Self> {
127        // Check if Arrow type is specified, else create it from Parquet type
128        let data_type = match arrow_type {
129            Some(t) => t,
130            None => parquet_to_arrow_field(column_desc.as_ref())?
131                .data_type()
132                .clone(),
133        };
134
135        let record_reader = RecordReader::<T>::new(column_desc);
136
137        Ok(Self {
138            data_type,
139            pages,
140            def_levels_buffer: None,
141            rep_levels_buffer: None,
142            record_reader,
143        })
144    }
145}
146
147/// Implementation of primitive array reader.
148impl<T> ArrayReader for PrimitiveArrayReader<T>
149where
150    T: DataType,
151    T::T: Copy + Default,
152    Vec<T::T>: IntoBuffer,
153{
154    fn as_any(&self) -> &dyn Any {
155        self
156    }
157
158    /// Returns data type of primitive array.
159    fn get_data_type(&self) -> &ArrowType {
160        &self.data_type
161    }
162
163    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
164        read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
165    }
166
167    fn consume_batch(&mut self) -> Result<ArrayRef> {
168        let target_type = &self.data_type;
169        let arrow_data_type = match T::get_physical_type() {
170            PhysicalType::BOOLEAN => ArrowType::Boolean,
171            PhysicalType::INT32 => {
172                match target_type {
173                    ArrowType::UInt32 => {
174                        // follow C++ implementation and use overflow/reinterpret cast from  i32 to u32 which will map
175                        // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX`
176                        ArrowType::UInt32
177                    }
178                    ArrowType::Decimal32(_, _) => target_type.clone(),
179                    _ => ArrowType::Int32,
180                }
181            }
182            PhysicalType::INT64 => {
183                match target_type {
184                    ArrowType::UInt64 => {
185                        // follow C++ implementation and use overflow/reinterpret cast from  i64 to u64 which will map
186                        // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX`
187                        ArrowType::UInt64
188                    }
189                    ArrowType::Decimal64(_, _) => target_type.clone(),
190                    _ => ArrowType::Int64,
191                }
192            }
193            PhysicalType::FLOAT => ArrowType::Float32,
194            PhysicalType::DOUBLE => ArrowType::Float64,
195            PhysicalType::INT96 => match target_type {
196                ArrowType::Timestamp(TimeUnit::Second, _) => target_type.clone(),
197                ArrowType::Timestamp(TimeUnit::Millisecond, _) => target_type.clone(),
198                ArrowType::Timestamp(TimeUnit::Microsecond, _) => target_type.clone(),
199                ArrowType::Timestamp(TimeUnit::Nanosecond, _) => target_type.clone(),
200                _ => unreachable!("INT96 must be a timestamp."),
201            },
202            PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
203                unreachable!("PrimitiveArrayReaders don't support complex physical types");
204            }
205        };
206
207        // Convert to arrays by using the Parquet physical type.
208        // The physical types are then cast to Arrow types if necessary
209
210        let record_data = self
211            .record_reader
212            .consume_record_data()
213            .into_buffer(target_type);
214
215        let array_data = ArrayDataBuilder::new(arrow_data_type)
216            .len(self.record_reader.num_values())
217            .add_buffer(record_data)
218            .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
219
220        let array_data = unsafe { array_data.build_unchecked() };
221        let array: ArrayRef = match T::get_physical_type() {
222            PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)),
223            PhysicalType::INT32 => match array_data.data_type() {
224                ArrowType::UInt32 => Arc::new(UInt32Array::from(array_data)),
225                ArrowType::Int32 => Arc::new(Int32Array::from(array_data)),
226                ArrowType::Decimal32(_, _) => Arc::new(Decimal32Array::from(array_data)),
227                _ => unreachable!(),
228            },
229            PhysicalType::INT64 => match array_data.data_type() {
230                ArrowType::UInt64 => Arc::new(UInt64Array::from(array_data)),
231                ArrowType::Int64 => Arc::new(Int64Array::from(array_data)),
232                ArrowType::Decimal64(_, _) => Arc::new(Decimal64Array::from(array_data)),
233                _ => unreachable!(),
234            },
235            PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)),
236            PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)),
237            PhysicalType::INT96 => match target_type {
238                ArrowType::Timestamp(TimeUnit::Second, _) => {
239                    Arc::new(TimestampSecondArray::from(array_data))
240                }
241                ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
242                    Arc::new(TimestampMillisecondArray::from(array_data))
243                }
244                ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
245                    Arc::new(TimestampMicrosecondArray::from(array_data))
246                }
247                ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
248                    Arc::new(TimestampNanosecondArray::from(array_data))
249                }
250                _ => unreachable!("INT96 must be a timestamp."),
251            },
252
253            PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
254                unreachable!("PrimitiveArrayReaders don't support complex physical types");
255            }
256        };
257
258        // cast to Arrow type
259        // We make a strong assumption here that the casts should be infallible.
260        // If the cast fails because of incompatible datatypes, then there might
261        // be a bigger problem with how Arrow schemas are converted to Parquet.
262        //
263        // As there is not always a 1:1 mapping between Arrow and Parquet, there
264        // are datatypes which we must convert explicitly.
265        // These are:
266        // - date64: cast int32 to date32, then date32 to date64.
267        // - decimal: cast int32 to decimal, int64 to decimal
268        let array = match target_type {
269            // Using `arrow_cast::cast` has been found to be very slow for converting
270            // INT32 physical type to lower bitwidth logical types. Since rust casts
271            // are infallible, instead use `unary` which is much faster (by up to 40%).
272            // One consequence of this approach is that some malformed integer columns
273            // will return (an arguably correct) result rather than null.
274            // See https://github.com/apache/arrow-rs/issues/7040 for a discussion of this
275            // issue.
276            ArrowType::UInt8 if *(array.data_type()) == ArrowType::Int32 => {
277                let array = array
278                    .as_any()
279                    .downcast_ref::<Int32Array>()
280                    .unwrap()
281                    .unary(|i| i as u8) as UInt8Array;
282                Arc::new(array) as ArrayRef
283            }
284            ArrowType::Int8 if *(array.data_type()) == ArrowType::Int32 => {
285                let array = array
286                    .as_any()
287                    .downcast_ref::<Int32Array>()
288                    .unwrap()
289                    .unary(|i| i as i8) as Int8Array;
290                Arc::new(array) as ArrayRef
291            }
292            ArrowType::UInt16 if *(array.data_type()) == ArrowType::Int32 => {
293                let array = array
294                    .as_any()
295                    .downcast_ref::<Int32Array>()
296                    .unwrap()
297                    .unary(|i| i as u16) as UInt16Array;
298                Arc::new(array) as ArrayRef
299            }
300            ArrowType::Int16 if *(array.data_type()) == ArrowType::Int32 => {
301                let array = array
302                    .as_any()
303                    .downcast_ref::<Int32Array>()
304                    .unwrap()
305                    .unary(|i| i as i16) as Int16Array;
306                Arc::new(array) as ArrayRef
307            }
308            ArrowType::Date64 if *(array.data_type()) == ArrowType::Int32 => {
309                // this is cheap as it internally reinterprets the data
310                let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
311                arrow_cast::cast(&a, target_type)?
312            }
313            ArrowType::Decimal64(p, s) if *(array.data_type()) == ArrowType::Int32 => {
314                // Apply conversion to all elements regardless of null slots as the conversion
315                // to `i64` is infallible. This improves performance by avoiding a branch in
316                // the inner loop (see docs for `PrimitiveArray::unary`).
317                let array = match array.data_type() {
318                    ArrowType::Int32 => array
319                        .as_any()
320                        .downcast_ref::<Int32Array>()
321                        .unwrap()
322                        .unary(|i| i as i64)
323                        as Decimal64Array,
324                    _ => {
325                        return Err(arrow_err!(
326                            "Cannot convert {:?} to decimal",
327                            array.data_type()
328                        ));
329                    }
330                }
331                .with_precision_and_scale(*p, *s)?;
332
333                Arc::new(array) as ArrayRef
334            }
335            ArrowType::Decimal128(p, s) => {
336                // See above comment. Conversion to `i128` is likewise infallible.
337                let array = match array.data_type() {
338                    ArrowType::Int32 => array
339                        .as_any()
340                        .downcast_ref::<Int32Array>()
341                        .unwrap()
342                        .unary(|i| i as i128)
343                        as Decimal128Array,
344                    ArrowType::Int64 => array
345                        .as_any()
346                        .downcast_ref::<Int64Array>()
347                        .unwrap()
348                        .unary(|i| i as i128)
349                        as Decimal128Array,
350                    _ => {
351                        return Err(arrow_err!(
352                            "Cannot convert {:?} to decimal",
353                            array.data_type()
354                        ));
355                    }
356                }
357                .with_precision_and_scale(*p, *s)?;
358
359                Arc::new(array) as ArrayRef
360            }
361            ArrowType::Decimal256(p, s) => {
362                // See above comment. Conversion to `i256` is likewise infallible.
363                let array = match array.data_type() {
364                    ArrowType::Int32 => array
365                        .as_any()
366                        .downcast_ref::<Int32Array>()
367                        .unwrap()
368                        .unary(|i| i256::from_i128(i as i128))
369                        as Decimal256Array,
370                    ArrowType::Int64 => array
371                        .as_any()
372                        .downcast_ref::<Int64Array>()
373                        .unwrap()
374                        .unary(|i| i256::from_i128(i as i128))
375                        as Decimal256Array,
376                    _ => {
377                        return Err(arrow_err!(
378                            "Cannot convert {:?} to decimal",
379                            array.data_type()
380                        ));
381                    }
382                }
383                .with_precision_and_scale(*p, *s)?;
384
385                Arc::new(array) as ArrayRef
386            }
387            ArrowType::Dictionary(_, value_type) => match value_type.as_ref() {
388                ArrowType::Decimal32(p, s) => {
389                    let array = match array.data_type() {
390                        ArrowType::Int32 => array
391                            .as_any()
392                            .downcast_ref::<Int32Array>()
393                            .unwrap()
394                            .unary(|i| i)
395                            as Decimal32Array,
396                        _ => {
397                            return Err(arrow_err!(
398                                "Cannot convert {:?} to decimal dictionary",
399                                array.data_type()
400                            ));
401                        }
402                    }
403                    .with_precision_and_scale(*p, *s)?;
404
405                    arrow_cast::cast(&array, target_type)?
406                }
407                ArrowType::Decimal64(p, s) => {
408                    let array = match array.data_type() {
409                        ArrowType::Int32 => array
410                            .as_any()
411                            .downcast_ref::<Int32Array>()
412                            .unwrap()
413                            .unary(|i| i as i64)
414                            as Decimal64Array,
415                        ArrowType::Int64 => array
416                            .as_any()
417                            .downcast_ref::<Int64Array>()
418                            .unwrap()
419                            .unary(|i| i)
420                            as Decimal64Array,
421                        _ => {
422                            return Err(arrow_err!(
423                                "Cannot convert {:?} to decimal dictionary",
424                                array.data_type()
425                            ));
426                        }
427                    }
428                    .with_precision_and_scale(*p, *s)?;
429
430                    arrow_cast::cast(&array, target_type)?
431                }
432                ArrowType::Decimal128(p, s) => {
433                    let array = match array.data_type() {
434                        ArrowType::Int32 => array
435                            .as_any()
436                            .downcast_ref::<Int32Array>()
437                            .unwrap()
438                            .unary(|i| i as i128)
439                            as Decimal128Array,
440                        ArrowType::Int64 => array
441                            .as_any()
442                            .downcast_ref::<Int64Array>()
443                            .unwrap()
444                            .unary(|i| i as i128)
445                            as Decimal128Array,
446                        _ => {
447                            return Err(arrow_err!(
448                                "Cannot convert {:?} to decimal dictionary",
449                                array.data_type()
450                            ));
451                        }
452                    }
453                    .with_precision_and_scale(*p, *s)?;
454
455                    arrow_cast::cast(&array, target_type)?
456                }
457                ArrowType::Decimal256(p, s) => {
458                    let array = match array.data_type() {
459                        ArrowType::Int32 => array
460                            .as_any()
461                            .downcast_ref::<Int32Array>()
462                            .unwrap()
463                            .unary(i256::from)
464                            as Decimal256Array,
465                        ArrowType::Int64 => array
466                            .as_any()
467                            .downcast_ref::<Int64Array>()
468                            .unwrap()
469                            .unary(i256::from)
470                            as Decimal256Array,
471                        _ => {
472                            return Err(arrow_err!(
473                                "Cannot convert {:?} to decimal dictionary",
474                                array.data_type()
475                            ));
476                        }
477                    }
478                    .with_precision_and_scale(*p, *s)?;
479
480                    arrow_cast::cast(&array, target_type)?
481                }
482                _ => arrow_cast::cast(&array, target_type)?,
483            },
484            _ => arrow_cast::cast(&array, target_type)?,
485        };
486
487        // save definition and repetition buffers
488        self.def_levels_buffer = self.record_reader.consume_def_levels();
489        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
490        self.record_reader.reset();
491        Ok(array)
492    }
493
494    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
495        skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
496    }
497
498    fn get_def_levels(&self) -> Option<&[i16]> {
499        self.def_levels_buffer.as_deref()
500    }
501
502    fn get_rep_levels(&self) -> Option<&[i16]> {
503        self.rep_levels_buffer.as_deref()
504    }
505}
506
507#[cfg(test)]
508mod tests {
509    use super::*;
510    use crate::arrow::array_reader::test_util::EmptyPageIterator;
511    use crate::basic::Encoding;
512    use crate::column::page::Page;
513    use crate::data_type::{Int32Type, Int64Type};
514    use crate::schema::parser::parse_message_type;
515    use crate::schema::types::SchemaDescriptor;
516    use crate::util::InMemoryPageIterator;
517    use crate::util::test_common::rand_gen::make_pages;
518    use arrow::datatypes::ArrowPrimitiveType;
519    use arrow_array::{Array, Date32Array, PrimitiveArray};
520
521    use arrow::datatypes::DataType::{Date32, Decimal128};
522    use rand::distr::uniform::SampleUniform;
523    use std::collections::VecDeque;
524
525    #[allow(clippy::too_many_arguments)]
526    fn make_column_chunks<T: DataType>(
527        column_desc: ColumnDescPtr,
528        encoding: Encoding,
529        num_levels: usize,
530        min_value: T::T,
531        max_value: T::T,
532        def_levels: &mut Vec<i16>,
533        rep_levels: &mut Vec<i16>,
534        values: &mut Vec<T::T>,
535        page_lists: &mut Vec<Vec<Page>>,
536        use_v2: bool,
537        num_chunks: usize,
538    ) where
539        T::T: PartialOrd + SampleUniform + Copy,
540    {
541        for _i in 0..num_chunks {
542            let mut pages = VecDeque::new();
543            let mut data = Vec::new();
544            let mut page_def_levels = Vec::new();
545            let mut page_rep_levels = Vec::new();
546
547            make_pages::<T>(
548                column_desc.clone(),
549                encoding,
550                1,
551                num_levels,
552                min_value,
553                max_value,
554                &mut page_def_levels,
555                &mut page_rep_levels,
556                &mut data,
557                &mut pages,
558                use_v2,
559            );
560
561            def_levels.append(&mut page_def_levels);
562            rep_levels.append(&mut page_rep_levels);
563            values.append(&mut data);
564            page_lists.push(Vec::from(pages));
565        }
566    }
567
568    #[test]
569    fn test_primitive_array_reader_empty_pages() {
570        // Construct column schema
571        let message_type = "
572        message test_schema {
573          REQUIRED INT32 leaf;
574        }
575        ";
576
577        let schema = parse_message_type(message_type)
578            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
579            .unwrap();
580
581        let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
582            Box::<EmptyPageIterator>::default(),
583            schema.column(0),
584            None,
585        )
586        .unwrap();
587
588        // expect no values to be read
589        let array = array_reader.next_batch(50).unwrap();
590        assert!(array.is_empty());
591    }
592
593    #[test]
594    fn test_primitive_array_reader_data() {
595        // Construct column schema
596        let message_type = "
597        message test_schema {
598          REQUIRED INT32 leaf;
599        }
600        ";
601
602        let schema = parse_message_type(message_type)
603            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
604            .unwrap();
605
606        let column_desc = schema.column(0);
607
608        // Construct page iterator
609        {
610            let mut data = Vec::new();
611            let mut page_lists = Vec::new();
612            make_column_chunks::<Int32Type>(
613                column_desc.clone(),
614                Encoding::PLAIN,
615                100,
616                1,
617                200,
618                &mut Vec::new(),
619                &mut Vec::new(),
620                &mut data,
621                &mut page_lists,
622                true,
623                2,
624            );
625            let page_iterator = InMemoryPageIterator::new(page_lists);
626
627            let mut array_reader =
628                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
629                    .unwrap();
630
631            // Read first 50 values, which are all from the first column chunk
632            let array = array_reader.next_batch(50).unwrap();
633            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
634
635            assert_eq!(&Int32Array::from(data[0..50].to_vec()), array);
636
637            // Read next 100 values, the first 50 ones are from the first column chunk,
638            // and the last 50 ones are from the second column chunk
639            let array = array_reader.next_batch(100).unwrap();
640            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
641
642            assert_eq!(&Int32Array::from(data[50..150].to_vec()), array);
643
644            // Try to read 100 values, however there are only 50 values
645            let array = array_reader.next_batch(100).unwrap();
646            let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
647
648            assert_eq!(&Int32Array::from(data[150..200].to_vec()), array);
649        }
650    }
651
652    macro_rules! test_primitive_array_reader_one_type {
653        (
654            $arrow_parquet_type:ty,
655            $physical_type:expr,
656            $converted_type_str:expr,
657            $result_arrow_type:ty,
658            $result_arrow_cast_type:ty,
659            $result_primitive_type:ty
660            $(, $timezone:expr)?
661        ) => {{
662            let message_type = format!(
663                "
664            message test_schema {{
665              REQUIRED {:?} leaf ({});
666          }}
667            ",
668                $physical_type, $converted_type_str
669            );
670            let schema = parse_message_type(&message_type)
671                .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
672                .unwrap();
673
674            let column_desc = schema.column(0);
675
676            // Construct page iterator
677            {
678                let mut data = Vec::new();
679                let mut page_lists = Vec::new();
680                make_column_chunks::<$arrow_parquet_type>(
681                    column_desc.clone(),
682                    Encoding::PLAIN,
683                    100,
684                    1,
685                    200,
686                    &mut Vec::new(),
687                    &mut Vec::new(),
688                    &mut data,
689                    &mut page_lists,
690                    true,
691                    2,
692                );
693                let page_iterator = InMemoryPageIterator::new(page_lists);
694                let mut array_reader = PrimitiveArrayReader::<$arrow_parquet_type>::new(
695                    Box::new(page_iterator),
696                    column_desc.clone(),
697                    None,
698                )
699                .expect("Unable to get array reader");
700
701                let array = array_reader
702                    .next_batch(50)
703                    .expect("Unable to get batch from reader");
704
705                let result_data_type = <$result_arrow_type>::DATA_TYPE;
706                let array = array
707                    .as_any()
708                    .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
709                    .expect(
710                        format!(
711                            "Unable to downcast {:?} to {:?}",
712                            array.data_type(),
713                            result_data_type
714                        )
715                        .as_str(),
716                    )
717                    $(.clone().with_timezone($timezone))?
718                    ;
719
720                // create expected array as primitive, and cast to result type
721                let expected = PrimitiveArray::<$result_arrow_cast_type>::from(
722                    data[0..50]
723                        .iter()
724                        .map(|x| *x as $result_primitive_type)
725                        .collect::<Vec<$result_primitive_type>>(),
726                );
727                let expected = Arc::new(expected) as ArrayRef;
728                let expected = arrow::compute::cast(&expected, &result_data_type)
729                    .expect("Unable to cast expected array");
730                assert_eq!(expected.data_type(), &result_data_type);
731                let expected = expected
732                    .as_any()
733                    .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
734                    .expect(
735                        format!(
736                            "Unable to downcast expected {:?} to {:?}",
737                            expected.data_type(),
738                            result_data_type
739                        )
740                        .as_str(),
741                    )
742                    $(.clone().with_timezone($timezone))?
743                    ;
744                assert_eq!(expected, array);
745            }
746        }};
747    }
748
749    #[test]
750    fn test_primitive_array_reader_temporal_types() {
751        test_primitive_array_reader_one_type!(
752            crate::data_type::Int32Type,
753            PhysicalType::INT32,
754            "DATE",
755            arrow::datatypes::Date32Type,
756            arrow::datatypes::Int32Type,
757            i32
758        );
759        test_primitive_array_reader_one_type!(
760            crate::data_type::Int32Type,
761            PhysicalType::INT32,
762            "TIME_MILLIS",
763            arrow::datatypes::Time32MillisecondType,
764            arrow::datatypes::Int32Type,
765            i32
766        );
767        test_primitive_array_reader_one_type!(
768            crate::data_type::Int64Type,
769            PhysicalType::INT64,
770            "TIME_MICROS",
771            arrow::datatypes::Time64MicrosecondType,
772            arrow::datatypes::Int64Type,
773            i64
774        );
775        test_primitive_array_reader_one_type!(
776            crate::data_type::Int64Type,
777            PhysicalType::INT64,
778            "TIMESTAMP_MILLIS",
779            arrow::datatypes::TimestampMillisecondType,
780            arrow::datatypes::Int64Type,
781            i64,
782            "UTC"
783        );
784        test_primitive_array_reader_one_type!(
785            crate::data_type::Int64Type,
786            PhysicalType::INT64,
787            "TIMESTAMP_MICROS",
788            arrow::datatypes::TimestampMicrosecondType,
789            arrow::datatypes::Int64Type,
790            i64,
791            "UTC"
792        );
793    }
794
795    #[test]
796    fn test_primitive_array_reader_def_and_rep_levels() {
797        // Construct column schema
798        let message_type = "
799        message test_schema {
800            REPEATED Group test_mid {
801                OPTIONAL INT32 leaf;
802            }
803        }
804        ";
805
806        let schema = parse_message_type(message_type)
807            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
808            .unwrap();
809
810        let column_desc = schema.column(0);
811
812        // Construct page iterator
813        {
814            let mut def_levels = Vec::new();
815            let mut rep_levels = Vec::new();
816            let mut page_lists = Vec::new();
817            make_column_chunks::<Int32Type>(
818                column_desc.clone(),
819                Encoding::PLAIN,
820                100,
821                1,
822                200,
823                &mut def_levels,
824                &mut rep_levels,
825                &mut Vec::new(),
826                &mut page_lists,
827                true,
828                2,
829            );
830
831            let page_iterator = InMemoryPageIterator::new(page_lists);
832
833            let mut array_reader =
834                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
835                    .unwrap();
836
837            let mut accu_len: usize = 0;
838
839            // Read first 50 values, which are all from the first column chunk
840            let array = array_reader.next_batch(50).unwrap();
841            assert_eq!(
842                Some(&def_levels[accu_len..(accu_len + array.len())]),
843                array_reader.get_def_levels()
844            );
845            assert_eq!(
846                Some(&rep_levels[accu_len..(accu_len + array.len())]),
847                array_reader.get_rep_levels()
848            );
849            accu_len += array.len();
850
851            // Read next 100 values, the first 50 ones are from the first column chunk,
852            // and the last 50 ones are from the second column chunk
853            let array = array_reader.next_batch(100).unwrap();
854            assert_eq!(
855                Some(&def_levels[accu_len..(accu_len + array.len())]),
856                array_reader.get_def_levels()
857            );
858            assert_eq!(
859                Some(&rep_levels[accu_len..(accu_len + array.len())]),
860                array_reader.get_rep_levels()
861            );
862            accu_len += array.len();
863
864            // Try to read 100 values, however there are only 50 values
865            let array = array_reader.next_batch(100).unwrap();
866            assert_eq!(
867                Some(&def_levels[accu_len..(accu_len + array.len())]),
868                array_reader.get_def_levels()
869            );
870            assert_eq!(
871                Some(&rep_levels[accu_len..(accu_len + array.len())]),
872                array_reader.get_rep_levels()
873            );
874        }
875    }
876
877    #[test]
878    fn test_primitive_array_reader_decimal_types() {
879        // parquet `INT32` to decimal
880        let message_type = "
881            message test_schema {
882                REQUIRED INT32 decimal1 (DECIMAL(8,2));
883        }
884        ";
885        let schema = parse_message_type(message_type)
886            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
887            .unwrap();
888        let column_desc = schema.column(0);
889
890        // create the array reader
891        {
892            let mut data = Vec::new();
893            let mut page_lists = Vec::new();
894            make_column_chunks::<Int32Type>(
895                column_desc.clone(),
896                Encoding::PLAIN,
897                100,
898                -99999999,
899                99999999,
900                &mut Vec::new(),
901                &mut Vec::new(),
902                &mut data,
903                &mut page_lists,
904                true,
905                2,
906            );
907            let page_iterator = InMemoryPageIterator::new(page_lists);
908
909            let mut array_reader =
910                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
911                    .unwrap();
912
913            // read data from the reader
914            // the data type is decimal(8,2)
915            let array = array_reader.next_batch(50).unwrap();
916            assert_eq!(array.data_type(), &Decimal128(8, 2));
917            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
918            let data_decimal_array = data[0..50]
919                .iter()
920                .copied()
921                .map(|v| Some(v as i128))
922                .collect::<Decimal128Array>()
923                .with_precision_and_scale(8, 2)
924                .unwrap();
925            assert_eq!(array, &data_decimal_array);
926
927            // not equal with different data type(precision and scale)
928            let data_decimal_array = data[0..50]
929                .iter()
930                .copied()
931                .map(|v| Some(v as i128))
932                .collect::<Decimal128Array>()
933                .with_precision_and_scale(9, 0)
934                .unwrap();
935            assert_ne!(array, &data_decimal_array)
936        }
937
938        // parquet `INT64` to decimal
939        let message_type = "
940            message test_schema {
941                REQUIRED INT64 decimal1 (DECIMAL(18,4));
942        }
943        ";
944        let schema = parse_message_type(message_type)
945            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
946            .unwrap();
947        let column_desc = schema.column(0);
948
949        // create the array reader
950        {
951            let mut data = Vec::new();
952            let mut page_lists = Vec::new();
953            make_column_chunks::<Int64Type>(
954                column_desc.clone(),
955                Encoding::PLAIN,
956                100,
957                -999999999999999999,
958                999999999999999999,
959                &mut Vec::new(),
960                &mut Vec::new(),
961                &mut data,
962                &mut page_lists,
963                true,
964                2,
965            );
966            let page_iterator = InMemoryPageIterator::new(page_lists);
967
968            let mut array_reader =
969                PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
970                    .unwrap();
971
972            // read data from the reader
973            // the data type is decimal(18,4)
974            let array = array_reader.next_batch(50).unwrap();
975            assert_eq!(array.data_type(), &Decimal128(18, 4));
976            let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
977            let data_decimal_array = data[0..50]
978                .iter()
979                .copied()
980                .map(|v| Some(v as i128))
981                .collect::<Decimal128Array>()
982                .with_precision_and_scale(18, 4)
983                .unwrap();
984            assert_eq!(array, &data_decimal_array);
985
986            // not equal with different data type(precision and scale)
987            let data_decimal_array = data[0..50]
988                .iter()
989                .copied()
990                .map(|v| Some(v as i128))
991                .collect::<Decimal128Array>()
992                .with_precision_and_scale(34, 0)
993                .unwrap();
994            assert_ne!(array, &data_decimal_array)
995        }
996    }
997
998    #[test]
999    fn test_primitive_array_reader_date32_type() {
1000        // parquet `INT32` to date
1001        let message_type = "
1002            message test_schema {
1003                REQUIRED INT32 date1 (DATE);
1004        }
1005        ";
1006        let schema = parse_message_type(message_type)
1007            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
1008            .unwrap();
1009        let column_desc = schema.column(0);
1010
1011        // create the array reader
1012        {
1013            let mut data = Vec::new();
1014            let mut page_lists = Vec::new();
1015            make_column_chunks::<Int32Type>(
1016                column_desc.clone(),
1017                Encoding::PLAIN,
1018                100,
1019                -99999999,
1020                99999999,
1021                &mut Vec::new(),
1022                &mut Vec::new(),
1023                &mut data,
1024                &mut page_lists,
1025                true,
1026                2,
1027            );
1028            let page_iterator = InMemoryPageIterator::new(page_lists);
1029
1030            let mut array_reader =
1031                PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
1032                    .unwrap();
1033
1034            // read data from the reader
1035            // the data type is date
1036            let array = array_reader.next_batch(50).unwrap();
1037            assert_eq!(array.data_type(), &Date32);
1038            let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
1039            let data_date_array = data[0..50]
1040                .iter()
1041                .copied()
1042                .map(Some)
1043                .collect::<Date32Array>();
1044            assert_eq!(array, &data_date_array);
1045        }
1046    }
1047}