1use bytes::Bytes;
21
22use super::page::{Page, PageReader};
23use crate::basic::*;
24use crate::column::reader::decoder::{
25 ColumnValueDecoder, ColumnValueDecoderImpl, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
26 RepetitionLevelDecoder, RepetitionLevelDecoderImpl,
27};
28use crate::data_type::*;
29use crate::errors::{ParquetError, Result};
30use crate::schema::types::ColumnDescPtr;
31use crate::util::bit_util::{ceil, num_required_bits, read_num_bytes};
32
33pub(crate) mod decoder;
34
35pub enum ColumnReader {
37 BoolColumnReader(ColumnReaderImpl<BoolType>),
39 Int32ColumnReader(ColumnReaderImpl<Int32Type>),
41 Int64ColumnReader(ColumnReaderImpl<Int64Type>),
43 Int96ColumnReader(ColumnReaderImpl<Int96Type>),
45 FloatColumnReader(ColumnReaderImpl<FloatType>),
47 DoubleColumnReader(ColumnReaderImpl<DoubleType>),
49 ByteArrayColumnReader(ColumnReaderImpl<ByteArrayType>),
51 FixedLenByteArrayColumnReader(ColumnReaderImpl<FixedLenByteArrayType>),
53}
54
55pub fn get_column_reader(
58 col_descr: ColumnDescPtr,
59 col_page_reader: Box<dyn PageReader>,
60) -> ColumnReader {
61 match col_descr.physical_type() {
62 Type::BOOLEAN => {
63 ColumnReader::BoolColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
64 }
65 Type::INT32 => {
66 ColumnReader::Int32ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
67 }
68 Type::INT64 => {
69 ColumnReader::Int64ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
70 }
71 Type::INT96 => {
72 ColumnReader::Int96ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
73 }
74 Type::FLOAT => {
75 ColumnReader::FloatColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
76 }
77 Type::DOUBLE => {
78 ColumnReader::DoubleColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
79 }
80 Type::BYTE_ARRAY => {
81 ColumnReader::ByteArrayColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
82 }
83 Type::FIXED_LEN_BYTE_ARRAY => ColumnReader::FixedLenByteArrayColumnReader(
84 ColumnReaderImpl::new(col_descr, col_page_reader),
85 ),
86 }
87}
88
89pub fn get_typed_column_reader<T: DataType>(col_reader: ColumnReader) -> ColumnReaderImpl<T> {
94 T::get_column_reader(col_reader).unwrap_or_else(|| {
95 panic!(
96 "Failed to convert column reader into a typed column reader for `{}` type",
97 T::get_physical_type()
98 )
99 })
100}
101
102pub type ColumnReaderImpl<T> = GenericColumnReader<
104 RepetitionLevelDecoderImpl,
105 DefinitionLevelDecoderImpl,
106 ColumnValueDecoderImpl<T>,
107>;
108
109pub struct GenericColumnReader<R, D, V> {
115 descr: ColumnDescPtr,
116
117 page_reader: Box<dyn PageReader>,
118
119 num_buffered_values: usize,
121
122 num_decoded_values: usize,
125
126 has_record_delimiter: bool,
128
129 def_level_decoder: Option<D>,
131
132 rep_level_decoder: Option<R>,
134
135 values_decoder: V,
137}
138
139impl<V> GenericColumnReader<RepetitionLevelDecoderImpl, DefinitionLevelDecoderImpl, V>
140where
141 V: ColumnValueDecoder,
142{
143 pub fn new(descr: ColumnDescPtr, page_reader: Box<dyn PageReader>) -> Self {
145 let values_decoder = V::new(&descr);
146
147 let def_level_decoder = (descr.max_def_level() != 0)
148 .then(|| DefinitionLevelDecoderImpl::new(descr.max_def_level()));
149
150 let rep_level_decoder = (descr.max_rep_level() != 0)
151 .then(|| RepetitionLevelDecoderImpl::new(descr.max_rep_level()));
152
153 Self::new_with_decoders(
154 descr,
155 page_reader,
156 values_decoder,
157 def_level_decoder,
158 rep_level_decoder,
159 )
160 }
161}
162
163impl<R, D, V> GenericColumnReader<R, D, V>
164where
165 R: RepetitionLevelDecoder,
166 D: DefinitionLevelDecoder,
167 V: ColumnValueDecoder,
168{
169 pub(crate) fn new_with_decoders(
170 descr: ColumnDescPtr,
171 page_reader: Box<dyn PageReader>,
172 values_decoder: V,
173 def_level_decoder: Option<D>,
174 rep_level_decoder: Option<R>,
175 ) -> Self {
176 Self {
177 descr,
178 def_level_decoder,
179 rep_level_decoder,
180 page_reader,
181 num_buffered_values: 0,
182 num_decoded_values: 0,
183 values_decoder,
184 has_record_delimiter: false,
185 }
186 }
187
188 pub fn read_records(
203 &mut self,
204 max_records: usize,
205 mut def_levels: Option<&mut D::Buffer>,
206 mut rep_levels: Option<&mut R::Buffer>,
207 values: &mut V::Buffer,
208 ) -> Result<(usize, usize, usize)> {
209 let mut total_records_read = 0;
210 let mut total_levels_read = 0;
211 let mut total_values_read = 0;
212
213 while total_records_read < max_records && self.has_next()? {
214 let remaining_records = max_records - total_records_read;
215 let remaining_levels = self.num_buffered_values - self.num_decoded_values;
216
217 let (records_read, levels_to_read) = match self.rep_level_decoder.as_mut() {
218 Some(reader) => {
219 let out = rep_levels
220 .as_mut()
221 .ok_or_else(|| general_err!("must specify repetition levels"))?;
222
223 let (mut records_read, levels_read) =
224 reader.read_rep_levels(out, remaining_records, remaining_levels)?;
225
226 if records_read == 0 && levels_read == 0 {
227 return Err(general_err!(
229 "Insufficient repetition levels read from column"
230 ));
231 }
232 if levels_read == remaining_levels && self.has_record_delimiter {
233 assert!(records_read < remaining_records); records_read += reader.flush_partial() as usize;
237 }
238 (records_read, levels_read)
239 }
240 None => {
241 let min = remaining_records.min(remaining_levels);
242 (min, min)
243 }
244 };
245
246 let values_to_read = match self.def_level_decoder.as_mut() {
247 Some(reader) => {
248 let out = def_levels
249 .as_mut()
250 .ok_or_else(|| general_err!("must specify definition levels"))?;
251
252 let (values_read, levels_read) = reader.read_def_levels(out, levels_to_read)?;
253
254 if levels_read != levels_to_read {
255 return Err(general_err!(
256 "insufficient definition levels read from column - expected {levels_to_read}, got {levels_read}"
257 ));
258 }
259
260 values_read
261 }
262 None => levels_to_read,
263 };
264
265 let values_read = self.values_decoder.read(values, values_to_read)?;
266
267 if values_read != values_to_read {
268 return Err(general_err!(
269 "insufficient values read from column - expected: {values_to_read}, got: {values_read}",
270 ));
271 }
272
273 self.num_decoded_values += levels_to_read;
274 total_records_read += records_read;
275 total_levels_read += levels_to_read;
276 total_values_read += values_read;
277 }
278
279 Ok((total_records_read, total_values_read, total_levels_read))
280 }
281
282 pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
288 let mut remaining_records = num_records;
289 while remaining_records != 0 {
290 if self.num_buffered_values == self.num_decoded_values {
291 let metadata = match self.page_reader.peek_next_page()? {
292 None => return Ok(num_records - remaining_records),
293 Some(metadata) => metadata,
294 };
295
296 if metadata.is_dict {
298 self.read_dictionary_page()?;
299 continue;
300 }
301
302 let rows = metadata.num_rows.or_else(|| {
305 self.rep_level_decoder
307 .is_none()
308 .then_some(metadata.num_levels)?
309 });
310
311 if let Some(rows) = rows {
312 if rows <= remaining_records {
313 self.page_reader.skip_next_page()?;
314 remaining_records -= rows;
315 continue;
316 }
317 }
318 if !self.read_new_page()? {
321 return Ok(num_records - remaining_records);
322 }
323 }
324
325 let remaining_levels = self.num_buffered_values - self.num_decoded_values;
329
330 let (records_read, rep_levels_read) = match self.rep_level_decoder.as_mut() {
331 Some(decoder) => {
332 let (mut records_read, levels_read) =
333 decoder.skip_rep_levels(remaining_records, remaining_levels)?;
334
335 if levels_read == remaining_levels && self.has_record_delimiter {
336 assert!(records_read < remaining_records); records_read += decoder.flush_partial() as usize;
340 }
341
342 (records_read, levels_read)
343 }
344 None => {
345 let levels = remaining_levels.min(remaining_records);
347 (levels, levels)
348 }
349 };
350
351 self.num_decoded_values += rep_levels_read;
352 remaining_records -= records_read;
353
354 if self.num_buffered_values == self.num_decoded_values {
355 continue;
357 }
358
359 let (values_read, def_levels_read) = match self.def_level_decoder.as_mut() {
360 Some(decoder) => decoder.skip_def_levels(rep_levels_read)?,
361 None => (rep_levels_read, rep_levels_read),
362 };
363
364 if rep_levels_read != def_levels_read {
365 return Err(general_err!(
366 "levels mismatch, read {} repetition levels and {} definition levels",
367 rep_levels_read,
368 def_levels_read
369 ));
370 }
371
372 let values = self.values_decoder.skip_values(values_read)?;
373 if values != values_read {
374 return Err(general_err!(
375 "skipped {} values, expected {}",
376 values,
377 values_read
378 ));
379 }
380 }
381 Ok(num_records - remaining_records)
382 }
383
384 fn read_dictionary_page(&mut self) -> Result<()> {
387 match self.page_reader.get_next_page()? {
388 Some(Page::DictionaryPage {
389 buf,
390 num_values,
391 encoding,
392 is_sorted,
393 }) => self
394 .values_decoder
395 .set_dict(buf, num_values, encoding, is_sorted),
396 _ => Err(ParquetError::General(
397 "Invalid page. Expecting dictionary page".to_string(),
398 )),
399 }
400 }
401
402 fn read_new_page(&mut self) -> Result<bool> {
405 loop {
406 match self.page_reader.get_next_page()? {
407 None => return Ok(false),
409 Some(current_page) => {
410 match current_page {
411 Page::DictionaryPage {
413 buf,
414 num_values,
415 encoding,
416 is_sorted,
417 } => {
418 self.values_decoder
419 .set_dict(buf, num_values, encoding, is_sorted)?;
420 continue;
421 }
422 Page::DataPage {
424 buf,
425 num_values,
426 encoding,
427 def_level_encoding,
428 rep_level_encoding,
429 statistics: _,
430 } => {
431 self.num_buffered_values = num_values as _;
432 self.num_decoded_values = 0;
433
434 let max_rep_level = self.descr.max_rep_level();
435 let max_def_level = self.descr.max_def_level();
436
437 let mut offset = 0;
438
439 if max_rep_level > 0 {
440 let (bytes_read, level_data) = parse_v1_level(
441 max_rep_level,
442 num_values,
443 rep_level_encoding,
444 buf.slice(offset..),
445 )?;
446 offset += bytes_read;
447
448 self.has_record_delimiter =
449 self.page_reader.at_record_boundary()?;
450
451 self.rep_level_decoder
452 .as_mut()
453 .unwrap()
454 .set_data(rep_level_encoding, level_data)?;
455 }
456
457 if max_def_level > 0 {
458 let (bytes_read, level_data) = parse_v1_level(
459 max_def_level,
460 num_values,
461 def_level_encoding,
462 buf.slice(offset..),
463 )?;
464 offset += bytes_read;
465
466 self.def_level_decoder
467 .as_mut()
468 .unwrap()
469 .set_data(def_level_encoding, level_data)?;
470 }
471
472 self.values_decoder.set_data(
473 encoding,
474 buf.slice(offset..),
475 num_values as usize,
476 None,
477 )?;
478 return Ok(true);
479 }
480 Page::DataPageV2 {
482 buf,
483 num_values,
484 encoding,
485 num_nulls,
486 num_rows: _,
487 def_levels_byte_len,
488 rep_levels_byte_len,
489 is_compressed: _,
490 statistics: _,
491 } => {
492 if num_nulls > num_values {
493 return Err(general_err!(
494 "more nulls than values in page, contained {} values and {} nulls",
495 num_values,
496 num_nulls
497 ));
498 }
499
500 self.num_buffered_values = num_values as _;
501 self.num_decoded_values = 0;
502
503 if self.descr.max_rep_level() > 0 {
506 self.has_record_delimiter =
510 self.page_reader.at_record_boundary()?;
511
512 self.rep_level_decoder.as_mut().unwrap().set_data(
513 Encoding::RLE,
514 buf.slice(..rep_levels_byte_len as usize),
515 )?;
516 }
517
518 if self.descr.max_def_level() > 0 {
521 self.def_level_decoder.as_mut().unwrap().set_data(
522 Encoding::RLE,
523 buf.slice(
524 rep_levels_byte_len as usize
525 ..(rep_levels_byte_len + def_levels_byte_len) as usize,
526 ),
527 )?;
528 }
529
530 self.values_decoder.set_data(
531 encoding,
532 buf.slice((rep_levels_byte_len + def_levels_byte_len) as usize..),
533 num_values as usize,
534 Some((num_values - num_nulls) as usize),
535 )?;
536 return Ok(true);
537 }
538 };
539 }
540 }
541 }
542 }
543
544 #[inline]
548 pub(crate) fn has_next(&mut self) -> Result<bool> {
549 if self.num_buffered_values == 0 || self.num_buffered_values == self.num_decoded_values {
550 if !self.read_new_page()? {
553 Ok(false)
554 } else {
555 Ok(self.num_buffered_values != 0)
556 }
557 } else {
558 Ok(true)
559 }
560 }
561}
562
563fn parse_v1_level(
564 max_level: i16,
565 num_buffered_values: u32,
566 encoding: Encoding,
567 buf: Bytes,
568) -> Result<(usize, Bytes)> {
569 match encoding {
570 Encoding::RLE => {
571 let i32_size = std::mem::size_of::<i32>();
572 if i32_size <= buf.len() {
573 let data_size = read_num_bytes::<i32>(i32_size, buf.as_ref()) as usize;
574 let end = i32_size
575 .checked_add(data_size)
576 .ok_or(general_err!("invalid level length"))?;
577 if end <= buf.len() {
578 return Ok((end, buf.slice(i32_size..end)));
579 }
580 }
581 Err(general_err!("not enough data to read levels"))
582 }
583 #[allow(deprecated)]
584 Encoding::BIT_PACKED => {
585 let bit_width = num_required_bits(max_level as u64);
586 let num_bytes = ceil(num_buffered_values as usize * bit_width as usize, 8);
587 Ok((num_bytes, buf.slice(..num_bytes)))
588 }
589 _ => Err(general_err!("invalid level encoding: {}", encoding)),
590 }
591}
592
593#[cfg(test)]
594mod tests {
595 use super::*;
596
597 use rand::distr::uniform::SampleUniform;
598 use std::{collections::VecDeque, sync::Arc};
599
600 use crate::basic::Type as PhysicalType;
601 use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
602 use crate::util::test_common::page_util::InMemoryPageReader;
603 use crate::util::test_common::rand_gen::make_pages;
604
605 #[test]
606 fn test_parse_v1_level_invalid_length() {
607 let buf = Bytes::from(vec![10, 0, 0, 0]);
609 let err = parse_v1_level(1, 100, Encoding::RLE, buf).unwrap_err();
610 assert_eq!(
611 err.to_string(),
612 "Parquet error: not enough data to read levels"
613 );
614
615 let buf = Bytes::from(vec![4, 0, 0]);
617 let err = parse_v1_level(1, 100, Encoding::RLE, buf).unwrap_err();
618 assert_eq!(
619 err.to_string(),
620 "Parquet error: not enough data to read levels"
621 );
622 }
623
624 const NUM_LEVELS: usize = 128;
625 const NUM_PAGES: usize = 2;
626 const MAX_DEF_LEVEL: i16 = 5;
627 const MAX_REP_LEVEL: i16 = 5;
628
629 macro_rules! test {
631 ($test_func:ident, i32, $func:ident, $def_level:expr, $rep_level:expr,
633 $num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
634 test_internal!(
635 $test_func,
636 Int32Type,
637 get_test_int32_type,
638 $func,
639 $def_level,
640 $rep_level,
641 $num_pages,
642 $num_levels,
643 $batch_size,
644 $min,
645 $max
646 );
647 };
648 ($test_func:ident, i64, $func:ident, $def_level:expr, $rep_level:expr,
650 $num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
651 test_internal!(
652 $test_func,
653 Int64Type,
654 get_test_int64_type,
655 $func,
656 $def_level,
657 $rep_level,
658 $num_pages,
659 $num_levels,
660 $batch_size,
661 $min,
662 $max
663 );
664 };
665 }
666
667 macro_rules! test_internal {
668 ($test_func:ident, $ty:ident, $pty:ident, $func:ident, $def_level:expr,
669 $rep_level:expr, $num_pages:expr, $num_levels:expr, $batch_size:expr,
670 $min:expr, $max:expr) => {
671 #[test]
672 fn $test_func() {
673 let desc = Arc::new(ColumnDescriptor::new(
674 Arc::new($pty()),
675 $def_level,
676 $rep_level,
677 ColumnPath::new(Vec::new()),
678 ));
679 let mut tester = ColumnReaderTester::<$ty>::new();
680 tester.$func(desc, $num_pages, $num_levels, $batch_size, $min, $max);
681 }
682 };
683 }
684
685 test!(
686 test_read_plain_v1_int32,
687 i32,
688 plain_v1,
689 MAX_DEF_LEVEL,
690 MAX_REP_LEVEL,
691 NUM_PAGES,
692 NUM_LEVELS,
693 16,
694 i32::MIN,
695 i32::MAX
696 );
697 test!(
698 test_read_plain_v2_int32,
699 i32,
700 plain_v2,
701 MAX_DEF_LEVEL,
702 MAX_REP_LEVEL,
703 NUM_PAGES,
704 NUM_LEVELS,
705 16,
706 i32::MIN,
707 i32::MAX
708 );
709
710 test!(
711 test_read_plain_v1_int32_uneven,
712 i32,
713 plain_v1,
714 MAX_DEF_LEVEL,
715 MAX_REP_LEVEL,
716 NUM_PAGES,
717 NUM_LEVELS,
718 17,
719 i32::MIN,
720 i32::MAX
721 );
722 test!(
723 test_read_plain_v2_int32_uneven,
724 i32,
725 plain_v2,
726 MAX_DEF_LEVEL,
727 MAX_REP_LEVEL,
728 NUM_PAGES,
729 NUM_LEVELS,
730 17,
731 i32::MIN,
732 i32::MAX
733 );
734
735 test!(
736 test_read_plain_v1_int32_multi_page,
737 i32,
738 plain_v1,
739 MAX_DEF_LEVEL,
740 MAX_REP_LEVEL,
741 NUM_PAGES,
742 NUM_LEVELS,
743 512,
744 i32::MIN,
745 i32::MAX
746 );
747 test!(
748 test_read_plain_v2_int32_multi_page,
749 i32,
750 plain_v2,
751 MAX_DEF_LEVEL,
752 MAX_REP_LEVEL,
753 NUM_PAGES,
754 NUM_LEVELS,
755 512,
756 i32::MIN,
757 i32::MAX
758 );
759
760 test!(
762 test_read_plain_v1_int32_required_non_repeated,
763 i32,
764 plain_v1,
765 0,
766 0,
767 NUM_PAGES,
768 NUM_LEVELS,
769 16,
770 i32::MIN,
771 i32::MAX
772 );
773 test!(
774 test_read_plain_v2_int32_required_non_repeated,
775 i32,
776 plain_v2,
777 0,
778 0,
779 NUM_PAGES,
780 NUM_LEVELS,
781 16,
782 i32::MIN,
783 i32::MAX
784 );
785
786 test!(
787 test_read_plain_v1_int64,
788 i64,
789 plain_v1,
790 1,
791 1,
792 NUM_PAGES,
793 NUM_LEVELS,
794 16,
795 i64::MIN,
796 i64::MAX
797 );
798 test!(
799 test_read_plain_v2_int64,
800 i64,
801 plain_v2,
802 1,
803 1,
804 NUM_PAGES,
805 NUM_LEVELS,
806 16,
807 i64::MIN,
808 i64::MAX
809 );
810
811 test!(
812 test_read_plain_v1_int64_uneven,
813 i64,
814 plain_v1,
815 1,
816 1,
817 NUM_PAGES,
818 NUM_LEVELS,
819 17,
820 i64::MIN,
821 i64::MAX
822 );
823 test!(
824 test_read_plain_v2_int64_uneven,
825 i64,
826 plain_v2,
827 1,
828 1,
829 NUM_PAGES,
830 NUM_LEVELS,
831 17,
832 i64::MIN,
833 i64::MAX
834 );
835
836 test!(
837 test_read_plain_v1_int64_multi_page,
838 i64,
839 plain_v1,
840 1,
841 1,
842 NUM_PAGES,
843 NUM_LEVELS,
844 512,
845 i64::MIN,
846 i64::MAX
847 );
848 test!(
849 test_read_plain_v2_int64_multi_page,
850 i64,
851 plain_v2,
852 1,
853 1,
854 NUM_PAGES,
855 NUM_LEVELS,
856 512,
857 i64::MIN,
858 i64::MAX
859 );
860
861 test!(
863 test_read_plain_v1_int64_required_non_repeated,
864 i64,
865 plain_v1,
866 0,
867 0,
868 NUM_PAGES,
869 NUM_LEVELS,
870 16,
871 i64::MIN,
872 i64::MAX
873 );
874 test!(
875 test_read_plain_v2_int64_required_non_repeated,
876 i64,
877 plain_v2,
878 0,
879 0,
880 NUM_PAGES,
881 NUM_LEVELS,
882 16,
883 i64::MIN,
884 i64::MAX
885 );
886
887 test!(
888 test_read_dict_v1_int32_small,
889 i32,
890 dict_v1,
891 MAX_DEF_LEVEL,
892 MAX_REP_LEVEL,
893 2,
894 2,
895 16,
896 0,
897 3
898 );
899 test!(
900 test_read_dict_v2_int32_small,
901 i32,
902 dict_v2,
903 MAX_DEF_LEVEL,
904 MAX_REP_LEVEL,
905 2,
906 2,
907 16,
908 0,
909 3
910 );
911
912 test!(
913 test_read_dict_v1_int32,
914 i32,
915 dict_v1,
916 MAX_DEF_LEVEL,
917 MAX_REP_LEVEL,
918 NUM_PAGES,
919 NUM_LEVELS,
920 16,
921 0,
922 3
923 );
924 test!(
925 test_read_dict_v2_int32,
926 i32,
927 dict_v2,
928 MAX_DEF_LEVEL,
929 MAX_REP_LEVEL,
930 NUM_PAGES,
931 NUM_LEVELS,
932 16,
933 0,
934 3
935 );
936
937 test!(
938 test_read_dict_v1_int32_uneven,
939 i32,
940 dict_v1,
941 MAX_DEF_LEVEL,
942 MAX_REP_LEVEL,
943 NUM_PAGES,
944 NUM_LEVELS,
945 17,
946 0,
947 3
948 );
949 test!(
950 test_read_dict_v2_int32_uneven,
951 i32,
952 dict_v2,
953 MAX_DEF_LEVEL,
954 MAX_REP_LEVEL,
955 NUM_PAGES,
956 NUM_LEVELS,
957 17,
958 0,
959 3
960 );
961
962 test!(
963 test_read_dict_v1_int32_multi_page,
964 i32,
965 dict_v1,
966 MAX_DEF_LEVEL,
967 MAX_REP_LEVEL,
968 NUM_PAGES,
969 NUM_LEVELS,
970 512,
971 0,
972 3
973 );
974 test!(
975 test_read_dict_v2_int32_multi_page,
976 i32,
977 dict_v2,
978 MAX_DEF_LEVEL,
979 MAX_REP_LEVEL,
980 NUM_PAGES,
981 NUM_LEVELS,
982 512,
983 0,
984 3
985 );
986
987 test!(
988 test_read_dict_v1_int64,
989 i64,
990 dict_v1,
991 MAX_DEF_LEVEL,
992 MAX_REP_LEVEL,
993 NUM_PAGES,
994 NUM_LEVELS,
995 16,
996 0,
997 3
998 );
999 test!(
1000 test_read_dict_v2_int64,
1001 i64,
1002 dict_v2,
1003 MAX_DEF_LEVEL,
1004 MAX_REP_LEVEL,
1005 NUM_PAGES,
1006 NUM_LEVELS,
1007 16,
1008 0,
1009 3
1010 );
1011
1012 #[test]
1013 fn test_read_batch_values_only() {
1014 test_read_batch_int32(16, 0, 0);
1015 }
1016
1017 #[test]
1018 fn test_read_batch_values_def_levels() {
1019 test_read_batch_int32(16, MAX_DEF_LEVEL, 0);
1020 }
1021
1022 #[test]
1023 fn test_read_batch_values_rep_levels() {
1024 test_read_batch_int32(16, 0, MAX_REP_LEVEL);
1025 }
1026
1027 #[test]
1028 fn test_read_batch_values_def_rep_levels() {
1029 test_read_batch_int32(128, MAX_DEF_LEVEL, MAX_REP_LEVEL);
1030 }
1031
1032 #[test]
1033 fn test_read_batch_adjust_after_buffering_page() {
1034 let primitive_type = get_test_int32_type();
1041 let desc = Arc::new(ColumnDescriptor::new(
1042 Arc::new(primitive_type),
1043 1,
1044 1,
1045 ColumnPath::new(Vec::new()),
1046 ));
1047
1048 let num_pages = 2;
1049 let num_levels = 4;
1050 let batch_size = 5;
1051
1052 let mut tester = ColumnReaderTester::<Int32Type>::new();
1053 tester.test_read_batch(
1054 desc,
1055 Encoding::RLE_DICTIONARY,
1056 num_pages,
1057 num_levels,
1058 batch_size,
1059 i32::MIN,
1060 i32::MAX,
1061 false,
1062 );
1063 }
1064
1065 fn get_test_int32_type() -> SchemaType {
1111 SchemaType::primitive_type_builder("a", PhysicalType::INT32)
1112 .with_repetition(Repetition::REQUIRED)
1113 .with_converted_type(ConvertedType::INT_32)
1114 .with_length(-1)
1115 .build()
1116 .expect("build() should be OK")
1117 }
1118
1119 fn get_test_int64_type() -> SchemaType {
1121 SchemaType::primitive_type_builder("a", PhysicalType::INT64)
1122 .with_repetition(Repetition::REQUIRED)
1123 .with_converted_type(ConvertedType::INT_64)
1124 .with_length(-1)
1125 .build()
1126 .expect("build() should be OK")
1127 }
1128
1129 fn test_read_batch_int32(batch_size: usize, max_def_level: i16, max_rep_level: i16) {
1134 let primitive_type = get_test_int32_type();
1135
1136 let desc = Arc::new(ColumnDescriptor::new(
1137 Arc::new(primitive_type),
1138 max_def_level,
1139 max_rep_level,
1140 ColumnPath::new(Vec::new()),
1141 ));
1142
1143 let mut tester = ColumnReaderTester::<Int32Type>::new();
1144 tester.test_read_batch(
1145 desc,
1146 Encoding::RLE_DICTIONARY,
1147 NUM_PAGES,
1148 NUM_LEVELS,
1149 batch_size,
1150 i32::MIN,
1151 i32::MAX,
1152 false,
1153 );
1154 }
1155
1156 struct ColumnReaderTester<T: DataType>
1157 where
1158 T::T: PartialOrd + SampleUniform + Copy,
1159 {
1160 rep_levels: Vec<i16>,
1161 def_levels: Vec<i16>,
1162 values: Vec<T::T>,
1163 }
1164
1165 impl<T: DataType> ColumnReaderTester<T>
1166 where
1167 T::T: PartialOrd + SampleUniform + Copy,
1168 {
1169 pub fn new() -> Self {
1170 Self {
1171 rep_levels: Vec::new(),
1172 def_levels: Vec::new(),
1173 values: Vec::new(),
1174 }
1175 }
1176
1177 fn plain_v1(
1179 &mut self,
1180 desc: ColumnDescPtr,
1181 num_pages: usize,
1182 num_levels: usize,
1183 batch_size: usize,
1184 min: T::T,
1185 max: T::T,
1186 ) {
1187 self.test_read_batch_general(
1188 desc,
1189 Encoding::PLAIN,
1190 num_pages,
1191 num_levels,
1192 batch_size,
1193 min,
1194 max,
1195 false,
1196 );
1197 }
1198
1199 fn plain_v2(
1201 &mut self,
1202 desc: ColumnDescPtr,
1203 num_pages: usize,
1204 num_levels: usize,
1205 batch_size: usize,
1206 min: T::T,
1207 max: T::T,
1208 ) {
1209 self.test_read_batch_general(
1210 desc,
1211 Encoding::PLAIN,
1212 num_pages,
1213 num_levels,
1214 batch_size,
1215 min,
1216 max,
1217 true,
1218 );
1219 }
1220
1221 fn dict_v1(
1223 &mut self,
1224 desc: ColumnDescPtr,
1225 num_pages: usize,
1226 num_levels: usize,
1227 batch_size: usize,
1228 min: T::T,
1229 max: T::T,
1230 ) {
1231 self.test_read_batch_general(
1232 desc,
1233 Encoding::RLE_DICTIONARY,
1234 num_pages,
1235 num_levels,
1236 batch_size,
1237 min,
1238 max,
1239 false,
1240 );
1241 }
1242
1243 fn dict_v2(
1245 &mut self,
1246 desc: ColumnDescPtr,
1247 num_pages: usize,
1248 num_levels: usize,
1249 batch_size: usize,
1250 min: T::T,
1251 max: T::T,
1252 ) {
1253 self.test_read_batch_general(
1254 desc,
1255 Encoding::RLE_DICTIONARY,
1256 num_pages,
1257 num_levels,
1258 batch_size,
1259 min,
1260 max,
1261 true,
1262 );
1263 }
1264
1265 #[allow(clippy::too_many_arguments)]
1268 fn test_read_batch_general(
1269 &mut self,
1270 desc: ColumnDescPtr,
1271 encoding: Encoding,
1272 num_pages: usize,
1273 num_levels: usize,
1274 batch_size: usize,
1275 min: T::T,
1276 max: T::T,
1277 use_v2: bool,
1278 ) {
1279 self.test_read_batch(
1280 desc, encoding, num_pages, num_levels, batch_size, min, max, use_v2,
1281 );
1282 }
1283
1284 #[allow(clippy::too_many_arguments)]
1287 fn test_read_batch(
1288 &mut self,
1289 desc: ColumnDescPtr,
1290 encoding: Encoding,
1291 num_pages: usize,
1292 num_levels: usize,
1293 batch_size: usize,
1294 min: T::T,
1295 max: T::T,
1296 use_v2: bool,
1297 ) {
1298 let mut pages = VecDeque::new();
1299 make_pages::<T>(
1300 desc.clone(),
1301 encoding,
1302 num_pages,
1303 num_levels,
1304 min,
1305 max,
1306 &mut self.def_levels,
1307 &mut self.rep_levels,
1308 &mut self.values,
1309 &mut pages,
1310 use_v2,
1311 );
1312 let max_def_level = desc.max_def_level();
1313 let max_rep_level = desc.max_rep_level();
1314 let page_reader = InMemoryPageReader::new(pages);
1315 let column_reader: ColumnReader = get_column_reader(desc, Box::new(page_reader));
1316 let mut typed_column_reader = get_typed_column_reader::<T>(column_reader);
1317
1318 let mut values = Vec::new();
1319 let mut def_levels = Vec::new();
1320 let mut rep_levels = Vec::new();
1321
1322 let mut curr_values_read = 0;
1323 let mut curr_levels_read = 0;
1324 loop {
1325 let (_, values_read, levels_read) = typed_column_reader
1326 .read_records(
1327 batch_size,
1328 Some(&mut def_levels),
1329 Some(&mut rep_levels),
1330 &mut values,
1331 )
1332 .expect("read_batch() should be OK");
1333
1334 curr_values_read += values_read;
1335 curr_levels_read += levels_read;
1336
1337 if values_read == 0 && levels_read == 0 {
1338 break;
1339 }
1340 }
1341
1342 assert_eq!(values, self.values, "values content doesn't match");
1343
1344 if max_def_level > 0 {
1345 assert_eq!(
1346 def_levels, self.def_levels,
1347 "definition levels content doesn't match"
1348 );
1349 }
1350
1351 if max_rep_level > 0 {
1352 assert_eq!(
1353 rep_levels, self.rep_levels,
1354 "repetition levels content doesn't match"
1355 );
1356 }
1357
1358 assert!(
1359 curr_levels_read >= curr_values_read,
1360 "expected levels read to be greater than values read"
1361 );
1362 }
1363 }
1364}