parquet/column/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains column reader API.
19
20use bytes::Bytes;
21
22use super::page::{Page, PageReader};
23use crate::basic::*;
24use crate::column::reader::decoder::{
25    ColumnValueDecoder, ColumnValueDecoderImpl, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
26    RepetitionLevelDecoder, RepetitionLevelDecoderImpl,
27};
28use crate::data_type::*;
29use crate::errors::{ParquetError, Result};
30use crate::schema::types::ColumnDescPtr;
31use crate::util::bit_util::{ceil, num_required_bits, read_num_bytes};
32
33pub(crate) mod decoder;
34
35/// Column reader for a Parquet type.
36pub enum ColumnReader {
37    /// Column reader for boolean type
38    BoolColumnReader(ColumnReaderImpl<BoolType>),
39    /// Column reader for int32 type
40    Int32ColumnReader(ColumnReaderImpl<Int32Type>),
41    /// Column reader for int64 type
42    Int64ColumnReader(ColumnReaderImpl<Int64Type>),
43    /// Column reader for int96 type
44    Int96ColumnReader(ColumnReaderImpl<Int96Type>),
45    /// Column reader for float type
46    FloatColumnReader(ColumnReaderImpl<FloatType>),
47    /// Column reader for double type
48    DoubleColumnReader(ColumnReaderImpl<DoubleType>),
49    /// Column reader for byte array type
50    ByteArrayColumnReader(ColumnReaderImpl<ByteArrayType>),
51    /// Column reader for fixed length byte array type
52    FixedLenByteArrayColumnReader(ColumnReaderImpl<FixedLenByteArrayType>),
53}
54
55/// Gets a specific column reader corresponding to column descriptor `col_descr`. The
56/// column reader will read from pages in `col_page_reader`.
57pub fn get_column_reader(
58    col_descr: ColumnDescPtr,
59    col_page_reader: Box<dyn PageReader>,
60) -> ColumnReader {
61    match col_descr.physical_type() {
62        Type::BOOLEAN => {
63            ColumnReader::BoolColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
64        }
65        Type::INT32 => {
66            ColumnReader::Int32ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
67        }
68        Type::INT64 => {
69            ColumnReader::Int64ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
70        }
71        Type::INT96 => {
72            ColumnReader::Int96ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
73        }
74        Type::FLOAT => {
75            ColumnReader::FloatColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
76        }
77        Type::DOUBLE => {
78            ColumnReader::DoubleColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
79        }
80        Type::BYTE_ARRAY => {
81            ColumnReader::ByteArrayColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
82        }
83        Type::FIXED_LEN_BYTE_ARRAY => ColumnReader::FixedLenByteArrayColumnReader(
84            ColumnReaderImpl::new(col_descr, col_page_reader),
85        ),
86    }
87}
88
89/// Gets a typed column reader for the specific type `T`, by "up-casting" `col_reader` of
90/// non-generic type to a generic column reader type `ColumnReaderImpl`.
91///
92/// Panics if actual enum value for `col_reader` does not match the type `T`.
93pub fn get_typed_column_reader<T: DataType>(col_reader: ColumnReader) -> ColumnReaderImpl<T> {
94    T::get_column_reader(col_reader).unwrap_or_else(|| {
95        panic!(
96            "Failed to convert column reader into a typed column reader for `{}` type",
97            T::get_physical_type()
98        )
99    })
100}
101
102/// Typed value reader for a particular primitive column.
103pub type ColumnReaderImpl<T> = GenericColumnReader<
104    RepetitionLevelDecoderImpl,
105    DefinitionLevelDecoderImpl,
106    ColumnValueDecoderImpl<T>,
107>;
108
109/// Reads data for a given column chunk, using the provided decoders:
110///
111/// - R: `ColumnLevelDecoder` used to decode repetition levels
112/// - D: `ColumnLevelDecoder` used to decode definition levels
113/// - V: `ColumnValueDecoder` used to decode value data
114pub struct GenericColumnReader<R, D, V> {
115    descr: ColumnDescPtr,
116
117    page_reader: Box<dyn PageReader>,
118
119    /// The total number of values stored in the data page.
120    num_buffered_values: usize,
121
122    /// The number of values from the current data page that has been decoded into memory
123    /// so far.
124    num_decoded_values: usize,
125
126    /// True if the end of the current data page denotes the end of a record
127    has_record_delimiter: bool,
128
129    /// The decoder for the definition levels if any
130    def_level_decoder: Option<D>,
131
132    /// The decoder for the repetition levels if any
133    rep_level_decoder: Option<R>,
134
135    /// The decoder for the values
136    values_decoder: V,
137}
138
139impl<V> GenericColumnReader<RepetitionLevelDecoderImpl, DefinitionLevelDecoderImpl, V>
140where
141    V: ColumnValueDecoder,
142{
143    /// Creates new column reader based on column descriptor and page reader.
144    pub fn new(descr: ColumnDescPtr, page_reader: Box<dyn PageReader>) -> Self {
145        let values_decoder = V::new(&descr);
146
147        let def_level_decoder = (descr.max_def_level() != 0)
148            .then(|| DefinitionLevelDecoderImpl::new(descr.max_def_level()));
149
150        let rep_level_decoder = (descr.max_rep_level() != 0)
151            .then(|| RepetitionLevelDecoderImpl::new(descr.max_rep_level()));
152
153        Self::new_with_decoders(
154            descr,
155            page_reader,
156            values_decoder,
157            def_level_decoder,
158            rep_level_decoder,
159        )
160    }
161}
162
163impl<R, D, V> GenericColumnReader<R, D, V>
164where
165    R: RepetitionLevelDecoder,
166    D: DefinitionLevelDecoder,
167    V: ColumnValueDecoder,
168{
169    pub(crate) fn new_with_decoders(
170        descr: ColumnDescPtr,
171        page_reader: Box<dyn PageReader>,
172        values_decoder: V,
173        def_level_decoder: Option<D>,
174        rep_level_decoder: Option<R>,
175    ) -> Self {
176        Self {
177            descr,
178            def_level_decoder,
179            rep_level_decoder,
180            page_reader,
181            num_buffered_values: 0,
182            num_decoded_values: 0,
183            values_decoder,
184            has_record_delimiter: false,
185        }
186    }
187
188    /// Read up to `max_records` whole records, returning the number of complete
189    /// records, non-null values and levels decoded. All levels for a given record
190    /// will be read, i.e. the next repetition level, if any, will be 0
191    ///
192    /// If the max definition level is 0, `def_levels` will be ignored and the number of records,
193    /// non-null values and levels decoded will all be equal, otherwise `def_levels` will be
194    /// populated with the number of levels read, with an error returned if it is `None`.
195    ///
196    /// If the max repetition level is 0, `rep_levels` will be ignored and the number of records
197    /// and levels decoded will both be equal, otherwise `rep_levels` will be populated with
198    /// the number of levels read, with an error returned if it is `None`.
199    ///
200    /// `values` will be contiguously populated with the non-null values. Note that if the column
201    /// is not required, this may be less than either `max_records` or the number of levels read
202    pub fn read_records(
203        &mut self,
204        max_records: usize,
205        mut def_levels: Option<&mut D::Buffer>,
206        mut rep_levels: Option<&mut R::Buffer>,
207        values: &mut V::Buffer,
208    ) -> Result<(usize, usize, usize)> {
209        let mut total_records_read = 0;
210        let mut total_levels_read = 0;
211        let mut total_values_read = 0;
212
213        while total_records_read < max_records && self.has_next()? {
214            let remaining_records = max_records - total_records_read;
215            let remaining_levels = self.num_buffered_values - self.num_decoded_values;
216
217            let (records_read, levels_to_read) = match self.rep_level_decoder.as_mut() {
218                Some(reader) => {
219                    let out = rep_levels
220                        .as_mut()
221                        .ok_or_else(|| general_err!("must specify repetition levels"))?;
222
223                    let (mut records_read, levels_read) =
224                        reader.read_rep_levels(out, remaining_records, remaining_levels)?;
225
226                    if records_read == 0 && levels_read == 0 {
227                        // The fact that we're still looping implies there must be some levels to read.
228                        return Err(general_err!(
229                            "Insufficient repetition levels read from column"
230                        ));
231                    }
232                    if levels_read == remaining_levels && self.has_record_delimiter {
233                        // Reached end of page, which implies records_read < remaining_records
234                        // as otherwise would have stopped reading before reaching the end
235                        assert!(records_read < remaining_records); // Sanity check
236                        records_read += reader.flush_partial() as usize;
237                    }
238                    (records_read, levels_read)
239                }
240                None => {
241                    let min = remaining_records.min(remaining_levels);
242                    (min, min)
243                }
244            };
245
246            let values_to_read = match self.def_level_decoder.as_mut() {
247                Some(reader) => {
248                    let out = def_levels
249                        .as_mut()
250                        .ok_or_else(|| general_err!("must specify definition levels"))?;
251
252                    let (values_read, levels_read) = reader.read_def_levels(out, levels_to_read)?;
253
254                    if levels_read != levels_to_read {
255                        return Err(general_err!("insufficient definition levels read from column - expected {levels_to_read}, got {levels_read}"));
256                    }
257
258                    values_read
259                }
260                None => levels_to_read,
261            };
262
263            let values_read = self.values_decoder.read(values, values_to_read)?;
264
265            if values_read != values_to_read {
266                return Err(general_err!(
267                    "insufficient values read from column - expected: {values_to_read}, got: {values_read}",
268                ));
269            }
270
271            self.num_decoded_values += levels_to_read;
272            total_records_read += records_read;
273            total_levels_read += levels_to_read;
274            total_values_read += values_read;
275        }
276
277        Ok((total_records_read, total_values_read, total_levels_read))
278    }
279
280    /// Skips over `num_records` records, where records are delimited by repetition levels of 0
281    ///
282    /// # Returns
283    ///
284    /// Returns the number of records skipped
285    pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
286        let mut remaining_records = num_records;
287        while remaining_records != 0 {
288            if self.num_buffered_values == self.num_decoded_values {
289                let metadata = match self.page_reader.peek_next_page()? {
290                    None => return Ok(num_records - remaining_records),
291                    Some(metadata) => metadata,
292                };
293
294                // If dictionary, we must read it
295                if metadata.is_dict {
296                    self.read_dictionary_page()?;
297                    continue;
298                }
299
300                // If page has less rows than the remaining records to
301                // be skipped, skip entire page
302                let rows = metadata.num_rows.or_else(|| {
303                    // If no repetition levels, num_levels == num_rows
304                    self.rep_level_decoder
305                        .is_none()
306                        .then_some(metadata.num_levels)?
307                });
308
309                if let Some(rows) = rows {
310                    if rows <= remaining_records {
311                        self.page_reader.skip_next_page()?;
312                        remaining_records -= rows;
313                        continue;
314                    }
315                }
316                // because self.num_buffered_values == self.num_decoded_values means
317                // we need reads a new page and set up the decoders for levels
318                if !self.read_new_page()? {
319                    return Ok(num_records - remaining_records);
320                }
321            }
322
323            // start skip values in page level
324
325            // The number of levels in the current data page
326            let remaining_levels = self.num_buffered_values - self.num_decoded_values;
327
328            let (records_read, rep_levels_read) = match self.rep_level_decoder.as_mut() {
329                Some(decoder) => {
330                    let (mut records_read, levels_read) =
331                        decoder.skip_rep_levels(remaining_records, remaining_levels)?;
332
333                    if levels_read == remaining_levels && self.has_record_delimiter {
334                        // Reached end of page, which implies records_read < remaining_records
335                        // as otherwise would have stopped reading before reaching the end
336                        assert!(records_read < remaining_records); // Sanity check
337                        records_read += decoder.flush_partial() as usize;
338                    }
339
340                    (records_read, levels_read)
341                }
342                None => {
343                    // No repetition levels, so each level corresponds to a row
344                    let levels = remaining_levels.min(remaining_records);
345                    (levels, levels)
346                }
347            };
348
349            self.num_decoded_values += rep_levels_read;
350            remaining_records -= records_read;
351
352            if self.num_buffered_values == self.num_decoded_values {
353                // Exhausted buffered page - no need to advance other decoders
354                continue;
355            }
356
357            let (values_read, def_levels_read) = match self.def_level_decoder.as_mut() {
358                Some(decoder) => decoder.skip_def_levels(rep_levels_read)?,
359                None => (rep_levels_read, rep_levels_read),
360            };
361
362            if rep_levels_read != def_levels_read {
363                return Err(general_err!(
364                    "levels mismatch, read {} repetition levels and {} definition levels",
365                    rep_levels_read,
366                    def_levels_read
367                ));
368            }
369
370            let values = self.values_decoder.skip_values(values_read)?;
371            if values != values_read {
372                return Err(general_err!(
373                    "skipped {} values, expected {}",
374                    values,
375                    values_read
376                ));
377            }
378        }
379        Ok(num_records - remaining_records)
380    }
381
382    /// Read the next page as a dictionary page. If the next page is not a dictionary page,
383    /// this will return an error.
384    fn read_dictionary_page(&mut self) -> Result<()> {
385        match self.page_reader.get_next_page()? {
386            Some(Page::DictionaryPage {
387                buf,
388                num_values,
389                encoding,
390                is_sorted,
391            }) => self
392                .values_decoder
393                .set_dict(buf, num_values, encoding, is_sorted),
394            _ => Err(ParquetError::General(
395                "Invalid page. Expecting dictionary page".to_string(),
396            )),
397        }
398    }
399
400    /// Reads a new page and set up the decoders for levels, values or dictionary.
401    /// Returns false if there's no page left.
402    fn read_new_page(&mut self) -> Result<bool> {
403        loop {
404            match self.page_reader.get_next_page()? {
405                // No more page to read
406                None => return Ok(false),
407                Some(current_page) => {
408                    match current_page {
409                        // 1. Dictionary page: configure dictionary for this page.
410                        Page::DictionaryPage {
411                            buf,
412                            num_values,
413                            encoding,
414                            is_sorted,
415                        } => {
416                            self.values_decoder
417                                .set_dict(buf, num_values, encoding, is_sorted)?;
418                            continue;
419                        }
420                        // 2. Data page v1
421                        Page::DataPage {
422                            buf,
423                            num_values,
424                            encoding,
425                            def_level_encoding,
426                            rep_level_encoding,
427                            statistics: _,
428                        } => {
429                            self.num_buffered_values = num_values as _;
430                            self.num_decoded_values = 0;
431
432                            let max_rep_level = self.descr.max_rep_level();
433                            let max_def_level = self.descr.max_def_level();
434
435                            let mut offset = 0;
436
437                            if max_rep_level > 0 {
438                                let (bytes_read, level_data) = parse_v1_level(
439                                    max_rep_level,
440                                    num_values,
441                                    rep_level_encoding,
442                                    buf.slice(offset..),
443                                )?;
444                                offset += bytes_read;
445
446                                self.has_record_delimiter =
447                                    self.page_reader.at_record_boundary()?;
448
449                                self.rep_level_decoder
450                                    .as_mut()
451                                    .unwrap()
452                                    .set_data(rep_level_encoding, level_data);
453                            }
454
455                            if max_def_level > 0 {
456                                let (bytes_read, level_data) = parse_v1_level(
457                                    max_def_level,
458                                    num_values,
459                                    def_level_encoding,
460                                    buf.slice(offset..),
461                                )?;
462                                offset += bytes_read;
463
464                                self.def_level_decoder
465                                    .as_mut()
466                                    .unwrap()
467                                    .set_data(def_level_encoding, level_data);
468                            }
469
470                            self.values_decoder.set_data(
471                                encoding,
472                                buf.slice(offset..),
473                                num_values as usize,
474                                None,
475                            )?;
476                            return Ok(true);
477                        }
478                        // 3. Data page v2
479                        Page::DataPageV2 {
480                            buf,
481                            num_values,
482                            encoding,
483                            num_nulls,
484                            num_rows: _,
485                            def_levels_byte_len,
486                            rep_levels_byte_len,
487                            is_compressed: _,
488                            statistics: _,
489                        } => {
490                            if num_nulls > num_values {
491                                return Err(general_err!("more nulls than values in page, contained {} values and {} nulls", num_values, num_nulls));
492                            }
493
494                            self.num_buffered_values = num_values as _;
495                            self.num_decoded_values = 0;
496
497                            // DataPage v2 only supports RLE encoding for repetition
498                            // levels
499                            if self.descr.max_rep_level() > 0 {
500                                // Technically a DataPage v2 should not write a record
501                                // across multiple pages, however, the parquet writer
502                                // used to do this so we preserve backwards compatibility
503                                self.has_record_delimiter =
504                                    self.page_reader.at_record_boundary()?;
505
506                                self.rep_level_decoder.as_mut().unwrap().set_data(
507                                    Encoding::RLE,
508                                    buf.slice(..rep_levels_byte_len as usize),
509                                );
510                            }
511
512                            // DataPage v2 only supports RLE encoding for definition
513                            // levels
514                            if self.descr.max_def_level() > 0 {
515                                self.def_level_decoder.as_mut().unwrap().set_data(
516                                    Encoding::RLE,
517                                    buf.slice(
518                                        rep_levels_byte_len as usize
519                                            ..(rep_levels_byte_len + def_levels_byte_len) as usize,
520                                    ),
521                                );
522                            }
523
524                            self.values_decoder.set_data(
525                                encoding,
526                                buf.slice((rep_levels_byte_len + def_levels_byte_len) as usize..),
527                                num_values as usize,
528                                Some((num_values - num_nulls) as usize),
529                            )?;
530                            return Ok(true);
531                        }
532                    };
533                }
534            }
535        }
536    }
537
538    /// Check whether there is more data to read from this column,
539    /// If the current page is fully decoded, this will load the next page
540    /// (if it exists) into the buffer
541    #[inline]
542    pub(crate) fn has_next(&mut self) -> Result<bool> {
543        if self.num_buffered_values == 0 || self.num_buffered_values == self.num_decoded_values {
544            // TODO: should we return false if read_new_page() = true and
545            // num_buffered_values = 0?
546            if !self.read_new_page()? {
547                Ok(false)
548            } else {
549                Ok(self.num_buffered_values != 0)
550            }
551        } else {
552            Ok(true)
553        }
554    }
555}
556
557fn parse_v1_level(
558    max_level: i16,
559    num_buffered_values: u32,
560    encoding: Encoding,
561    buf: Bytes,
562) -> Result<(usize, Bytes)> {
563    match encoding {
564        Encoding::RLE => {
565            let i32_size = std::mem::size_of::<i32>();
566            let data_size = read_num_bytes::<i32>(i32_size, buf.as_ref()) as usize;
567            Ok((
568                i32_size + data_size,
569                buf.slice(i32_size..i32_size + data_size),
570            ))
571        }
572        #[allow(deprecated)]
573        Encoding::BIT_PACKED => {
574            let bit_width = num_required_bits(max_level as u64);
575            let num_bytes = ceil(num_buffered_values as usize * bit_width as usize, 8);
576            Ok((num_bytes, buf.slice(..num_bytes)))
577        }
578        _ => Err(general_err!("invalid level encoding: {}", encoding)),
579    }
580}
581
582#[cfg(test)]
583mod tests {
584    use super::*;
585
586    use rand::distributions::uniform::SampleUniform;
587    use std::{collections::VecDeque, sync::Arc};
588
589    use crate::basic::Type as PhysicalType;
590    use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
591    use crate::util::test_common::page_util::InMemoryPageReader;
592    use crate::util::test_common::rand_gen::make_pages;
593
594    const NUM_LEVELS: usize = 128;
595    const NUM_PAGES: usize = 2;
596    const MAX_DEF_LEVEL: i16 = 5;
597    const MAX_REP_LEVEL: i16 = 5;
598
599    // Macro to generate test cases
600    macro_rules! test {
601        // branch for generating i32 cases
602        ($test_func:ident, i32, $func:ident, $def_level:expr, $rep_level:expr,
603     $num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
604            test_internal!(
605                $test_func,
606                Int32Type,
607                get_test_int32_type,
608                $func,
609                $def_level,
610                $rep_level,
611                $num_pages,
612                $num_levels,
613                $batch_size,
614                $min,
615                $max
616            );
617        };
618        // branch for generating i64 cases
619        ($test_func:ident, i64, $func:ident, $def_level:expr, $rep_level:expr,
620     $num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
621            test_internal!(
622                $test_func,
623                Int64Type,
624                get_test_int64_type,
625                $func,
626                $def_level,
627                $rep_level,
628                $num_pages,
629                $num_levels,
630                $batch_size,
631                $min,
632                $max
633            );
634        };
635    }
636
637    macro_rules! test_internal {
638        ($test_func:ident, $ty:ident, $pty:ident, $func:ident, $def_level:expr,
639     $rep_level:expr, $num_pages:expr, $num_levels:expr, $batch_size:expr,
640     $min:expr, $max:expr) => {
641            #[test]
642            fn $test_func() {
643                let desc = Arc::new(ColumnDescriptor::new(
644                    Arc::new($pty()),
645                    $def_level,
646                    $rep_level,
647                    ColumnPath::new(Vec::new()),
648                ));
649                let mut tester = ColumnReaderTester::<$ty>::new();
650                tester.$func(desc, $num_pages, $num_levels, $batch_size, $min, $max);
651            }
652        };
653    }
654
655    test!(
656        test_read_plain_v1_int32,
657        i32,
658        plain_v1,
659        MAX_DEF_LEVEL,
660        MAX_REP_LEVEL,
661        NUM_PAGES,
662        NUM_LEVELS,
663        16,
664        i32::MIN,
665        i32::MAX
666    );
667    test!(
668        test_read_plain_v2_int32,
669        i32,
670        plain_v2,
671        MAX_DEF_LEVEL,
672        MAX_REP_LEVEL,
673        NUM_PAGES,
674        NUM_LEVELS,
675        16,
676        i32::MIN,
677        i32::MAX
678    );
679
680    test!(
681        test_read_plain_v1_int32_uneven,
682        i32,
683        plain_v1,
684        MAX_DEF_LEVEL,
685        MAX_REP_LEVEL,
686        NUM_PAGES,
687        NUM_LEVELS,
688        17,
689        i32::MIN,
690        i32::MAX
691    );
692    test!(
693        test_read_plain_v2_int32_uneven,
694        i32,
695        plain_v2,
696        MAX_DEF_LEVEL,
697        MAX_REP_LEVEL,
698        NUM_PAGES,
699        NUM_LEVELS,
700        17,
701        i32::MIN,
702        i32::MAX
703    );
704
705    test!(
706        test_read_plain_v1_int32_multi_page,
707        i32,
708        plain_v1,
709        MAX_DEF_LEVEL,
710        MAX_REP_LEVEL,
711        NUM_PAGES,
712        NUM_LEVELS,
713        512,
714        i32::MIN,
715        i32::MAX
716    );
717    test!(
718        test_read_plain_v2_int32_multi_page,
719        i32,
720        plain_v2,
721        MAX_DEF_LEVEL,
722        MAX_REP_LEVEL,
723        NUM_PAGES,
724        NUM_LEVELS,
725        512,
726        i32::MIN,
727        i32::MAX
728    );
729
730    // test cases when column descriptor has MAX_DEF_LEVEL = 0 and MAX_REP_LEVEL = 0
731    test!(
732        test_read_plain_v1_int32_required_non_repeated,
733        i32,
734        plain_v1,
735        0,
736        0,
737        NUM_PAGES,
738        NUM_LEVELS,
739        16,
740        i32::MIN,
741        i32::MAX
742    );
743    test!(
744        test_read_plain_v2_int32_required_non_repeated,
745        i32,
746        plain_v2,
747        0,
748        0,
749        NUM_PAGES,
750        NUM_LEVELS,
751        16,
752        i32::MIN,
753        i32::MAX
754    );
755
756    test!(
757        test_read_plain_v1_int64,
758        i64,
759        plain_v1,
760        1,
761        1,
762        NUM_PAGES,
763        NUM_LEVELS,
764        16,
765        i64::MIN,
766        i64::MAX
767    );
768    test!(
769        test_read_plain_v2_int64,
770        i64,
771        plain_v2,
772        1,
773        1,
774        NUM_PAGES,
775        NUM_LEVELS,
776        16,
777        i64::MIN,
778        i64::MAX
779    );
780
781    test!(
782        test_read_plain_v1_int64_uneven,
783        i64,
784        plain_v1,
785        1,
786        1,
787        NUM_PAGES,
788        NUM_LEVELS,
789        17,
790        i64::MIN,
791        i64::MAX
792    );
793    test!(
794        test_read_plain_v2_int64_uneven,
795        i64,
796        plain_v2,
797        1,
798        1,
799        NUM_PAGES,
800        NUM_LEVELS,
801        17,
802        i64::MIN,
803        i64::MAX
804    );
805
806    test!(
807        test_read_plain_v1_int64_multi_page,
808        i64,
809        plain_v1,
810        1,
811        1,
812        NUM_PAGES,
813        NUM_LEVELS,
814        512,
815        i64::MIN,
816        i64::MAX
817    );
818    test!(
819        test_read_plain_v2_int64_multi_page,
820        i64,
821        plain_v2,
822        1,
823        1,
824        NUM_PAGES,
825        NUM_LEVELS,
826        512,
827        i64::MIN,
828        i64::MAX
829    );
830
831    // test cases when column descriptor has MAX_DEF_LEVEL = 0 and MAX_REP_LEVEL = 0
832    test!(
833        test_read_plain_v1_int64_required_non_repeated,
834        i64,
835        plain_v1,
836        0,
837        0,
838        NUM_PAGES,
839        NUM_LEVELS,
840        16,
841        i64::MIN,
842        i64::MAX
843    );
844    test!(
845        test_read_plain_v2_int64_required_non_repeated,
846        i64,
847        plain_v2,
848        0,
849        0,
850        NUM_PAGES,
851        NUM_LEVELS,
852        16,
853        i64::MIN,
854        i64::MAX
855    );
856
857    test!(
858        test_read_dict_v1_int32_small,
859        i32,
860        dict_v1,
861        MAX_DEF_LEVEL,
862        MAX_REP_LEVEL,
863        2,
864        2,
865        16,
866        0,
867        3
868    );
869    test!(
870        test_read_dict_v2_int32_small,
871        i32,
872        dict_v2,
873        MAX_DEF_LEVEL,
874        MAX_REP_LEVEL,
875        2,
876        2,
877        16,
878        0,
879        3
880    );
881
882    test!(
883        test_read_dict_v1_int32,
884        i32,
885        dict_v1,
886        MAX_DEF_LEVEL,
887        MAX_REP_LEVEL,
888        NUM_PAGES,
889        NUM_LEVELS,
890        16,
891        0,
892        3
893    );
894    test!(
895        test_read_dict_v2_int32,
896        i32,
897        dict_v2,
898        MAX_DEF_LEVEL,
899        MAX_REP_LEVEL,
900        NUM_PAGES,
901        NUM_LEVELS,
902        16,
903        0,
904        3
905    );
906
907    test!(
908        test_read_dict_v1_int32_uneven,
909        i32,
910        dict_v1,
911        MAX_DEF_LEVEL,
912        MAX_REP_LEVEL,
913        NUM_PAGES,
914        NUM_LEVELS,
915        17,
916        0,
917        3
918    );
919    test!(
920        test_read_dict_v2_int32_uneven,
921        i32,
922        dict_v2,
923        MAX_DEF_LEVEL,
924        MAX_REP_LEVEL,
925        NUM_PAGES,
926        NUM_LEVELS,
927        17,
928        0,
929        3
930    );
931
932    test!(
933        test_read_dict_v1_int32_multi_page,
934        i32,
935        dict_v1,
936        MAX_DEF_LEVEL,
937        MAX_REP_LEVEL,
938        NUM_PAGES,
939        NUM_LEVELS,
940        512,
941        0,
942        3
943    );
944    test!(
945        test_read_dict_v2_int32_multi_page,
946        i32,
947        dict_v2,
948        MAX_DEF_LEVEL,
949        MAX_REP_LEVEL,
950        NUM_PAGES,
951        NUM_LEVELS,
952        512,
953        0,
954        3
955    );
956
957    test!(
958        test_read_dict_v1_int64,
959        i64,
960        dict_v1,
961        MAX_DEF_LEVEL,
962        MAX_REP_LEVEL,
963        NUM_PAGES,
964        NUM_LEVELS,
965        16,
966        0,
967        3
968    );
969    test!(
970        test_read_dict_v2_int64,
971        i64,
972        dict_v2,
973        MAX_DEF_LEVEL,
974        MAX_REP_LEVEL,
975        NUM_PAGES,
976        NUM_LEVELS,
977        16,
978        0,
979        3
980    );
981
982    #[test]
983    fn test_read_batch_values_only() {
984        test_read_batch_int32(16, 0, 0);
985    }
986
987    #[test]
988    fn test_read_batch_values_def_levels() {
989        test_read_batch_int32(16, MAX_DEF_LEVEL, 0);
990    }
991
992    #[test]
993    fn test_read_batch_values_rep_levels() {
994        test_read_batch_int32(16, 0, MAX_REP_LEVEL);
995    }
996
997    #[test]
998    fn test_read_batch_values_def_rep_levels() {
999        test_read_batch_int32(128, MAX_DEF_LEVEL, MAX_REP_LEVEL);
1000    }
1001
1002    #[test]
1003    fn test_read_batch_adjust_after_buffering_page() {
1004        // This test covers scenario when buffering new page results in setting number
1005        // of decoded values to 0, resulting on reading `batch_size` of values, but it is
1006        // larger than we can insert into slice (affects values and levels).
1007        //
1008        // Note: values are chosen to reproduce the issue.
1009        //
1010        let primitive_type = get_test_int32_type();
1011        let desc = Arc::new(ColumnDescriptor::new(
1012            Arc::new(primitive_type),
1013            1,
1014            1,
1015            ColumnPath::new(Vec::new()),
1016        ));
1017
1018        let num_pages = 2;
1019        let num_levels = 4;
1020        let batch_size = 5;
1021
1022        let mut tester = ColumnReaderTester::<Int32Type>::new();
1023        tester.test_read_batch(
1024            desc,
1025            Encoding::RLE_DICTIONARY,
1026            num_pages,
1027            num_levels,
1028            batch_size,
1029            i32::MIN,
1030            i32::MAX,
1031            false,
1032        );
1033    }
1034
1035    // ----------------------------------------------------------------------
1036    // Helper methods to make pages and test
1037    //
1038    // # Overview
1039    //
1040    // Most of the test functionality is implemented in `ColumnReaderTester`, which
1041    // provides some general data page test methods:
1042    // - `test_read_batch_general`
1043    // - `test_read_batch`
1044    //
1045    // There are also some high level wrappers that are part of `ColumnReaderTester`:
1046    // - `plain_v1` -> call `test_read_batch_general` with data page v1 and plain encoding
1047    // - `plain_v2` -> call `test_read_batch_general` with data page v2 and plain encoding
1048    // - `dict_v1` -> call `test_read_batch_general` with data page v1 + dictionary page
1049    // - `dict_v2` -> call `test_read_batch_general` with data page v2 + dictionary page
1050    //
1051    // And even higher level wrappers that simplify testing of almost the same test cases:
1052    // - `get_test_int32_type`, provides dummy schema type
1053    // - `get_test_int64_type`, provides dummy schema type
1054    // - `test_read_batch_int32`, wrapper for `read_batch` tests, since they are basically
1055    //   the same, just different def/rep levels and batch size.
1056    //
1057    // # Page assembly
1058    //
1059    // Page construction and generation of values, definition and repetition levels
1060    // happens in `make_pages` function.
1061    // All values are randomly generated based on provided min/max, levels are calculated
1062    // based on provided max level for column descriptor (which is basically either int32
1063    // or int64 type in tests) and `levels_per_page` variable.
1064    //
1065    // We use `DataPageBuilder` and its implementation `DataPageBuilderImpl` to actually
1066    // turn values, definition and repetition levels into data pages (either v1 or v2).
1067    //
1068    // Those data pages are then stored as part of `TestPageReader` (we just pass vector
1069    // of generated pages directly), which implements `PageReader` interface.
1070    //
1071    // # Comparison
1072    //
1073    // This allows us to pass test page reader into column reader, so we can test
1074    // functionality of column reader - see `test_read_batch`, where we create column
1075    // reader -> typed column reader, buffer values in `read_batch` method and compare
1076    // output with generated data.
1077
1078    // Returns dummy Parquet `Type` for primitive field, because most of our tests use
1079    // INT32 physical type.
1080    fn get_test_int32_type() -> SchemaType {
1081        SchemaType::primitive_type_builder("a", PhysicalType::INT32)
1082            .with_repetition(Repetition::REQUIRED)
1083            .with_converted_type(ConvertedType::INT_32)
1084            .with_length(-1)
1085            .build()
1086            .expect("build() should be OK")
1087    }
1088
1089    // Returns dummy Parquet `Type` for INT64 physical type.
1090    fn get_test_int64_type() -> SchemaType {
1091        SchemaType::primitive_type_builder("a", PhysicalType::INT64)
1092            .with_repetition(Repetition::REQUIRED)
1093            .with_converted_type(ConvertedType::INT_64)
1094            .with_length(-1)
1095            .build()
1096            .expect("build() should be OK")
1097    }
1098
1099    // Tests `read_batch()` functionality for INT32.
1100    //
1101    // This is a high level wrapper on `ColumnReaderTester` that allows us to specify some
1102    // boilerplate code for setting up definition/repetition levels and column descriptor.
1103    fn test_read_batch_int32(batch_size: usize, max_def_level: i16, max_rep_level: i16) {
1104        let primitive_type = get_test_int32_type();
1105
1106        let desc = Arc::new(ColumnDescriptor::new(
1107            Arc::new(primitive_type),
1108            max_def_level,
1109            max_rep_level,
1110            ColumnPath::new(Vec::new()),
1111        ));
1112
1113        let mut tester = ColumnReaderTester::<Int32Type>::new();
1114        tester.test_read_batch(
1115            desc,
1116            Encoding::RLE_DICTIONARY,
1117            NUM_PAGES,
1118            NUM_LEVELS,
1119            batch_size,
1120            i32::MIN,
1121            i32::MAX,
1122            false,
1123        );
1124    }
1125
1126    struct ColumnReaderTester<T: DataType>
1127    where
1128        T::T: PartialOrd + SampleUniform + Copy,
1129    {
1130        rep_levels: Vec<i16>,
1131        def_levels: Vec<i16>,
1132        values: Vec<T::T>,
1133    }
1134
1135    impl<T: DataType> ColumnReaderTester<T>
1136    where
1137        T::T: PartialOrd + SampleUniform + Copy,
1138    {
1139        pub fn new() -> Self {
1140            Self {
1141                rep_levels: Vec::new(),
1142                def_levels: Vec::new(),
1143                values: Vec::new(),
1144            }
1145        }
1146
1147        // Method to generate and test data pages v1
1148        fn plain_v1(
1149            &mut self,
1150            desc: ColumnDescPtr,
1151            num_pages: usize,
1152            num_levels: usize,
1153            batch_size: usize,
1154            min: T::T,
1155            max: T::T,
1156        ) {
1157            self.test_read_batch_general(
1158                desc,
1159                Encoding::PLAIN,
1160                num_pages,
1161                num_levels,
1162                batch_size,
1163                min,
1164                max,
1165                false,
1166            );
1167        }
1168
1169        // Method to generate and test data pages v2
1170        fn plain_v2(
1171            &mut self,
1172            desc: ColumnDescPtr,
1173            num_pages: usize,
1174            num_levels: usize,
1175            batch_size: usize,
1176            min: T::T,
1177            max: T::T,
1178        ) {
1179            self.test_read_batch_general(
1180                desc,
1181                Encoding::PLAIN,
1182                num_pages,
1183                num_levels,
1184                batch_size,
1185                min,
1186                max,
1187                true,
1188            );
1189        }
1190
1191        // Method to generate and test dictionary page + data pages v1
1192        fn dict_v1(
1193            &mut self,
1194            desc: ColumnDescPtr,
1195            num_pages: usize,
1196            num_levels: usize,
1197            batch_size: usize,
1198            min: T::T,
1199            max: T::T,
1200        ) {
1201            self.test_read_batch_general(
1202                desc,
1203                Encoding::RLE_DICTIONARY,
1204                num_pages,
1205                num_levels,
1206                batch_size,
1207                min,
1208                max,
1209                false,
1210            );
1211        }
1212
1213        // Method to generate and test dictionary page + data pages v2
1214        fn dict_v2(
1215            &mut self,
1216            desc: ColumnDescPtr,
1217            num_pages: usize,
1218            num_levels: usize,
1219            batch_size: usize,
1220            min: T::T,
1221            max: T::T,
1222        ) {
1223            self.test_read_batch_general(
1224                desc,
1225                Encoding::RLE_DICTIONARY,
1226                num_pages,
1227                num_levels,
1228                batch_size,
1229                min,
1230                max,
1231                true,
1232            );
1233        }
1234
1235        // Helper function for the general case of `read_batch()` where `values`,
1236        // `def_levels` and `rep_levels` are always provided with enough space.
1237        #[allow(clippy::too_many_arguments)]
1238        fn test_read_batch_general(
1239            &mut self,
1240            desc: ColumnDescPtr,
1241            encoding: Encoding,
1242            num_pages: usize,
1243            num_levels: usize,
1244            batch_size: usize,
1245            min: T::T,
1246            max: T::T,
1247            use_v2: bool,
1248        ) {
1249            self.test_read_batch(
1250                desc, encoding, num_pages, num_levels, batch_size, min, max, use_v2,
1251            );
1252        }
1253
1254        // Helper function to test `read_batch()` method with custom buffers for values,
1255        // definition and repetition levels.
1256        #[allow(clippy::too_many_arguments)]
1257        fn test_read_batch(
1258            &mut self,
1259            desc: ColumnDescPtr,
1260            encoding: Encoding,
1261            num_pages: usize,
1262            num_levels: usize,
1263            batch_size: usize,
1264            min: T::T,
1265            max: T::T,
1266            use_v2: bool,
1267        ) {
1268            let mut pages = VecDeque::new();
1269            make_pages::<T>(
1270                desc.clone(),
1271                encoding,
1272                num_pages,
1273                num_levels,
1274                min,
1275                max,
1276                &mut self.def_levels,
1277                &mut self.rep_levels,
1278                &mut self.values,
1279                &mut pages,
1280                use_v2,
1281            );
1282            let max_def_level = desc.max_def_level();
1283            let max_rep_level = desc.max_rep_level();
1284            let page_reader = InMemoryPageReader::new(pages);
1285            let column_reader: ColumnReader = get_column_reader(desc, Box::new(page_reader));
1286            let mut typed_column_reader = get_typed_column_reader::<T>(column_reader);
1287
1288            let mut values = Vec::new();
1289            let mut def_levels = Vec::new();
1290            let mut rep_levels = Vec::new();
1291
1292            let mut curr_values_read = 0;
1293            let mut curr_levels_read = 0;
1294            loop {
1295                let (_, values_read, levels_read) = typed_column_reader
1296                    .read_records(
1297                        batch_size,
1298                        Some(&mut def_levels),
1299                        Some(&mut rep_levels),
1300                        &mut values,
1301                    )
1302                    .expect("read_batch() should be OK");
1303
1304                curr_values_read += values_read;
1305                curr_levels_read += levels_read;
1306
1307                if values_read == 0 && levels_read == 0 {
1308                    break;
1309                }
1310            }
1311
1312            assert_eq!(values, self.values, "values content doesn't match");
1313
1314            if max_def_level > 0 {
1315                assert_eq!(
1316                    def_levels, self.def_levels,
1317                    "definition levels content doesn't match"
1318                );
1319            }
1320
1321            if max_rep_level > 0 {
1322                assert_eq!(
1323                    rep_levels, self.rep_levels,
1324                    "repetition levels content doesn't match"
1325                );
1326            }
1327
1328            assert!(
1329                curr_levels_read >= curr_values_read,
1330                "expected levels read to be greater than values read"
1331            );
1332        }
1333    }
1334}