1use arrow_buffer::Buffer;
19
20use crate::arrow::record_reader::{
21 buffer::ValuesBuffer,
22 definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder},
23};
24use crate::column::reader::decoder::RepetitionLevelDecoderImpl;
25use crate::column::{
26 page::PageReader,
27 reader::{
28 GenericColumnReader,
29 decoder::{ColumnValueDecoder, ColumnValueDecoderImpl},
30 },
31};
32use crate::data_type::DataType;
33use crate::errors::{ParquetError, Result};
34use crate::schema::types::ColumnDescPtr;
35
36pub(crate) mod buffer;
37mod definition_levels;
38
39pub type RecordReader<T> = GenericRecordReader<Vec<<T as DataType>::T>, ColumnValueDecoderImpl<T>>;
41
42pub(crate) type ColumnReader<CV> =
43 GenericColumnReader<RepetitionLevelDecoderImpl, DefinitionLevelBufferDecoder, CV>;
44
45pub struct GenericRecordReader<V, CV> {
51 column_desc: ColumnDescPtr,
52
53 values: Option<V>,
56 def_levels: Option<DefinitionLevelBuffer>,
57 rep_levels: Option<Vec<i16>>,
58 column_reader: Option<ColumnReader<CV>>,
59 num_values: usize,
61 num_records: usize,
63 capacity_hint: usize,
65}
66
67impl<V, CV> GenericRecordReader<V, CV>
68where
69 V: ValuesBuffer,
70 CV: ColumnValueDecoder<Buffer = V>,
71{
72 pub fn new(desc: ColumnDescPtr, capacity: usize) -> Self {
78 let def_levels = (desc.max_def_level() > 0)
79 .then(|| DefinitionLevelBuffer::new(&desc, packed_null_mask(&desc)));
80
81 let rep_levels = (desc.max_rep_level() > 0).then(Vec::new);
82
83 Self {
84 values: None, def_levels,
86 rep_levels,
87 column_reader: None,
88 column_desc: desc,
89 num_values: 0,
90 num_records: 0,
91 capacity_hint: capacity,
92 }
93 }
94
95 pub fn set_page_reader(&mut self, page_reader: Box<dyn PageReader>) -> Result<()> {
97 let descr = &self.column_desc;
98 let values_decoder = CV::new(descr);
99
100 let def_level_decoder = (descr.max_def_level() != 0).then(|| {
101 DefinitionLevelBufferDecoder::new(descr.max_def_level(), packed_null_mask(descr))
102 });
103
104 let rep_level_decoder = (descr.max_rep_level() != 0)
105 .then(|| RepetitionLevelDecoderImpl::new(descr.max_rep_level()));
106
107 self.column_reader = Some(GenericColumnReader::new_with_decoders(
108 self.column_desc.clone(),
109 page_reader,
110 values_decoder,
111 def_level_decoder,
112 rep_level_decoder,
113 ));
114 Ok(())
115 }
116
117 pub fn read_records(&mut self, num_records: usize) -> Result<usize> {
123 if self.column_reader.is_none() {
124 return Ok(0);
125 }
126
127 let mut records_read = 0;
128
129 loop {
130 let records_to_read = num_records - records_read;
131 records_read += self.read_one_batch(records_to_read)?;
132 if records_read == num_records || !self.column_reader.as_mut().unwrap().has_next()? {
133 break;
134 }
135 }
136 Ok(records_read)
137 }
138
139 pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
145 match self.column_reader.as_mut() {
146 Some(reader) => reader.skip_records(num_records),
147 None => Ok(0),
148 }
149 }
150
151 #[allow(unused)]
153 pub fn num_records(&self) -> usize {
154 self.num_records
155 }
156
157 pub fn num_values(&self) -> usize {
161 self.num_values
162 }
163
164 pub fn consume_def_levels(&mut self) -> Option<Vec<i16>> {
169 self.def_levels.as_mut().and_then(|x| x.consume_levels())
170 }
171
172 pub fn consume_rep_levels(&mut self) -> Option<Vec<i16>> {
175 self.rep_levels.as_mut().map(std::mem::take)
176 }
177
178 pub fn consume_record_data(&mut self) -> V {
181 self.values.take().unwrap_or_else(|| V::with_capacity(0))
184 }
185
186 pub fn consume_bitmap_buffer(&mut self) -> Option<Buffer> {
190 self.consume_bitmap()
191 }
192
193 pub fn reset(&mut self) {
197 self.num_values = 0;
198 self.num_records = 0;
199 }
200
201 pub fn consume_bitmap(&mut self) -> Option<Buffer> {
204 let mask = self
205 .def_levels
206 .as_mut()
207 .and_then(|levels| levels.consume_bitmask());
208
209 if self.column_desc.self_type().is_optional() {
214 mask
215 } else {
216 None
217 }
218 }
219
220 fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
222 if batch_size == 0 {
223 return Ok(0);
224 }
225 if batch_size > self.capacity_hint {
227 self.capacity_hint = batch_size;
228 }
229
230 let capacity_hint = self.capacity_hint;
232 let values = self
233 .values
234 .get_or_insert_with(|| V::with_capacity(capacity_hint));
235
236 let (records_read, values_read, levels_read) =
237 self.column_reader.as_mut().unwrap().read_records(
238 batch_size,
239 self.def_levels.as_mut(),
240 self.rep_levels.as_mut(),
241 values,
242 )?;
243
244 if values_read < levels_read {
245 let def_levels = self.def_levels.as_ref().ok_or_else(|| {
246 general_err!("Definition levels should exist when data is less than levels!")
247 })?;
248
249 values.pad_nulls(
250 self.num_values,
251 values_read,
252 levels_read,
253 def_levels.nulls().as_slice(),
254 );
255 }
256
257 self.num_records += records_read;
258 self.num_values += levels_read;
259 Ok(records_read)
260 }
261}
262
263fn packed_null_mask(descr: &ColumnDescPtr) -> bool {
267 descr.max_def_level() == 1 && descr.max_rep_level() == 0 && descr.self_type().is_optional()
268}
269
270#[cfg(test)]
271mod tests {
272 use std::sync::Arc;
273
274 use arrow::buffer::Buffer;
275
276 use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
277 use crate::basic::Encoding;
278 use crate::data_type::Int32Type;
279 use crate::schema::parser::parse_message_type;
280 use crate::schema::types::SchemaDescriptor;
281 use crate::util::test_common::page_util::{
282 DataPageBuilder, DataPageBuilderImpl, InMemoryPageReader,
283 };
284
285 use super::RecordReader;
286
287 #[test]
288 fn test_read_required_records() {
289 let message_type = "
291 message test_schema {
292 REQUIRED INT32 leaf;
293 }
294 ";
295 let desc = parse_message_type(message_type)
296 .map(|t| SchemaDescriptor::new(Arc::new(t)))
297 .map(|s| s.column(0))
298 .unwrap();
299
300 let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(), DEFAULT_BATCH_SIZE);
302
303 {
317 let values = [4, 7, 6, 3, 2];
318 let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
319 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
320 let page = pb.consume();
321
322 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
323 record_reader.set_page_reader(page_reader).unwrap();
324 assert_eq!(2, record_reader.read_records(2).unwrap());
325 assert_eq!(2, record_reader.num_records());
326 assert_eq!(2, record_reader.num_values());
327 assert_eq!(3, record_reader.read_records(3).unwrap());
328 assert_eq!(5, record_reader.num_records());
329 assert_eq!(5, record_reader.num_values());
330 }
331
332 {
340 let values = [8, 9];
341 let mut pb = DataPageBuilderImpl::new(desc, 2, true);
342 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
343 let page = pb.consume();
344
345 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
346 record_reader.set_page_reader(page_reader).unwrap();
347 assert_eq!(2, record_reader.read_records(10).unwrap());
348 assert_eq!(7, record_reader.num_records());
349 assert_eq!(7, record_reader.num_values());
350 }
351
352 assert_eq!(record_reader.consume_record_data(), &[4, 7, 6, 3, 2, 8, 9]);
353 assert_eq!(None, record_reader.consume_def_levels());
354 assert_eq!(None, record_reader.consume_bitmap());
355 }
356
357 #[test]
358 fn test_read_optional_records() {
359 let message_type = "
361 message test_schema {
362 OPTIONAL Group test_struct {
363 OPTIONAL INT32 leaf;
364 }
365 }
366 ";
367
368 let desc = parse_message_type(message_type)
369 .map(|t| SchemaDescriptor::new(Arc::new(t)))
370 .map(|s| s.column(0))
371 .unwrap();
372
373 let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(), DEFAULT_BATCH_SIZE);
375
376 {
392 let values = [7, 6, 3];
393 let def_levels = [1i16, 2i16, 0i16, 2i16, 2i16];
395 let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
396 pb.add_def_levels(2, &def_levels);
397 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
398 let page = pb.consume();
399
400 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
401 record_reader.set_page_reader(page_reader).unwrap();
402 assert_eq!(2, record_reader.read_records(2).unwrap());
403 assert_eq!(2, record_reader.num_records());
404 assert_eq!(2, record_reader.num_values());
405 assert_eq!(3, record_reader.read_records(3).unwrap());
406 assert_eq!(5, record_reader.num_records());
407 assert_eq!(5, record_reader.num_values());
408 }
409
410 {
418 let values = [8];
419 let def_levels = [0i16, 2i16];
421 let mut pb = DataPageBuilderImpl::new(desc, 2, true);
422 pb.add_def_levels(2, &def_levels);
423 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
424 let page = pb.consume();
425
426 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
427 record_reader.set_page_reader(page_reader).unwrap();
428 assert_eq!(2, record_reader.read_records(10).unwrap());
429 assert_eq!(7, record_reader.num_records());
430 assert_eq!(7, record_reader.num_values());
431 }
432
433 assert_eq!(
435 Some(vec![1i16, 2i16, 0i16, 2i16, 2i16, 0i16, 2i16]),
436 record_reader.consume_def_levels()
437 );
438
439 let expected_valid = &[false, true, false, true, true, false, true];
441 let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned());
442 assert_eq!(Some(expected_buffer), record_reader.consume_bitmap());
443
444 let actual = record_reader.consume_record_data();
446
447 let expected = &[0, 7, 0, 6, 3, 0, 8];
448 assert_eq!(actual.len(), expected.len());
449
450 let iter = expected_valid.iter().zip(&actual).zip(expected);
452 for ((valid, actual), expected) in iter {
453 if *valid {
454 assert_eq!(actual, expected)
455 }
456 }
457 }
458
459 #[test]
460 fn test_read_repeated_records() {
461 let message_type = "
463 message test_schema {
464 REPEATED Group test_struct {
465 REPEATED INT32 leaf;
466 }
467 }
468 ";
469
470 let desc = parse_message_type(message_type)
471 .map(|t| SchemaDescriptor::new(Arc::new(t)))
472 .map(|s| s.column(0))
473 .unwrap();
474
475 let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(), DEFAULT_BATCH_SIZE);
477
478 {
494 let values = [4, 7, 6, 3, 2];
495 let def_levels = [2i16, 0i16, 1i16, 2i16, 2i16, 2i16, 2i16];
496 let rep_levels = [0i16, 0i16, 0i16, 1i16, 2i16, 2i16, 1i16];
497 let mut pb = DataPageBuilderImpl::new(desc.clone(), 7, true);
498 pb.add_rep_levels(2, &rep_levels);
499 pb.add_def_levels(2, &def_levels);
500 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
501 let page = pb.consume();
502
503 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
504 record_reader.set_page_reader(page_reader).unwrap();
505
506 assert_eq!(1, record_reader.read_records(1).unwrap());
507 assert_eq!(1, record_reader.num_records());
508 assert_eq!(1, record_reader.num_values());
509 assert_eq!(2, record_reader.read_records(3).unwrap());
510 assert_eq!(3, record_reader.num_records());
511 assert_eq!(7, record_reader.num_values());
512 }
513
514 {
522 let values = [8, 9];
523 let def_levels = [2i16, 2i16];
524 let rep_levels = [0i16, 2i16];
525 let mut pb = DataPageBuilderImpl::new(desc, 2, true);
526 pb.add_rep_levels(2, &rep_levels);
527 pb.add_def_levels(2, &def_levels);
528 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
529 let page = pb.consume();
530
531 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
532 record_reader.set_page_reader(page_reader).unwrap();
533
534 assert_eq!(1, record_reader.read_records(10).unwrap());
535 assert_eq!(4, record_reader.num_records());
536 assert_eq!(9, record_reader.num_values());
537 }
538
539 assert_eq!(
541 Some(vec![2i16, 0i16, 1i16, 2i16, 2i16, 2i16, 2i16, 2i16, 2i16]),
542 record_reader.consume_def_levels()
543 );
544
545 let expected_valid = &[true, false, false, true, true, true, true, true, true];
547 let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned());
548 assert_eq!(Some(expected_buffer), record_reader.consume_bitmap());
549
550 let actual = record_reader.consume_record_data();
552 let expected = &[4, 0, 0, 7, 6, 3, 2, 8, 9];
553 assert_eq!(actual.len(), expected.len());
554
555 let iter = expected_valid.iter().zip(&actual).zip(expected);
557 for ((valid, actual), expected) in iter {
558 if *valid {
559 assert_eq!(actual, expected)
560 }
561 }
562 }
563
564 #[test]
565 fn test_read_more_than_one_batch() {
566 let message_type = "
568 message test_schema {
569 REPEATED INT32 leaf;
570 }
571 ";
572
573 let desc = parse_message_type(message_type)
574 .map(|t| SchemaDescriptor::new(Arc::new(t)))
575 .map(|s| s.column(0))
576 .unwrap();
577
578 let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(), DEFAULT_BATCH_SIZE);
580
581 {
582 let values = [100; 5000];
583 let def_levels = [1i16; 5000];
584 let mut rep_levels = [1i16; 5000];
585 for idx in 0..1000 {
586 rep_levels[idx * 5] = 0i16;
587 }
588
589 let mut pb = DataPageBuilderImpl::new(desc, 5000, true);
590 pb.add_rep_levels(1, &rep_levels);
591 pb.add_def_levels(1, &def_levels);
592 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
593 let page = pb.consume();
594
595 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
596 record_reader.set_page_reader(page_reader).unwrap();
597
598 assert_eq!(1000, record_reader.read_records(1000).unwrap());
599 assert_eq!(1000, record_reader.num_records());
600 assert_eq!(5000, record_reader.num_values());
601 }
602 }
603
604 #[test]
605 fn test_row_group_boundary() {
606 let message_type = "
608 message test_schema {
609 REPEATED Group test_struct {
610 REPEATED INT32 leaf;
611 }
612 }
613 ";
614
615 let desc = parse_message_type(message_type)
616 .map(|t| SchemaDescriptor::new(Arc::new(t)))
617 .map(|s| s.column(0))
618 .unwrap();
619
620 let values = [1, 2, 3];
621 let def_levels = [1i16, 0i16, 1i16, 2i16, 2i16, 1i16, 2i16];
622 let rep_levels = [0i16, 0i16, 0i16, 1i16, 2i16, 0i16, 1i16];
623 let mut pb = DataPageBuilderImpl::new(desc.clone(), 7, true);
624 pb.add_rep_levels(2, &rep_levels);
625 pb.add_def_levels(2, &def_levels);
626 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
627 let page = pb.consume();
628
629 let mut record_reader = RecordReader::<Int32Type>::new(desc, DEFAULT_BATCH_SIZE);
630 let page_reader = Box::new(InMemoryPageReader::new(vec![page.clone()]));
631 record_reader.set_page_reader(page_reader).unwrap();
632 assert_eq!(record_reader.read_records(4).unwrap(), 4);
633 assert_eq!(record_reader.num_records(), 4);
634 assert_eq!(record_reader.num_values(), 7);
635
636 assert_eq!(record_reader.read_records(4).unwrap(), 0);
637 assert_eq!(record_reader.num_records(), 4);
638 assert_eq!(record_reader.num_values(), 7);
639
640 record_reader.read_records(4).unwrap();
641
642 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
643 record_reader.set_page_reader(page_reader).unwrap();
644
645 assert_eq!(record_reader.read_records(4).unwrap(), 4);
646 assert_eq!(record_reader.num_records(), 8);
647 assert_eq!(record_reader.num_values(), 14);
648
649 assert_eq!(record_reader.read_records(4).unwrap(), 0);
650 assert_eq!(record_reader.num_records(), 8);
651 assert_eq!(record_reader.num_values(), 14);
652 }
653
654 #[test]
655 fn test_skip_required_records() {
656 let message_type = "
658 message test_schema {
659 REQUIRED INT32 leaf;
660 }
661 ";
662 let desc = parse_message_type(message_type)
663 .map(|t| SchemaDescriptor::new(Arc::new(t)))
664 .map(|s| s.column(0))
665 .unwrap();
666
667 let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(), DEFAULT_BATCH_SIZE);
669
670 {
684 let values = [4, 7, 6, 3, 2];
685 let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
686 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
687 let page = pb.consume();
688
689 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
690 record_reader.set_page_reader(page_reader).unwrap();
691 assert_eq!(2, record_reader.skip_records(2).unwrap());
692 assert_eq!(0, record_reader.num_records());
693 assert_eq!(0, record_reader.num_values());
694 assert_eq!(3, record_reader.read_records(3).unwrap());
695 assert_eq!(3, record_reader.num_records());
696 assert_eq!(3, record_reader.num_values());
697 }
698
699 {
707 let values = [8, 9];
708 let mut pb = DataPageBuilderImpl::new(desc, 2, true);
709 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
710 let page = pb.consume();
711
712 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
713 record_reader.set_page_reader(page_reader).unwrap();
714 assert_eq!(2, record_reader.skip_records(10).unwrap());
715 assert_eq!(3, record_reader.num_records());
716 assert_eq!(3, record_reader.num_values());
717 assert_eq!(0, record_reader.read_records(10).unwrap());
718 }
719
720 assert_eq!(record_reader.consume_record_data(), &[6, 3, 2]);
721 assert_eq!(None, record_reader.consume_def_levels());
722 assert_eq!(None, record_reader.consume_bitmap());
723 }
724
725 #[test]
726 fn test_skip_optional_records() {
727 let message_type = "
729 message test_schema {
730 OPTIONAL Group test_struct {
731 OPTIONAL INT32 leaf;
732 }
733 }
734 ";
735
736 let desc = parse_message_type(message_type)
737 .map(|t| SchemaDescriptor::new(Arc::new(t)))
738 .map(|s| s.column(0))
739 .unwrap();
740
741 let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(), DEFAULT_BATCH_SIZE);
743
744 {
760 let values = [7, 6, 3];
761 let def_levels = [1i16, 2i16, 0i16, 2i16, 2i16];
763 let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
764 pb.add_def_levels(2, &def_levels);
765 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
766 let page = pb.consume();
767
768 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
769 record_reader.set_page_reader(page_reader).unwrap();
770 assert_eq!(2, record_reader.skip_records(2).unwrap());
771 assert_eq!(0, record_reader.num_records());
772 assert_eq!(0, record_reader.num_values());
773 assert_eq!(3, record_reader.read_records(3).unwrap());
774 assert_eq!(3, record_reader.num_records());
775 assert_eq!(3, record_reader.num_values());
776 }
777
778 {
786 let values = [8];
787 let def_levels = [0i16, 2i16];
789 let mut pb = DataPageBuilderImpl::new(desc, 2, true);
790 pb.add_def_levels(2, &def_levels);
791 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
792 let page = pb.consume();
793
794 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
795 record_reader.set_page_reader(page_reader).unwrap();
796 assert_eq!(2, record_reader.skip_records(10).unwrap());
797 assert_eq!(3, record_reader.num_records());
798 assert_eq!(3, record_reader.num_values());
799 assert_eq!(0, record_reader.read_records(10).unwrap());
800 }
801
802 assert_eq!(
804 Some(vec![0i16, 2i16, 2i16]),
805 record_reader.consume_def_levels()
806 );
807
808 let expected_valid = &[false, true, true];
810 let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned());
811 assert_eq!(Some(expected_buffer), record_reader.consume_bitmap());
812
813 let actual = record_reader.consume_record_data();
815
816 let expected = &[0, 6, 3];
817 assert_eq!(actual.len(), expected.len());
818
819 let iter = expected_valid.iter().zip(&actual).zip(expected);
821 for ((valid, actual), expected) in iter {
822 if *valid {
823 assert_eq!(actual, expected)
824 }
825 }
826 }
827}