1use bytes::Bytes;
21
22use super::page::{Page, PageReader};
23use crate::basic::*;
24use crate::column::reader::decoder::{
25 ColumnValueDecoder, ColumnValueDecoderImpl, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
26 RepetitionLevelDecoder, RepetitionLevelDecoderImpl,
27};
28use crate::data_type::*;
29use crate::errors::{ParquetError, Result};
30use crate::schema::types::ColumnDescPtr;
31use crate::util::bit_util::{ceil, num_required_bits, read_num_bytes};
32
33pub(crate) mod decoder;
34
35pub enum ColumnReader {
37 BoolColumnReader(ColumnReaderImpl<BoolType>),
39 Int32ColumnReader(ColumnReaderImpl<Int32Type>),
41 Int64ColumnReader(ColumnReaderImpl<Int64Type>),
43 Int96ColumnReader(ColumnReaderImpl<Int96Type>),
45 FloatColumnReader(ColumnReaderImpl<FloatType>),
47 DoubleColumnReader(ColumnReaderImpl<DoubleType>),
49 ByteArrayColumnReader(ColumnReaderImpl<ByteArrayType>),
51 FixedLenByteArrayColumnReader(ColumnReaderImpl<FixedLenByteArrayType>),
53}
54
55pub fn get_column_reader(
58 col_descr: ColumnDescPtr,
59 col_page_reader: Box<dyn PageReader>,
60) -> ColumnReader {
61 match col_descr.physical_type() {
62 Type::BOOLEAN => {
63 ColumnReader::BoolColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
64 }
65 Type::INT32 => {
66 ColumnReader::Int32ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
67 }
68 Type::INT64 => {
69 ColumnReader::Int64ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
70 }
71 Type::INT96 => {
72 ColumnReader::Int96ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
73 }
74 Type::FLOAT => {
75 ColumnReader::FloatColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
76 }
77 Type::DOUBLE => {
78 ColumnReader::DoubleColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
79 }
80 Type::BYTE_ARRAY => {
81 ColumnReader::ByteArrayColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
82 }
83 Type::FIXED_LEN_BYTE_ARRAY => ColumnReader::FixedLenByteArrayColumnReader(
84 ColumnReaderImpl::new(col_descr, col_page_reader),
85 ),
86 }
87}
88
89pub fn get_typed_column_reader<T: DataType>(col_reader: ColumnReader) -> ColumnReaderImpl<T> {
94 T::get_column_reader(col_reader).unwrap_or_else(|| {
95 panic!(
96 "Failed to convert column reader into a typed column reader for `{}` type",
97 T::get_physical_type()
98 )
99 })
100}
101
102pub type ColumnReaderImpl<T> = GenericColumnReader<
104 RepetitionLevelDecoderImpl,
105 DefinitionLevelDecoderImpl,
106 ColumnValueDecoderImpl<T>,
107>;
108
109pub struct GenericColumnReader<R, D, V> {
115 descr: ColumnDescPtr,
116
117 page_reader: Box<dyn PageReader>,
118
119 num_buffered_values: usize,
121
122 num_decoded_values: usize,
125
126 has_record_delimiter: bool,
128
129 def_level_decoder: Option<D>,
131
132 rep_level_decoder: Option<R>,
134
135 values_decoder: V,
137}
138
139impl<V> GenericColumnReader<RepetitionLevelDecoderImpl, DefinitionLevelDecoderImpl, V>
140where
141 V: ColumnValueDecoder,
142{
143 pub fn new(descr: ColumnDescPtr, page_reader: Box<dyn PageReader>) -> Self {
145 let values_decoder = V::new(&descr);
146
147 let def_level_decoder = (descr.max_def_level() != 0)
148 .then(|| DefinitionLevelDecoderImpl::new(descr.max_def_level()));
149
150 let rep_level_decoder = (descr.max_rep_level() != 0)
151 .then(|| RepetitionLevelDecoderImpl::new(descr.max_rep_level()));
152
153 Self::new_with_decoders(
154 descr,
155 page_reader,
156 values_decoder,
157 def_level_decoder,
158 rep_level_decoder,
159 )
160 }
161}
162
163impl<R, D, V> GenericColumnReader<R, D, V>
164where
165 R: RepetitionLevelDecoder,
166 D: DefinitionLevelDecoder,
167 V: ColumnValueDecoder,
168{
169 pub(crate) fn new_with_decoders(
170 descr: ColumnDescPtr,
171 page_reader: Box<dyn PageReader>,
172 values_decoder: V,
173 def_level_decoder: Option<D>,
174 rep_level_decoder: Option<R>,
175 ) -> Self {
176 Self {
177 descr,
178 def_level_decoder,
179 rep_level_decoder,
180 page_reader,
181 num_buffered_values: 0,
182 num_decoded_values: 0,
183 values_decoder,
184 has_record_delimiter: false,
185 }
186 }
187
188 pub fn read_records(
203 &mut self,
204 max_records: usize,
205 mut def_levels: Option<&mut D::Buffer>,
206 mut rep_levels: Option<&mut R::Buffer>,
207 values: &mut V::Buffer,
208 ) -> Result<(usize, usize, usize)> {
209 let mut total_records_read = 0;
210 let mut total_levels_read = 0;
211 let mut total_values_read = 0;
212
213 while total_records_read < max_records && self.has_next()? {
214 let remaining_records = max_records - total_records_read;
215 let remaining_levels = self.num_buffered_values - self.num_decoded_values;
216
217 let (records_read, levels_to_read) = match self.rep_level_decoder.as_mut() {
218 Some(reader) => {
219 let out = rep_levels
220 .as_mut()
221 .ok_or_else(|| general_err!("must specify repetition levels"))?;
222
223 let (mut records_read, levels_read) =
224 reader.read_rep_levels(out, remaining_records, remaining_levels)?;
225
226 if records_read == 0 && levels_read == 0 {
227 return Err(general_err!(
229 "Insufficient repetition levels read from column"
230 ));
231 }
232 if levels_read == remaining_levels && self.has_record_delimiter {
233 assert!(records_read < remaining_records); records_read += reader.flush_partial() as usize;
237 }
238 (records_read, levels_read)
239 }
240 None => {
241 let min = remaining_records.min(remaining_levels);
242 (min, min)
243 }
244 };
245
246 let values_to_read = match self.def_level_decoder.as_mut() {
247 Some(reader) => {
248 let out = def_levels
249 .as_mut()
250 .ok_or_else(|| general_err!("must specify definition levels"))?;
251
252 let (values_read, levels_read) = reader.read_def_levels(out, levels_to_read)?;
253
254 if levels_read != levels_to_read {
255 return Err(general_err!("insufficient definition levels read from column - expected {levels_to_read}, got {levels_read}"));
256 }
257
258 values_read
259 }
260 None => levels_to_read,
261 };
262
263 let values_read = self.values_decoder.read(values, values_to_read)?;
264
265 if values_read != values_to_read {
266 return Err(general_err!(
267 "insufficient values read from column - expected: {values_to_read}, got: {values_read}",
268 ));
269 }
270
271 self.num_decoded_values += levels_to_read;
272 total_records_read += records_read;
273 total_levels_read += levels_to_read;
274 total_values_read += values_read;
275 }
276
277 Ok((total_records_read, total_values_read, total_levels_read))
278 }
279
280 pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
286 let mut remaining_records = num_records;
287 while remaining_records != 0 {
288 if self.num_buffered_values == self.num_decoded_values {
289 let metadata = match self.page_reader.peek_next_page()? {
290 None => return Ok(num_records - remaining_records),
291 Some(metadata) => metadata,
292 };
293
294 if metadata.is_dict {
296 self.read_dictionary_page()?;
297 continue;
298 }
299
300 let rows = metadata.num_rows.or_else(|| {
303 self.rep_level_decoder
305 .is_none()
306 .then_some(metadata.num_levels)?
307 });
308
309 if let Some(rows) = rows {
310 if rows <= remaining_records {
311 self.page_reader.skip_next_page()?;
312 remaining_records -= rows;
313 continue;
314 }
315 }
316 if !self.read_new_page()? {
319 return Ok(num_records - remaining_records);
320 }
321 }
322
323 let remaining_levels = self.num_buffered_values - self.num_decoded_values;
327
328 let (records_read, rep_levels_read) = match self.rep_level_decoder.as_mut() {
329 Some(decoder) => {
330 let (mut records_read, levels_read) =
331 decoder.skip_rep_levels(remaining_records, remaining_levels)?;
332
333 if levels_read == remaining_levels && self.has_record_delimiter {
334 assert!(records_read < remaining_records); records_read += decoder.flush_partial() as usize;
338 }
339
340 (records_read, levels_read)
341 }
342 None => {
343 let levels = remaining_levels.min(remaining_records);
345 (levels, levels)
346 }
347 };
348
349 self.num_decoded_values += rep_levels_read;
350 remaining_records -= records_read;
351
352 if self.num_buffered_values == self.num_decoded_values {
353 continue;
355 }
356
357 let (values_read, def_levels_read) = match self.def_level_decoder.as_mut() {
358 Some(decoder) => decoder.skip_def_levels(rep_levels_read)?,
359 None => (rep_levels_read, rep_levels_read),
360 };
361
362 if rep_levels_read != def_levels_read {
363 return Err(general_err!(
364 "levels mismatch, read {} repetition levels and {} definition levels",
365 rep_levels_read,
366 def_levels_read
367 ));
368 }
369
370 let values = self.values_decoder.skip_values(values_read)?;
371 if values != values_read {
372 return Err(general_err!(
373 "skipped {} values, expected {}",
374 values,
375 values_read
376 ));
377 }
378 }
379 Ok(num_records - remaining_records)
380 }
381
382 fn read_dictionary_page(&mut self) -> Result<()> {
385 match self.page_reader.get_next_page()? {
386 Some(Page::DictionaryPage {
387 buf,
388 num_values,
389 encoding,
390 is_sorted,
391 }) => self
392 .values_decoder
393 .set_dict(buf, num_values, encoding, is_sorted),
394 _ => Err(ParquetError::General(
395 "Invalid page. Expecting dictionary page".to_string(),
396 )),
397 }
398 }
399
400 fn read_new_page(&mut self) -> Result<bool> {
403 loop {
404 match self.page_reader.get_next_page()? {
405 None => return Ok(false),
407 Some(current_page) => {
408 match current_page {
409 Page::DictionaryPage {
411 buf,
412 num_values,
413 encoding,
414 is_sorted,
415 } => {
416 self.values_decoder
417 .set_dict(buf, num_values, encoding, is_sorted)?;
418 continue;
419 }
420 Page::DataPage {
422 buf,
423 num_values,
424 encoding,
425 def_level_encoding,
426 rep_level_encoding,
427 statistics: _,
428 } => {
429 self.num_buffered_values = num_values as _;
430 self.num_decoded_values = 0;
431
432 let max_rep_level = self.descr.max_rep_level();
433 let max_def_level = self.descr.max_def_level();
434
435 let mut offset = 0;
436
437 if max_rep_level > 0 {
438 let (bytes_read, level_data) = parse_v1_level(
439 max_rep_level,
440 num_values,
441 rep_level_encoding,
442 buf.slice(offset..),
443 )?;
444 offset += bytes_read;
445
446 self.has_record_delimiter =
447 self.page_reader.at_record_boundary()?;
448
449 self.rep_level_decoder
450 .as_mut()
451 .unwrap()
452 .set_data(rep_level_encoding, level_data);
453 }
454
455 if max_def_level > 0 {
456 let (bytes_read, level_data) = parse_v1_level(
457 max_def_level,
458 num_values,
459 def_level_encoding,
460 buf.slice(offset..),
461 )?;
462 offset += bytes_read;
463
464 self.def_level_decoder
465 .as_mut()
466 .unwrap()
467 .set_data(def_level_encoding, level_data);
468 }
469
470 self.values_decoder.set_data(
471 encoding,
472 buf.slice(offset..),
473 num_values as usize,
474 None,
475 )?;
476 return Ok(true);
477 }
478 Page::DataPageV2 {
480 buf,
481 num_values,
482 encoding,
483 num_nulls,
484 num_rows: _,
485 def_levels_byte_len,
486 rep_levels_byte_len,
487 is_compressed: _,
488 statistics: _,
489 } => {
490 if num_nulls > num_values {
491 return Err(general_err!("more nulls than values in page, contained {} values and {} nulls", num_values, num_nulls));
492 }
493
494 self.num_buffered_values = num_values as _;
495 self.num_decoded_values = 0;
496
497 if self.descr.max_rep_level() > 0 {
500 self.has_record_delimiter =
504 self.page_reader.at_record_boundary()?;
505
506 self.rep_level_decoder.as_mut().unwrap().set_data(
507 Encoding::RLE,
508 buf.slice(..rep_levels_byte_len as usize),
509 );
510 }
511
512 if self.descr.max_def_level() > 0 {
515 self.def_level_decoder.as_mut().unwrap().set_data(
516 Encoding::RLE,
517 buf.slice(
518 rep_levels_byte_len as usize
519 ..(rep_levels_byte_len + def_levels_byte_len) as usize,
520 ),
521 );
522 }
523
524 self.values_decoder.set_data(
525 encoding,
526 buf.slice((rep_levels_byte_len + def_levels_byte_len) as usize..),
527 num_values as usize,
528 Some((num_values - num_nulls) as usize),
529 )?;
530 return Ok(true);
531 }
532 };
533 }
534 }
535 }
536 }
537
538 #[inline]
542 pub(crate) fn has_next(&mut self) -> Result<bool> {
543 if self.num_buffered_values == 0 || self.num_buffered_values == self.num_decoded_values {
544 if !self.read_new_page()? {
547 Ok(false)
548 } else {
549 Ok(self.num_buffered_values != 0)
550 }
551 } else {
552 Ok(true)
553 }
554 }
555}
556
557fn parse_v1_level(
558 max_level: i16,
559 num_buffered_values: u32,
560 encoding: Encoding,
561 buf: Bytes,
562) -> Result<(usize, Bytes)> {
563 match encoding {
564 Encoding::RLE => {
565 let i32_size = std::mem::size_of::<i32>();
566 let data_size = read_num_bytes::<i32>(i32_size, buf.as_ref()) as usize;
567 Ok((
568 i32_size + data_size,
569 buf.slice(i32_size..i32_size + data_size),
570 ))
571 }
572 #[allow(deprecated)]
573 Encoding::BIT_PACKED => {
574 let bit_width = num_required_bits(max_level as u64);
575 let num_bytes = ceil(num_buffered_values as usize * bit_width as usize, 8);
576 Ok((num_bytes, buf.slice(..num_bytes)))
577 }
578 _ => Err(general_err!("invalid level encoding: {}", encoding)),
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585
586 use rand::distributions::uniform::SampleUniform;
587 use std::{collections::VecDeque, sync::Arc};
588
589 use crate::basic::Type as PhysicalType;
590 use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
591 use crate::util::test_common::page_util::InMemoryPageReader;
592 use crate::util::test_common::rand_gen::make_pages;
593
594 const NUM_LEVELS: usize = 128;
595 const NUM_PAGES: usize = 2;
596 const MAX_DEF_LEVEL: i16 = 5;
597 const MAX_REP_LEVEL: i16 = 5;
598
599 macro_rules! test {
601 ($test_func:ident, i32, $func:ident, $def_level:expr, $rep_level:expr,
603 $num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
604 test_internal!(
605 $test_func,
606 Int32Type,
607 get_test_int32_type,
608 $func,
609 $def_level,
610 $rep_level,
611 $num_pages,
612 $num_levels,
613 $batch_size,
614 $min,
615 $max
616 );
617 };
618 ($test_func:ident, i64, $func:ident, $def_level:expr, $rep_level:expr,
620 $num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
621 test_internal!(
622 $test_func,
623 Int64Type,
624 get_test_int64_type,
625 $func,
626 $def_level,
627 $rep_level,
628 $num_pages,
629 $num_levels,
630 $batch_size,
631 $min,
632 $max
633 );
634 };
635 }
636
637 macro_rules! test_internal {
638 ($test_func:ident, $ty:ident, $pty:ident, $func:ident, $def_level:expr,
639 $rep_level:expr, $num_pages:expr, $num_levels:expr, $batch_size:expr,
640 $min:expr, $max:expr) => {
641 #[test]
642 fn $test_func() {
643 let desc = Arc::new(ColumnDescriptor::new(
644 Arc::new($pty()),
645 $def_level,
646 $rep_level,
647 ColumnPath::new(Vec::new()),
648 ));
649 let mut tester = ColumnReaderTester::<$ty>::new();
650 tester.$func(desc, $num_pages, $num_levels, $batch_size, $min, $max);
651 }
652 };
653 }
654
655 test!(
656 test_read_plain_v1_int32,
657 i32,
658 plain_v1,
659 MAX_DEF_LEVEL,
660 MAX_REP_LEVEL,
661 NUM_PAGES,
662 NUM_LEVELS,
663 16,
664 i32::MIN,
665 i32::MAX
666 );
667 test!(
668 test_read_plain_v2_int32,
669 i32,
670 plain_v2,
671 MAX_DEF_LEVEL,
672 MAX_REP_LEVEL,
673 NUM_PAGES,
674 NUM_LEVELS,
675 16,
676 i32::MIN,
677 i32::MAX
678 );
679
680 test!(
681 test_read_plain_v1_int32_uneven,
682 i32,
683 plain_v1,
684 MAX_DEF_LEVEL,
685 MAX_REP_LEVEL,
686 NUM_PAGES,
687 NUM_LEVELS,
688 17,
689 i32::MIN,
690 i32::MAX
691 );
692 test!(
693 test_read_plain_v2_int32_uneven,
694 i32,
695 plain_v2,
696 MAX_DEF_LEVEL,
697 MAX_REP_LEVEL,
698 NUM_PAGES,
699 NUM_LEVELS,
700 17,
701 i32::MIN,
702 i32::MAX
703 );
704
705 test!(
706 test_read_plain_v1_int32_multi_page,
707 i32,
708 plain_v1,
709 MAX_DEF_LEVEL,
710 MAX_REP_LEVEL,
711 NUM_PAGES,
712 NUM_LEVELS,
713 512,
714 i32::MIN,
715 i32::MAX
716 );
717 test!(
718 test_read_plain_v2_int32_multi_page,
719 i32,
720 plain_v2,
721 MAX_DEF_LEVEL,
722 MAX_REP_LEVEL,
723 NUM_PAGES,
724 NUM_LEVELS,
725 512,
726 i32::MIN,
727 i32::MAX
728 );
729
730 test!(
732 test_read_plain_v1_int32_required_non_repeated,
733 i32,
734 plain_v1,
735 0,
736 0,
737 NUM_PAGES,
738 NUM_LEVELS,
739 16,
740 i32::MIN,
741 i32::MAX
742 );
743 test!(
744 test_read_plain_v2_int32_required_non_repeated,
745 i32,
746 plain_v2,
747 0,
748 0,
749 NUM_PAGES,
750 NUM_LEVELS,
751 16,
752 i32::MIN,
753 i32::MAX
754 );
755
756 test!(
757 test_read_plain_v1_int64,
758 i64,
759 plain_v1,
760 1,
761 1,
762 NUM_PAGES,
763 NUM_LEVELS,
764 16,
765 i64::MIN,
766 i64::MAX
767 );
768 test!(
769 test_read_plain_v2_int64,
770 i64,
771 plain_v2,
772 1,
773 1,
774 NUM_PAGES,
775 NUM_LEVELS,
776 16,
777 i64::MIN,
778 i64::MAX
779 );
780
781 test!(
782 test_read_plain_v1_int64_uneven,
783 i64,
784 plain_v1,
785 1,
786 1,
787 NUM_PAGES,
788 NUM_LEVELS,
789 17,
790 i64::MIN,
791 i64::MAX
792 );
793 test!(
794 test_read_plain_v2_int64_uneven,
795 i64,
796 plain_v2,
797 1,
798 1,
799 NUM_PAGES,
800 NUM_LEVELS,
801 17,
802 i64::MIN,
803 i64::MAX
804 );
805
806 test!(
807 test_read_plain_v1_int64_multi_page,
808 i64,
809 plain_v1,
810 1,
811 1,
812 NUM_PAGES,
813 NUM_LEVELS,
814 512,
815 i64::MIN,
816 i64::MAX
817 );
818 test!(
819 test_read_plain_v2_int64_multi_page,
820 i64,
821 plain_v2,
822 1,
823 1,
824 NUM_PAGES,
825 NUM_LEVELS,
826 512,
827 i64::MIN,
828 i64::MAX
829 );
830
831 test!(
833 test_read_plain_v1_int64_required_non_repeated,
834 i64,
835 plain_v1,
836 0,
837 0,
838 NUM_PAGES,
839 NUM_LEVELS,
840 16,
841 i64::MIN,
842 i64::MAX
843 );
844 test!(
845 test_read_plain_v2_int64_required_non_repeated,
846 i64,
847 plain_v2,
848 0,
849 0,
850 NUM_PAGES,
851 NUM_LEVELS,
852 16,
853 i64::MIN,
854 i64::MAX
855 );
856
857 test!(
858 test_read_dict_v1_int32_small,
859 i32,
860 dict_v1,
861 MAX_DEF_LEVEL,
862 MAX_REP_LEVEL,
863 2,
864 2,
865 16,
866 0,
867 3
868 );
869 test!(
870 test_read_dict_v2_int32_small,
871 i32,
872 dict_v2,
873 MAX_DEF_LEVEL,
874 MAX_REP_LEVEL,
875 2,
876 2,
877 16,
878 0,
879 3
880 );
881
882 test!(
883 test_read_dict_v1_int32,
884 i32,
885 dict_v1,
886 MAX_DEF_LEVEL,
887 MAX_REP_LEVEL,
888 NUM_PAGES,
889 NUM_LEVELS,
890 16,
891 0,
892 3
893 );
894 test!(
895 test_read_dict_v2_int32,
896 i32,
897 dict_v2,
898 MAX_DEF_LEVEL,
899 MAX_REP_LEVEL,
900 NUM_PAGES,
901 NUM_LEVELS,
902 16,
903 0,
904 3
905 );
906
907 test!(
908 test_read_dict_v1_int32_uneven,
909 i32,
910 dict_v1,
911 MAX_DEF_LEVEL,
912 MAX_REP_LEVEL,
913 NUM_PAGES,
914 NUM_LEVELS,
915 17,
916 0,
917 3
918 );
919 test!(
920 test_read_dict_v2_int32_uneven,
921 i32,
922 dict_v2,
923 MAX_DEF_LEVEL,
924 MAX_REP_LEVEL,
925 NUM_PAGES,
926 NUM_LEVELS,
927 17,
928 0,
929 3
930 );
931
932 test!(
933 test_read_dict_v1_int32_multi_page,
934 i32,
935 dict_v1,
936 MAX_DEF_LEVEL,
937 MAX_REP_LEVEL,
938 NUM_PAGES,
939 NUM_LEVELS,
940 512,
941 0,
942 3
943 );
944 test!(
945 test_read_dict_v2_int32_multi_page,
946 i32,
947 dict_v2,
948 MAX_DEF_LEVEL,
949 MAX_REP_LEVEL,
950 NUM_PAGES,
951 NUM_LEVELS,
952 512,
953 0,
954 3
955 );
956
957 test!(
958 test_read_dict_v1_int64,
959 i64,
960 dict_v1,
961 MAX_DEF_LEVEL,
962 MAX_REP_LEVEL,
963 NUM_PAGES,
964 NUM_LEVELS,
965 16,
966 0,
967 3
968 );
969 test!(
970 test_read_dict_v2_int64,
971 i64,
972 dict_v2,
973 MAX_DEF_LEVEL,
974 MAX_REP_LEVEL,
975 NUM_PAGES,
976 NUM_LEVELS,
977 16,
978 0,
979 3
980 );
981
982 #[test]
983 fn test_read_batch_values_only() {
984 test_read_batch_int32(16, 0, 0);
985 }
986
987 #[test]
988 fn test_read_batch_values_def_levels() {
989 test_read_batch_int32(16, MAX_DEF_LEVEL, 0);
990 }
991
992 #[test]
993 fn test_read_batch_values_rep_levels() {
994 test_read_batch_int32(16, 0, MAX_REP_LEVEL);
995 }
996
997 #[test]
998 fn test_read_batch_values_def_rep_levels() {
999 test_read_batch_int32(128, MAX_DEF_LEVEL, MAX_REP_LEVEL);
1000 }
1001
1002 #[test]
1003 fn test_read_batch_adjust_after_buffering_page() {
1004 let primitive_type = get_test_int32_type();
1011 let desc = Arc::new(ColumnDescriptor::new(
1012 Arc::new(primitive_type),
1013 1,
1014 1,
1015 ColumnPath::new(Vec::new()),
1016 ));
1017
1018 let num_pages = 2;
1019 let num_levels = 4;
1020 let batch_size = 5;
1021
1022 let mut tester = ColumnReaderTester::<Int32Type>::new();
1023 tester.test_read_batch(
1024 desc,
1025 Encoding::RLE_DICTIONARY,
1026 num_pages,
1027 num_levels,
1028 batch_size,
1029 i32::MIN,
1030 i32::MAX,
1031 false,
1032 );
1033 }
1034
1035 fn get_test_int32_type() -> SchemaType {
1081 SchemaType::primitive_type_builder("a", PhysicalType::INT32)
1082 .with_repetition(Repetition::REQUIRED)
1083 .with_converted_type(ConvertedType::INT_32)
1084 .with_length(-1)
1085 .build()
1086 .expect("build() should be OK")
1087 }
1088
1089 fn get_test_int64_type() -> SchemaType {
1091 SchemaType::primitive_type_builder("a", PhysicalType::INT64)
1092 .with_repetition(Repetition::REQUIRED)
1093 .with_converted_type(ConvertedType::INT_64)
1094 .with_length(-1)
1095 .build()
1096 .expect("build() should be OK")
1097 }
1098
1099 fn test_read_batch_int32(batch_size: usize, max_def_level: i16, max_rep_level: i16) {
1104 let primitive_type = get_test_int32_type();
1105
1106 let desc = Arc::new(ColumnDescriptor::new(
1107 Arc::new(primitive_type),
1108 max_def_level,
1109 max_rep_level,
1110 ColumnPath::new(Vec::new()),
1111 ));
1112
1113 let mut tester = ColumnReaderTester::<Int32Type>::new();
1114 tester.test_read_batch(
1115 desc,
1116 Encoding::RLE_DICTIONARY,
1117 NUM_PAGES,
1118 NUM_LEVELS,
1119 batch_size,
1120 i32::MIN,
1121 i32::MAX,
1122 false,
1123 );
1124 }
1125
1126 struct ColumnReaderTester<T: DataType>
1127 where
1128 T::T: PartialOrd + SampleUniform + Copy,
1129 {
1130 rep_levels: Vec<i16>,
1131 def_levels: Vec<i16>,
1132 values: Vec<T::T>,
1133 }
1134
1135 impl<T: DataType> ColumnReaderTester<T>
1136 where
1137 T::T: PartialOrd + SampleUniform + Copy,
1138 {
1139 pub fn new() -> Self {
1140 Self {
1141 rep_levels: Vec::new(),
1142 def_levels: Vec::new(),
1143 values: Vec::new(),
1144 }
1145 }
1146
1147 fn plain_v1(
1149 &mut self,
1150 desc: ColumnDescPtr,
1151 num_pages: usize,
1152 num_levels: usize,
1153 batch_size: usize,
1154 min: T::T,
1155 max: T::T,
1156 ) {
1157 self.test_read_batch_general(
1158 desc,
1159 Encoding::PLAIN,
1160 num_pages,
1161 num_levels,
1162 batch_size,
1163 min,
1164 max,
1165 false,
1166 );
1167 }
1168
1169 fn plain_v2(
1171 &mut self,
1172 desc: ColumnDescPtr,
1173 num_pages: usize,
1174 num_levels: usize,
1175 batch_size: usize,
1176 min: T::T,
1177 max: T::T,
1178 ) {
1179 self.test_read_batch_general(
1180 desc,
1181 Encoding::PLAIN,
1182 num_pages,
1183 num_levels,
1184 batch_size,
1185 min,
1186 max,
1187 true,
1188 );
1189 }
1190
1191 fn dict_v1(
1193 &mut self,
1194 desc: ColumnDescPtr,
1195 num_pages: usize,
1196 num_levels: usize,
1197 batch_size: usize,
1198 min: T::T,
1199 max: T::T,
1200 ) {
1201 self.test_read_batch_general(
1202 desc,
1203 Encoding::RLE_DICTIONARY,
1204 num_pages,
1205 num_levels,
1206 batch_size,
1207 min,
1208 max,
1209 false,
1210 );
1211 }
1212
1213 fn dict_v2(
1215 &mut self,
1216 desc: ColumnDescPtr,
1217 num_pages: usize,
1218 num_levels: usize,
1219 batch_size: usize,
1220 min: T::T,
1221 max: T::T,
1222 ) {
1223 self.test_read_batch_general(
1224 desc,
1225 Encoding::RLE_DICTIONARY,
1226 num_pages,
1227 num_levels,
1228 batch_size,
1229 min,
1230 max,
1231 true,
1232 );
1233 }
1234
1235 #[allow(clippy::too_many_arguments)]
1238 fn test_read_batch_general(
1239 &mut self,
1240 desc: ColumnDescPtr,
1241 encoding: Encoding,
1242 num_pages: usize,
1243 num_levels: usize,
1244 batch_size: usize,
1245 min: T::T,
1246 max: T::T,
1247 use_v2: bool,
1248 ) {
1249 self.test_read_batch(
1250 desc, encoding, num_pages, num_levels, batch_size, min, max, use_v2,
1251 );
1252 }
1253
1254 #[allow(clippy::too_many_arguments)]
1257 fn test_read_batch(
1258 &mut self,
1259 desc: ColumnDescPtr,
1260 encoding: Encoding,
1261 num_pages: usize,
1262 num_levels: usize,
1263 batch_size: usize,
1264 min: T::T,
1265 max: T::T,
1266 use_v2: bool,
1267 ) {
1268 let mut pages = VecDeque::new();
1269 make_pages::<T>(
1270 desc.clone(),
1271 encoding,
1272 num_pages,
1273 num_levels,
1274 min,
1275 max,
1276 &mut self.def_levels,
1277 &mut self.rep_levels,
1278 &mut self.values,
1279 &mut pages,
1280 use_v2,
1281 );
1282 let max_def_level = desc.max_def_level();
1283 let max_rep_level = desc.max_rep_level();
1284 let page_reader = InMemoryPageReader::new(pages);
1285 let column_reader: ColumnReader = get_column_reader(desc, Box::new(page_reader));
1286 let mut typed_column_reader = get_typed_column_reader::<T>(column_reader);
1287
1288 let mut values = Vec::new();
1289 let mut def_levels = Vec::new();
1290 let mut rep_levels = Vec::new();
1291
1292 let mut curr_values_read = 0;
1293 let mut curr_levels_read = 0;
1294 loop {
1295 let (_, values_read, levels_read) = typed_column_reader
1296 .read_records(
1297 batch_size,
1298 Some(&mut def_levels),
1299 Some(&mut rep_levels),
1300 &mut values,
1301 )
1302 .expect("read_batch() should be OK");
1303
1304 curr_values_read += values_read;
1305 curr_levels_read += levels_read;
1306
1307 if values_read == 0 && levels_read == 0 {
1308 break;
1309 }
1310 }
1311
1312 assert_eq!(values, self.values, "values content doesn't match");
1313
1314 if max_def_level > 0 {
1315 assert_eq!(
1316 def_levels, self.def_levels,
1317 "definition levels content doesn't match"
1318 );
1319 }
1320
1321 if max_rep_level > 0 {
1322 assert_eq!(
1323 rep_levels, self.rep_levels,
1324 "repetition levels content doesn't match"
1325 );
1326 }
1327
1328 assert!(
1329 curr_levels_read >= curr_values_read,
1330 "expected levels read to be greater than values read"
1331 );
1332 }
1333 }
1334}