1use arrow_buffer::Buffer;
19
20use crate::arrow::record_reader::{
21 buffer::ValuesBuffer,
22 definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder},
23};
24use crate::column::reader::decoder::RepetitionLevelDecoderImpl;
25use crate::column::{
26 page::PageReader,
27 reader::{
28 decoder::{ColumnValueDecoder, ColumnValueDecoderImpl},
29 GenericColumnReader,
30 },
31};
32use crate::data_type::DataType;
33use crate::errors::{ParquetError, Result};
34use crate::schema::types::ColumnDescPtr;
35
36pub(crate) mod buffer;
37mod definition_levels;
38
39pub type RecordReader<T> = GenericRecordReader<Vec<<T as DataType>::T>, ColumnValueDecoderImpl<T>>;
41
42pub(crate) type ColumnReader<CV> =
43 GenericColumnReader<RepetitionLevelDecoderImpl, DefinitionLevelBufferDecoder, CV>;
44
45pub struct GenericRecordReader<V, CV> {
51 column_desc: ColumnDescPtr,
52
53 values: V,
54 def_levels: Option<DefinitionLevelBuffer>,
55 rep_levels: Option<Vec<i16>>,
56 column_reader: Option<ColumnReader<CV>>,
57 num_values: usize,
59 num_records: usize,
61}
62
63impl<V, CV> GenericRecordReader<V, CV>
64where
65 V: ValuesBuffer,
66 CV: ColumnValueDecoder<Buffer = V>,
67{
68 pub fn new(desc: ColumnDescPtr) -> Self {
70 let def_levels = (desc.max_def_level() > 0)
71 .then(|| DefinitionLevelBuffer::new(&desc, packed_null_mask(&desc)));
72
73 let rep_levels = (desc.max_rep_level() > 0).then(Vec::new);
74
75 Self {
76 values: V::default(),
77 def_levels,
78 rep_levels,
79 column_reader: None,
80 column_desc: desc,
81 num_values: 0,
82 num_records: 0,
83 }
84 }
85
86 pub fn set_page_reader(&mut self, page_reader: Box<dyn PageReader>) -> Result<()> {
88 let descr = &self.column_desc;
89 let values_decoder = CV::new(descr);
90
91 let def_level_decoder = (descr.max_def_level() != 0).then(|| {
92 DefinitionLevelBufferDecoder::new(descr.max_def_level(), packed_null_mask(descr))
93 });
94
95 let rep_level_decoder = (descr.max_rep_level() != 0)
96 .then(|| RepetitionLevelDecoderImpl::new(descr.max_rep_level()));
97
98 self.column_reader = Some(GenericColumnReader::new_with_decoders(
99 self.column_desc.clone(),
100 page_reader,
101 values_decoder,
102 def_level_decoder,
103 rep_level_decoder,
104 ));
105 Ok(())
106 }
107
108 pub fn read_records(&mut self, num_records: usize) -> Result<usize> {
114 if self.column_reader.is_none() {
115 return Ok(0);
116 }
117
118 let mut records_read = 0;
119
120 loop {
121 let records_to_read = num_records - records_read;
122 records_read += self.read_one_batch(records_to_read)?;
123 if records_read == num_records || !self.column_reader.as_mut().unwrap().has_next()? {
124 break;
125 }
126 }
127 Ok(records_read)
128 }
129
130 pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
136 match self.column_reader.as_mut() {
137 Some(reader) => reader.skip_records(num_records),
138 None => Ok(0),
139 }
140 }
141
142 #[allow(unused)]
144 pub fn num_records(&self) -> usize {
145 self.num_records
146 }
147
148 pub fn num_values(&self) -> usize {
152 self.num_values
153 }
154
155 pub fn consume_def_levels(&mut self) -> Option<Vec<i16>> {
160 self.def_levels.as_mut().and_then(|x| x.consume_levels())
161 }
162
163 pub fn consume_rep_levels(&mut self) -> Option<Vec<i16>> {
166 self.rep_levels.as_mut().map(std::mem::take)
167 }
168
169 pub fn consume_record_data(&mut self) -> V {
172 std::mem::take(&mut self.values)
173 }
174
175 pub fn consume_bitmap_buffer(&mut self) -> Option<Buffer> {
179 self.consume_bitmap()
180 }
181
182 pub fn reset(&mut self) {
186 self.num_values = 0;
187 self.num_records = 0;
188 }
189
190 pub fn consume_bitmap(&mut self) -> Option<Buffer> {
193 let mask = self
194 .def_levels
195 .as_mut()
196 .map(|levels| levels.consume_bitmask());
197
198 if self.column_desc.self_type().is_optional() {
203 mask
204 } else {
205 None
206 }
207 }
208
209 fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
211 let (records_read, values_read, levels_read) =
212 self.column_reader.as_mut().unwrap().read_records(
213 batch_size,
214 self.def_levels.as_mut(),
215 self.rep_levels.as_mut(),
216 &mut self.values,
217 )?;
218
219 if values_read < levels_read {
220 let def_levels = self.def_levels.as_ref().ok_or_else(|| {
221 general_err!("Definition levels should exist when data is less than levels!")
222 })?;
223
224 self.values.pad_nulls(
225 self.num_values,
226 values_read,
227 levels_read,
228 def_levels.nulls().as_slice(),
229 );
230 }
231
232 self.num_records += records_read;
233 self.num_values += levels_read;
234 Ok(records_read)
235 }
236}
237
238fn packed_null_mask(descr: &ColumnDescPtr) -> bool {
242 descr.max_def_level() == 1 && descr.max_rep_level() == 0 && descr.self_type().is_optional()
243}
244
245#[cfg(test)]
246mod tests {
247 use std::sync::Arc;
248
249 use arrow::buffer::Buffer;
250
251 use crate::basic::Encoding;
252 use crate::data_type::Int32Type;
253 use crate::schema::parser::parse_message_type;
254 use crate::schema::types::SchemaDescriptor;
255 use crate::util::test_common::page_util::{
256 DataPageBuilder, DataPageBuilderImpl, InMemoryPageReader,
257 };
258
259 use super::RecordReader;
260
261 #[test]
262 fn test_read_required_records() {
263 let message_type = "
265 message test_schema {
266 REQUIRED INT32 leaf;
267 }
268 ";
269 let desc = parse_message_type(message_type)
270 .map(|t| SchemaDescriptor::new(Arc::new(t)))
271 .map(|s| s.column(0))
272 .unwrap();
273
274 let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
276
277 {
291 let values = [4, 7, 6, 3, 2];
292 let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
293 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
294 let page = pb.consume();
295
296 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
297 record_reader.set_page_reader(page_reader).unwrap();
298 assert_eq!(2, record_reader.read_records(2).unwrap());
299 assert_eq!(2, record_reader.num_records());
300 assert_eq!(2, record_reader.num_values());
301 assert_eq!(3, record_reader.read_records(3).unwrap());
302 assert_eq!(5, record_reader.num_records());
303 assert_eq!(5, record_reader.num_values());
304 }
305
306 {
314 let values = [8, 9];
315 let mut pb = DataPageBuilderImpl::new(desc, 2, true);
316 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
317 let page = pb.consume();
318
319 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
320 record_reader.set_page_reader(page_reader).unwrap();
321 assert_eq!(2, record_reader.read_records(10).unwrap());
322 assert_eq!(7, record_reader.num_records());
323 assert_eq!(7, record_reader.num_values());
324 }
325
326 assert_eq!(record_reader.consume_record_data(), &[4, 7, 6, 3, 2, 8, 9]);
327 assert_eq!(None, record_reader.consume_def_levels());
328 assert_eq!(None, record_reader.consume_bitmap());
329 }
330
331 #[test]
332 fn test_read_optional_records() {
333 let message_type = "
335 message test_schema {
336 OPTIONAL Group test_struct {
337 OPTIONAL INT32 leaf;
338 }
339 }
340 ";
341
342 let desc = parse_message_type(message_type)
343 .map(|t| SchemaDescriptor::new(Arc::new(t)))
344 .map(|s| s.column(0))
345 .unwrap();
346
347 let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
349
350 {
366 let values = [7, 6, 3];
367 let def_levels = [1i16, 2i16, 0i16, 2i16, 2i16];
369 let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
370 pb.add_def_levels(2, &def_levels);
371 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
372 let page = pb.consume();
373
374 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
375 record_reader.set_page_reader(page_reader).unwrap();
376 assert_eq!(2, record_reader.read_records(2).unwrap());
377 assert_eq!(2, record_reader.num_records());
378 assert_eq!(2, record_reader.num_values());
379 assert_eq!(3, record_reader.read_records(3).unwrap());
380 assert_eq!(5, record_reader.num_records());
381 assert_eq!(5, record_reader.num_values());
382 }
383
384 {
392 let values = [8];
393 let def_levels = [0i16, 2i16];
395 let mut pb = DataPageBuilderImpl::new(desc, 2, true);
396 pb.add_def_levels(2, &def_levels);
397 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
398 let page = pb.consume();
399
400 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
401 record_reader.set_page_reader(page_reader).unwrap();
402 assert_eq!(2, record_reader.read_records(10).unwrap());
403 assert_eq!(7, record_reader.num_records());
404 assert_eq!(7, record_reader.num_values());
405 }
406
407 assert_eq!(
409 Some(vec![1i16, 2i16, 0i16, 2i16, 2i16, 0i16, 2i16]),
410 record_reader.consume_def_levels()
411 );
412
413 let expected_valid = &[false, true, false, true, true, false, true];
415 let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned());
416 assert_eq!(Some(expected_buffer), record_reader.consume_bitmap());
417
418 let actual = record_reader.consume_record_data();
420
421 let expected = &[0, 7, 0, 6, 3, 0, 8];
422 assert_eq!(actual.len(), expected.len());
423
424 let iter = expected_valid.iter().zip(&actual).zip(expected);
426 for ((valid, actual), expected) in iter {
427 if *valid {
428 assert_eq!(actual, expected)
429 }
430 }
431 }
432
433 #[test]
434 fn test_read_repeated_records() {
435 let message_type = "
437 message test_schema {
438 REPEATED Group test_struct {
439 REPEATED INT32 leaf;
440 }
441 }
442 ";
443
444 let desc = parse_message_type(message_type)
445 .map(|t| SchemaDescriptor::new(Arc::new(t)))
446 .map(|s| s.column(0))
447 .unwrap();
448
449 let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
451
452 {
468 let values = [4, 7, 6, 3, 2];
469 let def_levels = [2i16, 0i16, 1i16, 2i16, 2i16, 2i16, 2i16];
470 let rep_levels = [0i16, 0i16, 0i16, 1i16, 2i16, 2i16, 1i16];
471 let mut pb = DataPageBuilderImpl::new(desc.clone(), 7, true);
472 pb.add_rep_levels(2, &rep_levels);
473 pb.add_def_levels(2, &def_levels);
474 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
475 let page = pb.consume();
476
477 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
478 record_reader.set_page_reader(page_reader).unwrap();
479
480 assert_eq!(1, record_reader.read_records(1).unwrap());
481 assert_eq!(1, record_reader.num_records());
482 assert_eq!(1, record_reader.num_values());
483 assert_eq!(2, record_reader.read_records(3).unwrap());
484 assert_eq!(3, record_reader.num_records());
485 assert_eq!(7, record_reader.num_values());
486 }
487
488 {
496 let values = [8, 9];
497 let def_levels = [2i16, 2i16];
498 let rep_levels = [0i16, 2i16];
499 let mut pb = DataPageBuilderImpl::new(desc, 2, true);
500 pb.add_rep_levels(2, &rep_levels);
501 pb.add_def_levels(2, &def_levels);
502 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
503 let page = pb.consume();
504
505 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
506 record_reader.set_page_reader(page_reader).unwrap();
507
508 assert_eq!(1, record_reader.read_records(10).unwrap());
509 assert_eq!(4, record_reader.num_records());
510 assert_eq!(9, record_reader.num_values());
511 }
512
513 assert_eq!(
515 Some(vec![2i16, 0i16, 1i16, 2i16, 2i16, 2i16, 2i16, 2i16, 2i16]),
516 record_reader.consume_def_levels()
517 );
518
519 let expected_valid = &[true, false, false, true, true, true, true, true, true];
521 let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned());
522 assert_eq!(Some(expected_buffer), record_reader.consume_bitmap());
523
524 let actual = record_reader.consume_record_data();
526 let expected = &[4, 0, 0, 7, 6, 3, 2, 8, 9];
527 assert_eq!(actual.len(), expected.len());
528
529 let iter = expected_valid.iter().zip(&actual).zip(expected);
531 for ((valid, actual), expected) in iter {
532 if *valid {
533 assert_eq!(actual, expected)
534 }
535 }
536 }
537
538 #[test]
539 fn test_read_more_than_one_batch() {
540 let message_type = "
542 message test_schema {
543 REPEATED INT32 leaf;
544 }
545 ";
546
547 let desc = parse_message_type(message_type)
548 .map(|t| SchemaDescriptor::new(Arc::new(t)))
549 .map(|s| s.column(0))
550 .unwrap();
551
552 let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
554
555 {
556 let values = [100; 5000];
557 let def_levels = [1i16; 5000];
558 let mut rep_levels = [1i16; 5000];
559 for idx in 0..1000 {
560 rep_levels[idx * 5] = 0i16;
561 }
562
563 let mut pb = DataPageBuilderImpl::new(desc, 5000, true);
564 pb.add_rep_levels(1, &rep_levels);
565 pb.add_def_levels(1, &def_levels);
566 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
567 let page = pb.consume();
568
569 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
570 record_reader.set_page_reader(page_reader).unwrap();
571
572 assert_eq!(1000, record_reader.read_records(1000).unwrap());
573 assert_eq!(1000, record_reader.num_records());
574 assert_eq!(5000, record_reader.num_values());
575 }
576 }
577
578 #[test]
579 fn test_row_group_boundary() {
580 let message_type = "
582 message test_schema {
583 REPEATED Group test_struct {
584 REPEATED INT32 leaf;
585 }
586 }
587 ";
588
589 let desc = parse_message_type(message_type)
590 .map(|t| SchemaDescriptor::new(Arc::new(t)))
591 .map(|s| s.column(0))
592 .unwrap();
593
594 let values = [1, 2, 3];
595 let def_levels = [1i16, 0i16, 1i16, 2i16, 2i16, 1i16, 2i16];
596 let rep_levels = [0i16, 0i16, 0i16, 1i16, 2i16, 0i16, 1i16];
597 let mut pb = DataPageBuilderImpl::new(desc.clone(), 7, true);
598 pb.add_rep_levels(2, &rep_levels);
599 pb.add_def_levels(2, &def_levels);
600 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
601 let page = pb.consume();
602
603 let mut record_reader = RecordReader::<Int32Type>::new(desc);
604 let page_reader = Box::new(InMemoryPageReader::new(vec![page.clone()]));
605 record_reader.set_page_reader(page_reader).unwrap();
606 assert_eq!(record_reader.read_records(4).unwrap(), 4);
607 assert_eq!(record_reader.num_records(), 4);
608 assert_eq!(record_reader.num_values(), 7);
609
610 assert_eq!(record_reader.read_records(4).unwrap(), 0);
611 assert_eq!(record_reader.num_records(), 4);
612 assert_eq!(record_reader.num_values(), 7);
613
614 record_reader.read_records(4).unwrap();
615
616 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
617 record_reader.set_page_reader(page_reader).unwrap();
618
619 assert_eq!(record_reader.read_records(4).unwrap(), 4);
620 assert_eq!(record_reader.num_records(), 8);
621 assert_eq!(record_reader.num_values(), 14);
622
623 assert_eq!(record_reader.read_records(4).unwrap(), 0);
624 assert_eq!(record_reader.num_records(), 8);
625 assert_eq!(record_reader.num_values(), 14);
626 }
627
628 #[test]
629 fn test_skip_required_records() {
630 let message_type = "
632 message test_schema {
633 REQUIRED INT32 leaf;
634 }
635 ";
636 let desc = parse_message_type(message_type)
637 .map(|t| SchemaDescriptor::new(Arc::new(t)))
638 .map(|s| s.column(0))
639 .unwrap();
640
641 let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
643
644 {
658 let values = [4, 7, 6, 3, 2];
659 let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
660 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
661 let page = pb.consume();
662
663 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
664 record_reader.set_page_reader(page_reader).unwrap();
665 assert_eq!(2, record_reader.skip_records(2).unwrap());
666 assert_eq!(0, record_reader.num_records());
667 assert_eq!(0, record_reader.num_values());
668 assert_eq!(3, record_reader.read_records(3).unwrap());
669 assert_eq!(3, record_reader.num_records());
670 assert_eq!(3, record_reader.num_values());
671 }
672
673 {
681 let values = [8, 9];
682 let mut pb = DataPageBuilderImpl::new(desc, 2, true);
683 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
684 let page = pb.consume();
685
686 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
687 record_reader.set_page_reader(page_reader).unwrap();
688 assert_eq!(2, record_reader.skip_records(10).unwrap());
689 assert_eq!(3, record_reader.num_records());
690 assert_eq!(3, record_reader.num_values());
691 assert_eq!(0, record_reader.read_records(10).unwrap());
692 }
693
694 assert_eq!(record_reader.consume_record_data(), &[6, 3, 2]);
695 assert_eq!(None, record_reader.consume_def_levels());
696 assert_eq!(None, record_reader.consume_bitmap());
697 }
698
699 #[test]
700 fn test_skip_optional_records() {
701 let message_type = "
703 message test_schema {
704 OPTIONAL Group test_struct {
705 OPTIONAL INT32 leaf;
706 }
707 }
708 ";
709
710 let desc = parse_message_type(message_type)
711 .map(|t| SchemaDescriptor::new(Arc::new(t)))
712 .map(|s| s.column(0))
713 .unwrap();
714
715 let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
717
718 {
734 let values = [7, 6, 3];
735 let def_levels = [1i16, 2i16, 0i16, 2i16, 2i16];
737 let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
738 pb.add_def_levels(2, &def_levels);
739 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
740 let page = pb.consume();
741
742 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
743 record_reader.set_page_reader(page_reader).unwrap();
744 assert_eq!(2, record_reader.skip_records(2).unwrap());
745 assert_eq!(0, record_reader.num_records());
746 assert_eq!(0, record_reader.num_values());
747 assert_eq!(3, record_reader.read_records(3).unwrap());
748 assert_eq!(3, record_reader.num_records());
749 assert_eq!(3, record_reader.num_values());
750 }
751
752 {
760 let values = [8];
761 let def_levels = [0i16, 2i16];
763 let mut pb = DataPageBuilderImpl::new(desc, 2, true);
764 pb.add_def_levels(2, &def_levels);
765 pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
766 let page = pb.consume();
767
768 let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
769 record_reader.set_page_reader(page_reader).unwrap();
770 assert_eq!(2, record_reader.skip_records(10).unwrap());
771 assert_eq!(3, record_reader.num_records());
772 assert_eq!(3, record_reader.num_values());
773 assert_eq!(0, record_reader.read_records(10).unwrap());
774 }
775
776 assert_eq!(
778 Some(vec![0i16, 2i16, 2i16]),
779 record_reader.consume_def_levels()
780 );
781
782 let expected_valid = &[false, true, true];
784 let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned());
785 assert_eq!(Some(expected_buffer), record_reader.consume_bitmap());
786
787 let actual = record_reader.consume_record_data();
789
790 let expected = &[0, 6, 3];
791 assert_eq!(actual.len(), expected.len());
792
793 let iter = expected_valid.iter().zip(&actual).zip(expected);
795 for ((valid, actual), expected) in iter {
796 if *valid {
797 assert_eq!(actual, expected)
798 }
799 }
800 }
801}