1use bytes::Bytes;
21
22use super::page::{Page, PageReader};
23use crate::basic::*;
24use crate::column::reader::decoder::{
25 ColumnValueDecoder, ColumnValueDecoderImpl, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
26 RepetitionLevelDecoder, RepetitionLevelDecoderImpl,
27};
28use crate::data_type::*;
29use crate::errors::{ParquetError, Result};
30use crate::schema::types::ColumnDescPtr;
31use crate::util::bit_util::{ceil, num_required_bits, read_num_bytes};
32
33pub(crate) mod decoder;
34
35pub enum ColumnReader {
37 BoolColumnReader(ColumnReaderImpl<BoolType>),
39 Int32ColumnReader(ColumnReaderImpl<Int32Type>),
41 Int64ColumnReader(ColumnReaderImpl<Int64Type>),
43 Int96ColumnReader(ColumnReaderImpl<Int96Type>),
45 FloatColumnReader(ColumnReaderImpl<FloatType>),
47 DoubleColumnReader(ColumnReaderImpl<DoubleType>),
49 ByteArrayColumnReader(ColumnReaderImpl<ByteArrayType>),
51 FixedLenByteArrayColumnReader(ColumnReaderImpl<FixedLenByteArrayType>),
53}
54
55pub fn get_column_reader(
58 col_descr: ColumnDescPtr,
59 col_page_reader: Box<dyn PageReader>,
60) -> ColumnReader {
61 match col_descr.physical_type() {
62 Type::BOOLEAN => {
63 ColumnReader::BoolColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
64 }
65 Type::INT32 => {
66 ColumnReader::Int32ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
67 }
68 Type::INT64 => {
69 ColumnReader::Int64ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
70 }
71 Type::INT96 => {
72 ColumnReader::Int96ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
73 }
74 Type::FLOAT => {
75 ColumnReader::FloatColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
76 }
77 Type::DOUBLE => {
78 ColumnReader::DoubleColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
79 }
80 Type::BYTE_ARRAY => {
81 ColumnReader::ByteArrayColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
82 }
83 Type::FIXED_LEN_BYTE_ARRAY => ColumnReader::FixedLenByteArrayColumnReader(
84 ColumnReaderImpl::new(col_descr, col_page_reader),
85 ),
86 }
87}
88
89pub fn get_typed_column_reader<T: DataType>(col_reader: ColumnReader) -> ColumnReaderImpl<T> {
94 T::get_column_reader(col_reader).unwrap_or_else(|| {
95 panic!(
96 "Failed to convert column reader into a typed column reader for `{}` type",
97 T::get_physical_type()
98 )
99 })
100}
101
102pub type ColumnReaderImpl<T> = GenericColumnReader<
104 RepetitionLevelDecoderImpl,
105 DefinitionLevelDecoderImpl,
106 ColumnValueDecoderImpl<T>,
107>;
108
109pub struct GenericColumnReader<R, D, V> {
115 descr: ColumnDescPtr,
116
117 page_reader: Box<dyn PageReader>,
118
119 num_buffered_values: usize,
121
122 num_decoded_values: usize,
125
126 has_record_delimiter: bool,
128
129 def_level_decoder: Option<D>,
131
132 rep_level_decoder: Option<R>,
134
135 values_decoder: V,
137}
138
139impl<V> GenericColumnReader<RepetitionLevelDecoderImpl, DefinitionLevelDecoderImpl, V>
140where
141 V: ColumnValueDecoder,
142{
143 pub fn new(descr: ColumnDescPtr, page_reader: Box<dyn PageReader>) -> Self {
145 let values_decoder = V::new(&descr);
146
147 let def_level_decoder = (descr.max_def_level() != 0)
148 .then(|| DefinitionLevelDecoderImpl::new(descr.max_def_level()));
149
150 let rep_level_decoder = (descr.max_rep_level() != 0)
151 .then(|| RepetitionLevelDecoderImpl::new(descr.max_rep_level()));
152
153 Self::new_with_decoders(
154 descr,
155 page_reader,
156 values_decoder,
157 def_level_decoder,
158 rep_level_decoder,
159 )
160 }
161}
162
163impl<R, D, V> GenericColumnReader<R, D, V>
164where
165 R: RepetitionLevelDecoder,
166 D: DefinitionLevelDecoder,
167 V: ColumnValueDecoder,
168{
169 pub(crate) fn new_with_decoders(
170 descr: ColumnDescPtr,
171 page_reader: Box<dyn PageReader>,
172 values_decoder: V,
173 def_level_decoder: Option<D>,
174 rep_level_decoder: Option<R>,
175 ) -> Self {
176 Self {
177 descr,
178 def_level_decoder,
179 rep_level_decoder,
180 page_reader,
181 num_buffered_values: 0,
182 num_decoded_values: 0,
183 values_decoder,
184 has_record_delimiter: false,
185 }
186 }
187
188 pub fn read_records(
203 &mut self,
204 max_records: usize,
205 mut def_levels: Option<&mut D::Buffer>,
206 mut rep_levels: Option<&mut R::Buffer>,
207 values: &mut V::Buffer,
208 ) -> Result<(usize, usize, usize)> {
209 let mut total_records_read = 0;
210 let mut total_levels_read = 0;
211 let mut total_values_read = 0;
212
213 while total_records_read < max_records && self.has_next()? {
214 let remaining_records = max_records - total_records_read;
215 let remaining_levels = self.num_buffered_values - self.num_decoded_values;
216
217 let (records_read, levels_to_read) = match self.rep_level_decoder.as_mut() {
218 Some(reader) => {
219 let out = rep_levels
220 .as_mut()
221 .ok_or_else(|| general_err!("must specify repetition levels"))?;
222
223 let (mut records_read, levels_read) =
224 reader.read_rep_levels(out, remaining_records, remaining_levels)?;
225
226 if records_read == 0 && levels_read == 0 {
227 return Err(general_err!(
229 "Insufficient repetition levels read from column"
230 ));
231 }
232 if levels_read == remaining_levels && self.has_record_delimiter {
233 assert!(records_read < remaining_records); records_read += reader.flush_partial() as usize;
237 }
238 (records_read, levels_read)
239 }
240 None => {
241 let min = remaining_records.min(remaining_levels);
242 (min, min)
243 }
244 };
245
246 let values_to_read = match self.def_level_decoder.as_mut() {
247 Some(reader) => {
248 let out = def_levels
249 .as_mut()
250 .ok_or_else(|| general_err!("must specify definition levels"))?;
251
252 let (values_read, levels_read) = reader.read_def_levels(out, levels_to_read)?;
253
254 if levels_read != levels_to_read {
255 return Err(general_err!(
256 "insufficient definition levels read from column - expected {levels_to_read}, got {levels_read}"
257 ));
258 }
259
260 values_read
261 }
262 None => levels_to_read,
263 };
264
265 let values_read = self.values_decoder.read(values, values_to_read)?;
266
267 if values_read != values_to_read {
268 return Err(general_err!(
269 "insufficient values read from column - expected: {values_to_read}, got: {values_read}",
270 ));
271 }
272
273 self.num_decoded_values += levels_to_read;
274 total_records_read += records_read;
275 total_levels_read += levels_to_read;
276 total_values_read += values_read;
277 }
278
279 Ok((total_records_read, total_values_read, total_levels_read))
280 }
281
282 pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
288 let mut remaining_records = num_records;
289 while remaining_records != 0 {
290 if self.num_buffered_values == self.num_decoded_values {
291 let metadata = match self.page_reader.peek_next_page()? {
292 None => return Ok(num_records - remaining_records),
293 Some(metadata) => metadata,
294 };
295
296 if metadata.is_dict {
298 self.read_dictionary_page()?;
299 continue;
300 }
301
302 let rows = metadata.num_rows.or_else(|| {
305 self.rep_level_decoder
307 .is_none()
308 .then_some(metadata.num_levels)?
309 });
310
311 if let Some(rows) = rows {
312 if rows <= remaining_records {
313 self.page_reader.skip_next_page()?;
314 remaining_records -= rows;
315 continue;
316 }
317 }
318 if !self.read_new_page()? {
321 return Ok(num_records - remaining_records);
322 }
323 }
324
325 let remaining_levels = self.num_buffered_values - self.num_decoded_values;
329
330 let (records_read, rep_levels_read) = match self.rep_level_decoder.as_mut() {
331 Some(decoder) => {
332 let (mut records_read, levels_read) =
333 decoder.skip_rep_levels(remaining_records, remaining_levels)?;
334
335 if levels_read == remaining_levels && self.has_record_delimiter {
336 assert!(records_read < remaining_records); records_read += decoder.flush_partial() as usize;
340 }
341
342 (records_read, levels_read)
343 }
344 None => {
345 let levels = remaining_levels.min(remaining_records);
347 (levels, levels)
348 }
349 };
350
351 self.num_decoded_values += rep_levels_read;
352 remaining_records -= records_read;
353
354 if self.num_buffered_values == self.num_decoded_values {
355 continue;
357 }
358
359 let (values_read, def_levels_read) = match self.def_level_decoder.as_mut() {
360 Some(decoder) => decoder.skip_def_levels(rep_levels_read)?,
361 None => (rep_levels_read, rep_levels_read),
362 };
363
364 if rep_levels_read != def_levels_read {
365 return Err(general_err!(
366 "levels mismatch, read {} repetition levels and {} definition levels",
367 rep_levels_read,
368 def_levels_read
369 ));
370 }
371
372 let values = self.values_decoder.skip_values(values_read)?;
373 if values != values_read {
374 return Err(general_err!(
375 "skipped {} values, expected {}",
376 values,
377 values_read
378 ));
379 }
380 }
381 Ok(num_records - remaining_records)
382 }
383
384 fn read_dictionary_page(&mut self) -> Result<()> {
387 match self.page_reader.get_next_page()? {
388 Some(Page::DictionaryPage {
389 buf,
390 num_values,
391 encoding,
392 is_sorted,
393 }) => self
394 .values_decoder
395 .set_dict(buf, num_values, encoding, is_sorted),
396 _ => Err(ParquetError::General(
397 "Invalid page. Expecting dictionary page".to_string(),
398 )),
399 }
400 }
401
402 fn read_new_page(&mut self) -> Result<bool> {
405 loop {
406 match self.page_reader.get_next_page()? {
407 None => return Ok(false),
409 Some(current_page) => {
410 match current_page {
411 Page::DictionaryPage {
413 buf,
414 num_values,
415 encoding,
416 is_sorted,
417 } => {
418 self.values_decoder
419 .set_dict(buf, num_values, encoding, is_sorted)?;
420 continue;
421 }
422 Page::DataPage {
424 buf,
425 num_values,
426 encoding,
427 def_level_encoding,
428 rep_level_encoding,
429 statistics: _,
430 } => {
431 self.num_buffered_values = num_values as _;
432 self.num_decoded_values = 0;
433
434 let max_rep_level = self.descr.max_rep_level();
435 let max_def_level = self.descr.max_def_level();
436
437 let mut offset = 0;
438
439 if max_rep_level > 0 {
440 let (bytes_read, level_data) = parse_v1_level(
441 max_rep_level,
442 num_values,
443 rep_level_encoding,
444 buf.slice(offset..),
445 )?;
446 offset += bytes_read;
447
448 self.has_record_delimiter =
449 self.page_reader.at_record_boundary()?;
450
451 self.rep_level_decoder
452 .as_mut()
453 .unwrap()
454 .set_data(rep_level_encoding, level_data);
455 }
456
457 if max_def_level > 0 {
458 let (bytes_read, level_data) = parse_v1_level(
459 max_def_level,
460 num_values,
461 def_level_encoding,
462 buf.slice(offset..),
463 )?;
464 offset += bytes_read;
465
466 self.def_level_decoder
467 .as_mut()
468 .unwrap()
469 .set_data(def_level_encoding, level_data);
470 }
471
472 self.values_decoder.set_data(
473 encoding,
474 buf.slice(offset..),
475 num_values as usize,
476 None,
477 )?;
478 return Ok(true);
479 }
480 Page::DataPageV2 {
482 buf,
483 num_values,
484 encoding,
485 num_nulls,
486 num_rows: _,
487 def_levels_byte_len,
488 rep_levels_byte_len,
489 is_compressed: _,
490 statistics: _,
491 } => {
492 if num_nulls > num_values {
493 return Err(general_err!(
494 "more nulls than values in page, contained {} values and {} nulls",
495 num_values,
496 num_nulls
497 ));
498 }
499
500 self.num_buffered_values = num_values as _;
501 self.num_decoded_values = 0;
502
503 if self.descr.max_rep_level() > 0 {
506 self.has_record_delimiter =
510 self.page_reader.at_record_boundary()?;
511
512 self.rep_level_decoder.as_mut().unwrap().set_data(
513 Encoding::RLE,
514 buf.slice(..rep_levels_byte_len as usize),
515 );
516 }
517
518 if self.descr.max_def_level() > 0 {
521 self.def_level_decoder.as_mut().unwrap().set_data(
522 Encoding::RLE,
523 buf.slice(
524 rep_levels_byte_len as usize
525 ..(rep_levels_byte_len + def_levels_byte_len) as usize,
526 ),
527 );
528 }
529
530 self.values_decoder.set_data(
531 encoding,
532 buf.slice((rep_levels_byte_len + def_levels_byte_len) as usize..),
533 num_values as usize,
534 Some((num_values - num_nulls) as usize),
535 )?;
536 return Ok(true);
537 }
538 };
539 }
540 }
541 }
542 }
543
544 #[inline]
548 pub(crate) fn has_next(&mut self) -> Result<bool> {
549 if self.num_buffered_values == 0 || self.num_buffered_values == self.num_decoded_values {
550 if !self.read_new_page()? {
553 Ok(false)
554 } else {
555 Ok(self.num_buffered_values != 0)
556 }
557 } else {
558 Ok(true)
559 }
560 }
561}
562
563fn parse_v1_level(
564 max_level: i16,
565 num_buffered_values: u32,
566 encoding: Encoding,
567 buf: Bytes,
568) -> Result<(usize, Bytes)> {
569 match encoding {
570 Encoding::RLE => {
571 let i32_size = std::mem::size_of::<i32>();
572 let data_size = read_num_bytes::<i32>(i32_size, buf.as_ref()) as usize;
573 Ok((
574 i32_size + data_size,
575 buf.slice(i32_size..i32_size + data_size),
576 ))
577 }
578 #[allow(deprecated)]
579 Encoding::BIT_PACKED => {
580 let bit_width = num_required_bits(max_level as u64);
581 let num_bytes = ceil(num_buffered_values as usize * bit_width as usize, 8);
582 Ok((num_bytes, buf.slice(..num_bytes)))
583 }
584 _ => Err(general_err!("invalid level encoding: {}", encoding)),
585 }
586}
587
588#[cfg(test)]
589mod tests {
590 use super::*;
591
592 use rand::distr::uniform::SampleUniform;
593 use std::{collections::VecDeque, sync::Arc};
594
595 use crate::basic::Type as PhysicalType;
596 use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
597 use crate::util::test_common::page_util::InMemoryPageReader;
598 use crate::util::test_common::rand_gen::make_pages;
599
600 const NUM_LEVELS: usize = 128;
601 const NUM_PAGES: usize = 2;
602 const MAX_DEF_LEVEL: i16 = 5;
603 const MAX_REP_LEVEL: i16 = 5;
604
605 macro_rules! test {
607 ($test_func:ident, i32, $func:ident, $def_level:expr, $rep_level:expr,
609 $num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
610 test_internal!(
611 $test_func,
612 Int32Type,
613 get_test_int32_type,
614 $func,
615 $def_level,
616 $rep_level,
617 $num_pages,
618 $num_levels,
619 $batch_size,
620 $min,
621 $max
622 );
623 };
624 ($test_func:ident, i64, $func:ident, $def_level:expr, $rep_level:expr,
626 $num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
627 test_internal!(
628 $test_func,
629 Int64Type,
630 get_test_int64_type,
631 $func,
632 $def_level,
633 $rep_level,
634 $num_pages,
635 $num_levels,
636 $batch_size,
637 $min,
638 $max
639 );
640 };
641 }
642
643 macro_rules! test_internal {
644 ($test_func:ident, $ty:ident, $pty:ident, $func:ident, $def_level:expr,
645 $rep_level:expr, $num_pages:expr, $num_levels:expr, $batch_size:expr,
646 $min:expr, $max:expr) => {
647 #[test]
648 fn $test_func() {
649 let desc = Arc::new(ColumnDescriptor::new(
650 Arc::new($pty()),
651 $def_level,
652 $rep_level,
653 ColumnPath::new(Vec::new()),
654 ));
655 let mut tester = ColumnReaderTester::<$ty>::new();
656 tester.$func(desc, $num_pages, $num_levels, $batch_size, $min, $max);
657 }
658 };
659 }
660
661 test!(
662 test_read_plain_v1_int32,
663 i32,
664 plain_v1,
665 MAX_DEF_LEVEL,
666 MAX_REP_LEVEL,
667 NUM_PAGES,
668 NUM_LEVELS,
669 16,
670 i32::MIN,
671 i32::MAX
672 );
673 test!(
674 test_read_plain_v2_int32,
675 i32,
676 plain_v2,
677 MAX_DEF_LEVEL,
678 MAX_REP_LEVEL,
679 NUM_PAGES,
680 NUM_LEVELS,
681 16,
682 i32::MIN,
683 i32::MAX
684 );
685
686 test!(
687 test_read_plain_v1_int32_uneven,
688 i32,
689 plain_v1,
690 MAX_DEF_LEVEL,
691 MAX_REP_LEVEL,
692 NUM_PAGES,
693 NUM_LEVELS,
694 17,
695 i32::MIN,
696 i32::MAX
697 );
698 test!(
699 test_read_plain_v2_int32_uneven,
700 i32,
701 plain_v2,
702 MAX_DEF_LEVEL,
703 MAX_REP_LEVEL,
704 NUM_PAGES,
705 NUM_LEVELS,
706 17,
707 i32::MIN,
708 i32::MAX
709 );
710
711 test!(
712 test_read_plain_v1_int32_multi_page,
713 i32,
714 plain_v1,
715 MAX_DEF_LEVEL,
716 MAX_REP_LEVEL,
717 NUM_PAGES,
718 NUM_LEVELS,
719 512,
720 i32::MIN,
721 i32::MAX
722 );
723 test!(
724 test_read_plain_v2_int32_multi_page,
725 i32,
726 plain_v2,
727 MAX_DEF_LEVEL,
728 MAX_REP_LEVEL,
729 NUM_PAGES,
730 NUM_LEVELS,
731 512,
732 i32::MIN,
733 i32::MAX
734 );
735
736 test!(
738 test_read_plain_v1_int32_required_non_repeated,
739 i32,
740 plain_v1,
741 0,
742 0,
743 NUM_PAGES,
744 NUM_LEVELS,
745 16,
746 i32::MIN,
747 i32::MAX
748 );
749 test!(
750 test_read_plain_v2_int32_required_non_repeated,
751 i32,
752 plain_v2,
753 0,
754 0,
755 NUM_PAGES,
756 NUM_LEVELS,
757 16,
758 i32::MIN,
759 i32::MAX
760 );
761
762 test!(
763 test_read_plain_v1_int64,
764 i64,
765 plain_v1,
766 1,
767 1,
768 NUM_PAGES,
769 NUM_LEVELS,
770 16,
771 i64::MIN,
772 i64::MAX
773 );
774 test!(
775 test_read_plain_v2_int64,
776 i64,
777 plain_v2,
778 1,
779 1,
780 NUM_PAGES,
781 NUM_LEVELS,
782 16,
783 i64::MIN,
784 i64::MAX
785 );
786
787 test!(
788 test_read_plain_v1_int64_uneven,
789 i64,
790 plain_v1,
791 1,
792 1,
793 NUM_PAGES,
794 NUM_LEVELS,
795 17,
796 i64::MIN,
797 i64::MAX
798 );
799 test!(
800 test_read_plain_v2_int64_uneven,
801 i64,
802 plain_v2,
803 1,
804 1,
805 NUM_PAGES,
806 NUM_LEVELS,
807 17,
808 i64::MIN,
809 i64::MAX
810 );
811
812 test!(
813 test_read_plain_v1_int64_multi_page,
814 i64,
815 plain_v1,
816 1,
817 1,
818 NUM_PAGES,
819 NUM_LEVELS,
820 512,
821 i64::MIN,
822 i64::MAX
823 );
824 test!(
825 test_read_plain_v2_int64_multi_page,
826 i64,
827 plain_v2,
828 1,
829 1,
830 NUM_PAGES,
831 NUM_LEVELS,
832 512,
833 i64::MIN,
834 i64::MAX
835 );
836
837 test!(
839 test_read_plain_v1_int64_required_non_repeated,
840 i64,
841 plain_v1,
842 0,
843 0,
844 NUM_PAGES,
845 NUM_LEVELS,
846 16,
847 i64::MIN,
848 i64::MAX
849 );
850 test!(
851 test_read_plain_v2_int64_required_non_repeated,
852 i64,
853 plain_v2,
854 0,
855 0,
856 NUM_PAGES,
857 NUM_LEVELS,
858 16,
859 i64::MIN,
860 i64::MAX
861 );
862
863 test!(
864 test_read_dict_v1_int32_small,
865 i32,
866 dict_v1,
867 MAX_DEF_LEVEL,
868 MAX_REP_LEVEL,
869 2,
870 2,
871 16,
872 0,
873 3
874 );
875 test!(
876 test_read_dict_v2_int32_small,
877 i32,
878 dict_v2,
879 MAX_DEF_LEVEL,
880 MAX_REP_LEVEL,
881 2,
882 2,
883 16,
884 0,
885 3
886 );
887
888 test!(
889 test_read_dict_v1_int32,
890 i32,
891 dict_v1,
892 MAX_DEF_LEVEL,
893 MAX_REP_LEVEL,
894 NUM_PAGES,
895 NUM_LEVELS,
896 16,
897 0,
898 3
899 );
900 test!(
901 test_read_dict_v2_int32,
902 i32,
903 dict_v2,
904 MAX_DEF_LEVEL,
905 MAX_REP_LEVEL,
906 NUM_PAGES,
907 NUM_LEVELS,
908 16,
909 0,
910 3
911 );
912
913 test!(
914 test_read_dict_v1_int32_uneven,
915 i32,
916 dict_v1,
917 MAX_DEF_LEVEL,
918 MAX_REP_LEVEL,
919 NUM_PAGES,
920 NUM_LEVELS,
921 17,
922 0,
923 3
924 );
925 test!(
926 test_read_dict_v2_int32_uneven,
927 i32,
928 dict_v2,
929 MAX_DEF_LEVEL,
930 MAX_REP_LEVEL,
931 NUM_PAGES,
932 NUM_LEVELS,
933 17,
934 0,
935 3
936 );
937
938 test!(
939 test_read_dict_v1_int32_multi_page,
940 i32,
941 dict_v1,
942 MAX_DEF_LEVEL,
943 MAX_REP_LEVEL,
944 NUM_PAGES,
945 NUM_LEVELS,
946 512,
947 0,
948 3
949 );
950 test!(
951 test_read_dict_v2_int32_multi_page,
952 i32,
953 dict_v2,
954 MAX_DEF_LEVEL,
955 MAX_REP_LEVEL,
956 NUM_PAGES,
957 NUM_LEVELS,
958 512,
959 0,
960 3
961 );
962
963 test!(
964 test_read_dict_v1_int64,
965 i64,
966 dict_v1,
967 MAX_DEF_LEVEL,
968 MAX_REP_LEVEL,
969 NUM_PAGES,
970 NUM_LEVELS,
971 16,
972 0,
973 3
974 );
975 test!(
976 test_read_dict_v2_int64,
977 i64,
978 dict_v2,
979 MAX_DEF_LEVEL,
980 MAX_REP_LEVEL,
981 NUM_PAGES,
982 NUM_LEVELS,
983 16,
984 0,
985 3
986 );
987
988 #[test]
989 fn test_read_batch_values_only() {
990 test_read_batch_int32(16, 0, 0);
991 }
992
993 #[test]
994 fn test_read_batch_values_def_levels() {
995 test_read_batch_int32(16, MAX_DEF_LEVEL, 0);
996 }
997
998 #[test]
999 fn test_read_batch_values_rep_levels() {
1000 test_read_batch_int32(16, 0, MAX_REP_LEVEL);
1001 }
1002
1003 #[test]
1004 fn test_read_batch_values_def_rep_levels() {
1005 test_read_batch_int32(128, MAX_DEF_LEVEL, MAX_REP_LEVEL);
1006 }
1007
1008 #[test]
1009 fn test_read_batch_adjust_after_buffering_page() {
1010 let primitive_type = get_test_int32_type();
1017 let desc = Arc::new(ColumnDescriptor::new(
1018 Arc::new(primitive_type),
1019 1,
1020 1,
1021 ColumnPath::new(Vec::new()),
1022 ));
1023
1024 let num_pages = 2;
1025 let num_levels = 4;
1026 let batch_size = 5;
1027
1028 let mut tester = ColumnReaderTester::<Int32Type>::new();
1029 tester.test_read_batch(
1030 desc,
1031 Encoding::RLE_DICTIONARY,
1032 num_pages,
1033 num_levels,
1034 batch_size,
1035 i32::MIN,
1036 i32::MAX,
1037 false,
1038 );
1039 }
1040
1041 fn get_test_int32_type() -> SchemaType {
1087 SchemaType::primitive_type_builder("a", PhysicalType::INT32)
1088 .with_repetition(Repetition::REQUIRED)
1089 .with_converted_type(ConvertedType::INT_32)
1090 .with_length(-1)
1091 .build()
1092 .expect("build() should be OK")
1093 }
1094
1095 fn get_test_int64_type() -> SchemaType {
1097 SchemaType::primitive_type_builder("a", PhysicalType::INT64)
1098 .with_repetition(Repetition::REQUIRED)
1099 .with_converted_type(ConvertedType::INT_64)
1100 .with_length(-1)
1101 .build()
1102 .expect("build() should be OK")
1103 }
1104
1105 fn test_read_batch_int32(batch_size: usize, max_def_level: i16, max_rep_level: i16) {
1110 let primitive_type = get_test_int32_type();
1111
1112 let desc = Arc::new(ColumnDescriptor::new(
1113 Arc::new(primitive_type),
1114 max_def_level,
1115 max_rep_level,
1116 ColumnPath::new(Vec::new()),
1117 ));
1118
1119 let mut tester = ColumnReaderTester::<Int32Type>::new();
1120 tester.test_read_batch(
1121 desc,
1122 Encoding::RLE_DICTIONARY,
1123 NUM_PAGES,
1124 NUM_LEVELS,
1125 batch_size,
1126 i32::MIN,
1127 i32::MAX,
1128 false,
1129 );
1130 }
1131
1132 struct ColumnReaderTester<T: DataType>
1133 where
1134 T::T: PartialOrd + SampleUniform + Copy,
1135 {
1136 rep_levels: Vec<i16>,
1137 def_levels: Vec<i16>,
1138 values: Vec<T::T>,
1139 }
1140
1141 impl<T: DataType> ColumnReaderTester<T>
1142 where
1143 T::T: PartialOrd + SampleUniform + Copy,
1144 {
1145 pub fn new() -> Self {
1146 Self {
1147 rep_levels: Vec::new(),
1148 def_levels: Vec::new(),
1149 values: Vec::new(),
1150 }
1151 }
1152
1153 fn plain_v1(
1155 &mut self,
1156 desc: ColumnDescPtr,
1157 num_pages: usize,
1158 num_levels: usize,
1159 batch_size: usize,
1160 min: T::T,
1161 max: T::T,
1162 ) {
1163 self.test_read_batch_general(
1164 desc,
1165 Encoding::PLAIN,
1166 num_pages,
1167 num_levels,
1168 batch_size,
1169 min,
1170 max,
1171 false,
1172 );
1173 }
1174
1175 fn plain_v2(
1177 &mut self,
1178 desc: ColumnDescPtr,
1179 num_pages: usize,
1180 num_levels: usize,
1181 batch_size: usize,
1182 min: T::T,
1183 max: T::T,
1184 ) {
1185 self.test_read_batch_general(
1186 desc,
1187 Encoding::PLAIN,
1188 num_pages,
1189 num_levels,
1190 batch_size,
1191 min,
1192 max,
1193 true,
1194 );
1195 }
1196
1197 fn dict_v1(
1199 &mut self,
1200 desc: ColumnDescPtr,
1201 num_pages: usize,
1202 num_levels: usize,
1203 batch_size: usize,
1204 min: T::T,
1205 max: T::T,
1206 ) {
1207 self.test_read_batch_general(
1208 desc,
1209 Encoding::RLE_DICTIONARY,
1210 num_pages,
1211 num_levels,
1212 batch_size,
1213 min,
1214 max,
1215 false,
1216 );
1217 }
1218
1219 fn dict_v2(
1221 &mut self,
1222 desc: ColumnDescPtr,
1223 num_pages: usize,
1224 num_levels: usize,
1225 batch_size: usize,
1226 min: T::T,
1227 max: T::T,
1228 ) {
1229 self.test_read_batch_general(
1230 desc,
1231 Encoding::RLE_DICTIONARY,
1232 num_pages,
1233 num_levels,
1234 batch_size,
1235 min,
1236 max,
1237 true,
1238 );
1239 }
1240
1241 #[allow(clippy::too_many_arguments)]
1244 fn test_read_batch_general(
1245 &mut self,
1246 desc: ColumnDescPtr,
1247 encoding: Encoding,
1248 num_pages: usize,
1249 num_levels: usize,
1250 batch_size: usize,
1251 min: T::T,
1252 max: T::T,
1253 use_v2: bool,
1254 ) {
1255 self.test_read_batch(
1256 desc, encoding, num_pages, num_levels, batch_size, min, max, use_v2,
1257 );
1258 }
1259
1260 #[allow(clippy::too_many_arguments)]
1263 fn test_read_batch(
1264 &mut self,
1265 desc: ColumnDescPtr,
1266 encoding: Encoding,
1267 num_pages: usize,
1268 num_levels: usize,
1269 batch_size: usize,
1270 min: T::T,
1271 max: T::T,
1272 use_v2: bool,
1273 ) {
1274 let mut pages = VecDeque::new();
1275 make_pages::<T>(
1276 desc.clone(),
1277 encoding,
1278 num_pages,
1279 num_levels,
1280 min,
1281 max,
1282 &mut self.def_levels,
1283 &mut self.rep_levels,
1284 &mut self.values,
1285 &mut pages,
1286 use_v2,
1287 );
1288 let max_def_level = desc.max_def_level();
1289 let max_rep_level = desc.max_rep_level();
1290 let page_reader = InMemoryPageReader::new(pages);
1291 let column_reader: ColumnReader = get_column_reader(desc, Box::new(page_reader));
1292 let mut typed_column_reader = get_typed_column_reader::<T>(column_reader);
1293
1294 let mut values = Vec::new();
1295 let mut def_levels = Vec::new();
1296 let mut rep_levels = Vec::new();
1297
1298 let mut curr_values_read = 0;
1299 let mut curr_levels_read = 0;
1300 loop {
1301 let (_, values_read, levels_read) = typed_column_reader
1302 .read_records(
1303 batch_size,
1304 Some(&mut def_levels),
1305 Some(&mut rep_levels),
1306 &mut values,
1307 )
1308 .expect("read_batch() should be OK");
1309
1310 curr_values_read += values_read;
1311 curr_levels_read += levels_read;
1312
1313 if values_read == 0 && levels_read == 0 {
1314 break;
1315 }
1316 }
1317
1318 assert_eq!(values, self.values, "values content doesn't match");
1319
1320 if max_def_level > 0 {
1321 assert_eq!(
1322 def_levels, self.def_levels,
1323 "definition levels content doesn't match"
1324 );
1325 }
1326
1327 if max_rep_level > 0 {
1328 assert_eq!(
1329 rep_levels, self.rep_levels,
1330 "repetition levels content doesn't match"
1331 );
1332 }
1333
1334 assert!(
1335 curr_levels_read >= curr_values_read,
1336 "expected levels read to be greater than values read"
1337 );
1338 }
1339 }
1340}