1use crate::arrow::array_reader::{ArrayReader, read_records, skip_records};
19use crate::arrow::record_reader::RecordReader;
20use crate::arrow::schema::parquet_to_arrow_field;
21use crate::basic::Type as PhysicalType;
22use crate::column::page::PageIterator;
23use crate::data_type::{DataType, Int96};
24use crate::errors::Result;
25use crate::schema::types::ColumnDescPtr;
26use arrow_array::{
27 Array, ArrayRef, BooleanArray, Date64Array, Decimal64Array, Decimal128Array, Decimal256Array,
28 Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, PrimitiveArray,
29 UInt8Array, UInt16Array, builder::PrimitiveDictionaryBuilder, cast::AsArray, downcast_integer,
30 types::*,
31};
32use arrow_array::{
33 TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
34 TimestampSecondArray, UInt32Array, UInt64Array,
35};
36use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, ScalarBuffer, i256};
37use arrow_schema::{DataType as ArrowType, TimeUnit};
38use std::any::Any;
39use std::sync::Arc;
40
41pub trait IntoBuffer {
43 fn into_buffer(self, target_type: &ArrowType) -> Buffer;
44}
45
46macro_rules! native_buffer {
47 ($($t:ty),*) => {
48 $(impl IntoBuffer for Vec<$t> {
49 fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
50 Buffer::from_vec(self)
51 }
52 })*
53 };
54}
55native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64);
56
57impl IntoBuffer for Vec<bool> {
58 fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
59 BooleanBuffer::from_iter(self).into_inner()
60 }
61}
62
63impl IntoBuffer for Vec<Int96> {
64 fn into_buffer(self, target_type: &ArrowType) -> Buffer {
65 let mut builder = Vec::with_capacity(self.len());
66 match target_type {
67 ArrowType::Timestamp(TimeUnit::Second, _) => {
68 builder.extend(self.iter().map(|x| x.to_seconds()));
69 }
70 ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
71 builder.extend(self.iter().map(|x| x.to_millis()));
72 }
73 ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
74 builder.extend(self.iter().map(|x| x.to_micros()));
75 }
76 ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
77 builder.extend(self.iter().map(|x| x.to_nanos()));
78 }
79 _ => unreachable!("Invalid target_type for Int96."),
80 }
81 Buffer::from_vec(builder)
82 }
83}
84
85pub struct PrimitiveArrayReader<T>
88where
89 T: DataType,
90 T::T: Copy + Default,
91 Vec<T::T>: IntoBuffer,
92{
93 data_type: ArrowType,
94 pages: Box<dyn PageIterator>,
95 def_levels_buffer: Option<Vec<i16>>,
96 rep_levels_buffer: Option<Vec<i16>>,
97 record_reader: RecordReader<T>,
98}
99
100impl<T> PrimitiveArrayReader<T>
101where
102 T: DataType,
103 T::T: Copy + Default,
104 Vec<T::T>: IntoBuffer,
105{
106 pub fn new(
110 pages: Box<dyn PageIterator>,
111 column_desc: ColumnDescPtr,
112 arrow_type: Option<ArrowType>,
113 batch_size: usize,
114 ) -> Result<Self> {
115 let data_type = match arrow_type {
117 Some(t) => t,
118 None => parquet_to_arrow_field(column_desc.as_ref())?
119 .data_type()
120 .clone(),
121 };
122
123 let record_reader = RecordReader::<T>::new(column_desc, batch_size);
124
125 Ok(Self {
126 data_type,
127 pages,
128 def_levels_buffer: None,
129 rep_levels_buffer: None,
130 record_reader,
131 })
132 }
133}
134
135impl<T> ArrayReader for PrimitiveArrayReader<T>
137where
138 T: DataType,
139 T::T: Copy + Default,
140 Vec<T::T>: IntoBuffer,
141{
142 fn as_any(&self) -> &dyn Any {
143 self
144 }
145
146 fn get_data_type(&self) -> &ArrowType {
148 &self.data_type
149 }
150
151 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
152 read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
153 }
154
155 fn consume_batch(&mut self) -> Result<ArrayRef> {
156 let target_type = &self.data_type;
157
158 let record_data = self
161 .record_reader
162 .consume_record_data()
163 .into_buffer(target_type);
164
165 let len = self.record_reader.num_values();
166 let nulls = self
167 .record_reader
168 .consume_bitmap_buffer()
169 .and_then(|b| NullBuffer::from_unsliced_buffer(b, len));
170
171 let array: ArrayRef = match T::get_physical_type() {
172 PhysicalType::BOOLEAN => Arc::new(BooleanArray::new(
173 BooleanBuffer::new(record_data, 0, len),
174 nulls,
175 )),
176 PhysicalType::INT32 => Arc::new(Int32Array::new(
177 ScalarBuffer::new(record_data, 0, len),
178 nulls,
179 )),
180 PhysicalType::INT64 => Arc::new(Int64Array::new(
181 ScalarBuffer::new(record_data, 0, len),
182 nulls,
183 )),
184 PhysicalType::FLOAT => Arc::new(Float32Array::new(
185 ScalarBuffer::new(record_data, 0, len),
186 nulls,
187 )),
188 PhysicalType::DOUBLE => Arc::new(Float64Array::new(
189 ScalarBuffer::new(record_data, 0, len),
190 nulls,
191 )),
192 PhysicalType::INT96 => Arc::new(Int64Array::new(
193 ScalarBuffer::new(record_data, 0, len),
194 nulls,
195 )),
196 PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
197 unreachable!("PrimitiveArrayReaders don't support complex physical types");
198 }
199 };
200
201 let array = coerce_array(array, target_type)?;
203
204 self.def_levels_buffer = self.record_reader.consume_def_levels();
206 self.rep_levels_buffer = self.record_reader.consume_rep_levels();
207 self.record_reader.reset();
208 Ok(array)
209 }
210
211 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
212 skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
213 }
214
215 fn get_def_levels(&self) -> Option<&[i16]> {
216 self.def_levels_buffer.as_deref()
217 }
218
219 fn get_rep_levels(&self) -> Option<&[i16]> {
220 self.rep_levels_buffer.as_deref()
221 }
222}
223
224fn coerce_array(array: ArrayRef, target_type: &ArrowType) -> Result<ArrayRef> {
228 if let ArrowType::Dictionary(key_type, value_type) = target_type {
229 let dictionary = pack_dictionary(key_type, array.as_ref())?;
230 let any_dictionary = dictionary.as_any_dictionary();
231
232 let coerced_values =
233 coerce_array(Arc::clone(any_dictionary.values()), value_type.as_ref())?;
234
235 return Ok(any_dictionary.with_values(coerced_values));
236 }
237
238 match array.data_type() {
239 ArrowType::Int32 => coerce_i32(array.as_primitive(), target_type),
240 ArrowType::Int64 => coerce_i64(array.as_primitive(), target_type),
241 ArrowType::Boolean | ArrowType::Float32 | ArrowType::Float64 => Ok(array),
242 _ => unreachable!("Cannot coerce array of type {}", array.data_type()),
243 }
244}
245
246fn coerce_i32(array: &Int32Array, target_type: &ArrowType) -> Result<ArrayRef> {
247 Ok(match target_type {
248 ArrowType::UInt8 => {
249 let array = array.unary(|i| i as u8) as UInt8Array;
250 Arc::new(array) as ArrayRef
251 }
252 ArrowType::Int8 => {
253 let array = array.unary(|i| i as i8) as Int8Array;
254 Arc::new(array) as ArrayRef
255 }
256 ArrowType::UInt16 => {
257 let array = array.unary(|i| i as u16) as UInt16Array;
258 Arc::new(array) as ArrayRef
259 }
260 ArrowType::Int16 => {
261 let array = array.unary(|i| i as i16) as Int16Array;
262 Arc::new(array) as ArrayRef
263 }
264 ArrowType::Int32 => Arc::new(array.clone()),
265 ArrowType::UInt32 => Arc::new(UInt32Array::new(
268 array.values().inner().clone().into(),
269 array.nulls().cloned(),
270 )) as ArrayRef,
271 ArrowType::Date32 => Arc::new(array.reinterpret_cast::<Date32Type>()) as _,
272 ArrowType::Date64 => {
273 let array: Date64Array = array.unary(|x| x as i64 * 86_400_000);
274 Arc::new(array) as ArrayRef
275 }
276 ArrowType::Time32(TimeUnit::Second) => {
277 Arc::new(array.reinterpret_cast::<Time32SecondType>()) as ArrayRef
278 }
279 ArrowType::Time32(TimeUnit::Millisecond) => {
280 Arc::new(array.reinterpret_cast::<Time32MillisecondType>()) as ArrayRef
281 }
282 ArrowType::Timestamp(time_unit, timezone) => match time_unit {
283 TimeUnit::Second => {
284 let array: TimestampSecondArray = array
285 .unary(|x| x as i64)
286 .with_timezone_opt(timezone.clone());
287 Arc::new(array) as _
288 }
289 TimeUnit::Millisecond => {
290 let array: TimestampMillisecondArray = array
291 .unary(|x| x as i64)
292 .with_timezone_opt(timezone.clone());
293 Arc::new(array) as _
294 }
295 TimeUnit::Microsecond => {
296 let array: TimestampMicrosecondArray = array
297 .unary(|x| x as i64)
298 .with_timezone_opt(timezone.clone());
299 Arc::new(array) as _
300 }
301 TimeUnit::Nanosecond => {
302 let array: TimestampNanosecondArray = array
303 .unary(|x| x as i64)
304 .with_timezone_opt(timezone.clone());
305 Arc::new(array) as _
306 }
307 },
308 ArrowType::Decimal32(p, s) => {
309 let array = array
310 .reinterpret_cast::<Decimal32Type>()
311 .with_precision_and_scale(*p, *s)?;
312 Arc::new(array) as ArrayRef
313 }
314 ArrowType::Decimal64(p, s) => {
315 let array: Decimal64Array =
316 array.unary(|i| i as i64).with_precision_and_scale(*p, *s)?;
317 Arc::new(array) as ArrayRef
318 }
319 ArrowType::Decimal128(p, s) => {
320 let array: Decimal128Array = array
321 .unary(|i| i as i128)
322 .with_precision_and_scale(*p, *s)?;
323 Arc::new(array) as ArrayRef
324 }
325 ArrowType::Decimal256(p, s) => {
326 let array: Decimal256Array = array
327 .unary(|i| i256::from_i128(i as i128))
328 .with_precision_and_scale(*p, *s)?;
329 Arc::new(array) as ArrayRef
330 }
331 _ => unreachable!("Cannot coerce i32 to {target_type}"),
332 })
333}
334
335fn coerce_i64(array: &Int64Array, target_type: &ArrowType) -> Result<ArrayRef> {
336 Ok(match target_type {
337 ArrowType::Int64 => Arc::new(array.clone()) as _,
338 ArrowType::UInt64 => Arc::new(UInt64Array::new(
341 array.values().inner().clone().into(),
342 array.nulls().cloned(),
343 )) as ArrayRef,
344 ArrowType::Date64 => Arc::new(array.reinterpret_cast::<Date64Type>()) as _,
345 ArrowType::Time64(TimeUnit::Microsecond) => {
346 Arc::new(array.reinterpret_cast::<Time64MicrosecondType>()) as _
347 }
348 ArrowType::Time64(TimeUnit::Nanosecond) => {
349 Arc::new(array.reinterpret_cast::<Time64NanosecondType>()) as _
350 }
351 ArrowType::Duration(unit) => match unit {
352 TimeUnit::Second => Arc::new(array.reinterpret_cast::<DurationSecondType>()) as _,
353 TimeUnit::Millisecond => {
354 Arc::new(array.reinterpret_cast::<DurationMillisecondType>()) as _
355 }
356 TimeUnit::Microsecond => {
357 Arc::new(array.reinterpret_cast::<DurationMicrosecondType>()) as _
358 }
359 TimeUnit::Nanosecond => {
360 Arc::new(array.reinterpret_cast::<DurationNanosecondType>()) as _
361 }
362 },
363 ArrowType::Timestamp(time_unit, timezone) => match time_unit {
364 TimeUnit::Second => {
365 let array = array
366 .reinterpret_cast::<TimestampSecondType>()
367 .with_timezone_opt(timezone.clone());
368 Arc::new(array) as _
369 }
370 TimeUnit::Millisecond => {
371 let array = array
372 .reinterpret_cast::<TimestampMillisecondType>()
373 .with_timezone_opt(timezone.clone());
374 Arc::new(array) as _
375 }
376 TimeUnit::Microsecond => {
377 let array = array
378 .reinterpret_cast::<TimestampMicrosecondType>()
379 .with_timezone_opt(timezone.clone());
380 Arc::new(array) as _
381 }
382 TimeUnit::Nanosecond => {
383 let array = array
384 .reinterpret_cast::<TimestampNanosecondType>()
385 .with_timezone_opt(timezone.clone());
386 Arc::new(array) as _
387 }
388 },
389 ArrowType::Decimal64(p, s) => {
390 let array = array
391 .reinterpret_cast::<Decimal64Type>()
392 .with_precision_and_scale(*p, *s)?;
393 Arc::new(array) as _
394 }
395 ArrowType::Decimal128(p, s) => {
396 let array: Decimal128Array = array
397 .unary(|i| i as i128)
398 .with_precision_and_scale(*p, *s)?;
399 Arc::new(array) as _
400 }
401 ArrowType::Decimal256(p, s) => {
402 let array: Decimal256Array = array
403 .unary(|i| i256::from_i128(i as i128))
404 .with_precision_and_scale(*p, *s)?;
405 Arc::new(array) as _
406 }
407 _ => unreachable!("Cannot coerce i64 to {target_type}"),
408 })
409}
410
411macro_rules! pack_dictionary_helper {
412 ($t:ty, $values:ident) => {
413 match $values.data_type() {
414 ArrowType::Int32 => pack_dictionary_impl::<$t, Int32Type>($values.as_primitive()),
415 ArrowType::Int64 => pack_dictionary_impl::<$t, Int64Type>($values.as_primitive()),
416 ArrowType::Float32 => pack_dictionary_impl::<$t, Float32Type>($values.as_primitive()),
417 ArrowType::Float64 => pack_dictionary_impl::<$t, Float64Type>($values.as_primitive()),
418 _ => unreachable!("Invalid physical type"),
419 }
420 };
421}
422
423fn pack_dictionary(key: &ArrowType, values: &dyn Array) -> Result<ArrayRef> {
424 downcast_integer! {
425 key => (pack_dictionary_helper, values),
426 _ => unreachable!("Invalid key type"),
427 }
428}
429
430fn pack_dictionary_impl<K: ArrowDictionaryKeyType, V: ArrowPrimitiveType>(
431 values: &PrimitiveArray<V>,
432) -> Result<ArrayRef> {
433 let mut builder = PrimitiveDictionaryBuilder::<K, V>::with_capacity(1024, values.len());
434 builder.extend(values);
435 Ok(Arc::new(builder.finish()))
436}
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441 use crate::arrow::array_reader::test_util::EmptyPageIterator;
442 use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
443 use crate::basic::Encoding;
444 use crate::column::page::Page;
445 use crate::data_type::{Int32Type, Int64Type};
446 use crate::schema::parser::parse_message_type;
447 use crate::schema::types::SchemaDescriptor;
448 use crate::util::InMemoryPageIterator;
449 use crate::util::test_common::rand_gen::make_pages;
450 use arrow::datatypes::ArrowPrimitiveType;
451 use arrow_array::{Array, Date32Array, PrimitiveArray};
452
453 use arrow::datatypes::DataType::{Date32, Decimal128};
454 use rand::distr::uniform::SampleUniform;
455 use std::collections::VecDeque;
456
457 #[allow(clippy::too_many_arguments)]
458 fn make_column_chunks<T: DataType>(
459 column_desc: ColumnDescPtr,
460 encoding: Encoding,
461 num_levels: usize,
462 min_value: T::T,
463 max_value: T::T,
464 def_levels: &mut Vec<i16>,
465 rep_levels: &mut Vec<i16>,
466 values: &mut Vec<T::T>,
467 page_lists: &mut Vec<Vec<Page>>,
468 use_v2: bool,
469 num_chunks: usize,
470 ) where
471 T::T: PartialOrd + SampleUniform + Copy,
472 {
473 for _i in 0..num_chunks {
474 let mut pages = VecDeque::new();
475 let mut data = Vec::new();
476 let mut page_def_levels = Vec::new();
477 let mut page_rep_levels = Vec::new();
478
479 make_pages::<T>(
480 column_desc.clone(),
481 encoding,
482 1,
483 num_levels,
484 min_value,
485 max_value,
486 &mut page_def_levels,
487 &mut page_rep_levels,
488 &mut data,
489 &mut pages,
490 use_v2,
491 );
492
493 def_levels.append(&mut page_def_levels);
494 rep_levels.append(&mut page_rep_levels);
495 values.append(&mut data);
496 page_lists.push(Vec::from(pages));
497 }
498 }
499
500 #[test]
501 fn test_primitive_array_reader_empty_pages() {
502 let message_type = "
504 message test_schema {
505 REQUIRED INT32 leaf;
506 }
507 ";
508
509 let schema = parse_message_type(message_type)
510 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
511 .unwrap();
512
513 let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
514 Box::<EmptyPageIterator>::default(),
515 schema.column(0),
516 None,
517 DEFAULT_BATCH_SIZE,
518 )
519 .unwrap();
520
521 let array = array_reader.next_batch(50).unwrap();
523 assert!(array.is_empty());
524 }
525
526 #[test]
527 fn test_primitive_array_reader_data() {
528 let message_type = "
530 message test_schema {
531 REQUIRED INT32 leaf;
532 }
533 ";
534
535 let schema = parse_message_type(message_type)
536 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
537 .unwrap();
538
539 let column_desc = schema.column(0);
540
541 {
543 let mut data = Vec::new();
544 let mut page_lists = Vec::new();
545 make_column_chunks::<Int32Type>(
546 column_desc.clone(),
547 Encoding::PLAIN,
548 100,
549 1,
550 200,
551 &mut Vec::new(),
552 &mut Vec::new(),
553 &mut data,
554 &mut page_lists,
555 true,
556 2,
557 );
558 let page_iterator = InMemoryPageIterator::new(page_lists);
559
560 let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
561 Box::new(page_iterator),
562 column_desc,
563 None,
564 DEFAULT_BATCH_SIZE,
565 )
566 .unwrap();
567
568 let array = array_reader.next_batch(50).unwrap();
570 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
571
572 assert_eq!(&Int32Array::from(data[0..50].to_vec()), array);
573
574 let array = array_reader.next_batch(100).unwrap();
577 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
578
579 assert_eq!(&Int32Array::from(data[50..150].to_vec()), array);
580
581 let array = array_reader.next_batch(100).unwrap();
583 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
584
585 assert_eq!(&Int32Array::from(data[150..200].to_vec()), array);
586 }
587 }
588
589 macro_rules! test_primitive_array_reader_one_type {
590 (
591 $arrow_parquet_type:ty,
592 $physical_type:expr,
593 $converted_type_str:expr,
594 $result_arrow_type:ty,
595 $result_arrow_cast_type:ty,
596 $result_primitive_type:ty
597 $(, $timezone:expr)?
598 ) => {{
599 let message_type = format!(
600 "
601 message test_schema {{
602 REQUIRED {:?} leaf ({});
603 }}
604 ",
605 $physical_type, $converted_type_str
606 );
607 let schema = parse_message_type(&message_type)
608 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
609 .unwrap();
610
611 let column_desc = schema.column(0);
612
613 {
615 let mut data = Vec::new();
616 let mut page_lists = Vec::new();
617 make_column_chunks::<$arrow_parquet_type>(
618 column_desc.clone(),
619 Encoding::PLAIN,
620 100,
621 1,
622 200,
623 &mut Vec::new(),
624 &mut Vec::new(),
625 &mut data,
626 &mut page_lists,
627 true,
628 2,
629 );
630 let page_iterator = InMemoryPageIterator::new(page_lists);
631 let mut array_reader = PrimitiveArrayReader::<$arrow_parquet_type>::new(
632 Box::new(page_iterator),
633 column_desc.clone(),
634 None,
635 DEFAULT_BATCH_SIZE,
636 )
637 .expect("Unable to get array reader");
638
639 let array = array_reader
640 .next_batch(50)
641 .expect("Unable to get batch from reader");
642
643 let result_data_type = <$result_arrow_type>::DATA_TYPE;
644 let array = array
645 .as_any()
646 .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
647 .expect(
648 format!(
649 "Unable to downcast {:?} to {:?}",
650 array.data_type(),
651 result_data_type
652 )
653 .as_str(),
654 )
655 $(.clone().with_timezone($timezone))?
656 ;
657
658 let expected = PrimitiveArray::<$result_arrow_cast_type>::from(
660 data[0..50]
661 .iter()
662 .map(|x| *x as $result_primitive_type)
663 .collect::<Vec<$result_primitive_type>>(),
664 );
665 let expected = Arc::new(expected) as ArrayRef;
666 let expected = arrow::compute::cast(&expected, &result_data_type)
667 .expect("Unable to cast expected array");
668 assert_eq!(expected.data_type(), &result_data_type);
669 let expected = expected
670 .as_any()
671 .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
672 .expect(
673 format!(
674 "Unable to downcast expected {:?} to {:?}",
675 expected.data_type(),
676 result_data_type
677 )
678 .as_str(),
679 )
680 $(.clone().with_timezone($timezone))?
681 ;
682 assert_eq!(expected, array);
683 }
684 }};
685 }
686
687 #[test]
688 fn test_primitive_array_reader_temporal_types() {
689 test_primitive_array_reader_one_type!(
690 crate::data_type::Int32Type,
691 PhysicalType::INT32,
692 "DATE",
693 arrow::datatypes::Date32Type,
694 arrow::datatypes::Int32Type,
695 i32
696 );
697 test_primitive_array_reader_one_type!(
698 crate::data_type::Int32Type,
699 PhysicalType::INT32,
700 "TIME_MILLIS",
701 arrow::datatypes::Time32MillisecondType,
702 arrow::datatypes::Int32Type,
703 i32
704 );
705 test_primitive_array_reader_one_type!(
706 crate::data_type::Int64Type,
707 PhysicalType::INT64,
708 "TIME_MICROS",
709 arrow::datatypes::Time64MicrosecondType,
710 arrow::datatypes::Int64Type,
711 i64
712 );
713 test_primitive_array_reader_one_type!(
714 crate::data_type::Int64Type,
715 PhysicalType::INT64,
716 "TIMESTAMP_MILLIS",
717 arrow::datatypes::TimestampMillisecondType,
718 arrow::datatypes::Int64Type,
719 i64,
720 "UTC"
721 );
722 test_primitive_array_reader_one_type!(
723 crate::data_type::Int64Type,
724 PhysicalType::INT64,
725 "TIMESTAMP_MICROS",
726 arrow::datatypes::TimestampMicrosecondType,
727 arrow::datatypes::Int64Type,
728 i64,
729 "UTC"
730 );
731 }
732
733 #[test]
734 fn test_primitive_array_reader_def_and_rep_levels() {
735 let message_type = "
737 message test_schema {
738 REPEATED Group test_mid {
739 OPTIONAL INT32 leaf;
740 }
741 }
742 ";
743
744 let schema = parse_message_type(message_type)
745 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
746 .unwrap();
747
748 let column_desc = schema.column(0);
749
750 {
752 let mut def_levels = Vec::new();
753 let mut rep_levels = Vec::new();
754 let mut page_lists = Vec::new();
755 make_column_chunks::<Int32Type>(
756 column_desc.clone(),
757 Encoding::PLAIN,
758 100,
759 1,
760 200,
761 &mut def_levels,
762 &mut rep_levels,
763 &mut Vec::new(),
764 &mut page_lists,
765 true,
766 2,
767 );
768
769 let page_iterator = InMemoryPageIterator::new(page_lists);
770
771 let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
772 Box::new(page_iterator),
773 column_desc,
774 None,
775 DEFAULT_BATCH_SIZE,
776 )
777 .unwrap();
778
779 let mut accu_len: usize = 0;
780
781 let array = array_reader.next_batch(50).unwrap();
783 assert_eq!(
784 Some(&def_levels[accu_len..(accu_len + array.len())]),
785 array_reader.get_def_levels()
786 );
787 assert_eq!(
788 Some(&rep_levels[accu_len..(accu_len + array.len())]),
789 array_reader.get_rep_levels()
790 );
791 accu_len += array.len();
792
793 let array = array_reader.next_batch(100).unwrap();
796 assert_eq!(
797 Some(&def_levels[accu_len..(accu_len + array.len())]),
798 array_reader.get_def_levels()
799 );
800 assert_eq!(
801 Some(&rep_levels[accu_len..(accu_len + array.len())]),
802 array_reader.get_rep_levels()
803 );
804 accu_len += array.len();
805
806 let array = array_reader.next_batch(100).unwrap();
808 assert_eq!(
809 Some(&def_levels[accu_len..(accu_len + array.len())]),
810 array_reader.get_def_levels()
811 );
812 assert_eq!(
813 Some(&rep_levels[accu_len..(accu_len + array.len())]),
814 array_reader.get_rep_levels()
815 );
816 }
817 }
818
819 #[test]
820 fn test_primitive_array_reader_decimal_types() {
821 let message_type = "
823 message test_schema {
824 REQUIRED INT32 decimal1 (DECIMAL(8,2));
825 }
826 ";
827 let schema = parse_message_type(message_type)
828 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
829 .unwrap();
830 let column_desc = schema.column(0);
831
832 {
834 let mut data = Vec::new();
835 let mut page_lists = Vec::new();
836 make_column_chunks::<Int32Type>(
837 column_desc.clone(),
838 Encoding::PLAIN,
839 100,
840 -99999999,
841 99999999,
842 &mut Vec::new(),
843 &mut Vec::new(),
844 &mut data,
845 &mut page_lists,
846 true,
847 2,
848 );
849 let page_iterator = InMemoryPageIterator::new(page_lists);
850
851 let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
852 Box::new(page_iterator),
853 column_desc,
854 None,
855 DEFAULT_BATCH_SIZE,
856 )
857 .unwrap();
858
859 let array = array_reader.next_batch(50).unwrap();
862 assert_eq!(array.data_type(), &Decimal128(8, 2));
863 let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
864 let data_decimal_array = data[0..50]
865 .iter()
866 .copied()
867 .map(|v| Some(v as i128))
868 .collect::<Decimal128Array>()
869 .with_precision_and_scale(8, 2)
870 .unwrap();
871 assert_eq!(array, &data_decimal_array);
872
873 let data_decimal_array = data[0..50]
875 .iter()
876 .copied()
877 .map(|v| Some(v as i128))
878 .collect::<Decimal128Array>()
879 .with_precision_and_scale(9, 0)
880 .unwrap();
881 assert_ne!(array, &data_decimal_array)
882 }
883
884 let message_type = "
886 message test_schema {
887 REQUIRED INT64 decimal1 (DECIMAL(18,4));
888 }
889 ";
890 let schema = parse_message_type(message_type)
891 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
892 .unwrap();
893 let column_desc = schema.column(0);
894
895 {
897 let mut data = Vec::new();
898 let mut page_lists = Vec::new();
899 make_column_chunks::<Int64Type>(
900 column_desc.clone(),
901 Encoding::PLAIN,
902 100,
903 -999999999999999999,
904 999999999999999999,
905 &mut Vec::new(),
906 &mut Vec::new(),
907 &mut data,
908 &mut page_lists,
909 true,
910 2,
911 );
912 let page_iterator = InMemoryPageIterator::new(page_lists);
913
914 let mut array_reader = PrimitiveArrayReader::<Int64Type>::new(
915 Box::new(page_iterator),
916 column_desc,
917 None,
918 DEFAULT_BATCH_SIZE,
919 )
920 .unwrap();
921
922 let array = array_reader.next_batch(50).unwrap();
925 assert_eq!(array.data_type(), &Decimal128(18, 4));
926 let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
927 let data_decimal_array = data[0..50]
928 .iter()
929 .copied()
930 .map(|v| Some(v as i128))
931 .collect::<Decimal128Array>()
932 .with_precision_and_scale(18, 4)
933 .unwrap();
934 assert_eq!(array, &data_decimal_array);
935
936 let data_decimal_array = data[0..50]
938 .iter()
939 .copied()
940 .map(|v| Some(v as i128))
941 .collect::<Decimal128Array>()
942 .with_precision_and_scale(34, 0)
943 .unwrap();
944 assert_ne!(array, &data_decimal_array)
945 }
946 }
947
948 #[test]
949 fn test_primitive_array_reader_date32_type() {
950 let message_type = "
952 message test_schema {
953 REQUIRED INT32 date1 (DATE);
954 }
955 ";
956 let schema = parse_message_type(message_type)
957 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
958 .unwrap();
959 let column_desc = schema.column(0);
960
961 {
963 let mut data = Vec::new();
964 let mut page_lists = Vec::new();
965 make_column_chunks::<Int32Type>(
966 column_desc.clone(),
967 Encoding::PLAIN,
968 100,
969 -99999999,
970 99999999,
971 &mut Vec::new(),
972 &mut Vec::new(),
973 &mut data,
974 &mut page_lists,
975 true,
976 2,
977 );
978 let page_iterator = InMemoryPageIterator::new(page_lists);
979
980 let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
981 Box::new(page_iterator),
982 column_desc,
983 None,
984 DEFAULT_BATCH_SIZE,
985 )
986 .unwrap();
987
988 let array = array_reader.next_batch(50).unwrap();
991 assert_eq!(array.data_type(), &Date32);
992 let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
993 let data_date_array = data[0..50]
994 .iter()
995 .copied()
996 .map(Some)
997 .collect::<Date32Array>();
998 assert_eq!(array, &data_date_array);
999 }
1000 }
1001}