Skip to main content

parquet/arrow/record_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow_buffer::Buffer;
19
20use crate::arrow::record_reader::{
21    buffer::ValuesBuffer,
22    definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder},
23};
24use crate::column::reader::decoder::RepetitionLevelDecoderImpl;
25use crate::column::{
26    page::PageReader,
27    reader::{
28        GenericColumnReader,
29        decoder::{ColumnValueDecoder, ColumnValueDecoderImpl},
30    },
31};
32use crate::data_type::DataType;
33use crate::errors::{ParquetError, Result};
34use crate::schema::types::ColumnDescPtr;
35
36pub(crate) mod buffer;
37mod definition_levels;
38
39/// A `RecordReader` is a stateful column reader that delimits semantic records.
40pub type RecordReader<T> = GenericRecordReader<Vec<<T as DataType>::T>, ColumnValueDecoderImpl<T>>;
41
42pub(crate) type ColumnReader<CV> =
43    GenericColumnReader<RepetitionLevelDecoderImpl, DefinitionLevelBufferDecoder, CV>;
44
45/// A generic stateful column reader that delimits semantic records
46///
47/// This type is hidden from the docs, and relies on private traits with no
48/// public implementations. As such this type signature may be changed without
49/// breaking downstream users as it can only be constructed through type aliases
50pub struct GenericRecordReader<V, CV> {
51    column_desc: ColumnDescPtr,
52
53    /// Values buffer, lazily initialized on first read to avoid
54    /// allocating a buffer that may never be used (e.g., after the last batch)
55    values: Option<V>,
56    def_levels: Option<DefinitionLevelBuffer>,
57    rep_levels: Option<Vec<i16>>,
58    column_reader: Option<ColumnReader<CV>>,
59    /// Number of buffered levels / null-padded values
60    num_values: usize,
61    /// Number of buffered records
62    num_records: usize,
63    /// Capacity hint for pre-allocating buffers based on batch size
64    capacity_hint: usize,
65}
66
67impl<V, CV> GenericRecordReader<V, CV>
68where
69    V: ValuesBuffer,
70    CV: ColumnValueDecoder<Buffer = V>,
71{
72    /// Create a new [`GenericRecordReader`]
73    ///
74    /// The capacity is used to pre-allocate internal buffers, avoiding reallocations
75    /// when reading the first batch of data. For optimal performance, set this to
76    /// the expected batch size.
77    pub fn new(desc: ColumnDescPtr, capacity: usize) -> Self {
78        let def_levels = (desc.max_def_level() > 0)
79            .then(|| DefinitionLevelBuffer::new(&desc, packed_null_mask(&desc)));
80
81        let rep_levels = (desc.max_rep_level() > 0).then(Vec::new);
82
83        Self {
84            values: None, // Lazily initialized on first read
85            def_levels,
86            rep_levels,
87            column_reader: None,
88            column_desc: desc,
89            num_values: 0,
90            num_records: 0,
91            capacity_hint: capacity,
92        }
93    }
94
95    /// Set the current page reader.
96    pub fn set_page_reader(&mut self, page_reader: Box<dyn PageReader>) -> Result<()> {
97        let descr = &self.column_desc;
98        let values_decoder = CV::new(descr);
99
100        let def_level_decoder = (descr.max_def_level() != 0).then(|| {
101            DefinitionLevelBufferDecoder::new(descr.max_def_level(), packed_null_mask(descr))
102        });
103
104        let rep_level_decoder = (descr.max_rep_level() != 0)
105            .then(|| RepetitionLevelDecoderImpl::new(descr.max_rep_level()));
106
107        self.column_reader = Some(GenericColumnReader::new_with_decoders(
108            self.column_desc.clone(),
109            page_reader,
110            values_decoder,
111            def_level_decoder,
112            rep_level_decoder,
113        ));
114        Ok(())
115    }
116
117    /// Try to read `num_records` of column data into internal buffer.
118    ///
119    /// # Returns
120    ///
121    /// Number of actual records read.
122    pub fn read_records(&mut self, num_records: usize) -> Result<usize> {
123        if self.column_reader.is_none() {
124            return Ok(0);
125        }
126
127        let mut records_read = 0;
128
129        loop {
130            let records_to_read = num_records - records_read;
131            records_read += self.read_one_batch(records_to_read)?;
132            if records_read == num_records || !self.column_reader.as_mut().unwrap().has_next()? {
133                break;
134            }
135        }
136        Ok(records_read)
137    }
138
139    /// Try to skip the next `num_records` rows
140    ///
141    /// # Returns
142    ///
143    /// Number of records skipped
144    pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
145        match self.column_reader.as_mut() {
146            Some(reader) => reader.skip_records(num_records),
147            None => Ok(0),
148        }
149    }
150
151    /// Returns number of records stored in buffer.
152    #[allow(unused)]
153    pub fn num_records(&self) -> usize {
154        self.num_records
155    }
156
157    /// Return number of values stored in buffer.
158    /// If the parquet column is not repeated, it should be equals to `num_records`,
159    /// otherwise it should be larger than or equal to `num_records`.
160    pub fn num_values(&self) -> usize {
161        self.num_values
162    }
163
164    /// Returns definition level data.
165    /// The implementation has side effects. It will create a new buffer to hold those
166    /// definition level values that have already been read into memory but not counted
167    /// as record values, e.g. those from `self.num_values` to `self.values_written`.
168    pub fn consume_def_levels(&mut self) -> Option<Vec<i16>> {
169        self.def_levels.as_mut().and_then(|x| x.consume_levels())
170    }
171
172    /// Return repetition level data.
173    /// The side effect is similar to `consume_def_levels`.
174    pub fn consume_rep_levels(&mut self) -> Option<Vec<i16>> {
175        self.rep_levels.as_mut().map(std::mem::take)
176    }
177
178    /// Returns currently stored buffer data.
179    /// The side effect is similar to `consume_def_levels`.
180    pub fn consume_record_data(&mut self) -> V {
181        // Take the buffer, leaving None. The next read will lazily allocate a new buffer.
182        // This avoids allocating a buffer that may never be used (e.g., after the last batch).
183        self.values.take().unwrap_or_else(|| V::with_capacity(0))
184    }
185
186    /// Returns currently stored null bitmap data for nullable columns.
187    /// For non-nullable columns, the bitmap is discarded.
188    /// The side effect is similar to `consume_def_levels`.
189    pub fn consume_bitmap_buffer(&mut self) -> Option<Buffer> {
190        self.consume_bitmap()
191    }
192
193    /// Reset state of record reader.
194    /// Should be called after consuming data, e.g. `consume_rep_levels`,
195    /// `consume_rep_levels`, `consume_record_data` and `consume_bitmap_buffer`.
196    pub fn reset(&mut self) {
197        self.num_values = 0;
198        self.num_records = 0;
199    }
200
201    /// Returns bitmap data for nullable columns.
202    /// For non-nullable columns, the bitmap is discarded.
203    pub fn consume_bitmap(&mut self) -> Option<Buffer> {
204        let mask = self
205            .def_levels
206            .as_mut()
207            .and_then(|levels| levels.consume_bitmask());
208
209        // While we always consume the bitmask here, we only want to return
210        // the bitmask for nullable arrays. (Marking nulls on a non-nullable
211        // array may fail validations, even if those nulls are masked off at
212        // a higher level.)
213        if self.column_desc.self_type().is_optional() {
214            mask
215        } else {
216            None
217        }
218    }
219
220    /// Try to read one batch of data returning the number of records read
221    fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
222        if batch_size == 0 {
223            return Ok(0);
224        }
225        // Update capacity hint to the largest batch size seen
226        if batch_size > self.capacity_hint {
227            self.capacity_hint = batch_size;
228        }
229
230        // Lazily initialize buffer on first read
231        let capacity_hint = self.capacity_hint;
232        let values = self
233            .values
234            .get_or_insert_with(|| V::with_capacity(capacity_hint));
235
236        let (records_read, values_read, levels_read) =
237            self.column_reader.as_mut().unwrap().read_records(
238                batch_size,
239                self.def_levels.as_mut(),
240                self.rep_levels.as_mut(),
241                values,
242            )?;
243
244        if values_read < levels_read {
245            let def_levels = self.def_levels.as_ref().ok_or_else(|| {
246                general_err!("Definition levels should exist when data is less than levels!")
247            })?;
248
249            values.pad_nulls(
250                self.num_values,
251                values_read,
252                levels_read,
253                def_levels.nulls().as_slice(),
254            );
255        }
256
257        self.num_records += records_read;
258        self.num_values += levels_read;
259        Ok(records_read)
260    }
261}
262
263/// Returns true if we do not need to unpack the nullability for this column, this is
264/// only possible if the max definition level is 1, and corresponds to nulls at the
265/// leaf level, as opposed to a nullable parent nested type
266fn packed_null_mask(descr: &ColumnDescPtr) -> bool {
267    descr.max_def_level() == 1 && descr.max_rep_level() == 0 && descr.self_type().is_optional()
268}
269
270#[cfg(test)]
271mod tests {
272    use std::sync::Arc;
273
274    use arrow::buffer::Buffer;
275
276    use crate::arrow::arrow_reader::DEFAULT_BATCH_SIZE;
277    use crate::basic::Encoding;
278    use crate::data_type::Int32Type;
279    use crate::schema::parser::parse_message_type;
280    use crate::schema::types::SchemaDescriptor;
281    use crate::util::test_common::page_util::{
282        DataPageBuilder, DataPageBuilderImpl, InMemoryPageReader,
283    };
284
285    use super::RecordReader;
286
287    #[test]
288    fn test_read_required_records() {
289        // Construct column schema
290        let message_type = "
291        message test_schema {
292          REQUIRED INT32 leaf;
293        }
294        ";
295        let desc = parse_message_type(message_type)
296            .map(|t| SchemaDescriptor::new(Arc::new(t)))
297            .map(|s| s.column(0))
298            .unwrap();
299
300        // Construct record reader
301        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(), DEFAULT_BATCH_SIZE);
302
303        // First page
304
305        // Records data:
306        // test_schema
307        //   leaf: 4
308        // test_schema
309        //   leaf: 7
310        // test_schema
311        //   leaf: 6
312        // test_schema
313        //   left: 3
314        // test_schema
315        //   left: 2
316        {
317            let values = [4, 7, 6, 3, 2];
318            let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
319            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
320            let page = pb.consume();
321
322            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
323            record_reader.set_page_reader(page_reader).unwrap();
324            assert_eq!(2, record_reader.read_records(2).unwrap());
325            assert_eq!(2, record_reader.num_records());
326            assert_eq!(2, record_reader.num_values());
327            assert_eq!(3, record_reader.read_records(3).unwrap());
328            assert_eq!(5, record_reader.num_records());
329            assert_eq!(5, record_reader.num_values());
330        }
331
332        // Second page
333
334        // Records data:
335        // test_schema
336        //   leaf: 8
337        // test_schema
338        //   leaf: 9
339        {
340            let values = [8, 9];
341            let mut pb = DataPageBuilderImpl::new(desc, 2, true);
342            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
343            let page = pb.consume();
344
345            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
346            record_reader.set_page_reader(page_reader).unwrap();
347            assert_eq!(2, record_reader.read_records(10).unwrap());
348            assert_eq!(7, record_reader.num_records());
349            assert_eq!(7, record_reader.num_values());
350        }
351
352        assert_eq!(record_reader.consume_record_data(), &[4, 7, 6, 3, 2, 8, 9]);
353        assert_eq!(None, record_reader.consume_def_levels());
354        assert_eq!(None, record_reader.consume_bitmap());
355    }
356
357    #[test]
358    fn test_read_optional_records() {
359        // Construct column schema
360        let message_type = "
361        message test_schema {
362          OPTIONAL Group test_struct {
363            OPTIONAL INT32 leaf;
364          }
365        }
366        ";
367
368        let desc = parse_message_type(message_type)
369            .map(|t| SchemaDescriptor::new(Arc::new(t)))
370            .map(|s| s.column(0))
371            .unwrap();
372
373        // Construct record reader
374        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(), DEFAULT_BATCH_SIZE);
375
376        // First page
377
378        // Records data:
379        // test_schema
380        //   test_struct
381        // test_schema
382        //   test_struct
383        //     left: 7
384        // test_schema
385        // test_schema
386        //   test_struct
387        //     leaf: 6
388        // test_schema
389        //   test_struct
390        //     leaf: 6
391        {
392            let values = [7, 6, 3];
393            //empty, non-empty, empty, non-empty, non-empty
394            let def_levels = [1i16, 2i16, 0i16, 2i16, 2i16];
395            let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
396            pb.add_def_levels(2, &def_levels);
397            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
398            let page = pb.consume();
399
400            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
401            record_reader.set_page_reader(page_reader).unwrap();
402            assert_eq!(2, record_reader.read_records(2).unwrap());
403            assert_eq!(2, record_reader.num_records());
404            assert_eq!(2, record_reader.num_values());
405            assert_eq!(3, record_reader.read_records(3).unwrap());
406            assert_eq!(5, record_reader.num_records());
407            assert_eq!(5, record_reader.num_values());
408        }
409
410        // Second page
411
412        // Records data:
413        // test_schema
414        // test_schema
415        //   test_struct
416        //     left: 8
417        {
418            let values = [8];
419            //empty, non-empty
420            let def_levels = [0i16, 2i16];
421            let mut pb = DataPageBuilderImpl::new(desc, 2, true);
422            pb.add_def_levels(2, &def_levels);
423            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
424            let page = pb.consume();
425
426            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
427            record_reader.set_page_reader(page_reader).unwrap();
428            assert_eq!(2, record_reader.read_records(10).unwrap());
429            assert_eq!(7, record_reader.num_records());
430            assert_eq!(7, record_reader.num_values());
431        }
432
433        // Verify result def levels
434        assert_eq!(
435            Some(vec![1i16, 2i16, 0i16, 2i16, 2i16, 0i16, 2i16]),
436            record_reader.consume_def_levels()
437        );
438
439        // Verify bitmap
440        let expected_valid = &[false, true, false, true, true, false, true];
441        let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned());
442        assert_eq!(Some(expected_buffer), record_reader.consume_bitmap());
443
444        // Verify result record data
445        let actual = record_reader.consume_record_data();
446
447        let expected = &[0, 7, 0, 6, 3, 0, 8];
448        assert_eq!(actual.len(), expected.len());
449
450        // Only validate valid values are equal
451        let iter = expected_valid.iter().zip(&actual).zip(expected);
452        for ((valid, actual), expected) in iter {
453            if *valid {
454                assert_eq!(actual, expected)
455            }
456        }
457    }
458
459    #[test]
460    fn test_read_repeated_records() {
461        // Construct column schema
462        let message_type = "
463        message test_schema {
464          REPEATED Group test_struct {
465            REPEATED  INT32 leaf;
466          }
467        }
468        ";
469
470        let desc = parse_message_type(message_type)
471            .map(|t| SchemaDescriptor::new(Arc::new(t)))
472            .map(|s| s.column(0))
473            .unwrap();
474
475        // Construct record reader
476        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(), DEFAULT_BATCH_SIZE);
477
478        // First page
479
480        // Records data:
481        // test_schema
482        //   test_struct
483        //     leaf: 4
484        // test_schema
485        // test_schema
486        //   test_struct
487        //   test_struct
488        //     leaf: 7
489        //     leaf: 6
490        //     leaf: 3
491        //   test_struct
492        //     leaf: 2
493        {
494            let values = [4, 7, 6, 3, 2];
495            let def_levels = [2i16, 0i16, 1i16, 2i16, 2i16, 2i16, 2i16];
496            let rep_levels = [0i16, 0i16, 0i16, 1i16, 2i16, 2i16, 1i16];
497            let mut pb = DataPageBuilderImpl::new(desc.clone(), 7, true);
498            pb.add_rep_levels(2, &rep_levels);
499            pb.add_def_levels(2, &def_levels);
500            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
501            let page = pb.consume();
502
503            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
504            record_reader.set_page_reader(page_reader).unwrap();
505
506            assert_eq!(1, record_reader.read_records(1).unwrap());
507            assert_eq!(1, record_reader.num_records());
508            assert_eq!(1, record_reader.num_values());
509            assert_eq!(2, record_reader.read_records(3).unwrap());
510            assert_eq!(3, record_reader.num_records());
511            assert_eq!(7, record_reader.num_values());
512        }
513
514        // Second page
515
516        // Records data:
517        // test_schema
518        //   test_struct
519        //     leaf: 8
520        //     leaf: 9
521        {
522            let values = [8, 9];
523            let def_levels = [2i16, 2i16];
524            let rep_levels = [0i16, 2i16];
525            let mut pb = DataPageBuilderImpl::new(desc, 2, true);
526            pb.add_rep_levels(2, &rep_levels);
527            pb.add_def_levels(2, &def_levels);
528            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
529            let page = pb.consume();
530
531            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
532            record_reader.set_page_reader(page_reader).unwrap();
533
534            assert_eq!(1, record_reader.read_records(10).unwrap());
535            assert_eq!(4, record_reader.num_records());
536            assert_eq!(9, record_reader.num_values());
537        }
538
539        // Verify result def levels
540        assert_eq!(
541            Some(vec![2i16, 0i16, 1i16, 2i16, 2i16, 2i16, 2i16, 2i16, 2i16]),
542            record_reader.consume_def_levels()
543        );
544
545        // Verify bitmap
546        let expected_valid = &[true, false, false, true, true, true, true, true, true];
547        let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned());
548        assert_eq!(Some(expected_buffer), record_reader.consume_bitmap());
549
550        // Verify result record data
551        let actual = record_reader.consume_record_data();
552        let expected = &[4, 0, 0, 7, 6, 3, 2, 8, 9];
553        assert_eq!(actual.len(), expected.len());
554
555        // Only validate valid values are equal
556        let iter = expected_valid.iter().zip(&actual).zip(expected);
557        for ((valid, actual), expected) in iter {
558            if *valid {
559                assert_eq!(actual, expected)
560            }
561        }
562    }
563
564    #[test]
565    fn test_read_more_than_one_batch() {
566        // Construct column schema
567        let message_type = "
568        message test_schema {
569          REPEATED  INT32 leaf;
570        }
571        ";
572
573        let desc = parse_message_type(message_type)
574            .map(|t| SchemaDescriptor::new(Arc::new(t)))
575            .map(|s| s.column(0))
576            .unwrap();
577
578        // Construct record reader
579        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(), DEFAULT_BATCH_SIZE);
580
581        {
582            let values = [100; 5000];
583            let def_levels = [1i16; 5000];
584            let mut rep_levels = [1i16; 5000];
585            for idx in 0..1000 {
586                rep_levels[idx * 5] = 0i16;
587            }
588
589            let mut pb = DataPageBuilderImpl::new(desc, 5000, true);
590            pb.add_rep_levels(1, &rep_levels);
591            pb.add_def_levels(1, &def_levels);
592            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
593            let page = pb.consume();
594
595            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
596            record_reader.set_page_reader(page_reader).unwrap();
597
598            assert_eq!(1000, record_reader.read_records(1000).unwrap());
599            assert_eq!(1000, record_reader.num_records());
600            assert_eq!(5000, record_reader.num_values());
601        }
602    }
603
604    #[test]
605    fn test_row_group_boundary() {
606        // Construct column schema
607        let message_type = "
608        message test_schema {
609          REPEATED Group test_struct {
610            REPEATED  INT32 leaf;
611          }
612        }
613        ";
614
615        let desc = parse_message_type(message_type)
616            .map(|t| SchemaDescriptor::new(Arc::new(t)))
617            .map(|s| s.column(0))
618            .unwrap();
619
620        let values = [1, 2, 3];
621        let def_levels = [1i16, 0i16, 1i16, 2i16, 2i16, 1i16, 2i16];
622        let rep_levels = [0i16, 0i16, 0i16, 1i16, 2i16, 0i16, 1i16];
623        let mut pb = DataPageBuilderImpl::new(desc.clone(), 7, true);
624        pb.add_rep_levels(2, &rep_levels);
625        pb.add_def_levels(2, &def_levels);
626        pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
627        let page = pb.consume();
628
629        let mut record_reader = RecordReader::<Int32Type>::new(desc, DEFAULT_BATCH_SIZE);
630        let page_reader = Box::new(InMemoryPageReader::new(vec![page.clone()]));
631        record_reader.set_page_reader(page_reader).unwrap();
632        assert_eq!(record_reader.read_records(4).unwrap(), 4);
633        assert_eq!(record_reader.num_records(), 4);
634        assert_eq!(record_reader.num_values(), 7);
635
636        assert_eq!(record_reader.read_records(4).unwrap(), 0);
637        assert_eq!(record_reader.num_records(), 4);
638        assert_eq!(record_reader.num_values(), 7);
639
640        record_reader.read_records(4).unwrap();
641
642        let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
643        record_reader.set_page_reader(page_reader).unwrap();
644
645        assert_eq!(record_reader.read_records(4).unwrap(), 4);
646        assert_eq!(record_reader.num_records(), 8);
647        assert_eq!(record_reader.num_values(), 14);
648
649        assert_eq!(record_reader.read_records(4).unwrap(), 0);
650        assert_eq!(record_reader.num_records(), 8);
651        assert_eq!(record_reader.num_values(), 14);
652    }
653
654    #[test]
655    fn test_skip_required_records() {
656        // Construct column schema
657        let message_type = "
658        message test_schema {
659          REQUIRED INT32 leaf;
660        }
661        ";
662        let desc = parse_message_type(message_type)
663            .map(|t| SchemaDescriptor::new(Arc::new(t)))
664            .map(|s| s.column(0))
665            .unwrap();
666
667        // Construct record reader
668        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(), DEFAULT_BATCH_SIZE);
669
670        // First page
671
672        // Records data:
673        // test_schema
674        //   leaf: 4
675        // test_schema
676        //   leaf: 7
677        // test_schema
678        //   leaf: 6
679        // test_schema
680        //   left: 3
681        // test_schema
682        //   left: 2
683        {
684            let values = [4, 7, 6, 3, 2];
685            let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
686            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
687            let page = pb.consume();
688
689            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
690            record_reader.set_page_reader(page_reader).unwrap();
691            assert_eq!(2, record_reader.skip_records(2).unwrap());
692            assert_eq!(0, record_reader.num_records());
693            assert_eq!(0, record_reader.num_values());
694            assert_eq!(3, record_reader.read_records(3).unwrap());
695            assert_eq!(3, record_reader.num_records());
696            assert_eq!(3, record_reader.num_values());
697        }
698
699        // Second page
700
701        // Records data:
702        // test_schema
703        //   leaf: 8
704        // test_schema
705        //   leaf: 9
706        {
707            let values = [8, 9];
708            let mut pb = DataPageBuilderImpl::new(desc, 2, true);
709            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
710            let page = pb.consume();
711
712            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
713            record_reader.set_page_reader(page_reader).unwrap();
714            assert_eq!(2, record_reader.skip_records(10).unwrap());
715            assert_eq!(3, record_reader.num_records());
716            assert_eq!(3, record_reader.num_values());
717            assert_eq!(0, record_reader.read_records(10).unwrap());
718        }
719
720        assert_eq!(record_reader.consume_record_data(), &[6, 3, 2]);
721        assert_eq!(None, record_reader.consume_def_levels());
722        assert_eq!(None, record_reader.consume_bitmap());
723    }
724
725    #[test]
726    fn test_skip_optional_records() {
727        // Construct column schema
728        let message_type = "
729        message test_schema {
730          OPTIONAL Group test_struct {
731            OPTIONAL INT32 leaf;
732          }
733        }
734        ";
735
736        let desc = parse_message_type(message_type)
737            .map(|t| SchemaDescriptor::new(Arc::new(t)))
738            .map(|s| s.column(0))
739            .unwrap();
740
741        // Construct record reader
742        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone(), DEFAULT_BATCH_SIZE);
743
744        // First page
745
746        // Records data:
747        // test_schema
748        //   test_struct
749        // test_schema
750        //   test_struct
751        //     leaf: 7
752        // test_schema
753        // test_schema
754        //   test_struct
755        //     leaf: 6
756        // test_schema
757        //   test_struct
758        //     leaf: 6
759        {
760            let values = [7, 6, 3];
761            //empty, non-empty, empty, non-empty, non-empty
762            let def_levels = [1i16, 2i16, 0i16, 2i16, 2i16];
763            let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
764            pb.add_def_levels(2, &def_levels);
765            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
766            let page = pb.consume();
767
768            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
769            record_reader.set_page_reader(page_reader).unwrap();
770            assert_eq!(2, record_reader.skip_records(2).unwrap());
771            assert_eq!(0, record_reader.num_records());
772            assert_eq!(0, record_reader.num_values());
773            assert_eq!(3, record_reader.read_records(3).unwrap());
774            assert_eq!(3, record_reader.num_records());
775            assert_eq!(3, record_reader.num_values());
776        }
777
778        // Second page
779
780        // Records data:
781        // test_schema
782        // test_schema
783        //   test_struct
784        //     left: 8
785        {
786            let values = [8];
787            //empty, non-empty
788            let def_levels = [0i16, 2i16];
789            let mut pb = DataPageBuilderImpl::new(desc, 2, true);
790            pb.add_def_levels(2, &def_levels);
791            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
792            let page = pb.consume();
793
794            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
795            record_reader.set_page_reader(page_reader).unwrap();
796            assert_eq!(2, record_reader.skip_records(10).unwrap());
797            assert_eq!(3, record_reader.num_records());
798            assert_eq!(3, record_reader.num_values());
799            assert_eq!(0, record_reader.read_records(10).unwrap());
800        }
801
802        // Verify result def levels
803        assert_eq!(
804            Some(vec![0i16, 2i16, 2i16]),
805            record_reader.consume_def_levels()
806        );
807
808        // Verify bitmap
809        let expected_valid = &[false, true, true];
810        let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned());
811        assert_eq!(Some(expected_buffer), record_reader.consume_bitmap());
812
813        // Verify result record data
814        let actual = record_reader.consume_record_data();
815
816        let expected = &[0, 6, 3];
817        assert_eq!(actual.len(), expected.len());
818
819        // Only validate valid values are equal
820        let iter = expected_valid.iter().zip(&actual).zip(expected);
821        for ((valid, actual), expected) in iter {
822            if *valid {
823                assert_eq!(actual, expected)
824            }
825        }
826    }
827}