1use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
19use crate::arrow::record_reader::RecordReader;
20use crate::arrow::schema::parquet_to_arrow_field;
21use crate::basic::Type as PhysicalType;
22use crate::column::page::PageIterator;
23use crate::data_type::{DataType, Int96};
24use crate::errors::{ParquetError, Result};
25use crate::schema::types::ColumnDescPtr;
26use arrow_array::{
27 builder::{
28 TimestampMicrosecondBufferBuilder, TimestampMillisecondBufferBuilder,
29 TimestampNanosecondBufferBuilder, TimestampSecondBufferBuilder,
30 },
31 ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
32 Int16Array, Int32Array, Int64Array, Int8Array, TimestampMicrosecondArray,
33 TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array,
34 UInt32Array, UInt64Array, UInt8Array,
35};
36use arrow_buffer::{i256, BooleanBuffer, Buffer};
37use arrow_data::ArrayDataBuilder;
38use arrow_schema::{DataType as ArrowType, TimeUnit};
39use std::any::Any;
40use std::sync::Arc;
41
42pub trait IntoBuffer {
44 fn into_buffer(self, target_type: &ArrowType) -> Buffer;
45}
46
47macro_rules! native_buffer {
48 ($($t:ty),*) => {
49 $(impl IntoBuffer for Vec<$t> {
50 fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
51 Buffer::from_vec(self)
52 }
53 })*
54 };
55}
56native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64);
57
58impl IntoBuffer for Vec<bool> {
59 fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
60 BooleanBuffer::from_iter(self).into_inner()
61 }
62}
63
64impl IntoBuffer for Vec<Int96> {
65 fn into_buffer(self, target_type: &ArrowType) -> Buffer {
66 match target_type {
67 ArrowType::Timestamp(TimeUnit::Second, _) => {
68 let mut builder = TimestampSecondBufferBuilder::new(self.len());
69 for v in self {
70 builder.append(v.to_seconds())
71 }
72 builder.finish()
73 }
74 ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
75 let mut builder = TimestampMillisecondBufferBuilder::new(self.len());
76 for v in self {
77 builder.append(v.to_millis())
78 }
79 builder.finish()
80 }
81 ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
82 let mut builder = TimestampMicrosecondBufferBuilder::new(self.len());
83 for v in self {
84 builder.append(v.to_micros())
85 }
86 builder.finish()
87 }
88 ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
89 let mut builder = TimestampNanosecondBufferBuilder::new(self.len());
90 for v in self {
91 builder.append(v.to_nanos())
92 }
93 builder.finish()
94 }
95 _ => unreachable!("Invalid target_type for Int96."),
96 }
97 }
98}
99
100pub struct PrimitiveArrayReader<T>
103where
104 T: DataType,
105 T::T: Copy + Default,
106 Vec<T::T>: IntoBuffer,
107{
108 data_type: ArrowType,
109 pages: Box<dyn PageIterator>,
110 def_levels_buffer: Option<Vec<i16>>,
111 rep_levels_buffer: Option<Vec<i16>>,
112 record_reader: RecordReader<T>,
113}
114
115impl<T> PrimitiveArrayReader<T>
116where
117 T: DataType,
118 T::T: Copy + Default,
119 Vec<T::T>: IntoBuffer,
120{
121 pub fn new(
123 pages: Box<dyn PageIterator>,
124 column_desc: ColumnDescPtr,
125 arrow_type: Option<ArrowType>,
126 ) -> Result<Self> {
127 let data_type = match arrow_type {
129 Some(t) => t,
130 None => parquet_to_arrow_field(column_desc.as_ref())?
131 .data_type()
132 .clone(),
133 };
134
135 let record_reader = RecordReader::<T>::new(column_desc);
136
137 Ok(Self {
138 data_type,
139 pages,
140 def_levels_buffer: None,
141 rep_levels_buffer: None,
142 record_reader,
143 })
144 }
145}
146
147impl<T> ArrayReader for PrimitiveArrayReader<T>
149where
150 T: DataType,
151 T::T: Copy + Default,
152 Vec<T::T>: IntoBuffer,
153{
154 fn as_any(&self) -> &dyn Any {
155 self
156 }
157
158 fn get_data_type(&self) -> &ArrowType {
160 &self.data_type
161 }
162
163 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
164 read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
165 }
166
167 fn consume_batch(&mut self) -> Result<ArrayRef> {
168 let target_type = &self.data_type;
169 let arrow_data_type = match T::get_physical_type() {
170 PhysicalType::BOOLEAN => ArrowType::Boolean,
171 PhysicalType::INT32 => {
172 match target_type {
173 ArrowType::UInt32 => {
174 ArrowType::UInt32
177 }
178 _ => ArrowType::Int32,
179 }
180 }
181 PhysicalType::INT64 => {
182 match target_type {
183 ArrowType::UInt64 => {
184 ArrowType::UInt64
187 }
188 _ => ArrowType::Int64,
189 }
190 }
191 PhysicalType::FLOAT => ArrowType::Float32,
192 PhysicalType::DOUBLE => ArrowType::Float64,
193 PhysicalType::INT96 => match target_type {
194 ArrowType::Timestamp(TimeUnit::Second, _) => target_type.clone(),
195 ArrowType::Timestamp(TimeUnit::Millisecond, _) => target_type.clone(),
196 ArrowType::Timestamp(TimeUnit::Microsecond, _) => target_type.clone(),
197 ArrowType::Timestamp(TimeUnit::Nanosecond, _) => target_type.clone(),
198 _ => unreachable!("INT96 must be a timestamp."),
199 },
200 PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
201 unreachable!("PrimitiveArrayReaders don't support complex physical types");
202 }
203 };
204
205 let record_data = self
209 .record_reader
210 .consume_record_data()
211 .into_buffer(target_type);
212
213 let array_data = ArrayDataBuilder::new(arrow_data_type)
214 .len(self.record_reader.num_values())
215 .add_buffer(record_data)
216 .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
217
218 let array_data = unsafe { array_data.build_unchecked() };
219 let array: ArrayRef = match T::get_physical_type() {
220 PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)),
221 PhysicalType::INT32 => match array_data.data_type() {
222 ArrowType::UInt32 => Arc::new(UInt32Array::from(array_data)),
223 ArrowType::Int32 => Arc::new(Int32Array::from(array_data)),
224 _ => unreachable!(),
225 },
226 PhysicalType::INT64 => match array_data.data_type() {
227 ArrowType::UInt64 => Arc::new(UInt64Array::from(array_data)),
228 ArrowType::Int64 => Arc::new(Int64Array::from(array_data)),
229 _ => unreachable!(),
230 },
231 PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)),
232 PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)),
233 PhysicalType::INT96 => match target_type {
234 ArrowType::Timestamp(TimeUnit::Second, _) => {
235 Arc::new(TimestampSecondArray::from(array_data))
236 }
237 ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
238 Arc::new(TimestampMillisecondArray::from(array_data))
239 }
240 ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
241 Arc::new(TimestampMicrosecondArray::from(array_data))
242 }
243 ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
244 Arc::new(TimestampNanosecondArray::from(array_data))
245 }
246 _ => unreachable!("INT96 must be a timestamp."),
247 },
248
249 PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
250 unreachable!("PrimitiveArrayReaders don't support complex physical types");
251 }
252 };
253
254 let array = match target_type {
265 ArrowType::UInt8 if *(array.data_type()) == ArrowType::Int32 => {
273 let array = array
274 .as_any()
275 .downcast_ref::<Int32Array>()
276 .unwrap()
277 .unary(|i| i as u8) as UInt8Array;
278 Arc::new(array) as ArrayRef
279 }
280 ArrowType::Int8 if *(array.data_type()) == ArrowType::Int32 => {
281 let array = array
282 .as_any()
283 .downcast_ref::<Int32Array>()
284 .unwrap()
285 .unary(|i| i as i8) as Int8Array;
286 Arc::new(array) as ArrayRef
287 }
288 ArrowType::UInt16 if *(array.data_type()) == ArrowType::Int32 => {
289 let array = array
290 .as_any()
291 .downcast_ref::<Int32Array>()
292 .unwrap()
293 .unary(|i| i as u16) as UInt16Array;
294 Arc::new(array) as ArrayRef
295 }
296 ArrowType::Int16 if *(array.data_type()) == ArrowType::Int32 => {
297 let array = array
298 .as_any()
299 .downcast_ref::<Int32Array>()
300 .unwrap()
301 .unary(|i| i as i16) as Int16Array;
302 Arc::new(array) as ArrayRef
303 }
304 ArrowType::Date64 if *(array.data_type()) == ArrowType::Int32 => {
305 let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
307 arrow_cast::cast(&a, target_type)?
308 }
309 ArrowType::Decimal128(p, s) => {
310 let array = match array.data_type() {
314 ArrowType::Int32 => array
315 .as_any()
316 .downcast_ref::<Int32Array>()
317 .unwrap()
318 .unary(|i| i as i128)
319 as Decimal128Array,
320 ArrowType::Int64 => array
321 .as_any()
322 .downcast_ref::<Int64Array>()
323 .unwrap()
324 .unary(|i| i as i128)
325 as Decimal128Array,
326 _ => {
327 return Err(arrow_err!(
328 "Cannot convert {:?} to decimal",
329 array.data_type()
330 ));
331 }
332 }
333 .with_precision_and_scale(*p, *s)?;
334
335 Arc::new(array) as ArrayRef
336 }
337 ArrowType::Decimal256(p, s) => {
338 let array = match array.data_type() {
340 ArrowType::Int32 => array
341 .as_any()
342 .downcast_ref::<Int32Array>()
343 .unwrap()
344 .unary(|i| i256::from_i128(i as i128))
345 as Decimal256Array,
346 ArrowType::Int64 => array
347 .as_any()
348 .downcast_ref::<Int64Array>()
349 .unwrap()
350 .unary(|i| i256::from_i128(i as i128))
351 as Decimal256Array,
352 _ => {
353 return Err(arrow_err!(
354 "Cannot convert {:?} to decimal",
355 array.data_type()
356 ));
357 }
358 }
359 .with_precision_and_scale(*p, *s)?;
360
361 Arc::new(array) as ArrayRef
362 }
363 ArrowType::Dictionary(_, value_type) => match value_type.as_ref() {
364 ArrowType::Decimal128(p, s) => {
365 let array = match array.data_type() {
366 ArrowType::Int32 => array
367 .as_any()
368 .downcast_ref::<Int32Array>()
369 .unwrap()
370 .unary(|i| i as i128)
371 as Decimal128Array,
372 ArrowType::Int64 => array
373 .as_any()
374 .downcast_ref::<Int64Array>()
375 .unwrap()
376 .unary(|i| i as i128)
377 as Decimal128Array,
378 _ => {
379 return Err(arrow_err!(
380 "Cannot convert {:?} to decimal dictionary",
381 array.data_type()
382 ));
383 }
384 }
385 .with_precision_and_scale(*p, *s)?;
386
387 arrow_cast::cast(&array, target_type)?
388 }
389 ArrowType::Decimal256(p, s) => {
390 let array = match array.data_type() {
391 ArrowType::Int32 => array
392 .as_any()
393 .downcast_ref::<Int32Array>()
394 .unwrap()
395 .unary(i256::from)
396 as Decimal256Array,
397 ArrowType::Int64 => array
398 .as_any()
399 .downcast_ref::<Int64Array>()
400 .unwrap()
401 .unary(i256::from)
402 as Decimal256Array,
403 _ => {
404 return Err(arrow_err!(
405 "Cannot convert {:?} to decimal dictionary",
406 array.data_type()
407 ));
408 }
409 }
410 .with_precision_and_scale(*p, *s)?;
411
412 arrow_cast::cast(&array, target_type)?
413 }
414 _ => arrow_cast::cast(&array, target_type)?,
415 },
416 _ => arrow_cast::cast(&array, target_type)?,
417 };
418
419 self.def_levels_buffer = self.record_reader.consume_def_levels();
421 self.rep_levels_buffer = self.record_reader.consume_rep_levels();
422 self.record_reader.reset();
423 Ok(array)
424 }
425
426 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
427 skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
428 }
429
430 fn get_def_levels(&self) -> Option<&[i16]> {
431 self.def_levels_buffer.as_deref()
432 }
433
434 fn get_rep_levels(&self) -> Option<&[i16]> {
435 self.rep_levels_buffer.as_deref()
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442 use crate::arrow::array_reader::test_util::EmptyPageIterator;
443 use crate::basic::Encoding;
444 use crate::column::page::Page;
445 use crate::data_type::{Int32Type, Int64Type};
446 use crate::schema::parser::parse_message_type;
447 use crate::schema::types::SchemaDescriptor;
448 use crate::util::test_common::rand_gen::make_pages;
449 use crate::util::InMemoryPageIterator;
450 use arrow::datatypes::ArrowPrimitiveType;
451 use arrow_array::{Array, Date32Array, PrimitiveArray};
452
453 use arrow::datatypes::DataType::{Date32, Decimal128};
454 use rand::distr::uniform::SampleUniform;
455 use std::collections::VecDeque;
456
457 #[allow(clippy::too_many_arguments)]
458 fn make_column_chunks<T: DataType>(
459 column_desc: ColumnDescPtr,
460 encoding: Encoding,
461 num_levels: usize,
462 min_value: T::T,
463 max_value: T::T,
464 def_levels: &mut Vec<i16>,
465 rep_levels: &mut Vec<i16>,
466 values: &mut Vec<T::T>,
467 page_lists: &mut Vec<Vec<Page>>,
468 use_v2: bool,
469 num_chunks: usize,
470 ) where
471 T::T: PartialOrd + SampleUniform + Copy,
472 {
473 for _i in 0..num_chunks {
474 let mut pages = VecDeque::new();
475 let mut data = Vec::new();
476 let mut page_def_levels = Vec::new();
477 let mut page_rep_levels = Vec::new();
478
479 make_pages::<T>(
480 column_desc.clone(),
481 encoding,
482 1,
483 num_levels,
484 min_value,
485 max_value,
486 &mut page_def_levels,
487 &mut page_rep_levels,
488 &mut data,
489 &mut pages,
490 use_v2,
491 );
492
493 def_levels.append(&mut page_def_levels);
494 rep_levels.append(&mut page_rep_levels);
495 values.append(&mut data);
496 page_lists.push(Vec::from(pages));
497 }
498 }
499
500 #[test]
501 fn test_primitive_array_reader_empty_pages() {
502 let message_type = "
504 message test_schema {
505 REQUIRED INT32 leaf;
506 }
507 ";
508
509 let schema = parse_message_type(message_type)
510 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
511 .unwrap();
512
513 let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
514 Box::<EmptyPageIterator>::default(),
515 schema.column(0),
516 None,
517 )
518 .unwrap();
519
520 let array = array_reader.next_batch(50).unwrap();
522 assert!(array.is_empty());
523 }
524
525 #[test]
526 fn test_primitive_array_reader_data() {
527 let message_type = "
529 message test_schema {
530 REQUIRED INT32 leaf;
531 }
532 ";
533
534 let schema = parse_message_type(message_type)
535 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
536 .unwrap();
537
538 let column_desc = schema.column(0);
539
540 {
542 let mut data = Vec::new();
543 let mut page_lists = Vec::new();
544 make_column_chunks::<Int32Type>(
545 column_desc.clone(),
546 Encoding::PLAIN,
547 100,
548 1,
549 200,
550 &mut Vec::new(),
551 &mut Vec::new(),
552 &mut data,
553 &mut page_lists,
554 true,
555 2,
556 );
557 let page_iterator = InMemoryPageIterator::new(page_lists);
558
559 let mut array_reader =
560 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
561 .unwrap();
562
563 let array = array_reader.next_batch(50).unwrap();
565 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
566
567 assert_eq!(&Int32Array::from(data[0..50].to_vec()), array);
568
569 let array = array_reader.next_batch(100).unwrap();
572 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
573
574 assert_eq!(&Int32Array::from(data[50..150].to_vec()), array);
575
576 let array = array_reader.next_batch(100).unwrap();
578 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
579
580 assert_eq!(&Int32Array::from(data[150..200].to_vec()), array);
581 }
582 }
583
584 macro_rules! test_primitive_array_reader_one_type {
585 (
586 $arrow_parquet_type:ty,
587 $physical_type:expr,
588 $converted_type_str:expr,
589 $result_arrow_type:ty,
590 $result_arrow_cast_type:ty,
591 $result_primitive_type:ty
592 $(, $timezone:expr)?
593 ) => {{
594 let message_type = format!(
595 "
596 message test_schema {{
597 REQUIRED {:?} leaf ({});
598 }}
599 ",
600 $physical_type, $converted_type_str
601 );
602 let schema = parse_message_type(&message_type)
603 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
604 .unwrap();
605
606 let column_desc = schema.column(0);
607
608 {
610 let mut data = Vec::new();
611 let mut page_lists = Vec::new();
612 make_column_chunks::<$arrow_parquet_type>(
613 column_desc.clone(),
614 Encoding::PLAIN,
615 100,
616 1,
617 200,
618 &mut Vec::new(),
619 &mut Vec::new(),
620 &mut data,
621 &mut page_lists,
622 true,
623 2,
624 );
625 let page_iterator = InMemoryPageIterator::new(page_lists);
626 let mut array_reader = PrimitiveArrayReader::<$arrow_parquet_type>::new(
627 Box::new(page_iterator),
628 column_desc.clone(),
629 None,
630 )
631 .expect("Unable to get array reader");
632
633 let array = array_reader
634 .next_batch(50)
635 .expect("Unable to get batch from reader");
636
637 let result_data_type = <$result_arrow_type>::DATA_TYPE;
638 let array = array
639 .as_any()
640 .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
641 .expect(
642 format!(
643 "Unable to downcast {:?} to {:?}",
644 array.data_type(),
645 result_data_type
646 )
647 .as_str(),
648 )
649 $(.clone().with_timezone($timezone))?
650 ;
651
652 let expected = PrimitiveArray::<$result_arrow_cast_type>::from(
654 data[0..50]
655 .iter()
656 .map(|x| *x as $result_primitive_type)
657 .collect::<Vec<$result_primitive_type>>(),
658 );
659 let expected = Arc::new(expected) as ArrayRef;
660 let expected = arrow::compute::cast(&expected, &result_data_type)
661 .expect("Unable to cast expected array");
662 assert_eq!(expected.data_type(), &result_data_type);
663 let expected = expected
664 .as_any()
665 .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
666 .expect(
667 format!(
668 "Unable to downcast expected {:?} to {:?}",
669 expected.data_type(),
670 result_data_type
671 )
672 .as_str(),
673 )
674 $(.clone().with_timezone($timezone))?
675 ;
676 assert_eq!(expected, array);
677 }
678 }};
679 }
680
681 #[test]
682 fn test_primitive_array_reader_temporal_types() {
683 test_primitive_array_reader_one_type!(
684 crate::data_type::Int32Type,
685 PhysicalType::INT32,
686 "DATE",
687 arrow::datatypes::Date32Type,
688 arrow::datatypes::Int32Type,
689 i32
690 );
691 test_primitive_array_reader_one_type!(
692 crate::data_type::Int32Type,
693 PhysicalType::INT32,
694 "TIME_MILLIS",
695 arrow::datatypes::Time32MillisecondType,
696 arrow::datatypes::Int32Type,
697 i32
698 );
699 test_primitive_array_reader_one_type!(
700 crate::data_type::Int64Type,
701 PhysicalType::INT64,
702 "TIME_MICROS",
703 arrow::datatypes::Time64MicrosecondType,
704 arrow::datatypes::Int64Type,
705 i64
706 );
707 test_primitive_array_reader_one_type!(
708 crate::data_type::Int64Type,
709 PhysicalType::INT64,
710 "TIMESTAMP_MILLIS",
711 arrow::datatypes::TimestampMillisecondType,
712 arrow::datatypes::Int64Type,
713 i64,
714 "UTC"
715 );
716 test_primitive_array_reader_one_type!(
717 crate::data_type::Int64Type,
718 PhysicalType::INT64,
719 "TIMESTAMP_MICROS",
720 arrow::datatypes::TimestampMicrosecondType,
721 arrow::datatypes::Int64Type,
722 i64,
723 "UTC"
724 );
725 }
726
727 #[test]
728 fn test_primitive_array_reader_def_and_rep_levels() {
729 let message_type = "
731 message test_schema {
732 REPEATED Group test_mid {
733 OPTIONAL INT32 leaf;
734 }
735 }
736 ";
737
738 let schema = parse_message_type(message_type)
739 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
740 .unwrap();
741
742 let column_desc = schema.column(0);
743
744 {
746 let mut def_levels = Vec::new();
747 let mut rep_levels = Vec::new();
748 let mut page_lists = Vec::new();
749 make_column_chunks::<Int32Type>(
750 column_desc.clone(),
751 Encoding::PLAIN,
752 100,
753 1,
754 200,
755 &mut def_levels,
756 &mut rep_levels,
757 &mut Vec::new(),
758 &mut page_lists,
759 true,
760 2,
761 );
762
763 let page_iterator = InMemoryPageIterator::new(page_lists);
764
765 let mut array_reader =
766 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
767 .unwrap();
768
769 let mut accu_len: usize = 0;
770
771 let array = array_reader.next_batch(50).unwrap();
773 assert_eq!(
774 Some(&def_levels[accu_len..(accu_len + array.len())]),
775 array_reader.get_def_levels()
776 );
777 assert_eq!(
778 Some(&rep_levels[accu_len..(accu_len + array.len())]),
779 array_reader.get_rep_levels()
780 );
781 accu_len += array.len();
782
783 let array = array_reader.next_batch(100).unwrap();
786 assert_eq!(
787 Some(&def_levels[accu_len..(accu_len + array.len())]),
788 array_reader.get_def_levels()
789 );
790 assert_eq!(
791 Some(&rep_levels[accu_len..(accu_len + array.len())]),
792 array_reader.get_rep_levels()
793 );
794 accu_len += array.len();
795
796 let array = array_reader.next_batch(100).unwrap();
798 assert_eq!(
799 Some(&def_levels[accu_len..(accu_len + array.len())]),
800 array_reader.get_def_levels()
801 );
802 assert_eq!(
803 Some(&rep_levels[accu_len..(accu_len + array.len())]),
804 array_reader.get_rep_levels()
805 );
806 }
807 }
808
809 #[test]
810 fn test_primitive_array_reader_decimal_types() {
811 let message_type = "
813 message test_schema {
814 REQUIRED INT32 decimal1 (DECIMAL(8,2));
815 }
816 ";
817 let schema = parse_message_type(message_type)
818 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
819 .unwrap();
820 let column_desc = schema.column(0);
821
822 {
824 let mut data = Vec::new();
825 let mut page_lists = Vec::new();
826 make_column_chunks::<Int32Type>(
827 column_desc.clone(),
828 Encoding::PLAIN,
829 100,
830 -99999999,
831 99999999,
832 &mut Vec::new(),
833 &mut Vec::new(),
834 &mut data,
835 &mut page_lists,
836 true,
837 2,
838 );
839 let page_iterator = InMemoryPageIterator::new(page_lists);
840
841 let mut array_reader =
842 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
843 .unwrap();
844
845 let array = array_reader.next_batch(50).unwrap();
848 assert_eq!(array.data_type(), &Decimal128(8, 2));
849 let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
850 let data_decimal_array = data[0..50]
851 .iter()
852 .copied()
853 .map(|v| Some(v as i128))
854 .collect::<Decimal128Array>()
855 .with_precision_and_scale(8, 2)
856 .unwrap();
857 assert_eq!(array, &data_decimal_array);
858
859 let data_decimal_array = data[0..50]
861 .iter()
862 .copied()
863 .map(|v| Some(v as i128))
864 .collect::<Decimal128Array>()
865 .with_precision_and_scale(9, 0)
866 .unwrap();
867 assert_ne!(array, &data_decimal_array)
868 }
869
870 let message_type = "
872 message test_schema {
873 REQUIRED INT64 decimal1 (DECIMAL(18,4));
874 }
875 ";
876 let schema = parse_message_type(message_type)
877 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
878 .unwrap();
879 let column_desc = schema.column(0);
880
881 {
883 let mut data = Vec::new();
884 let mut page_lists = Vec::new();
885 make_column_chunks::<Int64Type>(
886 column_desc.clone(),
887 Encoding::PLAIN,
888 100,
889 -999999999999999999,
890 999999999999999999,
891 &mut Vec::new(),
892 &mut Vec::new(),
893 &mut data,
894 &mut page_lists,
895 true,
896 2,
897 );
898 let page_iterator = InMemoryPageIterator::new(page_lists);
899
900 let mut array_reader =
901 PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
902 .unwrap();
903
904 let array = array_reader.next_batch(50).unwrap();
907 assert_eq!(array.data_type(), &Decimal128(18, 4));
908 let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
909 let data_decimal_array = data[0..50]
910 .iter()
911 .copied()
912 .map(|v| Some(v as i128))
913 .collect::<Decimal128Array>()
914 .with_precision_and_scale(18, 4)
915 .unwrap();
916 assert_eq!(array, &data_decimal_array);
917
918 let data_decimal_array = data[0..50]
920 .iter()
921 .copied()
922 .map(|v| Some(v as i128))
923 .collect::<Decimal128Array>()
924 .with_precision_and_scale(34, 0)
925 .unwrap();
926 assert_ne!(array, &data_decimal_array)
927 }
928 }
929
930 #[test]
931 fn test_primitive_array_reader_date32_type() {
932 let message_type = "
934 message test_schema {
935 REQUIRED INT32 date1 (DATE);
936 }
937 ";
938 let schema = parse_message_type(message_type)
939 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
940 .unwrap();
941 let column_desc = schema.column(0);
942
943 {
945 let mut data = Vec::new();
946 let mut page_lists = Vec::new();
947 make_column_chunks::<Int32Type>(
948 column_desc.clone(),
949 Encoding::PLAIN,
950 100,
951 -99999999,
952 99999999,
953 &mut Vec::new(),
954 &mut Vec::new(),
955 &mut data,
956 &mut page_lists,
957 true,
958 2,
959 );
960 let page_iterator = InMemoryPageIterator::new(page_lists);
961
962 let mut array_reader =
963 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
964 .unwrap();
965
966 let array = array_reader.next_batch(50).unwrap();
969 assert_eq!(array.data_type(), &Date32);
970 let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
971 let data_date_array = data[0..50]
972 .iter()
973 .copied()
974 .map(Some)
975 .collect::<Date32Array>();
976 assert_eq!(array, &data_date_array);
977 }
978 }
979}