1use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
19use crate::arrow::record_reader::RecordReader;
20use crate::arrow::schema::parquet_to_arrow_field;
21use crate::basic::Type as PhysicalType;
22use crate::column::page::PageIterator;
23use crate::data_type::{DataType, Int96};
24use crate::errors::{ParquetError, Result};
25use crate::schema::types::ColumnDescPtr;
26use arrow_array::Decimal256Array;
27use arrow_array::{
28 builder::TimestampNanosecondBufferBuilder, ArrayRef, BooleanArray, Decimal128Array,
29 Float32Array, Float64Array, Int32Array, Int64Array, TimestampNanosecondArray, UInt32Array,
30 UInt64Array,
31};
32use arrow_buffer::{i256, BooleanBuffer, Buffer};
33use arrow_data::ArrayDataBuilder;
34use arrow_schema::{DataType as ArrowType, TimeUnit};
35use std::any::Any;
36use std::sync::Arc;
37
38pub trait IntoBuffer {
40 fn into_buffer(self) -> Buffer;
41}
42
43macro_rules! native_buffer {
44 ($($t:ty),*) => {
45 $(impl IntoBuffer for Vec<$t> {
46 fn into_buffer(self) -> Buffer {
47 Buffer::from_vec(self)
48 }
49 })*
50 };
51}
52native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64);
53
54impl IntoBuffer for Vec<bool> {
55 fn into_buffer(self) -> Buffer {
56 BooleanBuffer::from_iter(self).into_inner()
57 }
58}
59
60impl IntoBuffer for Vec<Int96> {
61 fn into_buffer(self) -> Buffer {
62 let mut builder = TimestampNanosecondBufferBuilder::new(self.len());
63 for v in self {
64 builder.append(v.to_nanos())
65 }
66 builder.finish()
67 }
68}
69
70pub struct PrimitiveArrayReader<T>
73where
74 T: DataType,
75 T::T: Copy + Default,
76 Vec<T::T>: IntoBuffer,
77{
78 data_type: ArrowType,
79 pages: Box<dyn PageIterator>,
80 def_levels_buffer: Option<Vec<i16>>,
81 rep_levels_buffer: Option<Vec<i16>>,
82 record_reader: RecordReader<T>,
83}
84
85impl<T> PrimitiveArrayReader<T>
86where
87 T: DataType,
88 T::T: Copy + Default,
89 Vec<T::T>: IntoBuffer,
90{
91 pub fn new(
93 pages: Box<dyn PageIterator>,
94 column_desc: ColumnDescPtr,
95 arrow_type: Option<ArrowType>,
96 ) -> Result<Self> {
97 let data_type = match arrow_type {
99 Some(t) => t,
100 None => parquet_to_arrow_field(column_desc.as_ref())?
101 .data_type()
102 .clone(),
103 };
104
105 let record_reader = RecordReader::<T>::new(column_desc);
106
107 Ok(Self {
108 data_type,
109 pages,
110 def_levels_buffer: None,
111 rep_levels_buffer: None,
112 record_reader,
113 })
114 }
115}
116
117impl<T> ArrayReader for PrimitiveArrayReader<T>
119where
120 T: DataType,
121 T::T: Copy + Default,
122 Vec<T::T>: IntoBuffer,
123{
124 fn as_any(&self) -> &dyn Any {
125 self
126 }
127
128 fn get_data_type(&self) -> &ArrowType {
130 &self.data_type
131 }
132
133 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
134 read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
135 }
136
137 fn consume_batch(&mut self) -> Result<ArrayRef> {
138 let target_type = &self.data_type;
139 let arrow_data_type = match T::get_physical_type() {
140 PhysicalType::BOOLEAN => ArrowType::Boolean,
141 PhysicalType::INT32 => {
142 match target_type {
143 ArrowType::UInt32 => {
144 ArrowType::UInt32
147 }
148 _ => ArrowType::Int32,
149 }
150 }
151 PhysicalType::INT64 => {
152 match target_type {
153 ArrowType::UInt64 => {
154 ArrowType::UInt64
157 }
158 _ => ArrowType::Int64,
159 }
160 }
161 PhysicalType::FLOAT => ArrowType::Float32,
162 PhysicalType::DOUBLE => ArrowType::Float64,
163 PhysicalType::INT96 => match target_type {
164 ArrowType::Timestamp(TimeUnit::Nanosecond, _) => target_type.clone(),
165 _ => unreachable!("INT96 must be timestamp nanosecond"),
166 },
167 PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
168 unreachable!("PrimitiveArrayReaders don't support complex physical types");
169 }
170 };
171
172 let record_data = self.record_reader.consume_record_data().into_buffer();
176
177 let array_data = ArrayDataBuilder::new(arrow_data_type)
178 .len(self.record_reader.num_values())
179 .add_buffer(record_data)
180 .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
181
182 let array_data = unsafe { array_data.build_unchecked() };
183 let array: ArrayRef = match T::get_physical_type() {
184 PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)),
185 PhysicalType::INT32 => match array_data.data_type() {
186 ArrowType::UInt32 => Arc::new(UInt32Array::from(array_data)),
187 ArrowType::Int32 => Arc::new(Int32Array::from(array_data)),
188 _ => unreachable!(),
189 },
190 PhysicalType::INT64 => match array_data.data_type() {
191 ArrowType::UInt64 => Arc::new(UInt64Array::from(array_data)),
192 ArrowType::Int64 => Arc::new(Int64Array::from(array_data)),
193 _ => unreachable!(),
194 },
195 PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)),
196 PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)),
197 PhysicalType::INT96 => Arc::new(TimestampNanosecondArray::from(array_data)),
198 PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
199 unreachable!("PrimitiveArrayReaders don't support complex physical types");
200 }
201 };
202
203 let array = match target_type {
214 ArrowType::Date64 if *(array.data_type()) == ArrowType::Int32 => {
215 let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
217 arrow_cast::cast(&a, target_type)?
218 }
219 ArrowType::Decimal128(p, s) => {
220 let array = match array.data_type() {
224 ArrowType::Int32 => array
225 .as_any()
226 .downcast_ref::<Int32Array>()
227 .unwrap()
228 .unary(|i| i as i128)
229 as Decimal128Array,
230 ArrowType::Int64 => array
231 .as_any()
232 .downcast_ref::<Int64Array>()
233 .unwrap()
234 .unary(|i| i as i128)
235 as Decimal128Array,
236 _ => {
237 return Err(arrow_err!(
238 "Cannot convert {:?} to decimal",
239 array.data_type()
240 ));
241 }
242 }
243 .with_precision_and_scale(*p, *s)?;
244
245 Arc::new(array) as ArrayRef
246 }
247 ArrowType::Decimal256(p, s) => {
248 let array = match array.data_type() {
250 ArrowType::Int32 => array
251 .as_any()
252 .downcast_ref::<Int32Array>()
253 .unwrap()
254 .unary(|i| i256::from_i128(i as i128))
255 as Decimal256Array,
256 ArrowType::Int64 => array
257 .as_any()
258 .downcast_ref::<Int64Array>()
259 .unwrap()
260 .unary(|i| i256::from_i128(i as i128))
261 as Decimal256Array,
262 _ => {
263 return Err(arrow_err!(
264 "Cannot convert {:?} to decimal",
265 array.data_type()
266 ));
267 }
268 }
269 .with_precision_and_scale(*p, *s)?;
270
271 Arc::new(array) as ArrayRef
272 }
273 ArrowType::Dictionary(_, value_type) => match value_type.as_ref() {
274 ArrowType::Decimal128(p, s) => {
275 let array = match array.data_type() {
276 ArrowType::Int32 => array
277 .as_any()
278 .downcast_ref::<Int32Array>()
279 .unwrap()
280 .unary(|i| i as i128)
281 as Decimal128Array,
282 ArrowType::Int64 => array
283 .as_any()
284 .downcast_ref::<Int64Array>()
285 .unwrap()
286 .unary(|i| i as i128)
287 as Decimal128Array,
288 _ => {
289 return Err(arrow_err!(
290 "Cannot convert {:?} to decimal dictionary",
291 array.data_type()
292 ));
293 }
294 }
295 .with_precision_and_scale(*p, *s)?;
296
297 arrow_cast::cast(&array, target_type)?
298 }
299 ArrowType::Decimal256(p, s) => {
300 let array = match array.data_type() {
301 ArrowType::Int32 => array
302 .as_any()
303 .downcast_ref::<Int32Array>()
304 .unwrap()
305 .unary(i256::from)
306 as Decimal256Array,
307 ArrowType::Int64 => array
308 .as_any()
309 .downcast_ref::<Int64Array>()
310 .unwrap()
311 .unary(i256::from)
312 as Decimal256Array,
313 _ => {
314 return Err(arrow_err!(
315 "Cannot convert {:?} to decimal dictionary",
316 array.data_type()
317 ));
318 }
319 }
320 .with_precision_and_scale(*p, *s)?;
321
322 arrow_cast::cast(&array, target_type)?
323 }
324 _ => arrow_cast::cast(&array, target_type)?,
325 },
326 _ => arrow_cast::cast(&array, target_type)?,
327 };
328
329 self.def_levels_buffer = self.record_reader.consume_def_levels();
331 self.rep_levels_buffer = self.record_reader.consume_rep_levels();
332 self.record_reader.reset();
333 Ok(array)
334 }
335
336 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
337 skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
338 }
339
340 fn get_def_levels(&self) -> Option<&[i16]> {
341 self.def_levels_buffer.as_deref()
342 }
343
344 fn get_rep_levels(&self) -> Option<&[i16]> {
345 self.rep_levels_buffer.as_deref()
346 }
347}
348
349#[cfg(test)]
350mod tests {
351 use super::*;
352 use crate::arrow::array_reader::test_util::EmptyPageIterator;
353 use crate::basic::Encoding;
354 use crate::column::page::Page;
355 use crate::data_type::{Int32Type, Int64Type};
356 use crate::schema::parser::parse_message_type;
357 use crate::schema::types::SchemaDescriptor;
358 use crate::util::test_common::rand_gen::make_pages;
359 use crate::util::InMemoryPageIterator;
360 use arrow::datatypes::ArrowPrimitiveType;
361 use arrow_array::{Array, Date32Array, PrimitiveArray};
362
363 use arrow::datatypes::DataType::{Date32, Decimal128};
364 use rand::distributions::uniform::SampleUniform;
365 use std::collections::VecDeque;
366
367 #[allow(clippy::too_many_arguments)]
368 fn make_column_chunks<T: DataType>(
369 column_desc: ColumnDescPtr,
370 encoding: Encoding,
371 num_levels: usize,
372 min_value: T::T,
373 max_value: T::T,
374 def_levels: &mut Vec<i16>,
375 rep_levels: &mut Vec<i16>,
376 values: &mut Vec<T::T>,
377 page_lists: &mut Vec<Vec<Page>>,
378 use_v2: bool,
379 num_chunks: usize,
380 ) where
381 T::T: PartialOrd + SampleUniform + Copy,
382 {
383 for _i in 0..num_chunks {
384 let mut pages = VecDeque::new();
385 let mut data = Vec::new();
386 let mut page_def_levels = Vec::new();
387 let mut page_rep_levels = Vec::new();
388
389 make_pages::<T>(
390 column_desc.clone(),
391 encoding,
392 1,
393 num_levels,
394 min_value,
395 max_value,
396 &mut page_def_levels,
397 &mut page_rep_levels,
398 &mut data,
399 &mut pages,
400 use_v2,
401 );
402
403 def_levels.append(&mut page_def_levels);
404 rep_levels.append(&mut page_rep_levels);
405 values.append(&mut data);
406 page_lists.push(Vec::from(pages));
407 }
408 }
409
410 #[test]
411 fn test_primitive_array_reader_empty_pages() {
412 let message_type = "
414 message test_schema {
415 REQUIRED INT32 leaf;
416 }
417 ";
418
419 let schema = parse_message_type(message_type)
420 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
421 .unwrap();
422
423 let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
424 Box::<EmptyPageIterator>::default(),
425 schema.column(0),
426 None,
427 )
428 .unwrap();
429
430 let array = array_reader.next_batch(50).unwrap();
432 assert!(array.is_empty());
433 }
434
435 #[test]
436 fn test_primitive_array_reader_data() {
437 let message_type = "
439 message test_schema {
440 REQUIRED INT32 leaf;
441 }
442 ";
443
444 let schema = parse_message_type(message_type)
445 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
446 .unwrap();
447
448 let column_desc = schema.column(0);
449
450 {
452 let mut data = Vec::new();
453 let mut page_lists = Vec::new();
454 make_column_chunks::<Int32Type>(
455 column_desc.clone(),
456 Encoding::PLAIN,
457 100,
458 1,
459 200,
460 &mut Vec::new(),
461 &mut Vec::new(),
462 &mut data,
463 &mut page_lists,
464 true,
465 2,
466 );
467 let page_iterator = InMemoryPageIterator::new(page_lists);
468
469 let mut array_reader =
470 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
471 .unwrap();
472
473 let array = array_reader.next_batch(50).unwrap();
475 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
476
477 assert_eq!(&Int32Array::from(data[0..50].to_vec()), array);
478
479 let array = array_reader.next_batch(100).unwrap();
482 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
483
484 assert_eq!(&Int32Array::from(data[50..150].to_vec()), array);
485
486 let array = array_reader.next_batch(100).unwrap();
488 let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
489
490 assert_eq!(&Int32Array::from(data[150..200].to_vec()), array);
491 }
492 }
493
494 macro_rules! test_primitive_array_reader_one_type {
495 (
496 $arrow_parquet_type:ty,
497 $physical_type:expr,
498 $converted_type_str:expr,
499 $result_arrow_type:ty,
500 $result_arrow_cast_type:ty,
501 $result_primitive_type:ty
502 $(, $timezone:expr)?
503 ) => {{
504 let message_type = format!(
505 "
506 message test_schema {{
507 REQUIRED {:?} leaf ({});
508 }}
509 ",
510 $physical_type, $converted_type_str
511 );
512 let schema = parse_message_type(&message_type)
513 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
514 .unwrap();
515
516 let column_desc = schema.column(0);
517
518 {
520 let mut data = Vec::new();
521 let mut page_lists = Vec::new();
522 make_column_chunks::<$arrow_parquet_type>(
523 column_desc.clone(),
524 Encoding::PLAIN,
525 100,
526 1,
527 200,
528 &mut Vec::new(),
529 &mut Vec::new(),
530 &mut data,
531 &mut page_lists,
532 true,
533 2,
534 );
535 let page_iterator = InMemoryPageIterator::new(page_lists);
536 let mut array_reader = PrimitiveArrayReader::<$arrow_parquet_type>::new(
537 Box::new(page_iterator),
538 column_desc.clone(),
539 None,
540 )
541 .expect("Unable to get array reader");
542
543 let array = array_reader
544 .next_batch(50)
545 .expect("Unable to get batch from reader");
546
547 let result_data_type = <$result_arrow_type>::DATA_TYPE;
548 let array = array
549 .as_any()
550 .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
551 .expect(
552 format!(
553 "Unable to downcast {:?} to {:?}",
554 array.data_type(),
555 result_data_type
556 )
557 .as_str(),
558 )
559 $(.clone().with_timezone($timezone))?
560 ;
561
562 let expected = PrimitiveArray::<$result_arrow_cast_type>::from(
564 data[0..50]
565 .iter()
566 .map(|x| *x as $result_primitive_type)
567 .collect::<Vec<$result_primitive_type>>(),
568 );
569 let expected = Arc::new(expected) as ArrayRef;
570 let expected = arrow::compute::cast(&expected, &result_data_type)
571 .expect("Unable to cast expected array");
572 assert_eq!(expected.data_type(), &result_data_type);
573 let expected = expected
574 .as_any()
575 .downcast_ref::<PrimitiveArray<$result_arrow_type>>()
576 .expect(
577 format!(
578 "Unable to downcast expected {:?} to {:?}",
579 expected.data_type(),
580 result_data_type
581 )
582 .as_str(),
583 )
584 $(.clone().with_timezone($timezone))?
585 ;
586 assert_eq!(expected, array);
587 }
588 }};
589 }
590
591 #[test]
592 fn test_primitive_array_reader_temporal_types() {
593 test_primitive_array_reader_one_type!(
594 crate::data_type::Int32Type,
595 PhysicalType::INT32,
596 "DATE",
597 arrow::datatypes::Date32Type,
598 arrow::datatypes::Int32Type,
599 i32
600 );
601 test_primitive_array_reader_one_type!(
602 crate::data_type::Int32Type,
603 PhysicalType::INT32,
604 "TIME_MILLIS",
605 arrow::datatypes::Time32MillisecondType,
606 arrow::datatypes::Int32Type,
607 i32
608 );
609 test_primitive_array_reader_one_type!(
610 crate::data_type::Int64Type,
611 PhysicalType::INT64,
612 "TIME_MICROS",
613 arrow::datatypes::Time64MicrosecondType,
614 arrow::datatypes::Int64Type,
615 i64
616 );
617 test_primitive_array_reader_one_type!(
618 crate::data_type::Int64Type,
619 PhysicalType::INT64,
620 "TIMESTAMP_MILLIS",
621 arrow::datatypes::TimestampMillisecondType,
622 arrow::datatypes::Int64Type,
623 i64,
624 "UTC"
625 );
626 test_primitive_array_reader_one_type!(
627 crate::data_type::Int64Type,
628 PhysicalType::INT64,
629 "TIMESTAMP_MICROS",
630 arrow::datatypes::TimestampMicrosecondType,
631 arrow::datatypes::Int64Type,
632 i64,
633 "UTC"
634 );
635 }
636
637 #[test]
638 fn test_primitive_array_reader_def_and_rep_levels() {
639 let message_type = "
641 message test_schema {
642 REPEATED Group test_mid {
643 OPTIONAL INT32 leaf;
644 }
645 }
646 ";
647
648 let schema = parse_message_type(message_type)
649 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
650 .unwrap();
651
652 let column_desc = schema.column(0);
653
654 {
656 let mut def_levels = Vec::new();
657 let mut rep_levels = Vec::new();
658 let mut page_lists = Vec::new();
659 make_column_chunks::<Int32Type>(
660 column_desc.clone(),
661 Encoding::PLAIN,
662 100,
663 1,
664 200,
665 &mut def_levels,
666 &mut rep_levels,
667 &mut Vec::new(),
668 &mut page_lists,
669 true,
670 2,
671 );
672
673 let page_iterator = InMemoryPageIterator::new(page_lists);
674
675 let mut array_reader =
676 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
677 .unwrap();
678
679 let mut accu_len: usize = 0;
680
681 let array = array_reader.next_batch(50).unwrap();
683 assert_eq!(
684 Some(&def_levels[accu_len..(accu_len + array.len())]),
685 array_reader.get_def_levels()
686 );
687 assert_eq!(
688 Some(&rep_levels[accu_len..(accu_len + array.len())]),
689 array_reader.get_rep_levels()
690 );
691 accu_len += array.len();
692
693 let array = array_reader.next_batch(100).unwrap();
696 assert_eq!(
697 Some(&def_levels[accu_len..(accu_len + array.len())]),
698 array_reader.get_def_levels()
699 );
700 assert_eq!(
701 Some(&rep_levels[accu_len..(accu_len + array.len())]),
702 array_reader.get_rep_levels()
703 );
704 accu_len += array.len();
705
706 let array = array_reader.next_batch(100).unwrap();
708 assert_eq!(
709 Some(&def_levels[accu_len..(accu_len + array.len())]),
710 array_reader.get_def_levels()
711 );
712 assert_eq!(
713 Some(&rep_levels[accu_len..(accu_len + array.len())]),
714 array_reader.get_rep_levels()
715 );
716 }
717 }
718
719 #[test]
720 fn test_primitive_array_reader_decimal_types() {
721 let message_type = "
723 message test_schema {
724 REQUIRED INT32 decimal1 (DECIMAL(8,2));
725 }
726 ";
727 let schema = parse_message_type(message_type)
728 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
729 .unwrap();
730 let column_desc = schema.column(0);
731
732 {
734 let mut data = Vec::new();
735 let mut page_lists = Vec::new();
736 make_column_chunks::<Int32Type>(
737 column_desc.clone(),
738 Encoding::PLAIN,
739 100,
740 -99999999,
741 99999999,
742 &mut Vec::new(),
743 &mut Vec::new(),
744 &mut data,
745 &mut page_lists,
746 true,
747 2,
748 );
749 let page_iterator = InMemoryPageIterator::new(page_lists);
750
751 let mut array_reader =
752 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
753 .unwrap();
754
755 let array = array_reader.next_batch(50).unwrap();
758 assert_eq!(array.data_type(), &Decimal128(8, 2));
759 let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
760 let data_decimal_array = data[0..50]
761 .iter()
762 .copied()
763 .map(|v| Some(v as i128))
764 .collect::<Decimal128Array>()
765 .with_precision_and_scale(8, 2)
766 .unwrap();
767 assert_eq!(array, &data_decimal_array);
768
769 let data_decimal_array = data[0..50]
771 .iter()
772 .copied()
773 .map(|v| Some(v as i128))
774 .collect::<Decimal128Array>()
775 .with_precision_and_scale(9, 0)
776 .unwrap();
777 assert_ne!(array, &data_decimal_array)
778 }
779
780 let message_type = "
782 message test_schema {
783 REQUIRED INT64 decimal1 (DECIMAL(18,4));
784 }
785 ";
786 let schema = parse_message_type(message_type)
787 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
788 .unwrap();
789 let column_desc = schema.column(0);
790
791 {
793 let mut data = Vec::new();
794 let mut page_lists = Vec::new();
795 make_column_chunks::<Int64Type>(
796 column_desc.clone(),
797 Encoding::PLAIN,
798 100,
799 -999999999999999999,
800 999999999999999999,
801 &mut Vec::new(),
802 &mut Vec::new(),
803 &mut data,
804 &mut page_lists,
805 true,
806 2,
807 );
808 let page_iterator = InMemoryPageIterator::new(page_lists);
809
810 let mut array_reader =
811 PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
812 .unwrap();
813
814 let array = array_reader.next_batch(50).unwrap();
817 assert_eq!(array.data_type(), &Decimal128(18, 4));
818 let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
819 let data_decimal_array = data[0..50]
820 .iter()
821 .copied()
822 .map(|v| Some(v as i128))
823 .collect::<Decimal128Array>()
824 .with_precision_and_scale(18, 4)
825 .unwrap();
826 assert_eq!(array, &data_decimal_array);
827
828 let data_decimal_array = data[0..50]
830 .iter()
831 .copied()
832 .map(|v| Some(v as i128))
833 .collect::<Decimal128Array>()
834 .with_precision_and_scale(34, 0)
835 .unwrap();
836 assert_ne!(array, &data_decimal_array)
837 }
838 }
839
840 #[test]
841 fn test_primitive_array_reader_date32_type() {
842 let message_type = "
844 message test_schema {
845 REQUIRED INT32 date1 (DATE);
846 }
847 ";
848 let schema = parse_message_type(message_type)
849 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
850 .unwrap();
851 let column_desc = schema.column(0);
852
853 {
855 let mut data = Vec::new();
856 let mut page_lists = Vec::new();
857 make_column_chunks::<Int32Type>(
858 column_desc.clone(),
859 Encoding::PLAIN,
860 100,
861 -99999999,
862 99999999,
863 &mut Vec::new(),
864 &mut Vec::new(),
865 &mut data,
866 &mut page_lists,
867 true,
868 2,
869 );
870 let page_iterator = InMemoryPageIterator::new(page_lists);
871
872 let mut array_reader =
873 PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
874 .unwrap();
875
876 let array = array_reader.next_batch(50).unwrap();
879 assert_eq!(array.data_type(), &Date32);
880 let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
881 let data_date_array = data[0..50]
882 .iter()
883 .copied()
884 .map(Some)
885 .collect::<Date32Array>();
886 assert_eq!(array, &data_date_array);
887 }
888 }
889}