1use crate::arrow::array_reader::{ArrayReader, read_records, skip_records};
19use crate::arrow::record_reader::RecordReader;
20use crate::arrow::schema::parquet_to_arrow_field;
21use crate::basic::Type as PhysicalType;
22use crate::column::page::PageIterator;
23use crate::data_type::{DataType, Int96};
24use crate::errors::{ParquetError, Result};
25use crate::schema::types::ColumnDescPtr;
26use arrow_array::{
27 ArrayRef, BooleanArray, Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array,
28 Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array,
29 TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
30 TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
31 builder::{
32 TimestampMicrosecondBufferBuilder, TimestampMillisecondBufferBuilder,
33 TimestampNanosecondBufferBuilder, TimestampSecondBufferBuilder,
34 },
35};
36use arrow_buffer::{BooleanBuffer, Buffer, i256};
37use arrow_data::ArrayDataBuilder;
38use arrow_schema::{DataType as ArrowType, TimeUnit};
39use std::any::Any;
40use std::sync::Arc;
41
42pub trait IntoBuffer {
44 fn into_buffer(self, target_type: &ArrowType) -> Buffer;
45}
46
47macro_rules! native_buffer {
48 ($($t:ty),*) => {
49 $(impl IntoBuffer for Vec<$t> {
50 fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
51 Buffer::from_vec(self)
52 }
53 })*
54 };
55}
56native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64);
57
58impl IntoBuffer for Vec<bool> {
59 fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
60 BooleanBuffer::from_iter(self).into_inner()
61 }
62}
63
64impl IntoBuffer for Vec<Int96> {
65 fn into_buffer(self, target_type: &ArrowType) -> Buffer {
66 match target_type {
67 ArrowType::Timestamp(TimeUnit::Second, _) => {
68 let mut builder = TimestampSecondBufferBuilder::new(self.len());
69 for v in self {
70 builder.append(v.to_seconds())
71 }
72 builder.finish()
73 }
74 ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
75 let mut builder = TimestampMillisecondBufferBuilder::new(self.len());
76 for v in self {
77 builder.append(v.to_millis())
78 }
79 builder.finish()
80 }
81 ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
82 let mut builder = TimestampMicrosecondBufferBuilder::new(self.len());
83 for v in self {
84 builder.append(v.to_micros())
85 }
86 builder.finish()
87 }
88 ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
89 let mut builder = TimestampNanosecondBufferBuilder::new(self.len());
90 for v in self {
91 builder.append(v.to_nanos())
92 }
93 builder.finish()
94 }
95 _ => unreachable!("Invalid target_type for Int96."),
96 }
97 }
98}
99
100pub struct PrimitiveArrayReader<T>
103where
104 T: DataType,
105 T::T: Copy + Default,
106 Vec<T::T>: IntoBuffer,
107{
108 data_type: ArrowType,
109 pages: Box<dyn PageIterator>,
110 def_levels_buffer: Option<Vec<i16>>,
111 rep_levels_buffer: Option<Vec<i16>>,
112 record_reader: RecordReader<T>,
113}
114
115impl<T> PrimitiveArrayReader<T>
116where
117 T: DataType,
118 T::T: Copy + Default,
119 Vec<T::T>: IntoBuffer,
120{
121 pub fn new(
123 pages: Box<dyn PageIterator>,
124 column_desc: ColumnDescPtr,
125 arrow_type: Option<ArrowType>,
126 ) -> Result<Self> {
127 let data_type = match arrow_type {
129 Some(t) => t,
130 None => parquet_to_arrow_field(column_desc.as_ref())?
131 .data_type()
132 .clone(),
133 };
134
135 let record_reader = RecordReader::<T>::new(column_desc);
136
137 Ok(Self {
138 data_type,
139 pages,
140 def_levels_buffer: None,
141 rep_levels_buffer: None,
142 record_reader,
143 })
144 }
145}
146
147impl<T> ArrayReader for PrimitiveArrayReader<T>
149where
150 T: DataType,
151 T::T: Copy + Default,
152 Vec<T::T>: IntoBuffer,
153{
154 fn as_any(&self) -> &dyn Any {
155 self
156 }
157
158 fn get_data_type(&self) -> &ArrowType {
160 &self.data_type
161 }
162
163 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
164 read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
165 }
166
167 fn consume_batch(&mut self) -> Result<ArrayRef> {
168 let target_type = &self.data_type;
169 let arrow_data_type = match T::get_physical_type() {
170 PhysicalType::BOOLEAN => ArrowType::Boolean,
171 PhysicalType::INT32 => {
172 match target_type {
173 ArrowType::UInt32 => {
174 ArrowType::UInt32
177 }
178 ArrowType::Decimal32(_, _) => target_type.clone(),
179 _ => ArrowType::Int32,
180 }
181 }
182 PhysicalType::INT64 => {
183 match target_type {
184 ArrowType::UInt64 => {
185 ArrowType::UInt64
188 }
189 ArrowType::Decimal64(_, _) => target_type.clone(),
190 _ => ArrowType::Int64,
191 }
192 }
193 PhysicalType::FLOAT => ArrowType::Float32,
194 PhysicalType::DOUBLE => ArrowType::Float64,
195 PhysicalType::INT96 => match target_type {
196 ArrowType::Timestamp(TimeUnit::Second, _) => target_type.clone(),
197 ArrowType::Timestamp(TimeUnit::Millisecond, _) => target_type.clone(),
198 ArrowType::Timestamp(TimeUnit::Microsecond, _) => target_type.clone(),
199 ArrowType::Timestamp(TimeUnit::Nanosecond, _) => target_type.clone(),
200 _ => unreachable!("INT96 must be a timestamp."),
201 },
202 PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
203 unreachable!("PrimitiveArrayReaders don't support complex physical types");
204 }
205 };
206
207 let record_data = self
211 .record_reader
212 .consume_record_data()
213 .into_buffer(target_type);
214
215 let array_data = ArrayDataBuilder::new(arrow_data_type)
216 .len(self.record_reader.num_values())
217 .add_buffer(record_data)
218 .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
219
220 let array_data = unsafe { array_data.build_unchecked() };
221 let array: ArrayRef = match T::get_physical_type() {
222 PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)),
223 PhysicalType::INT32 => match array_data.data_type() {
224 ArrowType::UInt32 => Arc::new(UInt32Array::from(array_data)),
225 ArrowType::Int32 => Arc::new(Int32Array::from(array_data)),
226 ArrowType::Decimal32(_, _) => Arc::new(Decimal32Array::from(array_data)),
227 _ => unreachable!(),
228 },
229 PhysicalType::INT64 => match array_data.data_type() {
230 ArrowType::UInt64 => Arc::new(UInt64Array::from(array_data)),
231 ArrowType::Int64 => Arc::new(Int64Array::from(array_data)),
232 ArrowType::Decimal64(_, _) => Arc::new(Decimal64Array::from(array_data)),
233 _ => unreachable!(),
234 },
235 PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)),
236 PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)),
237 PhysicalType::INT96 => match target_type {
238 ArrowType::Timestamp(TimeUnit::Second, _) => {
239 Arc::new(TimestampSecondArray::from(array_data))
240 }
241 ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
242 Arc::new(TimestampMillisecondArray::from(array_data))
243 }
244 ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
245 Arc::new(TimestampMicrosecondArray::from(array_data))
246 }
247 ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
248 Arc::new(TimestampNanosecondArray::from(array_data))
249 }
250 _ => unreachable!("INT96 must be a timestamp."),
251 },
252
253 PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
254 unreachable!("PrimitiveArrayReaders don't support complex physical types");
255 }
256 };
257
258 let array = match target_type {
269 ArrowType::UInt8 if *(array.data_type()) == ArrowType::Int32 => {
277 let array = array
278 .as_any()
279 .downcast_ref::<Int32Array>()
280 .unwrap()
281 .unary(|i| i as u8) as UInt8Array;
282 Arc::new(array) as ArrayRef
283 }
284 ArrowType::Int8 if *(array.data_type()) == ArrowType::Int32 => {
285 let array = array
286 .as_any()
287 .downcast_ref::<Int32Array>()
288 .unwrap()
289 .unary(|i| i as i8) as Int8Array;
290 Arc::new(array) as ArrayRef
291 }
292 ArrowType::UInt16 if *(array.data_type()) == ArrowType::Int32 => {
293 let array = array
294 .as_any()
295 .downcast_ref::<Int32Array>()
296 .unwrap()
297 .unary(|i| i as u16) as UInt16Array;
298 Arc::new(array) as ArrayRef
299 }
300 ArrowType::Int16 if *(array.data_type()) == ArrowType::Int32 => {
301 let array = array
302 .as_any()
303 .downcast_ref::<Int32Array>()
304 .unwrap()
305 .unary(|i| i as i16) as Int16Array;
306 Arc::new(array) as ArrayRef
307 }
308 ArrowType::Date64 if *(array.data_type()) == ArrowType::Int32 => {
309 let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
311 arrow_cast::cast(&a, target_type)?
312 }
313 ArrowType::Decimal64(p, s) if *(array.data_type()) == ArrowType::Int32 => {
314 let array = match array.data_type() {
318 ArrowType::Int32 => array
319 .as_any()
320 .downcast_ref::<Int32Array>()
321 .unwrap()
322 .unary(|i| i as i64)
323 as Decimal64Array,
324 _ => {
325 return Err(arrow_err!(
326 "Cannot convert {:?} to decimal",
327 array.data_type()
328 ));
329 }
330 }
331 .with_precision_and_scale(*p, *s)?;
332
333 Arc::new(array) as ArrayRef
334 }
335 ArrowType::Decimal128(p, s) => {
336 let array = match array.data_type() {
338 ArrowType::Int32 => array
339 .as_any()
340 .downcast_ref::<Int32Array>()
341 .unwrap()
342 .unary(|i| i as i128)
343 as Decimal128Array,
344 ArrowType::Int64 => array
345 .as_any()
346 .downcast_ref::<Int64Array>()
347 .unwrap()
348 .unary(|i| i as i128)
349 as Decimal128Array,
350 _ => {
351 return Err(arrow_err!(
352 "Cannot convert {:?} to decimal",
353 array.data_type()
354 ));
355 }
356 }
357 .with_precision_and_scale(*p, *s)?;
358
359 Arc::new(array) as ArrayRef
360 }
361 ArrowType::Decimal256(p, s) => {
362 let array = match array.data_type() {
364 ArrowType::Int32 => array
365 .as_any()
366 .downcast_ref::<Int32Array>()
367 .unwrap()
368 .unary(|i| i256::from_i128(i as i128))
369 as Decimal256Array,
370 ArrowType::Int64 => array
371 .as_any()
372 .downcast_ref::<Int64Array>()
373 .unwrap()
374 .unary(|i| i256::from_i128(i as i128))
375 as Decimal256Array,
376 _ => {
377 return Err(arrow_err!(
378 "Cannot convert {:?} to decimal",
379 array.data_type()
380 ));
381 }
382 }
383 .with_precision_and_scale(*p, *s)?;
384
385 Arc::new(array) as ArrayRef
386 }
387 ArrowType::Dictionary(_, value_type) => match value_type.as_ref() {
388 ArrowType::Decimal32(p, s) => {
389 let array = match array.data_type() {
390 ArrowType::Int32 => array
391 .as_any()
392 .downcast_ref::<Int32Array>()
393 .unwrap()
394 .unary(|i| i)
395 as Decimal32Array,
396 _ => {
397 return Err(arrow_err!(
398 "Cannot convert {:?} to decimal dictionary",
399 array.data_type()
400 ));
401 }
402 }
403 .with_precision_and_scale(*p, *s)?;
404
405 arrow_cast::cast(&array, target_type)?
406 }
407 ArrowType::Decimal64(p, s) => {
408 let array = match array.data_type() {
409 ArrowType::Int32 => array
410 .as_any()
411 .downcast_ref::<Int32Array>()
412 .unwrap()
413 .unary(|i| i as i64)
414 as Decimal64Array,
415 ArrowType::Int64 => array
416 .as_any()
417 .downcast_ref::<Int64Array>()
418 .unwrap()
419 .unary(|i| i)
420 as Decimal64Array,
421 _ => {
422 return Err(arrow_err!(
423 "Cannot convert {:?} to decimal dictionary",
424 array.data_type()
425 ));
426 }
427 }
428 .with_precision_and_scale(*p, *s)?;
429
430 arrow_cast::cast(&array, target_type)?
431 }
432 ArrowType::Decimal128(p, s) => {
433 let array = match array.data_type() {
434 ArrowType::Int32 => array
435 .as_any()
436 .downcast_ref::<Int32Array>()
437 .unwrap()
438 .unary(|i| i as i128)
439 as Decimal128Array,
440 ArrowType::Int64 => array
441 .as_any()
442 .downcast_ref::<Int64Array>()
443 .unwrap()
444 .unary(|i| i as i128)
445 as Decimal128Array,
446 _ => {
447 return Err(arrow_err!(
448 "Cannot convert {:?} to decimal dictionary",
449 array.data_type()
450 ));
451 }
452 }
453 .with_precision_and_scale(*p, *s)?;
454
455 arrow_cast::cast(&array, target_type)?
456 }
457 ArrowType::Decimal256(p, s) => {
458 let array = match array.data_type() {
459 ArrowType::Int32 => array
460 .as_any()
461 .downcast_ref::<Int32Array>()
462 .unwrap()
463 .unary(i256::from)
464 as Decimal256Array,
465 ArrowType::Int64 => array
466 .as_any()
467 .downcast_ref::<Int64Array>()
468 .unwrap()
469 .unary(i256::from)
470 as Decimal256Array,
471 _ => {
472 return Err(arrow_err!(
473 "Cannot convert {:?} to decimal dictionary",
474 array.data_type()
475 ));
476 }
477 }
478 .with_precision_and_scale(*p, *s)?;
479
480 arrow_cast::cast(&array, target_type)?
481 }
482 _ => arrow_cast::cast(&array, target_type)?,
483 },
484 _ => arrow_cast::cast(&array, target_type)?,
485 };
486
487 self.def_levels_buffer = self.record_reader.consume_def_levels();
489 self.rep_levels_buffer = self.record_reader.consume_rep_levels();
490 self.record_reader.reset();
491 Ok(array)
492 }
493
494 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
495 skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
496 }
497
498 fn get_def_levels(&self) -> Option<&[i16]> {
499 self.def_levels_buffer.as_deref()
500 }
501
502 fn get_rep_levels(&self) -> Option<&[i16]> {
503 self.rep_levels_buffer.as_deref()
504 }
505}
506
507#[cfg(test)]
508mod tests {
509 use super::*;
510 use crate::arrow::array_reader::test_util::EmptyPageIterator;
511 use crate::basic::Encoding;
512 use crate::column::page::Page;
513 use crate::data_type::{Int32Type, Int64Type};
514 use crate::schema::parser::parse_message_type;
515 use crate::schema::types::SchemaDescriptor;
516 use crate::util::InMemoryPageIterator;
517 use crate::util::test_common::rand_gen::make_pages;
518 use arrow::datatypes::ArrowPrimitiveType;
519 use arrow_array::{Array, Date32Array, PrimitiveArray};
520
521 use arrow::datatypes::DataType::{Date32, Decimal128};
522 use rand::distr::uniform::SampleUniform;
523 use std::collections::VecDeque;
524
525 #[allow(clippy::too_many_arguments)]
526 fn make_column_chunks<T: DataType>(
527 column_desc: ColumnDescPtr,
528 encoding: Encoding,
529 num_levels: usize,
530 min_value: T::T,
531 max_value: T::T,
532 def_levels: &mut Vec<i16>,
533 rep_levels: &mut Vec<i16>,
534 values: &mut Vec<T::T>,
535 page_lists: &mut Vec<Vec<Page>>,
536 use_v2: bool,
537 num_chunks: usize,
538 ) where
539 T::T: PartialOrd + SampleUniform + Copy,
540 {
541 for _i in 0..num_chunks {
542 let mut pages = VecDeque::new();
543 let mut data = Vec::new();
544 let mut page_def_levels = Vec::new();
545 let mut page_rep_levels = Vec::new();
546
547 make_pages::<T>(
548 column_desc.clone(),
549 encoding,
550 1,
551 num_levels,
552 min_value,
553 max_value,
554 &mut page_def_levels,
555 &mut page_rep_levels,
556 &mut data,
557 &mut pages,
558 use_v2,
559 );
560
561 def_levels.append(&mut page_def_levels);
562 rep_levels.append(&mut page_rep_levels);
563 values.append(&mut data);
564 page_lists.push(Vec::from(pages));
565 }
566 }
567
568 #[test]
569 fn test_primitive_array_reader_empty_pages() {
570 let message_type = "
572 message test_schema {
573 REQUIRED INT32 leaf;
574 }
575 ";
576
577 let schema = parse_message_type(message_type)
578 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
579 .unwrap();
580
581 let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
582 Box::<EmptyPageIterator>::default(),
583 schema.column(0),
584 None,
585 )
586 .unwrap();
587
588 let array = array_reader.next_batch(50).unwrap();
590 assert!(array.is_empty());
591 }
592
593 #[test]
594 fn test_primitive_array_reader_data() {
595 let message_type = "
597 message test_schema {
598 REQUIRED INT32 leaf;
599 }
600 ";
601
602 let schema = parse_message_type(message_type)
603 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
604 .unwrap();
605
606 let column_desc = schema.column(0);
607
608 {
610 let mut data = Vec::new();
611 let mut page_lists = Vec::new();
612 make_column_chunks::<Int32Type>(
613 column_desc.clone(),
614 Encoding::PLAIN,
615 100,
616 1,
617 200,
618 &mut Vec::new(),
619 &mut Vec::new(),
620 &mut data,
621 &mut page_lists,
622 true,
623 2,
624 );
625 let page_iterator = InMemoryPageIterator::new(page_lists);
626
627 let mut array_reader =
628 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
629 .unwrap();
630
631 let array = array_reader.next_batch(50).unwrap();
633 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
634
635 assert_eq!(&Int32Array::from(data[0..50].to_vec()), array);
636
637 let array = array_reader.next_batch(100).unwrap();
640 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
641
642 assert_eq!(&Int32Array::from(data[50..150].to_vec()), array);
643
644 let array = array_reader.next_batch(100).unwrap();
646 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
647
648 assert_eq!(&Int32Array::from(data[150..200].to_vec()), array);
649 }
650 }
651
652 macro_rules! test_primitive_array_reader_one_type {
653 (
654 $arrow_parquet_type:ty,
655 $physical_type:expr,
656 $converted_type_str:expr,
657 $result_arrow_type:ty,
658 $result_arrow_cast_type:ty,
659 $result_primitive_type:ty
660 $(, $timezone:expr)?
661 ) => {{
662 let message_type = format!(
663 "
664 message test_schema {{
665 REQUIRED {:?} leaf ({});
666 }}
667 ",
668 $physical_type, $converted_type_str
669 );
670 let schema = parse_message_type(&message_type)
671 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
672 .unwrap();
673
674 let column_desc = schema.column(0);
675
676 {
678 let mut data = Vec::new();
679 let mut page_lists = Vec::new();
680 make_column_chunks::<$arrow_parquet_type>(
681 column_desc.clone(),
682 Encoding::PLAIN,
683 100,
684 1,
685 200,
686 &mut Vec::new(),
687 &mut Vec::new(),
688 &mut data,
689 &mut page_lists,
690 true,
691 2,
692 );
693 let page_iterator = InMemoryPageIterator::new(page_lists);
694 let mut array_reader = PrimitiveArrayReader::<$arrow_parquet_type>::new(
695 Box::new(page_iterator),
696 column_desc.clone(),
697 None,
698 )
699 .expect("Unable to get array reader");
700
701 let array = array_reader
702 .next_batch(50)
703 .expect("Unable to get batch from reader");
704
705 let result_data_type = <$result_arrow_type>::DATA_TYPE;
706 let array = array
707 .as_any()
708 .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
709 .expect(
710 format!(
711 "Unable to downcast {:?} to {:?}",
712 array.data_type(),
713 result_data_type
714 )
715 .as_str(),
716 )
717 $(.clone().with_timezone($timezone))?
718 ;
719
720 let expected = PrimitiveArray::<$result_arrow_cast_type>::from(
722 data[0..50]
723 .iter()
724 .map(|x| *x as $result_primitive_type)
725 .collect::<Vec<$result_primitive_type>>(),
726 );
727 let expected = Arc::new(expected) as ArrayRef;
728 let expected = arrow::compute::cast(&expected, &result_data_type)
729 .expect("Unable to cast expected array");
730 assert_eq!(expected.data_type(), &result_data_type);
731 let expected = expected
732 .as_any()
733 .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
734 .expect(
735 format!(
736 "Unable to downcast expected {:?} to {:?}",
737 expected.data_type(),
738 result_data_type
739 )
740 .as_str(),
741 )
742 $(.clone().with_timezone($timezone))?
743 ;
744 assert_eq!(expected, array);
745 }
746 }};
747 }
748
749 #[test]
750 fn test_primitive_array_reader_temporal_types() {
751 test_primitive_array_reader_one_type!(
752 crate::data_type::Int32Type,
753 PhysicalType::INT32,
754 "DATE",
755 arrow::datatypes::Date32Type,
756 arrow::datatypes::Int32Type,
757 i32
758 );
759 test_primitive_array_reader_one_type!(
760 crate::data_type::Int32Type,
761 PhysicalType::INT32,
762 "TIME_MILLIS",
763 arrow::datatypes::Time32MillisecondType,
764 arrow::datatypes::Int32Type,
765 i32
766 );
767 test_primitive_array_reader_one_type!(
768 crate::data_type::Int64Type,
769 PhysicalType::INT64,
770 "TIME_MICROS",
771 arrow::datatypes::Time64MicrosecondType,
772 arrow::datatypes::Int64Type,
773 i64
774 );
775 test_primitive_array_reader_one_type!(
776 crate::data_type::Int64Type,
777 PhysicalType::INT64,
778 "TIMESTAMP_MILLIS",
779 arrow::datatypes::TimestampMillisecondType,
780 arrow::datatypes::Int64Type,
781 i64,
782 "UTC"
783 );
784 test_primitive_array_reader_one_type!(
785 crate::data_type::Int64Type,
786 PhysicalType::INT64,
787 "TIMESTAMP_MICROS",
788 arrow::datatypes::TimestampMicrosecondType,
789 arrow::datatypes::Int64Type,
790 i64,
791 "UTC"
792 );
793 }
794
795 #[test]
796 fn test_primitive_array_reader_def_and_rep_levels() {
797 let message_type = "
799 message test_schema {
800 REPEATED Group test_mid {
801 OPTIONAL INT32 leaf;
802 }
803 }
804 ";
805
806 let schema = parse_message_type(message_type)
807 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
808 .unwrap();
809
810 let column_desc = schema.column(0);
811
812 {
814 let mut def_levels = Vec::new();
815 let mut rep_levels = Vec::new();
816 let mut page_lists = Vec::new();
817 make_column_chunks::<Int32Type>(
818 column_desc.clone(),
819 Encoding::PLAIN,
820 100,
821 1,
822 200,
823 &mut def_levels,
824 &mut rep_levels,
825 &mut Vec::new(),
826 &mut page_lists,
827 true,
828 2,
829 );
830
831 let page_iterator = InMemoryPageIterator::new(page_lists);
832
833 let mut array_reader =
834 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
835 .unwrap();
836
837 let mut accu_len: usize = 0;
838
839 let array = array_reader.next_batch(50).unwrap();
841 assert_eq!(
842 Some(&def_levels[accu_len..(accu_len + array.len())]),
843 array_reader.get_def_levels()
844 );
845 assert_eq!(
846 Some(&rep_levels[accu_len..(accu_len + array.len())]),
847 array_reader.get_rep_levels()
848 );
849 accu_len += array.len();
850
851 let array = array_reader.next_batch(100).unwrap();
854 assert_eq!(
855 Some(&def_levels[accu_len..(accu_len + array.len())]),
856 array_reader.get_def_levels()
857 );
858 assert_eq!(
859 Some(&rep_levels[accu_len..(accu_len + array.len())]),
860 array_reader.get_rep_levels()
861 );
862 accu_len += array.len();
863
864 let array = array_reader.next_batch(100).unwrap();
866 assert_eq!(
867 Some(&def_levels[accu_len..(accu_len + array.len())]),
868 array_reader.get_def_levels()
869 );
870 assert_eq!(
871 Some(&rep_levels[accu_len..(accu_len + array.len())]),
872 array_reader.get_rep_levels()
873 );
874 }
875 }
876
877 #[test]
878 fn test_primitive_array_reader_decimal_types() {
879 let message_type = "
881 message test_schema {
882 REQUIRED INT32 decimal1 (DECIMAL(8,2));
883 }
884 ";
885 let schema = parse_message_type(message_type)
886 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
887 .unwrap();
888 let column_desc = schema.column(0);
889
890 {
892 let mut data = Vec::new();
893 let mut page_lists = Vec::new();
894 make_column_chunks::<Int32Type>(
895 column_desc.clone(),
896 Encoding::PLAIN,
897 100,
898 -99999999,
899 99999999,
900 &mut Vec::new(),
901 &mut Vec::new(),
902 &mut data,
903 &mut page_lists,
904 true,
905 2,
906 );
907 let page_iterator = InMemoryPageIterator::new(page_lists);
908
909 let mut array_reader =
910 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
911 .unwrap();
912
913 let array = array_reader.next_batch(50).unwrap();
916 assert_eq!(array.data_type(), &Decimal128(8, 2));
917 let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
918 let data_decimal_array = data[0..50]
919 .iter()
920 .copied()
921 .map(|v| Some(v as i128))
922 .collect::<Decimal128Array>()
923 .with_precision_and_scale(8, 2)
924 .unwrap();
925 assert_eq!(array, &data_decimal_array);
926
927 let data_decimal_array = data[0..50]
929 .iter()
930 .copied()
931 .map(|v| Some(v as i128))
932 .collect::<Decimal128Array>()
933 .with_precision_and_scale(9, 0)
934 .unwrap();
935 assert_ne!(array, &data_decimal_array)
936 }
937
938 let message_type = "
940 message test_schema {
941 REQUIRED INT64 decimal1 (DECIMAL(18,4));
942 }
943 ";
944 let schema = parse_message_type(message_type)
945 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
946 .unwrap();
947 let column_desc = schema.column(0);
948
949 {
951 let mut data = Vec::new();
952 let mut page_lists = Vec::new();
953 make_column_chunks::<Int64Type>(
954 column_desc.clone(),
955 Encoding::PLAIN,
956 100,
957 -999999999999999999,
958 999999999999999999,
959 &mut Vec::new(),
960 &mut Vec::new(),
961 &mut data,
962 &mut page_lists,
963 true,
964 2,
965 );
966 let page_iterator = InMemoryPageIterator::new(page_lists);
967
968 let mut array_reader =
969 PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
970 .unwrap();
971
972 let array = array_reader.next_batch(50).unwrap();
975 assert_eq!(array.data_type(), &Decimal128(18, 4));
976 let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
977 let data_decimal_array = data[0..50]
978 .iter()
979 .copied()
980 .map(|v| Some(v as i128))
981 .collect::<Decimal128Array>()
982 .with_precision_and_scale(18, 4)
983 .unwrap();
984 assert_eq!(array, &data_decimal_array);
985
986 let data_decimal_array = data[0..50]
988 .iter()
989 .copied()
990 .map(|v| Some(v as i128))
991 .collect::<Decimal128Array>()
992 .with_precision_and_scale(34, 0)
993 .unwrap();
994 assert_ne!(array, &data_decimal_array)
995 }
996 }
997
998 #[test]
999 fn test_primitive_array_reader_date32_type() {
1000 let message_type = "
1002 message test_schema {
1003 REQUIRED INT32 date1 (DATE);
1004 }
1005 ";
1006 let schema = parse_message_type(message_type)
1007 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
1008 .unwrap();
1009 let column_desc = schema.column(0);
1010
1011 {
1013 let mut data = Vec::new();
1014 let mut page_lists = Vec::new();
1015 make_column_chunks::<Int32Type>(
1016 column_desc.clone(),
1017 Encoding::PLAIN,
1018 100,
1019 -99999999,
1020 99999999,
1021 &mut Vec::new(),
1022 &mut Vec::new(),
1023 &mut data,
1024 &mut page_lists,
1025 true,
1026 2,
1027 );
1028 let page_iterator = InMemoryPageIterator::new(page_lists);
1029
1030 let mut array_reader =
1031 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
1032 .unwrap();
1033
1034 let array = array_reader.next_batch(50).unwrap();
1037 assert_eq!(array.data_type(), &Date32);
1038 let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
1039 let data_date_array = data[0..50]
1040 .iter()
1041 .copied()
1042 .map(Some)
1043 .collect::<Date32Array>();
1044 assert_eq!(array, &data_date_array);
1045 }
1046 }
1047}