parquet/arrow/record_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow_buffer::Buffer;
19
20use crate::arrow::record_reader::{
21    buffer::ValuesBuffer,
22    definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder},
23};
24use crate::column::reader::decoder::RepetitionLevelDecoderImpl;
25use crate::column::{
26    page::PageReader,
27    reader::{
28        decoder::{ColumnValueDecoder, ColumnValueDecoderImpl},
29        GenericColumnReader,
30    },
31};
32use crate::data_type::DataType;
33use crate::errors::{ParquetError, Result};
34use crate::schema::types::ColumnDescPtr;
35
36pub(crate) mod buffer;
37mod definition_levels;
38
39/// A `RecordReader` is a stateful column reader that delimits semantic records.
40pub type RecordReader<T> = GenericRecordReader<Vec<<T as DataType>::T>, ColumnValueDecoderImpl<T>>;
41
42pub(crate) type ColumnReader<CV> =
43    GenericColumnReader<RepetitionLevelDecoderImpl, DefinitionLevelBufferDecoder, CV>;
44
45/// A generic stateful column reader that delimits semantic records
46///
47/// This type is hidden from the docs, and relies on private traits with no
48/// public implementations. As such this type signature may be changed without
49/// breaking downstream users as it can only be constructed through type aliases
50pub struct GenericRecordReader<V, CV> {
51    column_desc: ColumnDescPtr,
52
53    values: V,
54    def_levels: Option<DefinitionLevelBuffer>,
55    rep_levels: Option<Vec<i16>>,
56    column_reader: Option<ColumnReader<CV>>,
57    /// Number of buffered levels / null-padded values
58    num_values: usize,
59    /// Number of buffered records
60    num_records: usize,
61}
62
63impl<V, CV> GenericRecordReader<V, CV>
64where
65    V: ValuesBuffer,
66    CV: ColumnValueDecoder<Buffer = V>,
67{
68    /// Create a new [`GenericRecordReader`]
69    pub fn new(desc: ColumnDescPtr) -> Self {
70        let def_levels = (desc.max_def_level() > 0)
71            .then(|| DefinitionLevelBuffer::new(&desc, packed_null_mask(&desc)));
72
73        let rep_levels = (desc.max_rep_level() > 0).then(Vec::new);
74
75        Self {
76            values: V::default(),
77            def_levels,
78            rep_levels,
79            column_reader: None,
80            column_desc: desc,
81            num_values: 0,
82            num_records: 0,
83        }
84    }
85
86    /// Set the current page reader.
87    pub fn set_page_reader(&mut self, page_reader: Box<dyn PageReader>) -> Result<()> {
88        let descr = &self.column_desc;
89        let values_decoder = CV::new(descr);
90
91        let def_level_decoder = (descr.max_def_level() != 0).then(|| {
92            DefinitionLevelBufferDecoder::new(descr.max_def_level(), packed_null_mask(descr))
93        });
94
95        let rep_level_decoder = (descr.max_rep_level() != 0)
96            .then(|| RepetitionLevelDecoderImpl::new(descr.max_rep_level()));
97
98        self.column_reader = Some(GenericColumnReader::new_with_decoders(
99            self.column_desc.clone(),
100            page_reader,
101            values_decoder,
102            def_level_decoder,
103            rep_level_decoder,
104        ));
105        Ok(())
106    }
107
108    /// Try to read `num_records` of column data into internal buffer.
109    ///
110    /// # Returns
111    ///
112    /// Number of actual records read.
113    pub fn read_records(&mut self, num_records: usize) -> Result<usize> {
114        if self.column_reader.is_none() {
115            return Ok(0);
116        }
117
118        let mut records_read = 0;
119
120        loop {
121            let records_to_read = num_records - records_read;
122            records_read += self.read_one_batch(records_to_read)?;
123            if records_read == num_records || !self.column_reader.as_mut().unwrap().has_next()? {
124                break;
125            }
126        }
127        Ok(records_read)
128    }
129
130    /// Try to skip the next `num_records` rows
131    ///
132    /// # Returns
133    ///
134    /// Number of records skipped
135    pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
136        match self.column_reader.as_mut() {
137            Some(reader) => reader.skip_records(num_records),
138            None => Ok(0),
139        }
140    }
141
142    /// Returns number of records stored in buffer.
143    #[allow(unused)]
144    pub fn num_records(&self) -> usize {
145        self.num_records
146    }
147
148    /// Return number of values stored in buffer.
149    /// If the parquet column is not repeated, it should be equals to `num_records`,
150    /// otherwise it should be larger than or equal to `num_records`.
151    pub fn num_values(&self) -> usize {
152        self.num_values
153    }
154
155    /// Returns definition level data.
156    /// The implementation has side effects. It will create a new buffer to hold those
157    /// definition level values that have already been read into memory but not counted
158    /// as record values, e.g. those from `self.num_values` to `self.values_written`.
159    pub fn consume_def_levels(&mut self) -> Option<Vec<i16>> {
160        self.def_levels.as_mut().and_then(|x| x.consume_levels())
161    }
162
163    /// Return repetition level data.
164    /// The side effect is similar to `consume_def_levels`.
165    pub fn consume_rep_levels(&mut self) -> Option<Vec<i16>> {
166        self.rep_levels.as_mut().map(std::mem::take)
167    }
168
169    /// Returns currently stored buffer data.
170    /// The side effect is similar to `consume_def_levels`.
171    pub fn consume_record_data(&mut self) -> V {
172        std::mem::take(&mut self.values)
173    }
174
175    /// Returns currently stored null bitmap data for nullable columns.
176    /// For non-nullable columns, the bitmap is discarded.
177    /// The side effect is similar to `consume_def_levels`.
178    pub fn consume_bitmap_buffer(&mut self) -> Option<Buffer> {
179        self.consume_bitmap()
180    }
181
182    /// Reset state of record reader.
183    /// Should be called after consuming data, e.g. `consume_rep_levels`,
184    /// `consume_rep_levels`, `consume_record_data` and `consume_bitmap_buffer`.
185    pub fn reset(&mut self) {
186        self.num_values = 0;
187        self.num_records = 0;
188    }
189
190    /// Returns bitmap data for nullable columns.
191    /// For non-nullable columns, the bitmap is discarded.
192    pub fn consume_bitmap(&mut self) -> Option<Buffer> {
193        let mask = self
194            .def_levels
195            .as_mut()
196            .map(|levels| levels.consume_bitmask());
197
198        // While we always consume the bitmask here, we only want to return
199        // the bitmask for nullable arrays. (Marking nulls on a non-nullable
200        // array may fail validations, even if those nulls are masked off at
201        // a higher level.)
202        if self.column_desc.self_type().is_optional() {
203            mask
204        } else {
205            None
206        }
207    }
208
209    /// Try to read one batch of data returning the number of records read
210    fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
211        let (records_read, values_read, levels_read) =
212            self.column_reader.as_mut().unwrap().read_records(
213                batch_size,
214                self.def_levels.as_mut(),
215                self.rep_levels.as_mut(),
216                &mut self.values,
217            )?;
218
219        if values_read < levels_read {
220            let def_levels = self.def_levels.as_ref().ok_or_else(|| {
221                general_err!("Definition levels should exist when data is less than levels!")
222            })?;
223
224            self.values.pad_nulls(
225                self.num_values,
226                values_read,
227                levels_read,
228                def_levels.nulls().as_slice(),
229            );
230        }
231
232        self.num_records += records_read;
233        self.num_values += levels_read;
234        Ok(records_read)
235    }
236}
237
238/// Returns true if we do not need to unpack the nullability for this column, this is
239/// only possible if the max definition level is 1, and corresponds to nulls at the
240/// leaf level, as opposed to a nullable parent nested type
241fn packed_null_mask(descr: &ColumnDescPtr) -> bool {
242    descr.max_def_level() == 1 && descr.max_rep_level() == 0 && descr.self_type().is_optional()
243}
244
245#[cfg(test)]
246mod tests {
247    use std::sync::Arc;
248
249    use arrow::buffer::Buffer;
250
251    use crate::basic::Encoding;
252    use crate::data_type::Int32Type;
253    use crate::schema::parser::parse_message_type;
254    use crate::schema::types::SchemaDescriptor;
255    use crate::util::test_common::page_util::{
256        DataPageBuilder, DataPageBuilderImpl, InMemoryPageReader,
257    };
258
259    use super::RecordReader;
260
261    #[test]
262    fn test_read_required_records() {
263        // Construct column schema
264        let message_type = "
265        message test_schema {
266          REQUIRED INT32 leaf;
267        }
268        ";
269        let desc = parse_message_type(message_type)
270            .map(|t| SchemaDescriptor::new(Arc::new(t)))
271            .map(|s| s.column(0))
272            .unwrap();
273
274        // Construct record reader
275        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
276
277        // First page
278
279        // Records data:
280        // test_schema
281        //   leaf: 4
282        // test_schema
283        //   leaf: 7
284        // test_schema
285        //   leaf: 6
286        // test_schema
287        //   left: 3
288        // test_schema
289        //   left: 2
290        {
291            let values = [4, 7, 6, 3, 2];
292            let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
293            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
294            let page = pb.consume();
295
296            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
297            record_reader.set_page_reader(page_reader).unwrap();
298            assert_eq!(2, record_reader.read_records(2).unwrap());
299            assert_eq!(2, record_reader.num_records());
300            assert_eq!(2, record_reader.num_values());
301            assert_eq!(3, record_reader.read_records(3).unwrap());
302            assert_eq!(5, record_reader.num_records());
303            assert_eq!(5, record_reader.num_values());
304        }
305
306        // Second page
307
308        // Records data:
309        // test_schema
310        //   leaf: 8
311        // test_schema
312        //   leaf: 9
313        {
314            let values = [8, 9];
315            let mut pb = DataPageBuilderImpl::new(desc, 2, true);
316            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
317            let page = pb.consume();
318
319            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
320            record_reader.set_page_reader(page_reader).unwrap();
321            assert_eq!(2, record_reader.read_records(10).unwrap());
322            assert_eq!(7, record_reader.num_records());
323            assert_eq!(7, record_reader.num_values());
324        }
325
326        assert_eq!(record_reader.consume_record_data(), &[4, 7, 6, 3, 2, 8, 9]);
327        assert_eq!(None, record_reader.consume_def_levels());
328        assert_eq!(None, record_reader.consume_bitmap());
329    }
330
331    #[test]
332    fn test_read_optional_records() {
333        // Construct column schema
334        let message_type = "
335        message test_schema {
336          OPTIONAL Group test_struct {
337            OPTIONAL INT32 leaf;
338          }
339        }
340        ";
341
342        let desc = parse_message_type(message_type)
343            .map(|t| SchemaDescriptor::new(Arc::new(t)))
344            .map(|s| s.column(0))
345            .unwrap();
346
347        // Construct record reader
348        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
349
350        // First page
351
352        // Records data:
353        // test_schema
354        //   test_struct
355        // test_schema
356        //   test_struct
357        //     left: 7
358        // test_schema
359        // test_schema
360        //   test_struct
361        //     leaf: 6
362        // test_schema
363        //   test_struct
364        //     leaf: 6
365        {
366            let values = [7, 6, 3];
367            //empty, non-empty, empty, non-empty, non-empty
368            let def_levels = [1i16, 2i16, 0i16, 2i16, 2i16];
369            let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
370            pb.add_def_levels(2, &def_levels);
371            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
372            let page = pb.consume();
373
374            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
375            record_reader.set_page_reader(page_reader).unwrap();
376            assert_eq!(2, record_reader.read_records(2).unwrap());
377            assert_eq!(2, record_reader.num_records());
378            assert_eq!(2, record_reader.num_values());
379            assert_eq!(3, record_reader.read_records(3).unwrap());
380            assert_eq!(5, record_reader.num_records());
381            assert_eq!(5, record_reader.num_values());
382        }
383
384        // Second page
385
386        // Records data:
387        // test_schema
388        // test_schema
389        //   test_struct
390        //     left: 8
391        {
392            let values = [8];
393            //empty, non-empty
394            let def_levels = [0i16, 2i16];
395            let mut pb = DataPageBuilderImpl::new(desc, 2, true);
396            pb.add_def_levels(2, &def_levels);
397            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
398            let page = pb.consume();
399
400            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
401            record_reader.set_page_reader(page_reader).unwrap();
402            assert_eq!(2, record_reader.read_records(10).unwrap());
403            assert_eq!(7, record_reader.num_records());
404            assert_eq!(7, record_reader.num_values());
405        }
406
407        // Verify result def levels
408        assert_eq!(
409            Some(vec![1i16, 2i16, 0i16, 2i16, 2i16, 0i16, 2i16]),
410            record_reader.consume_def_levels()
411        );
412
413        // Verify bitmap
414        let expected_valid = &[false, true, false, true, true, false, true];
415        let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned());
416        assert_eq!(Some(expected_buffer), record_reader.consume_bitmap());
417
418        // Verify result record data
419        let actual = record_reader.consume_record_data();
420
421        let expected = &[0, 7, 0, 6, 3, 0, 8];
422        assert_eq!(actual.len(), expected.len());
423
424        // Only validate valid values are equal
425        let iter = expected_valid.iter().zip(&actual).zip(expected);
426        for ((valid, actual), expected) in iter {
427            if *valid {
428                assert_eq!(actual, expected)
429            }
430        }
431    }
432
433    #[test]
434    fn test_read_repeated_records() {
435        // Construct column schema
436        let message_type = "
437        message test_schema {
438          REPEATED Group test_struct {
439            REPEATED  INT32 leaf;
440          }
441        }
442        ";
443
444        let desc = parse_message_type(message_type)
445            .map(|t| SchemaDescriptor::new(Arc::new(t)))
446            .map(|s| s.column(0))
447            .unwrap();
448
449        // Construct record reader
450        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
451
452        // First page
453
454        // Records data:
455        // test_schema
456        //   test_struct
457        //     leaf: 4
458        // test_schema
459        // test_schema
460        //   test_struct
461        //   test_struct
462        //     leaf: 7
463        //     leaf: 6
464        //     leaf: 3
465        //   test_struct
466        //     leaf: 2
467        {
468            let values = [4, 7, 6, 3, 2];
469            let def_levels = [2i16, 0i16, 1i16, 2i16, 2i16, 2i16, 2i16];
470            let rep_levels = [0i16, 0i16, 0i16, 1i16, 2i16, 2i16, 1i16];
471            let mut pb = DataPageBuilderImpl::new(desc.clone(), 7, true);
472            pb.add_rep_levels(2, &rep_levels);
473            pb.add_def_levels(2, &def_levels);
474            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
475            let page = pb.consume();
476
477            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
478            record_reader.set_page_reader(page_reader).unwrap();
479
480            assert_eq!(1, record_reader.read_records(1).unwrap());
481            assert_eq!(1, record_reader.num_records());
482            assert_eq!(1, record_reader.num_values());
483            assert_eq!(2, record_reader.read_records(3).unwrap());
484            assert_eq!(3, record_reader.num_records());
485            assert_eq!(7, record_reader.num_values());
486        }
487
488        // Second page
489
490        // Records data:
491        // test_schema
492        //   test_struct
493        //     leaf: 8
494        //     leaf: 9
495        {
496            let values = [8, 9];
497            let def_levels = [2i16, 2i16];
498            let rep_levels = [0i16, 2i16];
499            let mut pb = DataPageBuilderImpl::new(desc, 2, true);
500            pb.add_rep_levels(2, &rep_levels);
501            pb.add_def_levels(2, &def_levels);
502            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
503            let page = pb.consume();
504
505            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
506            record_reader.set_page_reader(page_reader).unwrap();
507
508            assert_eq!(1, record_reader.read_records(10).unwrap());
509            assert_eq!(4, record_reader.num_records());
510            assert_eq!(9, record_reader.num_values());
511        }
512
513        // Verify result def levels
514        assert_eq!(
515            Some(vec![2i16, 0i16, 1i16, 2i16, 2i16, 2i16, 2i16, 2i16, 2i16]),
516            record_reader.consume_def_levels()
517        );
518
519        // Verify bitmap
520        let expected_valid = &[true, false, false, true, true, true, true, true, true];
521        let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned());
522        assert_eq!(Some(expected_buffer), record_reader.consume_bitmap());
523
524        // Verify result record data
525        let actual = record_reader.consume_record_data();
526        let expected = &[4, 0, 0, 7, 6, 3, 2, 8, 9];
527        assert_eq!(actual.len(), expected.len());
528
529        // Only validate valid values are equal
530        let iter = expected_valid.iter().zip(&actual).zip(expected);
531        for ((valid, actual), expected) in iter {
532            if *valid {
533                assert_eq!(actual, expected)
534            }
535        }
536    }
537
538    #[test]
539    fn test_read_more_than_one_batch() {
540        // Construct column schema
541        let message_type = "
542        message test_schema {
543          REPEATED  INT32 leaf;
544        }
545        ";
546
547        let desc = parse_message_type(message_type)
548            .map(|t| SchemaDescriptor::new(Arc::new(t)))
549            .map(|s| s.column(0))
550            .unwrap();
551
552        // Construct record reader
553        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
554
555        {
556            let values = [100; 5000];
557            let def_levels = [1i16; 5000];
558            let mut rep_levels = [1i16; 5000];
559            for idx in 0..1000 {
560                rep_levels[idx * 5] = 0i16;
561            }
562
563            let mut pb = DataPageBuilderImpl::new(desc, 5000, true);
564            pb.add_rep_levels(1, &rep_levels);
565            pb.add_def_levels(1, &def_levels);
566            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
567            let page = pb.consume();
568
569            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
570            record_reader.set_page_reader(page_reader).unwrap();
571
572            assert_eq!(1000, record_reader.read_records(1000).unwrap());
573            assert_eq!(1000, record_reader.num_records());
574            assert_eq!(5000, record_reader.num_values());
575        }
576    }
577
578    #[test]
579    fn test_row_group_boundary() {
580        // Construct column schema
581        let message_type = "
582        message test_schema {
583          REPEATED Group test_struct {
584            REPEATED  INT32 leaf;
585          }
586        }
587        ";
588
589        let desc = parse_message_type(message_type)
590            .map(|t| SchemaDescriptor::new(Arc::new(t)))
591            .map(|s| s.column(0))
592            .unwrap();
593
594        let values = [1, 2, 3];
595        let def_levels = [1i16, 0i16, 1i16, 2i16, 2i16, 1i16, 2i16];
596        let rep_levels = [0i16, 0i16, 0i16, 1i16, 2i16, 0i16, 1i16];
597        let mut pb = DataPageBuilderImpl::new(desc.clone(), 7, true);
598        pb.add_rep_levels(2, &rep_levels);
599        pb.add_def_levels(2, &def_levels);
600        pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
601        let page = pb.consume();
602
603        let mut record_reader = RecordReader::<Int32Type>::new(desc);
604        let page_reader = Box::new(InMemoryPageReader::new(vec![page.clone()]));
605        record_reader.set_page_reader(page_reader).unwrap();
606        assert_eq!(record_reader.read_records(4).unwrap(), 4);
607        assert_eq!(record_reader.num_records(), 4);
608        assert_eq!(record_reader.num_values(), 7);
609
610        assert_eq!(record_reader.read_records(4).unwrap(), 0);
611        assert_eq!(record_reader.num_records(), 4);
612        assert_eq!(record_reader.num_values(), 7);
613
614        record_reader.read_records(4).unwrap();
615
616        let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
617        record_reader.set_page_reader(page_reader).unwrap();
618
619        assert_eq!(record_reader.read_records(4).unwrap(), 4);
620        assert_eq!(record_reader.num_records(), 8);
621        assert_eq!(record_reader.num_values(), 14);
622
623        assert_eq!(record_reader.read_records(4).unwrap(), 0);
624        assert_eq!(record_reader.num_records(), 8);
625        assert_eq!(record_reader.num_values(), 14);
626    }
627
628    #[test]
629    fn test_skip_required_records() {
630        // Construct column schema
631        let message_type = "
632        message test_schema {
633          REQUIRED INT32 leaf;
634        }
635        ";
636        let desc = parse_message_type(message_type)
637            .map(|t| SchemaDescriptor::new(Arc::new(t)))
638            .map(|s| s.column(0))
639            .unwrap();
640
641        // Construct record reader
642        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
643
644        // First page
645
646        // Records data:
647        // test_schema
648        //   leaf: 4
649        // test_schema
650        //   leaf: 7
651        // test_schema
652        //   leaf: 6
653        // test_schema
654        //   left: 3
655        // test_schema
656        //   left: 2
657        {
658            let values = [4, 7, 6, 3, 2];
659            let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
660            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
661            let page = pb.consume();
662
663            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
664            record_reader.set_page_reader(page_reader).unwrap();
665            assert_eq!(2, record_reader.skip_records(2).unwrap());
666            assert_eq!(0, record_reader.num_records());
667            assert_eq!(0, record_reader.num_values());
668            assert_eq!(3, record_reader.read_records(3).unwrap());
669            assert_eq!(3, record_reader.num_records());
670            assert_eq!(3, record_reader.num_values());
671        }
672
673        // Second page
674
675        // Records data:
676        // test_schema
677        //   leaf: 8
678        // test_schema
679        //   leaf: 9
680        {
681            let values = [8, 9];
682            let mut pb = DataPageBuilderImpl::new(desc, 2, true);
683            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
684            let page = pb.consume();
685
686            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
687            record_reader.set_page_reader(page_reader).unwrap();
688            assert_eq!(2, record_reader.skip_records(10).unwrap());
689            assert_eq!(3, record_reader.num_records());
690            assert_eq!(3, record_reader.num_values());
691            assert_eq!(0, record_reader.read_records(10).unwrap());
692        }
693
694        assert_eq!(record_reader.consume_record_data(), &[6, 3, 2]);
695        assert_eq!(None, record_reader.consume_def_levels());
696        assert_eq!(None, record_reader.consume_bitmap());
697    }
698
699    #[test]
700    fn test_skip_optional_records() {
701        // Construct column schema
702        let message_type = "
703        message test_schema {
704          OPTIONAL Group test_struct {
705            OPTIONAL INT32 leaf;
706          }
707        }
708        ";
709
710        let desc = parse_message_type(message_type)
711            .map(|t| SchemaDescriptor::new(Arc::new(t)))
712            .map(|s| s.column(0))
713            .unwrap();
714
715        // Construct record reader
716        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
717
718        // First page
719
720        // Records data:
721        // test_schema
722        //   test_struct
723        // test_schema
724        //   test_struct
725        //     leaf: 7
726        // test_schema
727        // test_schema
728        //   test_struct
729        //     leaf: 6
730        // test_schema
731        //   test_struct
732        //     leaf: 6
733        {
734            let values = [7, 6, 3];
735            //empty, non-empty, empty, non-empty, non-empty
736            let def_levels = [1i16, 2i16, 0i16, 2i16, 2i16];
737            let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
738            pb.add_def_levels(2, &def_levels);
739            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
740            let page = pb.consume();
741
742            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
743            record_reader.set_page_reader(page_reader).unwrap();
744            assert_eq!(2, record_reader.skip_records(2).unwrap());
745            assert_eq!(0, record_reader.num_records());
746            assert_eq!(0, record_reader.num_values());
747            assert_eq!(3, record_reader.read_records(3).unwrap());
748            assert_eq!(3, record_reader.num_records());
749            assert_eq!(3, record_reader.num_values());
750        }
751
752        // Second page
753
754        // Records data:
755        // test_schema
756        // test_schema
757        //   test_struct
758        //     left: 8
759        {
760            let values = [8];
761            //empty, non-empty
762            let def_levels = [0i16, 2i16];
763            let mut pb = DataPageBuilderImpl::new(desc, 2, true);
764            pb.add_def_levels(2, &def_levels);
765            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
766            let page = pb.consume();
767
768            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
769            record_reader.set_page_reader(page_reader).unwrap();
770            assert_eq!(2, record_reader.skip_records(10).unwrap());
771            assert_eq!(3, record_reader.num_records());
772            assert_eq!(3, record_reader.num_values());
773            assert_eq!(0, record_reader.read_records(10).unwrap());
774        }
775
776        // Verify result def levels
777        assert_eq!(
778            Some(vec![0i16, 2i16, 2i16]),
779            record_reader.consume_def_levels()
780        );
781
782        // Verify bitmap
783        let expected_valid = &[false, true, true];
784        let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned());
785        assert_eq!(Some(expected_buffer), record_reader.consume_bitmap());
786
787        // Verify result record data
788        let actual = record_reader.consume_record_data();
789
790        let expected = &[0, 6, 3];
791        assert_eq!(actual.len(), expected.len());
792
793        // Only validate valid values are equal
794        let iter = expected_valid.iter().zip(&actual).zip(expected);
795        for ((valid, actual), expected) in iter {
796            if *valid {
797                assert_eq!(actual, expected)
798            }
799        }
800    }
801}