1use crate::arrow::array_reader::{ArrayReader, read_records, skip_records};
19use crate::arrow::record_reader::RecordReader;
20use crate::arrow::schema::parquet_to_arrow_field;
21use crate::basic::Type as PhysicalType;
22use crate::column::page::PageIterator;
23use crate::data_type::{DataType, Int96};
24use crate::errors::Result;
25use crate::schema::types::ColumnDescPtr;
26use arrow_array::{
27 Array, ArrayRef, BooleanArray, Date64Array, Decimal64Array, Decimal128Array, Decimal256Array,
28 Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, PrimitiveArray,
29 UInt8Array, UInt16Array, builder::PrimitiveDictionaryBuilder, cast::AsArray, downcast_integer,
30 types::*,
31};
32use arrow_array::{
33 TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
34 TimestampSecondArray, UInt32Array, UInt64Array,
35};
36use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, ScalarBuffer, i256};
37use arrow_schema::{DataType as ArrowType, TimeUnit};
38use std::any::Any;
39use std::sync::Arc;
40
41pub trait IntoBuffer {
43 fn into_buffer(self, target_type: &ArrowType) -> Buffer;
44}
45
46macro_rules! native_buffer {
47 ($($t:ty),*) => {
48 $(impl IntoBuffer for Vec<$t> {
49 fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
50 Buffer::from_vec(self)
51 }
52 })*
53 };
54}
55native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64);
56
57impl IntoBuffer for Vec<bool> {
58 fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
59 BooleanBuffer::from_iter(self).into_inner()
60 }
61}
62
63impl IntoBuffer for Vec<Int96> {
64 fn into_buffer(self, target_type: &ArrowType) -> Buffer {
65 let mut builder = Vec::with_capacity(self.len());
66 match target_type {
67 ArrowType::Timestamp(TimeUnit::Second, _) => {
68 builder.extend(self.iter().map(|x| x.to_seconds()));
69 }
70 ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
71 builder.extend(self.iter().map(|x| x.to_millis()));
72 }
73 ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
74 builder.extend(self.iter().map(|x| x.to_micros()));
75 }
76 ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
77 builder.extend(self.iter().map(|x| x.to_nanos()));
78 }
79 _ => unreachable!("Invalid target_type for Int96."),
80 }
81 Buffer::from_vec(builder)
82 }
83}
84
85pub struct PrimitiveArrayReader<T>
88where
89 T: DataType,
90 T::T: Copy + Default,
91 Vec<T::T>: IntoBuffer,
92{
93 data_type: ArrowType,
94 pages: Box<dyn PageIterator>,
95 def_levels_buffer: Option<Vec<i16>>,
96 rep_levels_buffer: Option<Vec<i16>>,
97 record_reader: RecordReader<T>,
98}
99
100impl<T> PrimitiveArrayReader<T>
101where
102 T: DataType,
103 T::T: Copy + Default,
104 Vec<T::T>: IntoBuffer,
105{
106 pub fn new(
108 pages: Box<dyn PageIterator>,
109 column_desc: ColumnDescPtr,
110 arrow_type: Option<ArrowType>,
111 ) -> Result<Self> {
112 let data_type = match arrow_type {
114 Some(t) => t,
115 None => parquet_to_arrow_field(column_desc.as_ref())?
116 .data_type()
117 .clone(),
118 };
119
120 let record_reader = RecordReader::<T>::new(column_desc);
121
122 Ok(Self {
123 data_type,
124 pages,
125 def_levels_buffer: None,
126 rep_levels_buffer: None,
127 record_reader,
128 })
129 }
130}
131
132impl<T> ArrayReader for PrimitiveArrayReader<T>
134where
135 T: DataType,
136 T::T: Copy + Default,
137 Vec<T::T>: IntoBuffer,
138{
139 fn as_any(&self) -> &dyn Any {
140 self
141 }
142
143 fn get_data_type(&self) -> &ArrowType {
145 &self.data_type
146 }
147
148 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
149 read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
150 }
151
152 fn consume_batch(&mut self) -> Result<ArrayRef> {
153 let target_type = &self.data_type;
154
155 let record_data = self
158 .record_reader
159 .consume_record_data()
160 .into_buffer(target_type);
161
162 let len = self.record_reader.num_values();
163 let nulls = self
164 .record_reader
165 .consume_bitmap_buffer()
166 .map(|b| NullBuffer::new(BooleanBuffer::new(b, 0, len)));
167
168 let array: ArrayRef = match T::get_physical_type() {
169 PhysicalType::BOOLEAN => Arc::new(BooleanArray::new(
170 BooleanBuffer::new(record_data, 0, len),
171 nulls,
172 )),
173 PhysicalType::INT32 => Arc::new(Int32Array::new(
174 ScalarBuffer::new(record_data, 0, len),
175 nulls,
176 )),
177 PhysicalType::INT64 => Arc::new(Int64Array::new(
178 ScalarBuffer::new(record_data, 0, len),
179 nulls,
180 )),
181 PhysicalType::FLOAT => Arc::new(Float32Array::new(
182 ScalarBuffer::new(record_data, 0, len),
183 nulls,
184 )),
185 PhysicalType::DOUBLE => Arc::new(Float64Array::new(
186 ScalarBuffer::new(record_data, 0, len),
187 nulls,
188 )),
189 PhysicalType::INT96 => Arc::new(Int64Array::new(
190 ScalarBuffer::new(record_data, 0, len),
191 nulls,
192 )),
193 PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
194 unreachable!("PrimitiveArrayReaders don't support complex physical types");
195 }
196 };
197
198 let array = coerce_array(array, target_type)?;
200
201 self.def_levels_buffer = self.record_reader.consume_def_levels();
203 self.rep_levels_buffer = self.record_reader.consume_rep_levels();
204 self.record_reader.reset();
205 Ok(array)
206 }
207
208 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
209 skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
210 }
211
212 fn get_def_levels(&self) -> Option<&[i16]> {
213 self.def_levels_buffer.as_deref()
214 }
215
216 fn get_rep_levels(&self) -> Option<&[i16]> {
217 self.rep_levels_buffer.as_deref()
218 }
219}
220
221fn coerce_array(array: ArrayRef, target_type: &ArrowType) -> Result<ArrayRef> {
225 if let ArrowType::Dictionary(key_type, value_type) = target_type {
226 let dictionary = pack_dictionary(key_type, array.as_ref())?;
227 let any_dictionary = dictionary.as_any_dictionary();
228
229 let coerced_values =
230 coerce_array(Arc::clone(any_dictionary.values()), value_type.as_ref())?;
231
232 return Ok(any_dictionary.with_values(coerced_values));
233 }
234
235 match array.data_type() {
236 ArrowType::Int32 => coerce_i32(array.as_primitive(), target_type),
237 ArrowType::Int64 => coerce_i64(array.as_primitive(), target_type),
238 ArrowType::Boolean | ArrowType::Float32 | ArrowType::Float64 => Ok(array),
239 _ => unreachable!("Cannot coerce array of type {}", array.data_type()),
240 }
241}
242
243fn coerce_i32(array: &Int32Array, target_type: &ArrowType) -> Result<ArrayRef> {
244 Ok(match target_type {
245 ArrowType::UInt8 => {
246 let array = array.unary(|i| i as u8) as UInt8Array;
247 Arc::new(array) as ArrayRef
248 }
249 ArrowType::Int8 => {
250 let array = array.unary(|i| i as i8) as Int8Array;
251 Arc::new(array) as ArrayRef
252 }
253 ArrowType::UInt16 => {
254 let array = array.unary(|i| i as u16) as UInt16Array;
255 Arc::new(array) as ArrayRef
256 }
257 ArrowType::Int16 => {
258 let array = array.unary(|i| i as i16) as Int16Array;
259 Arc::new(array) as ArrayRef
260 }
261 ArrowType::Int32 => Arc::new(array.clone()),
262 ArrowType::UInt32 => Arc::new(UInt32Array::new(
265 array.values().inner().clone().into(),
266 array.nulls().cloned(),
267 )) as ArrayRef,
268 ArrowType::Date32 => Arc::new(array.reinterpret_cast::<Date32Type>()) as _,
269 ArrowType::Date64 => {
270 let array: Date64Array = array.unary(|x| x as i64 * 86_400_000);
271 Arc::new(array) as ArrayRef
272 }
273 ArrowType::Time32(TimeUnit::Second) => {
274 Arc::new(array.reinterpret_cast::<Time32SecondType>()) as ArrayRef
275 }
276 ArrowType::Time32(TimeUnit::Millisecond) => {
277 Arc::new(array.reinterpret_cast::<Time32MillisecondType>()) as ArrayRef
278 }
279 ArrowType::Timestamp(time_unit, timezone) => match time_unit {
280 TimeUnit::Second => {
281 let array: TimestampSecondArray = array
282 .unary(|x| x as i64)
283 .with_timezone_opt(timezone.clone());
284 Arc::new(array) as _
285 }
286 TimeUnit::Millisecond => {
287 let array: TimestampMillisecondArray = array
288 .unary(|x| x as i64)
289 .with_timezone_opt(timezone.clone());
290 Arc::new(array) as _
291 }
292 TimeUnit::Microsecond => {
293 let array: TimestampMicrosecondArray = array
294 .unary(|x| x as i64)
295 .with_timezone_opt(timezone.clone());
296 Arc::new(array) as _
297 }
298 TimeUnit::Nanosecond => {
299 let array: TimestampNanosecondArray = array
300 .unary(|x| x as i64)
301 .with_timezone_opt(timezone.clone());
302 Arc::new(array) as _
303 }
304 },
305 ArrowType::Decimal32(p, s) => {
306 let array = array
307 .reinterpret_cast::<Decimal32Type>()
308 .with_precision_and_scale(*p, *s)?;
309 Arc::new(array) as ArrayRef
310 }
311 ArrowType::Decimal64(p, s) => {
312 let array: Decimal64Array =
313 array.unary(|i| i as i64).with_precision_and_scale(*p, *s)?;
314 Arc::new(array) as ArrayRef
315 }
316 ArrowType::Decimal128(p, s) => {
317 let array: Decimal128Array = array
318 .unary(|i| i as i128)
319 .with_precision_and_scale(*p, *s)?;
320 Arc::new(array) as ArrayRef
321 }
322 ArrowType::Decimal256(p, s) => {
323 let array: Decimal256Array = array
324 .unary(|i| i256::from_i128(i as i128))
325 .with_precision_and_scale(*p, *s)?;
326 Arc::new(array) as ArrayRef
327 }
328 _ => unreachable!("Cannot coerce i32 to {target_type}"),
329 })
330}
331
332fn coerce_i64(array: &Int64Array, target_type: &ArrowType) -> Result<ArrayRef> {
333 Ok(match target_type {
334 ArrowType::Int64 => Arc::new(array.clone()) as _,
335 ArrowType::UInt64 => Arc::new(UInt64Array::new(
338 array.values().inner().clone().into(),
339 array.nulls().cloned(),
340 )) as ArrayRef,
341 ArrowType::Date64 => Arc::new(array.reinterpret_cast::<Date64Type>()) as _,
342 ArrowType::Time64(TimeUnit::Microsecond) => {
343 Arc::new(array.reinterpret_cast::<Time64MicrosecondType>()) as _
344 }
345 ArrowType::Time64(TimeUnit::Nanosecond) => {
346 Arc::new(array.reinterpret_cast::<Time64NanosecondType>()) as _
347 }
348 ArrowType::Duration(unit) => match unit {
349 TimeUnit::Second => Arc::new(array.reinterpret_cast::<DurationSecondType>()) as _,
350 TimeUnit::Millisecond => {
351 Arc::new(array.reinterpret_cast::<DurationMillisecondType>()) as _
352 }
353 TimeUnit::Microsecond => {
354 Arc::new(array.reinterpret_cast::<DurationMicrosecondType>()) as _
355 }
356 TimeUnit::Nanosecond => {
357 Arc::new(array.reinterpret_cast::<DurationNanosecondType>()) as _
358 }
359 },
360 ArrowType::Timestamp(time_unit, timezone) => match time_unit {
361 TimeUnit::Second => {
362 let array = array
363 .reinterpret_cast::<TimestampSecondType>()
364 .with_timezone_opt(timezone.clone());
365 Arc::new(array) as _
366 }
367 TimeUnit::Millisecond => {
368 let array = array
369 .reinterpret_cast::<TimestampMillisecondType>()
370 .with_timezone_opt(timezone.clone());
371 Arc::new(array) as _
372 }
373 TimeUnit::Microsecond => {
374 let array = array
375 .reinterpret_cast::<TimestampMicrosecondType>()
376 .with_timezone_opt(timezone.clone());
377 Arc::new(array) as _
378 }
379 TimeUnit::Nanosecond => {
380 let array = array
381 .reinterpret_cast::<TimestampNanosecondType>()
382 .with_timezone_opt(timezone.clone());
383 Arc::new(array) as _
384 }
385 },
386 ArrowType::Decimal64(p, s) => {
387 let array = array
388 .reinterpret_cast::<Decimal64Type>()
389 .with_precision_and_scale(*p, *s)?;
390 Arc::new(array) as _
391 }
392 ArrowType::Decimal128(p, s) => {
393 let array: Decimal128Array = array
394 .unary(|i| i as i128)
395 .with_precision_and_scale(*p, *s)?;
396 Arc::new(array) as _
397 }
398 ArrowType::Decimal256(p, s) => {
399 let array: Decimal256Array = array
400 .unary(|i| i256::from_i128(i as i128))
401 .with_precision_and_scale(*p, *s)?;
402 Arc::new(array) as _
403 }
404 _ => unreachable!("Cannot coerce i64 to {target_type}"),
405 })
406}
407
408macro_rules! pack_dictionary_helper {
409 ($t:ty, $values:ident) => {
410 match $values.data_type() {
411 ArrowType::Int32 => pack_dictionary_impl::<$t, Int32Type>($values.as_primitive()),
412 ArrowType::Int64 => pack_dictionary_impl::<$t, Int64Type>($values.as_primitive()),
413 ArrowType::Float32 => pack_dictionary_impl::<$t, Float32Type>($values.as_primitive()),
414 ArrowType::Float64 => pack_dictionary_impl::<$t, Float64Type>($values.as_primitive()),
415 _ => unreachable!("Invalid physical type"),
416 }
417 };
418}
419
420fn pack_dictionary(key: &ArrowType, values: &dyn Array) -> Result<ArrayRef> {
421 downcast_integer! {
422 key => (pack_dictionary_helper, values),
423 _ => unreachable!("Invalid key type"),
424 }
425}
426
427fn pack_dictionary_impl<K: ArrowDictionaryKeyType, V: ArrowPrimitiveType>(
428 values: &PrimitiveArray<V>,
429) -> Result<ArrayRef> {
430 let mut builder = PrimitiveDictionaryBuilder::<K, V>::with_capacity(1024, values.len());
431 builder.extend(values);
432 Ok(Arc::new(builder.finish()))
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438 use crate::arrow::array_reader::test_util::EmptyPageIterator;
439 use crate::basic::Encoding;
440 use crate::column::page::Page;
441 use crate::data_type::{Int32Type, Int64Type};
442 use crate::schema::parser::parse_message_type;
443 use crate::schema::types::SchemaDescriptor;
444 use crate::util::InMemoryPageIterator;
445 use crate::util::test_common::rand_gen::make_pages;
446 use arrow::datatypes::ArrowPrimitiveType;
447 use arrow_array::{Array, Date32Array, PrimitiveArray};
448
449 use arrow::datatypes::DataType::{Date32, Decimal128};
450 use rand::distr::uniform::SampleUniform;
451 use std::collections::VecDeque;
452
453 #[allow(clippy::too_many_arguments)]
454 fn make_column_chunks<T: DataType>(
455 column_desc: ColumnDescPtr,
456 encoding: Encoding,
457 num_levels: usize,
458 min_value: T::T,
459 max_value: T::T,
460 def_levels: &mut Vec<i16>,
461 rep_levels: &mut Vec<i16>,
462 values: &mut Vec<T::T>,
463 page_lists: &mut Vec<Vec<Page>>,
464 use_v2: bool,
465 num_chunks: usize,
466 ) where
467 T::T: PartialOrd + SampleUniform + Copy,
468 {
469 for _i in 0..num_chunks {
470 let mut pages = VecDeque::new();
471 let mut data = Vec::new();
472 let mut page_def_levels = Vec::new();
473 let mut page_rep_levels = Vec::new();
474
475 make_pages::<T>(
476 column_desc.clone(),
477 encoding,
478 1,
479 num_levels,
480 min_value,
481 max_value,
482 &mut page_def_levels,
483 &mut page_rep_levels,
484 &mut data,
485 &mut pages,
486 use_v2,
487 );
488
489 def_levels.append(&mut page_def_levels);
490 rep_levels.append(&mut page_rep_levels);
491 values.append(&mut data);
492 page_lists.push(Vec::from(pages));
493 }
494 }
495
496 #[test]
497 fn test_primitive_array_reader_empty_pages() {
498 let message_type = "
500 message test_schema {
501 REQUIRED INT32 leaf;
502 }
503 ";
504
505 let schema = parse_message_type(message_type)
506 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
507 .unwrap();
508
509 let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
510 Box::<EmptyPageIterator>::default(),
511 schema.column(0),
512 None,
513 )
514 .unwrap();
515
516 let array = array_reader.next_batch(50).unwrap();
518 assert!(array.is_empty());
519 }
520
521 #[test]
522 fn test_primitive_array_reader_data() {
523 let message_type = "
525 message test_schema {
526 REQUIRED INT32 leaf;
527 }
528 ";
529
530 let schema = parse_message_type(message_type)
531 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
532 .unwrap();
533
534 let column_desc = schema.column(0);
535
536 {
538 let mut data = Vec::new();
539 let mut page_lists = Vec::new();
540 make_column_chunks::<Int32Type>(
541 column_desc.clone(),
542 Encoding::PLAIN,
543 100,
544 1,
545 200,
546 &mut Vec::new(),
547 &mut Vec::new(),
548 &mut data,
549 &mut page_lists,
550 true,
551 2,
552 );
553 let page_iterator = InMemoryPageIterator::new(page_lists);
554
555 let mut array_reader =
556 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
557 .unwrap();
558
559 let array = array_reader.next_batch(50).unwrap();
561 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
562
563 assert_eq!(&Int32Array::from(data[0..50].to_vec()), array);
564
565 let array = array_reader.next_batch(100).unwrap();
568 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
569
570 assert_eq!(&Int32Array::from(data[50..150].to_vec()), array);
571
572 let array = array_reader.next_batch(100).unwrap();
574 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
575
576 assert_eq!(&Int32Array::from(data[150..200].to_vec()), array);
577 }
578 }
579
580 macro_rules! test_primitive_array_reader_one_type {
581 (
582 $arrow_parquet_type:ty,
583 $physical_type:expr,
584 $converted_type_str:expr,
585 $result_arrow_type:ty,
586 $result_arrow_cast_type:ty,
587 $result_primitive_type:ty
588 $(, $timezone:expr)?
589 ) => {{
590 let message_type = format!(
591 "
592 message test_schema {{
593 REQUIRED {:?} leaf ({});
594 }}
595 ",
596 $physical_type, $converted_type_str
597 );
598 let schema = parse_message_type(&message_type)
599 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
600 .unwrap();
601
602 let column_desc = schema.column(0);
603
604 {
606 let mut data = Vec::new();
607 let mut page_lists = Vec::new();
608 make_column_chunks::<$arrow_parquet_type>(
609 column_desc.clone(),
610 Encoding::PLAIN,
611 100,
612 1,
613 200,
614 &mut Vec::new(),
615 &mut Vec::new(),
616 &mut data,
617 &mut page_lists,
618 true,
619 2,
620 );
621 let page_iterator = InMemoryPageIterator::new(page_lists);
622 let mut array_reader = PrimitiveArrayReader::<$arrow_parquet_type>::new(
623 Box::new(page_iterator),
624 column_desc.clone(),
625 None,
626 )
627 .expect("Unable to get array reader");
628
629 let array = array_reader
630 .next_batch(50)
631 .expect("Unable to get batch from reader");
632
633 let result_data_type = <$result_arrow_type>::DATA_TYPE;
634 let array = array
635 .as_any()
636 .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
637 .expect(
638 format!(
639 "Unable to downcast {:?} to {:?}",
640 array.data_type(),
641 result_data_type
642 )
643 .as_str(),
644 )
645 $(.clone().with_timezone($timezone))?
646 ;
647
648 let expected = PrimitiveArray::<$result_arrow_cast_type>::from(
650 data[0..50]
651 .iter()
652 .map(|x| *x as $result_primitive_type)
653 .collect::<Vec<$result_primitive_type>>(),
654 );
655 let expected = Arc::new(expected) as ArrayRef;
656 let expected = arrow::compute::cast(&expected, &result_data_type)
657 .expect("Unable to cast expected array");
658 assert_eq!(expected.data_type(), &result_data_type);
659 let expected = expected
660 .as_any()
661 .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
662 .expect(
663 format!(
664 "Unable to downcast expected {:?} to {:?}",
665 expected.data_type(),
666 result_data_type
667 )
668 .as_str(),
669 )
670 $(.clone().with_timezone($timezone))?
671 ;
672 assert_eq!(expected, array);
673 }
674 }};
675 }
676
677 #[test]
678 fn test_primitive_array_reader_temporal_types() {
679 test_primitive_array_reader_one_type!(
680 crate::data_type::Int32Type,
681 PhysicalType::INT32,
682 "DATE",
683 arrow::datatypes::Date32Type,
684 arrow::datatypes::Int32Type,
685 i32
686 );
687 test_primitive_array_reader_one_type!(
688 crate::data_type::Int32Type,
689 PhysicalType::INT32,
690 "TIME_MILLIS",
691 arrow::datatypes::Time32MillisecondType,
692 arrow::datatypes::Int32Type,
693 i32
694 );
695 test_primitive_array_reader_one_type!(
696 crate::data_type::Int64Type,
697 PhysicalType::INT64,
698 "TIME_MICROS",
699 arrow::datatypes::Time64MicrosecondType,
700 arrow::datatypes::Int64Type,
701 i64
702 );
703 test_primitive_array_reader_one_type!(
704 crate::data_type::Int64Type,
705 PhysicalType::INT64,
706 "TIMESTAMP_MILLIS",
707 arrow::datatypes::TimestampMillisecondType,
708 arrow::datatypes::Int64Type,
709 i64,
710 "UTC"
711 );
712 test_primitive_array_reader_one_type!(
713 crate::data_type::Int64Type,
714 PhysicalType::INT64,
715 "TIMESTAMP_MICROS",
716 arrow::datatypes::TimestampMicrosecondType,
717 arrow::datatypes::Int64Type,
718 i64,
719 "UTC"
720 );
721 }
722
723 #[test]
724 fn test_primitive_array_reader_def_and_rep_levels() {
725 let message_type = "
727 message test_schema {
728 REPEATED Group test_mid {
729 OPTIONAL INT32 leaf;
730 }
731 }
732 ";
733
734 let schema = parse_message_type(message_type)
735 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
736 .unwrap();
737
738 let column_desc = schema.column(0);
739
740 {
742 let mut def_levels = Vec::new();
743 let mut rep_levels = Vec::new();
744 let mut page_lists = Vec::new();
745 make_column_chunks::<Int32Type>(
746 column_desc.clone(),
747 Encoding::PLAIN,
748 100,
749 1,
750 200,
751 &mut def_levels,
752 &mut rep_levels,
753 &mut Vec::new(),
754 &mut page_lists,
755 true,
756 2,
757 );
758
759 let page_iterator = InMemoryPageIterator::new(page_lists);
760
761 let mut array_reader =
762 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
763 .unwrap();
764
765 let mut accu_len: usize = 0;
766
767 let array = array_reader.next_batch(50).unwrap();
769 assert_eq!(
770 Some(&def_levels[accu_len..(accu_len + array.len())]),
771 array_reader.get_def_levels()
772 );
773 assert_eq!(
774 Some(&rep_levels[accu_len..(accu_len + array.len())]),
775 array_reader.get_rep_levels()
776 );
777 accu_len += array.len();
778
779 let array = array_reader.next_batch(100).unwrap();
782 assert_eq!(
783 Some(&def_levels[accu_len..(accu_len + array.len())]),
784 array_reader.get_def_levels()
785 );
786 assert_eq!(
787 Some(&rep_levels[accu_len..(accu_len + array.len())]),
788 array_reader.get_rep_levels()
789 );
790 accu_len += array.len();
791
792 let array = array_reader.next_batch(100).unwrap();
794 assert_eq!(
795 Some(&def_levels[accu_len..(accu_len + array.len())]),
796 array_reader.get_def_levels()
797 );
798 assert_eq!(
799 Some(&rep_levels[accu_len..(accu_len + array.len())]),
800 array_reader.get_rep_levels()
801 );
802 }
803 }
804
805 #[test]
806 fn test_primitive_array_reader_decimal_types() {
807 let message_type = "
809 message test_schema {
810 REQUIRED INT32 decimal1 (DECIMAL(8,2));
811 }
812 ";
813 let schema = parse_message_type(message_type)
814 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
815 .unwrap();
816 let column_desc = schema.column(0);
817
818 {
820 let mut data = Vec::new();
821 let mut page_lists = Vec::new();
822 make_column_chunks::<Int32Type>(
823 column_desc.clone(),
824 Encoding::PLAIN,
825 100,
826 -99999999,
827 99999999,
828 &mut Vec::new(),
829 &mut Vec::new(),
830 &mut data,
831 &mut page_lists,
832 true,
833 2,
834 );
835 let page_iterator = InMemoryPageIterator::new(page_lists);
836
837 let mut array_reader =
838 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
839 .unwrap();
840
841 let array = array_reader.next_batch(50).unwrap();
844 assert_eq!(array.data_type(), &Decimal128(8, 2));
845 let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
846 let data_decimal_array = data[0..50]
847 .iter()
848 .copied()
849 .map(|v| Some(v as i128))
850 .collect::<Decimal128Array>()
851 .with_precision_and_scale(8, 2)
852 .unwrap();
853 assert_eq!(array, &data_decimal_array);
854
855 let data_decimal_array = data[0..50]
857 .iter()
858 .copied()
859 .map(|v| Some(v as i128))
860 .collect::<Decimal128Array>()
861 .with_precision_and_scale(9, 0)
862 .unwrap();
863 assert_ne!(array, &data_decimal_array)
864 }
865
866 let message_type = "
868 message test_schema {
869 REQUIRED INT64 decimal1 (DECIMAL(18,4));
870 }
871 ";
872 let schema = parse_message_type(message_type)
873 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
874 .unwrap();
875 let column_desc = schema.column(0);
876
877 {
879 let mut data = Vec::new();
880 let mut page_lists = Vec::new();
881 make_column_chunks::<Int64Type>(
882 column_desc.clone(),
883 Encoding::PLAIN,
884 100,
885 -999999999999999999,
886 999999999999999999,
887 &mut Vec::new(),
888 &mut Vec::new(),
889 &mut data,
890 &mut page_lists,
891 true,
892 2,
893 );
894 let page_iterator = InMemoryPageIterator::new(page_lists);
895
896 let mut array_reader =
897 PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
898 .unwrap();
899
900 let array = array_reader.next_batch(50).unwrap();
903 assert_eq!(array.data_type(), &Decimal128(18, 4));
904 let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
905 let data_decimal_array = data[0..50]
906 .iter()
907 .copied()
908 .map(|v| Some(v as i128))
909 .collect::<Decimal128Array>()
910 .with_precision_and_scale(18, 4)
911 .unwrap();
912 assert_eq!(array, &data_decimal_array);
913
914 let data_decimal_array = data[0..50]
916 .iter()
917 .copied()
918 .map(|v| Some(v as i128))
919 .collect::<Decimal128Array>()
920 .with_precision_and_scale(34, 0)
921 .unwrap();
922 assert_ne!(array, &data_decimal_array)
923 }
924 }
925
926 #[test]
927 fn test_primitive_array_reader_date32_type() {
928 let message_type = "
930 message test_schema {
931 REQUIRED INT32 date1 (DATE);
932 }
933 ";
934 let schema = parse_message_type(message_type)
935 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
936 .unwrap();
937 let column_desc = schema.column(0);
938
939 {
941 let mut data = Vec::new();
942 let mut page_lists = Vec::new();
943 make_column_chunks::<Int32Type>(
944 column_desc.clone(),
945 Encoding::PLAIN,
946 100,
947 -99999999,
948 99999999,
949 &mut Vec::new(),
950 &mut Vec::new(),
951 &mut data,
952 &mut page_lists,
953 true,
954 2,
955 );
956 let page_iterator = InMemoryPageIterator::new(page_lists);
957
958 let mut array_reader =
959 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
960 .unwrap();
961
962 let array = array_reader.next_batch(50).unwrap();
965 assert_eq!(array.data_type(), &Date32);
966 let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
967 let data_date_array = data[0..50]
968 .iter()
969 .copied()
970 .map(Some)
971 .collect::<Date32Array>();
972 assert_eq!(array, &data_date_array);
973 }
974 }
975}