1use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
19use crate::arrow::record_reader::RecordReader;
20use crate::arrow::schema::parquet_to_arrow_field;
21use crate::basic::Type as PhysicalType;
22use crate::column::page::PageIterator;
23use crate::data_type::{DataType, Int96};
24use crate::errors::{ParquetError, Result};
25use crate::schema::types::ColumnDescPtr;
26use arrow_array::{
27 builder::{
28 TimestampMicrosecondBufferBuilder, TimestampMillisecondBufferBuilder,
29 TimestampNanosecondBufferBuilder, TimestampSecondBufferBuilder,
30 },
31 ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
32 Int32Array, Int64Array, TimestampMicrosecondArray, TimestampMillisecondArray,
33 TimestampNanosecondArray, TimestampSecondArray, UInt32Array, UInt64Array,
34};
35use arrow_buffer::{i256, BooleanBuffer, Buffer};
36use arrow_data::ArrayDataBuilder;
37use arrow_schema::{DataType as ArrowType, TimeUnit};
38use std::any::Any;
39use std::sync::Arc;
40
41pub trait IntoBuffer {
43 fn into_buffer(self, target_type: &ArrowType) -> Buffer;
44}
45
46macro_rules! native_buffer {
47 ($($t:ty),*) => {
48 $(impl IntoBuffer for Vec<$t> {
49 fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
50 Buffer::from_vec(self)
51 }
52 })*
53 };
54}
55native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64);
56
57impl IntoBuffer for Vec<bool> {
58 fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
59 BooleanBuffer::from_iter(self).into_inner()
60 }
61}
62
63impl IntoBuffer for Vec<Int96> {
64 fn into_buffer(self, target_type: &ArrowType) -> Buffer {
65 match target_type {
66 ArrowType::Timestamp(TimeUnit::Second, _) => {
67 let mut builder = TimestampSecondBufferBuilder::new(self.len());
68 for v in self {
69 builder.append(v.to_seconds())
70 }
71 builder.finish()
72 }
73 ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
74 let mut builder = TimestampMillisecondBufferBuilder::new(self.len());
75 for v in self {
76 builder.append(v.to_millis())
77 }
78 builder.finish()
79 }
80 ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
81 let mut builder = TimestampMicrosecondBufferBuilder::new(self.len());
82 for v in self {
83 builder.append(v.to_micros())
84 }
85 builder.finish()
86 }
87 ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
88 let mut builder = TimestampNanosecondBufferBuilder::new(self.len());
89 for v in self {
90 builder.append(v.to_nanos())
91 }
92 builder.finish()
93 }
94 _ => unreachable!("Invalid target_type for Int96."),
95 }
96 }
97}
98
99pub struct PrimitiveArrayReader<T>
102where
103 T: DataType,
104 T::T: Copy + Default,
105 Vec<T::T>: IntoBuffer,
106{
107 data_type: ArrowType,
108 pages: Box<dyn PageIterator>,
109 def_levels_buffer: Option<Vec<i16>>,
110 rep_levels_buffer: Option<Vec<i16>>,
111 record_reader: RecordReader<T>,
112}
113
114impl<T> PrimitiveArrayReader<T>
115where
116 T: DataType,
117 T::T: Copy + Default,
118 Vec<T::T>: IntoBuffer,
119{
120 pub fn new(
122 pages: Box<dyn PageIterator>,
123 column_desc: ColumnDescPtr,
124 arrow_type: Option<ArrowType>,
125 ) -> Result<Self> {
126 let data_type = match arrow_type {
128 Some(t) => t,
129 None => parquet_to_arrow_field(column_desc.as_ref())?
130 .data_type()
131 .clone(),
132 };
133
134 let record_reader = RecordReader::<T>::new(column_desc);
135
136 Ok(Self {
137 data_type,
138 pages,
139 def_levels_buffer: None,
140 rep_levels_buffer: None,
141 record_reader,
142 })
143 }
144}
145
146impl<T> ArrayReader for PrimitiveArrayReader<T>
148where
149 T: DataType,
150 T::T: Copy + Default,
151 Vec<T::T>: IntoBuffer,
152{
153 fn as_any(&self) -> &dyn Any {
154 self
155 }
156
157 fn get_data_type(&self) -> &ArrowType {
159 &self.data_type
160 }
161
162 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
163 read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
164 }
165
166 fn consume_batch(&mut self) -> Result<ArrayRef> {
167 let target_type = &self.data_type;
168 let arrow_data_type = match T::get_physical_type() {
169 PhysicalType::BOOLEAN => ArrowType::Boolean,
170 PhysicalType::INT32 => {
171 match target_type {
172 ArrowType::UInt32 => {
173 ArrowType::UInt32
176 }
177 _ => ArrowType::Int32,
178 }
179 }
180 PhysicalType::INT64 => {
181 match target_type {
182 ArrowType::UInt64 => {
183 ArrowType::UInt64
186 }
187 _ => ArrowType::Int64,
188 }
189 }
190 PhysicalType::FLOAT => ArrowType::Float32,
191 PhysicalType::DOUBLE => ArrowType::Float64,
192 PhysicalType::INT96 => match target_type {
193 ArrowType::Timestamp(TimeUnit::Second, _) => target_type.clone(),
194 ArrowType::Timestamp(TimeUnit::Millisecond, _) => target_type.clone(),
195 ArrowType::Timestamp(TimeUnit::Microsecond, _) => target_type.clone(),
196 ArrowType::Timestamp(TimeUnit::Nanosecond, _) => target_type.clone(),
197 _ => unreachable!("INT96 must be a timestamp."),
198 },
199 PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
200 unreachable!("PrimitiveArrayReaders don't support complex physical types");
201 }
202 };
203
204 let record_data = self
208 .record_reader
209 .consume_record_data()
210 .into_buffer(target_type);
211
212 let array_data = ArrayDataBuilder::new(arrow_data_type)
213 .len(self.record_reader.num_values())
214 .add_buffer(record_data)
215 .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
216
217 let array_data = unsafe { array_data.build_unchecked() };
218 let array: ArrayRef = match T::get_physical_type() {
219 PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)),
220 PhysicalType::INT32 => match array_data.data_type() {
221 ArrowType::UInt32 => Arc::new(UInt32Array::from(array_data)),
222 ArrowType::Int32 => Arc::new(Int32Array::from(array_data)),
223 _ => unreachable!(),
224 },
225 PhysicalType::INT64 => match array_data.data_type() {
226 ArrowType::UInt64 => Arc::new(UInt64Array::from(array_data)),
227 ArrowType::Int64 => Arc::new(Int64Array::from(array_data)),
228 _ => unreachable!(),
229 },
230 PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)),
231 PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)),
232 PhysicalType::INT96 => match target_type {
233 ArrowType::Timestamp(TimeUnit::Second, _) => {
234 Arc::new(TimestampSecondArray::from(array_data))
235 }
236 ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
237 Arc::new(TimestampMillisecondArray::from(array_data))
238 }
239 ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
240 Arc::new(TimestampMicrosecondArray::from(array_data))
241 }
242 ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
243 Arc::new(TimestampNanosecondArray::from(array_data))
244 }
245 _ => unreachable!("INT96 must be a timestamp."),
246 },
247
248 PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
249 unreachable!("PrimitiveArrayReaders don't support complex physical types");
250 }
251 };
252
253 let array = match target_type {
264 ArrowType::Date64 if *(array.data_type()) == ArrowType::Int32 => {
265 let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
267 arrow_cast::cast(&a, target_type)?
268 }
269 ArrowType::Decimal128(p, s) => {
270 let array = match array.data_type() {
274 ArrowType::Int32 => array
275 .as_any()
276 .downcast_ref::<Int32Array>()
277 .unwrap()
278 .unary(|i| i as i128)
279 as Decimal128Array,
280 ArrowType::Int64 => array
281 .as_any()
282 .downcast_ref::<Int64Array>()
283 .unwrap()
284 .unary(|i| i as i128)
285 as Decimal128Array,
286 _ => {
287 return Err(arrow_err!(
288 "Cannot convert {:?} to decimal",
289 array.data_type()
290 ));
291 }
292 }
293 .with_precision_and_scale(*p, *s)?;
294
295 Arc::new(array) as ArrayRef
296 }
297 ArrowType::Decimal256(p, s) => {
298 let array = match array.data_type() {
300 ArrowType::Int32 => array
301 .as_any()
302 .downcast_ref::<Int32Array>()
303 .unwrap()
304 .unary(|i| i256::from_i128(i as i128))
305 as Decimal256Array,
306 ArrowType::Int64 => array
307 .as_any()
308 .downcast_ref::<Int64Array>()
309 .unwrap()
310 .unary(|i| i256::from_i128(i as i128))
311 as Decimal256Array,
312 _ => {
313 return Err(arrow_err!(
314 "Cannot convert {:?} to decimal",
315 array.data_type()
316 ));
317 }
318 }
319 .with_precision_and_scale(*p, *s)?;
320
321 Arc::new(array) as ArrayRef
322 }
323 ArrowType::Dictionary(_, value_type) => match value_type.as_ref() {
324 ArrowType::Decimal128(p, s) => {
325 let array = match array.data_type() {
326 ArrowType::Int32 => array
327 .as_any()
328 .downcast_ref::<Int32Array>()
329 .unwrap()
330 .unary(|i| i as i128)
331 as Decimal128Array,
332 ArrowType::Int64 => array
333 .as_any()
334 .downcast_ref::<Int64Array>()
335 .unwrap()
336 .unary(|i| i as i128)
337 as Decimal128Array,
338 _ => {
339 return Err(arrow_err!(
340 "Cannot convert {:?} to decimal dictionary",
341 array.data_type()
342 ));
343 }
344 }
345 .with_precision_and_scale(*p, *s)?;
346
347 arrow_cast::cast(&array, target_type)?
348 }
349 ArrowType::Decimal256(p, s) => {
350 let array = match array.data_type() {
351 ArrowType::Int32 => array
352 .as_any()
353 .downcast_ref::<Int32Array>()
354 .unwrap()
355 .unary(i256::from)
356 as Decimal256Array,
357 ArrowType::Int64 => array
358 .as_any()
359 .downcast_ref::<Int64Array>()
360 .unwrap()
361 .unary(i256::from)
362 as Decimal256Array,
363 _ => {
364 return Err(arrow_err!(
365 "Cannot convert {:?} to decimal dictionary",
366 array.data_type()
367 ));
368 }
369 }
370 .with_precision_and_scale(*p, *s)?;
371
372 arrow_cast::cast(&array, target_type)?
373 }
374 _ => arrow_cast::cast(&array, target_type)?,
375 },
376 _ => arrow_cast::cast(&array, target_type)?,
377 };
378
379 self.def_levels_buffer = self.record_reader.consume_def_levels();
381 self.rep_levels_buffer = self.record_reader.consume_rep_levels();
382 self.record_reader.reset();
383 Ok(array)
384 }
385
386 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
387 skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
388 }
389
390 fn get_def_levels(&self) -> Option<&[i16]> {
391 self.def_levels_buffer.as_deref()
392 }
393
394 fn get_rep_levels(&self) -> Option<&[i16]> {
395 self.rep_levels_buffer.as_deref()
396 }
397}
398
399#[cfg(test)]
400mod tests {
401 use super::*;
402 use crate::arrow::array_reader::test_util::EmptyPageIterator;
403 use crate::basic::Encoding;
404 use crate::column::page::Page;
405 use crate::data_type::{Int32Type, Int64Type};
406 use crate::schema::parser::parse_message_type;
407 use crate::schema::types::SchemaDescriptor;
408 use crate::util::test_common::rand_gen::make_pages;
409 use crate::util::InMemoryPageIterator;
410 use arrow::datatypes::ArrowPrimitiveType;
411 use arrow_array::{Array, Date32Array, PrimitiveArray};
412
413 use arrow::datatypes::DataType::{Date32, Decimal128};
414 use rand::distr::uniform::SampleUniform;
415 use std::collections::VecDeque;
416
417 #[allow(clippy::too_many_arguments)]
418 fn make_column_chunks<T: DataType>(
419 column_desc: ColumnDescPtr,
420 encoding: Encoding,
421 num_levels: usize,
422 min_value: T::T,
423 max_value: T::T,
424 def_levels: &mut Vec<i16>,
425 rep_levels: &mut Vec<i16>,
426 values: &mut Vec<T::T>,
427 page_lists: &mut Vec<Vec<Page>>,
428 use_v2: bool,
429 num_chunks: usize,
430 ) where
431 T::T: PartialOrd + SampleUniform + Copy,
432 {
433 for _i in 0..num_chunks {
434 let mut pages = VecDeque::new();
435 let mut data = Vec::new();
436 let mut page_def_levels = Vec::new();
437 let mut page_rep_levels = Vec::new();
438
439 make_pages::<T>(
440 column_desc.clone(),
441 encoding,
442 1,
443 num_levels,
444 min_value,
445 max_value,
446 &mut page_def_levels,
447 &mut page_rep_levels,
448 &mut data,
449 &mut pages,
450 use_v2,
451 );
452
453 def_levels.append(&mut page_def_levels);
454 rep_levels.append(&mut page_rep_levels);
455 values.append(&mut data);
456 page_lists.push(Vec::from(pages));
457 }
458 }
459
460 #[test]
461 fn test_primitive_array_reader_empty_pages() {
462 let message_type = "
464 message test_schema {
465 REQUIRED INT32 leaf;
466 }
467 ";
468
469 let schema = parse_message_type(message_type)
470 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
471 .unwrap();
472
473 let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
474 Box::<EmptyPageIterator>::default(),
475 schema.column(0),
476 None,
477 )
478 .unwrap();
479
480 let array = array_reader.next_batch(50).unwrap();
482 assert!(array.is_empty());
483 }
484
485 #[test]
486 fn test_primitive_array_reader_data() {
487 let message_type = "
489 message test_schema {
490 REQUIRED INT32 leaf;
491 }
492 ";
493
494 let schema = parse_message_type(message_type)
495 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
496 .unwrap();
497
498 let column_desc = schema.column(0);
499
500 {
502 let mut data = Vec::new();
503 let mut page_lists = Vec::new();
504 make_column_chunks::<Int32Type>(
505 column_desc.clone(),
506 Encoding::PLAIN,
507 100,
508 1,
509 200,
510 &mut Vec::new(),
511 &mut Vec::new(),
512 &mut data,
513 &mut page_lists,
514 true,
515 2,
516 );
517 let page_iterator = InMemoryPageIterator::new(page_lists);
518
519 let mut array_reader =
520 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
521 .unwrap();
522
523 let array = array_reader.next_batch(50).unwrap();
525 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
526
527 assert_eq!(&Int32Array::from(data[0..50].to_vec()), array);
528
529 let array = array_reader.next_batch(100).unwrap();
532 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
533
534 assert_eq!(&Int32Array::from(data[50..150].to_vec()), array);
535
536 let array = array_reader.next_batch(100).unwrap();
538 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
539
540 assert_eq!(&Int32Array::from(data[150..200].to_vec()), array);
541 }
542 }
543
544 macro_rules! test_primitive_array_reader_one_type {
545 (
546 $arrow_parquet_type:ty,
547 $physical_type:expr,
548 $converted_type_str:expr,
549 $result_arrow_type:ty,
550 $result_arrow_cast_type:ty,
551 $result_primitive_type:ty
552 $(, $timezone:expr)?
553 ) => {{
554 let message_type = format!(
555 "
556 message test_schema {{
557 REQUIRED {:?} leaf ({});
558 }}
559 ",
560 $physical_type, $converted_type_str
561 );
562 let schema = parse_message_type(&message_type)
563 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
564 .unwrap();
565
566 let column_desc = schema.column(0);
567
568 {
570 let mut data = Vec::new();
571 let mut page_lists = Vec::new();
572 make_column_chunks::<$arrow_parquet_type>(
573 column_desc.clone(),
574 Encoding::PLAIN,
575 100,
576 1,
577 200,
578 &mut Vec::new(),
579 &mut Vec::new(),
580 &mut data,
581 &mut page_lists,
582 true,
583 2,
584 );
585 let page_iterator = InMemoryPageIterator::new(page_lists);
586 let mut array_reader = PrimitiveArrayReader::<$arrow_parquet_type>::new(
587 Box::new(page_iterator),
588 column_desc.clone(),
589 None,
590 )
591 .expect("Unable to get array reader");
592
593 let array = array_reader
594 .next_batch(50)
595 .expect("Unable to get batch from reader");
596
597 let result_data_type = <$result_arrow_type>::DATA_TYPE;
598 let array = array
599 .as_any()
600 .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
601 .expect(
602 format!(
603 "Unable to downcast {:?} to {:?}",
604 array.data_type(),
605 result_data_type
606 )
607 .as_str(),
608 )
609 $(.clone().with_timezone($timezone))?
610 ;
611
612 let expected = PrimitiveArray::<$result_arrow_cast_type>::from(
614 data[0..50]
615 .iter()
616 .map(|x| *x as $result_primitive_type)
617 .collect::<Vec<$result_primitive_type>>(),
618 );
619 let expected = Arc::new(expected) as ArrayRef;
620 let expected = arrow::compute::cast(&expected, &result_data_type)
621 .expect("Unable to cast expected array");
622 assert_eq!(expected.data_type(), &result_data_type);
623 let expected = expected
624 .as_any()
625 .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
626 .expect(
627 format!(
628 "Unable to downcast expected {:?} to {:?}",
629 expected.data_type(),
630 result_data_type
631 )
632 .as_str(),
633 )
634 $(.clone().with_timezone($timezone))?
635 ;
636 assert_eq!(expected, array);
637 }
638 }};
639 }
640
641 #[test]
642 fn test_primitive_array_reader_temporal_types() {
643 test_primitive_array_reader_one_type!(
644 crate::data_type::Int32Type,
645 PhysicalType::INT32,
646 "DATE",
647 arrow::datatypes::Date32Type,
648 arrow::datatypes::Int32Type,
649 i32
650 );
651 test_primitive_array_reader_one_type!(
652 crate::data_type::Int32Type,
653 PhysicalType::INT32,
654 "TIME_MILLIS",
655 arrow::datatypes::Time32MillisecondType,
656 arrow::datatypes::Int32Type,
657 i32
658 );
659 test_primitive_array_reader_one_type!(
660 crate::data_type::Int64Type,
661 PhysicalType::INT64,
662 "TIME_MICROS",
663 arrow::datatypes::Time64MicrosecondType,
664 arrow::datatypes::Int64Type,
665 i64
666 );
667 test_primitive_array_reader_one_type!(
668 crate::data_type::Int64Type,
669 PhysicalType::INT64,
670 "TIMESTAMP_MILLIS",
671 arrow::datatypes::TimestampMillisecondType,
672 arrow::datatypes::Int64Type,
673 i64,
674 "UTC"
675 );
676 test_primitive_array_reader_one_type!(
677 crate::data_type::Int64Type,
678 PhysicalType::INT64,
679 "TIMESTAMP_MICROS",
680 arrow::datatypes::TimestampMicrosecondType,
681 arrow::datatypes::Int64Type,
682 i64,
683 "UTC"
684 );
685 }
686
687 #[test]
688 fn test_primitive_array_reader_def_and_rep_levels() {
689 let message_type = "
691 message test_schema {
692 REPEATED Group test_mid {
693 OPTIONAL INT32 leaf;
694 }
695 }
696 ";
697
698 let schema = parse_message_type(message_type)
699 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
700 .unwrap();
701
702 let column_desc = schema.column(0);
703
704 {
706 let mut def_levels = Vec::new();
707 let mut rep_levels = Vec::new();
708 let mut page_lists = Vec::new();
709 make_column_chunks::<Int32Type>(
710 column_desc.clone(),
711 Encoding::PLAIN,
712 100,
713 1,
714 200,
715 &mut def_levels,
716 &mut rep_levels,
717 &mut Vec::new(),
718 &mut page_lists,
719 true,
720 2,
721 );
722
723 let page_iterator = InMemoryPageIterator::new(page_lists);
724
725 let mut array_reader =
726 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
727 .unwrap();
728
729 let mut accu_len: usize = 0;
730
731 let array = array_reader.next_batch(50).unwrap();
733 assert_eq!(
734 Some(&def_levels[accu_len..(accu_len + array.len())]),
735 array_reader.get_def_levels()
736 );
737 assert_eq!(
738 Some(&rep_levels[accu_len..(accu_len + array.len())]),
739 array_reader.get_rep_levels()
740 );
741 accu_len += array.len();
742
743 let array = array_reader.next_batch(100).unwrap();
746 assert_eq!(
747 Some(&def_levels[accu_len..(accu_len + array.len())]),
748 array_reader.get_def_levels()
749 );
750 assert_eq!(
751 Some(&rep_levels[accu_len..(accu_len + array.len())]),
752 array_reader.get_rep_levels()
753 );
754 accu_len += array.len();
755
756 let array = array_reader.next_batch(100).unwrap();
758 assert_eq!(
759 Some(&def_levels[accu_len..(accu_len + array.len())]),
760 array_reader.get_def_levels()
761 );
762 assert_eq!(
763 Some(&rep_levels[accu_len..(accu_len + array.len())]),
764 array_reader.get_rep_levels()
765 );
766 }
767 }
768
769 #[test]
770 fn test_primitive_array_reader_decimal_types() {
771 let message_type = "
773 message test_schema {
774 REQUIRED INT32 decimal1 (DECIMAL(8,2));
775 }
776 ";
777 let schema = parse_message_type(message_type)
778 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
779 .unwrap();
780 let column_desc = schema.column(0);
781
782 {
784 let mut data = Vec::new();
785 let mut page_lists = Vec::new();
786 make_column_chunks::<Int32Type>(
787 column_desc.clone(),
788 Encoding::PLAIN,
789 100,
790 -99999999,
791 99999999,
792 &mut Vec::new(),
793 &mut Vec::new(),
794 &mut data,
795 &mut page_lists,
796 true,
797 2,
798 );
799 let page_iterator = InMemoryPageIterator::new(page_lists);
800
801 let mut array_reader =
802 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
803 .unwrap();
804
805 let array = array_reader.next_batch(50).unwrap();
808 assert_eq!(array.data_type(), &Decimal128(8, 2));
809 let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
810 let data_decimal_array = data[0..50]
811 .iter()
812 .copied()
813 .map(|v| Some(v as i128))
814 .collect::<Decimal128Array>()
815 .with_precision_and_scale(8, 2)
816 .unwrap();
817 assert_eq!(array, &data_decimal_array);
818
819 let data_decimal_array = data[0..50]
821 .iter()
822 .copied()
823 .map(|v| Some(v as i128))
824 .collect::<Decimal128Array>()
825 .with_precision_and_scale(9, 0)
826 .unwrap();
827 assert_ne!(array, &data_decimal_array)
828 }
829
830 let message_type = "
832 message test_schema {
833 REQUIRED INT64 decimal1 (DECIMAL(18,4));
834 }
835 ";
836 let schema = parse_message_type(message_type)
837 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
838 .unwrap();
839 let column_desc = schema.column(0);
840
841 {
843 let mut data = Vec::new();
844 let mut page_lists = Vec::new();
845 make_column_chunks::<Int64Type>(
846 column_desc.clone(),
847 Encoding::PLAIN,
848 100,
849 -999999999999999999,
850 999999999999999999,
851 &mut Vec::new(),
852 &mut Vec::new(),
853 &mut data,
854 &mut page_lists,
855 true,
856 2,
857 );
858 let page_iterator = InMemoryPageIterator::new(page_lists);
859
860 let mut array_reader =
861 PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
862 .unwrap();
863
864 let array = array_reader.next_batch(50).unwrap();
867 assert_eq!(array.data_type(), &Decimal128(18, 4));
868 let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
869 let data_decimal_array = data[0..50]
870 .iter()
871 .copied()
872 .map(|v| Some(v as i128))
873 .collect::<Decimal128Array>()
874 .with_precision_and_scale(18, 4)
875 .unwrap();
876 assert_eq!(array, &data_decimal_array);
877
878 let data_decimal_array = data[0..50]
880 .iter()
881 .copied()
882 .map(|v| Some(v as i128))
883 .collect::<Decimal128Array>()
884 .with_precision_and_scale(34, 0)
885 .unwrap();
886 assert_ne!(array, &data_decimal_array)
887 }
888 }
889
890 #[test]
891 fn test_primitive_array_reader_date32_type() {
892 let message_type = "
894 message test_schema {
895 REQUIRED INT32 date1 (DATE);
896 }
897 ";
898 let schema = parse_message_type(message_type)
899 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
900 .unwrap();
901 let column_desc = schema.column(0);
902
903 {
905 let mut data = Vec::new();
906 let mut page_lists = Vec::new();
907 make_column_chunks::<Int32Type>(
908 column_desc.clone(),
909 Encoding::PLAIN,
910 100,
911 -99999999,
912 99999999,
913 &mut Vec::new(),
914 &mut Vec::new(),
915 &mut data,
916 &mut page_lists,
917 true,
918 2,
919 );
920 let page_iterator = InMemoryPageIterator::new(page_lists);
921
922 let mut array_reader =
923 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
924 .unwrap();
925
926 let array = array_reader.next_batch(50).unwrap();
929 assert_eq!(array.data_type(), &Date32);
930 let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
931 let data_date_array = data[0..50]
932 .iter()
933 .copied()
934 .map(Some)
935 .collect::<Date32Array>();
936 assert_eq!(array, &data_date_array);
937 }
938 }
939}