parquet/column/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains column reader API.
19
20use bytes::Bytes;
21
22use super::page::{Page, PageReader};
23use crate::basic::*;
24use crate::column::reader::decoder::{
25    ColumnValueDecoder, ColumnValueDecoderImpl, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
26    RepetitionLevelDecoder, RepetitionLevelDecoderImpl,
27};
28use crate::data_type::*;
29use crate::errors::{ParquetError, Result};
30use crate::schema::types::ColumnDescPtr;
31use crate::util::bit_util::{ceil, num_required_bits, read_num_bytes};
32
33pub(crate) mod decoder;
34
35/// Column reader for a Parquet type.
36pub enum ColumnReader {
37    /// Column reader for boolean type
38    BoolColumnReader(ColumnReaderImpl<BoolType>),
39    /// Column reader for int32 type
40    Int32ColumnReader(ColumnReaderImpl<Int32Type>),
41    /// Column reader for int64 type
42    Int64ColumnReader(ColumnReaderImpl<Int64Type>),
43    /// Column reader for int96 type
44    Int96ColumnReader(ColumnReaderImpl<Int96Type>),
45    /// Column reader for float type
46    FloatColumnReader(ColumnReaderImpl<FloatType>),
47    /// Column reader for double type
48    DoubleColumnReader(ColumnReaderImpl<DoubleType>),
49    /// Column reader for byte array type
50    ByteArrayColumnReader(ColumnReaderImpl<ByteArrayType>),
51    /// Column reader for fixed length byte array type
52    FixedLenByteArrayColumnReader(ColumnReaderImpl<FixedLenByteArrayType>),
53}
54
55/// Gets a specific column reader corresponding to column descriptor `col_descr`. The
56/// column reader will read from pages in `col_page_reader`.
57pub fn get_column_reader(
58    col_descr: ColumnDescPtr,
59    col_page_reader: Box<dyn PageReader>,
60) -> ColumnReader {
61    match col_descr.physical_type() {
62        Type::BOOLEAN => {
63            ColumnReader::BoolColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
64        }
65        Type::INT32 => {
66            ColumnReader::Int32ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
67        }
68        Type::INT64 => {
69            ColumnReader::Int64ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
70        }
71        Type::INT96 => {
72            ColumnReader::Int96ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
73        }
74        Type::FLOAT => {
75            ColumnReader::FloatColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
76        }
77        Type::DOUBLE => {
78            ColumnReader::DoubleColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
79        }
80        Type::BYTE_ARRAY => {
81            ColumnReader::ByteArrayColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
82        }
83        Type::FIXED_LEN_BYTE_ARRAY => ColumnReader::FixedLenByteArrayColumnReader(
84            ColumnReaderImpl::new(col_descr, col_page_reader),
85        ),
86    }
87}
88
89/// Gets a typed column reader for the specific type `T`, by "up-casting" `col_reader` of
90/// non-generic type to a generic column reader type `ColumnReaderImpl`.
91///
92/// Panics if actual enum value for `col_reader` does not match the type `T`.
93pub fn get_typed_column_reader<T: DataType>(col_reader: ColumnReader) -> ColumnReaderImpl<T> {
94    T::get_column_reader(col_reader).unwrap_or_else(|| {
95        panic!(
96            "Failed to convert column reader into a typed column reader for `{}` type",
97            T::get_physical_type()
98        )
99    })
100}
101
102/// Typed value reader for a particular primitive column.
103pub type ColumnReaderImpl<T> = GenericColumnReader<
104    RepetitionLevelDecoderImpl,
105    DefinitionLevelDecoderImpl,
106    ColumnValueDecoderImpl<T>,
107>;
108
109/// Reads data for a given column chunk, using the provided decoders:
110///
111/// - R: `ColumnLevelDecoder` used to decode repetition levels
112/// - D: `ColumnLevelDecoder` used to decode definition levels
113/// - V: `ColumnValueDecoder` used to decode value data
114pub struct GenericColumnReader<R, D, V> {
115    descr: ColumnDescPtr,
116
117    page_reader: Box<dyn PageReader>,
118
119    /// The total number of values stored in the data page.
120    num_buffered_values: usize,
121
122    /// The number of values from the current data page that has been decoded into memory
123    /// so far.
124    num_decoded_values: usize,
125
126    /// True if the end of the current data page denotes the end of a record
127    has_record_delimiter: bool,
128
129    /// The decoder for the definition levels if any
130    def_level_decoder: Option<D>,
131
132    /// The decoder for the repetition levels if any
133    rep_level_decoder: Option<R>,
134
135    /// The decoder for the values
136    values_decoder: V,
137}
138
139impl<V> GenericColumnReader<RepetitionLevelDecoderImpl, DefinitionLevelDecoderImpl, V>
140where
141    V: ColumnValueDecoder,
142{
143    /// Creates new column reader based on column descriptor and page reader.
144    pub fn new(descr: ColumnDescPtr, page_reader: Box<dyn PageReader>) -> Self {
145        let values_decoder = V::new(&descr);
146
147        let def_level_decoder = (descr.max_def_level() != 0)
148            .then(|| DefinitionLevelDecoderImpl::new(descr.max_def_level()));
149
150        let rep_level_decoder = (descr.max_rep_level() != 0)
151            .then(|| RepetitionLevelDecoderImpl::new(descr.max_rep_level()));
152
153        Self::new_with_decoders(
154            descr,
155            page_reader,
156            values_decoder,
157            def_level_decoder,
158            rep_level_decoder,
159        )
160    }
161}
162
163impl<R, D, V> GenericColumnReader<R, D, V>
164where
165    R: RepetitionLevelDecoder,
166    D: DefinitionLevelDecoder,
167    V: ColumnValueDecoder,
168{
169    pub(crate) fn new_with_decoders(
170        descr: ColumnDescPtr,
171        page_reader: Box<dyn PageReader>,
172        values_decoder: V,
173        def_level_decoder: Option<D>,
174        rep_level_decoder: Option<R>,
175    ) -> Self {
176        Self {
177            descr,
178            def_level_decoder,
179            rep_level_decoder,
180            page_reader,
181            num_buffered_values: 0,
182            num_decoded_values: 0,
183            values_decoder,
184            has_record_delimiter: false,
185        }
186    }
187
188    /// Read up to `max_records` whole records, returning the number of complete
189    /// records, non-null values and levels decoded. All levels for a given record
190    /// will be read, i.e. the next repetition level, if any, will be 0
191    ///
192    /// If the max definition level is 0, `def_levels` will be ignored and the number of records,
193    /// non-null values and levels decoded will all be equal, otherwise `def_levels` will be
194    /// populated with the number of levels read, with an error returned if it is `None`.
195    ///
196    /// If the max repetition level is 0, `rep_levels` will be ignored and the number of records
197    /// and levels decoded will both be equal, otherwise `rep_levels` will be populated with
198    /// the number of levels read, with an error returned if it is `None`.
199    ///
200    /// `values` will be contiguously populated with the non-null values. Note that if the column
201    /// is not required, this may be less than either `max_records` or the number of levels read
202    pub fn read_records(
203        &mut self,
204        max_records: usize,
205        mut def_levels: Option<&mut D::Buffer>,
206        mut rep_levels: Option<&mut R::Buffer>,
207        values: &mut V::Buffer,
208    ) -> Result<(usize, usize, usize)> {
209        let mut total_records_read = 0;
210        let mut total_levels_read = 0;
211        let mut total_values_read = 0;
212
213        while total_records_read < max_records && self.has_next()? {
214            let remaining_records = max_records - total_records_read;
215            let remaining_levels = self.num_buffered_values - self.num_decoded_values;
216
217            let (records_read, levels_to_read) = match self.rep_level_decoder.as_mut() {
218                Some(reader) => {
219                    let out = rep_levels
220                        .as_mut()
221                        .ok_or_else(|| general_err!("must specify repetition levels"))?;
222
223                    let (mut records_read, levels_read) =
224                        reader.read_rep_levels(out, remaining_records, remaining_levels)?;
225
226                    if records_read == 0 && levels_read == 0 {
227                        // The fact that we're still looping implies there must be some levels to read.
228                        return Err(general_err!(
229                            "Insufficient repetition levels read from column"
230                        ));
231                    }
232                    if levels_read == remaining_levels && self.has_record_delimiter {
233                        // Reached end of page, which implies records_read < remaining_records
234                        // as otherwise would have stopped reading before reaching the end
235                        assert!(records_read < remaining_records); // Sanity check
236                        records_read += reader.flush_partial() as usize;
237                    }
238                    (records_read, levels_read)
239                }
240                None => {
241                    let min = remaining_records.min(remaining_levels);
242                    (min, min)
243                }
244            };
245
246            let values_to_read = match self.def_level_decoder.as_mut() {
247                Some(reader) => {
248                    let out = def_levels
249                        .as_mut()
250                        .ok_or_else(|| general_err!("must specify definition levels"))?;
251
252                    let (values_read, levels_read) = reader.read_def_levels(out, levels_to_read)?;
253
254                    if levels_read != levels_to_read {
255                        return Err(general_err!(
256                            "insufficient definition levels read from column - expected {levels_to_read}, got {levels_read}"
257                        ));
258                    }
259
260                    values_read
261                }
262                None => levels_to_read,
263            };
264
265            let values_read = self.values_decoder.read(values, values_to_read)?;
266
267            if values_read != values_to_read {
268                return Err(general_err!(
269                    "insufficient values read from column - expected: {values_to_read}, got: {values_read}",
270                ));
271            }
272
273            self.num_decoded_values += levels_to_read;
274            total_records_read += records_read;
275            total_levels_read += levels_to_read;
276            total_values_read += values_read;
277        }
278
279        Ok((total_records_read, total_values_read, total_levels_read))
280    }
281
282    /// Skips over `num_records` records, where records are delimited by repetition levels of 0
283    ///
284    /// # Returns
285    ///
286    /// Returns the number of records skipped
287    pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
288        let mut remaining_records = num_records;
289        while remaining_records != 0 {
290            if self.num_buffered_values == self.num_decoded_values {
291                let metadata = match self.page_reader.peek_next_page()? {
292                    None => return Ok(num_records - remaining_records),
293                    Some(metadata) => metadata,
294                };
295
296                // If dictionary, we must read it
297                if metadata.is_dict {
298                    self.read_dictionary_page()?;
299                    continue;
300                }
301
302                // If page has less rows than the remaining records to
303                // be skipped, skip entire page
304                let rows = metadata.num_rows.or_else(|| {
305                    // If no repetition levels, num_levels == num_rows
306                    self.rep_level_decoder
307                        .is_none()
308                        .then_some(metadata.num_levels)?
309                });
310
311                if let Some(rows) = rows {
312                    if rows <= remaining_records {
313                        self.page_reader.skip_next_page()?;
314                        remaining_records -= rows;
315                        continue;
316                    }
317                }
318                // because self.num_buffered_values == self.num_decoded_values means
319                // we need reads a new page and set up the decoders for levels
320                if !self.read_new_page()? {
321                    return Ok(num_records - remaining_records);
322                }
323            }
324
325            // start skip values in page level
326
327            // The number of levels in the current data page
328            let remaining_levels = self.num_buffered_values - self.num_decoded_values;
329
330            let (records_read, rep_levels_read) = match self.rep_level_decoder.as_mut() {
331                Some(decoder) => {
332                    let (mut records_read, levels_read) =
333                        decoder.skip_rep_levels(remaining_records, remaining_levels)?;
334
335                    if levels_read == remaining_levels && self.has_record_delimiter {
336                        // Reached end of page, which implies records_read < remaining_records
337                        // as otherwise would have stopped reading before reaching the end
338                        assert!(records_read < remaining_records); // Sanity check
339                        records_read += decoder.flush_partial() as usize;
340                    }
341
342                    (records_read, levels_read)
343                }
344                None => {
345                    // No repetition levels, so each level corresponds to a row
346                    let levels = remaining_levels.min(remaining_records);
347                    (levels, levels)
348                }
349            };
350
351            self.num_decoded_values += rep_levels_read;
352            remaining_records -= records_read;
353
354            if self.num_buffered_values == self.num_decoded_values {
355                // Exhausted buffered page - no need to advance other decoders
356                continue;
357            }
358
359            let (values_read, def_levels_read) = match self.def_level_decoder.as_mut() {
360                Some(decoder) => decoder.skip_def_levels(rep_levels_read)?,
361                None => (rep_levels_read, rep_levels_read),
362            };
363
364            if rep_levels_read != def_levels_read {
365                return Err(general_err!(
366                    "levels mismatch, read {} repetition levels and {} definition levels",
367                    rep_levels_read,
368                    def_levels_read
369                ));
370            }
371
372            let values = self.values_decoder.skip_values(values_read)?;
373            if values != values_read {
374                return Err(general_err!(
375                    "skipped {} values, expected {}",
376                    values,
377                    values_read
378                ));
379            }
380        }
381        Ok(num_records - remaining_records)
382    }
383
384    /// Read the next page as a dictionary page. If the next page is not a dictionary page,
385    /// this will return an error.
386    fn read_dictionary_page(&mut self) -> Result<()> {
387        match self.page_reader.get_next_page()? {
388            Some(Page::DictionaryPage {
389                buf,
390                num_values,
391                encoding,
392                is_sorted,
393            }) => self
394                .values_decoder
395                .set_dict(buf, num_values, encoding, is_sorted),
396            _ => Err(ParquetError::General(
397                "Invalid page. Expecting dictionary page".to_string(),
398            )),
399        }
400    }
401
402    /// Reads a new page and set up the decoders for levels, values or dictionary.
403    /// Returns false if there's no page left.
404    fn read_new_page(&mut self) -> Result<bool> {
405        loop {
406            match self.page_reader.get_next_page()? {
407                // No more page to read
408                None => return Ok(false),
409                Some(current_page) => {
410                    match current_page {
411                        // 1. Dictionary page: configure dictionary for this page.
412                        Page::DictionaryPage {
413                            buf,
414                            num_values,
415                            encoding,
416                            is_sorted,
417                        } => {
418                            self.values_decoder
419                                .set_dict(buf, num_values, encoding, is_sorted)?;
420                            continue;
421                        }
422                        // 2. Data page v1
423                        Page::DataPage {
424                            buf,
425                            num_values,
426                            encoding,
427                            def_level_encoding,
428                            rep_level_encoding,
429                            statistics: _,
430                        } => {
431                            self.num_buffered_values = num_values as _;
432                            self.num_decoded_values = 0;
433
434                            let max_rep_level = self.descr.max_rep_level();
435                            let max_def_level = self.descr.max_def_level();
436
437                            let mut offset = 0;
438
439                            if max_rep_level > 0 {
440                                let (bytes_read, level_data) = parse_v1_level(
441                                    max_rep_level,
442                                    num_values,
443                                    rep_level_encoding,
444                                    buf.slice(offset..),
445                                )?;
446                                offset += bytes_read;
447
448                                self.has_record_delimiter =
449                                    self.page_reader.at_record_boundary()?;
450
451                                self.rep_level_decoder
452                                    .as_mut()
453                                    .unwrap()
454                                    .set_data(rep_level_encoding, level_data);
455                            }
456
457                            if max_def_level > 0 {
458                                let (bytes_read, level_data) = parse_v1_level(
459                                    max_def_level,
460                                    num_values,
461                                    def_level_encoding,
462                                    buf.slice(offset..),
463                                )?;
464                                offset += bytes_read;
465
466                                self.def_level_decoder
467                                    .as_mut()
468                                    .unwrap()
469                                    .set_data(def_level_encoding, level_data);
470                            }
471
472                            self.values_decoder.set_data(
473                                encoding,
474                                buf.slice(offset..),
475                                num_values as usize,
476                                None,
477                            )?;
478                            return Ok(true);
479                        }
480                        // 3. Data page v2
481                        Page::DataPageV2 {
482                            buf,
483                            num_values,
484                            encoding,
485                            num_nulls,
486                            num_rows: _,
487                            def_levels_byte_len,
488                            rep_levels_byte_len,
489                            is_compressed: _,
490                            statistics: _,
491                        } => {
492                            if num_nulls > num_values {
493                                return Err(general_err!(
494                                    "more nulls than values in page, contained {} values and {} nulls",
495                                    num_values,
496                                    num_nulls
497                                ));
498                            }
499
500                            self.num_buffered_values = num_values as _;
501                            self.num_decoded_values = 0;
502
503                            // DataPage v2 only supports RLE encoding for repetition
504                            // levels
505                            if self.descr.max_rep_level() > 0 {
506                                // Technically a DataPage v2 should not write a record
507                                // across multiple pages, however, the parquet writer
508                                // used to do this so we preserve backwards compatibility
509                                self.has_record_delimiter =
510                                    self.page_reader.at_record_boundary()?;
511
512                                self.rep_level_decoder.as_mut().unwrap().set_data(
513                                    Encoding::RLE,
514                                    buf.slice(..rep_levels_byte_len as usize),
515                                );
516                            }
517
518                            // DataPage v2 only supports RLE encoding for definition
519                            // levels
520                            if self.descr.max_def_level() > 0 {
521                                self.def_level_decoder.as_mut().unwrap().set_data(
522                                    Encoding::RLE,
523                                    buf.slice(
524                                        rep_levels_byte_len as usize
525                                            ..(rep_levels_byte_len + def_levels_byte_len) as usize,
526                                    ),
527                                );
528                            }
529
530                            self.values_decoder.set_data(
531                                encoding,
532                                buf.slice((rep_levels_byte_len + def_levels_byte_len) as usize..),
533                                num_values as usize,
534                                Some((num_values - num_nulls) as usize),
535                            )?;
536                            return Ok(true);
537                        }
538                    };
539                }
540            }
541        }
542    }
543
544    /// Check whether there is more data to read from this column,
545    /// If the current page is fully decoded, this will load the next page
546    /// (if it exists) into the buffer
547    #[inline]
548    pub(crate) fn has_next(&mut self) -> Result<bool> {
549        if self.num_buffered_values == 0 || self.num_buffered_values == self.num_decoded_values {
550            // TODO: should we return false if read_new_page() = true and
551            // num_buffered_values = 0?
552            if !self.read_new_page()? {
553                Ok(false)
554            } else {
555                Ok(self.num_buffered_values != 0)
556            }
557        } else {
558            Ok(true)
559        }
560    }
561}
562
563fn parse_v1_level(
564    max_level: i16,
565    num_buffered_values: u32,
566    encoding: Encoding,
567    buf: Bytes,
568) -> Result<(usize, Bytes)> {
569    match encoding {
570        Encoding::RLE => {
571            let i32_size = std::mem::size_of::<i32>();
572            let data_size = read_num_bytes::<i32>(i32_size, buf.as_ref()) as usize;
573            Ok((
574                i32_size + data_size,
575                buf.slice(i32_size..i32_size + data_size),
576            ))
577        }
578        #[allow(deprecated)]
579        Encoding::BIT_PACKED => {
580            let bit_width = num_required_bits(max_level as u64);
581            let num_bytes = ceil(num_buffered_values as usize * bit_width as usize, 8);
582            Ok((num_bytes, buf.slice(..num_bytes)))
583        }
584        _ => Err(general_err!("invalid level encoding: {}", encoding)),
585    }
586}
587
588#[cfg(test)]
589mod tests {
590    use super::*;
591
592    use rand::distr::uniform::SampleUniform;
593    use std::{collections::VecDeque, sync::Arc};
594
595    use crate::basic::Type as PhysicalType;
596    use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
597    use crate::util::test_common::page_util::InMemoryPageReader;
598    use crate::util::test_common::rand_gen::make_pages;
599
600    const NUM_LEVELS: usize = 128;
601    const NUM_PAGES: usize = 2;
602    const MAX_DEF_LEVEL: i16 = 5;
603    const MAX_REP_LEVEL: i16 = 5;
604
605    // Macro to generate test cases
606    macro_rules! test {
607        // branch for generating i32 cases
608        ($test_func:ident, i32, $func:ident, $def_level:expr, $rep_level:expr,
609     $num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
610            test_internal!(
611                $test_func,
612                Int32Type,
613                get_test_int32_type,
614                $func,
615                $def_level,
616                $rep_level,
617                $num_pages,
618                $num_levels,
619                $batch_size,
620                $min,
621                $max
622            );
623        };
624        // branch for generating i64 cases
625        ($test_func:ident, i64, $func:ident, $def_level:expr, $rep_level:expr,
626     $num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
627            test_internal!(
628                $test_func,
629                Int64Type,
630                get_test_int64_type,
631                $func,
632                $def_level,
633                $rep_level,
634                $num_pages,
635                $num_levels,
636                $batch_size,
637                $min,
638                $max
639            );
640        };
641    }
642
643    macro_rules! test_internal {
644        ($test_func:ident, $ty:ident, $pty:ident, $func:ident, $def_level:expr,
645     $rep_level:expr, $num_pages:expr, $num_levels:expr, $batch_size:expr,
646     $min:expr, $max:expr) => {
647            #[test]
648            fn $test_func() {
649                let desc = Arc::new(ColumnDescriptor::new(
650                    Arc::new($pty()),
651                    $def_level,
652                    $rep_level,
653                    ColumnPath::new(Vec::new()),
654                ));
655                let mut tester = ColumnReaderTester::<$ty>::new();
656                tester.$func(desc, $num_pages, $num_levels, $batch_size, $min, $max);
657            }
658        };
659    }
660
661    test!(
662        test_read_plain_v1_int32,
663        i32,
664        plain_v1,
665        MAX_DEF_LEVEL,
666        MAX_REP_LEVEL,
667        NUM_PAGES,
668        NUM_LEVELS,
669        16,
670        i32::MIN,
671        i32::MAX
672    );
673    test!(
674        test_read_plain_v2_int32,
675        i32,
676        plain_v2,
677        MAX_DEF_LEVEL,
678        MAX_REP_LEVEL,
679        NUM_PAGES,
680        NUM_LEVELS,
681        16,
682        i32::MIN,
683        i32::MAX
684    );
685
686    test!(
687        test_read_plain_v1_int32_uneven,
688        i32,
689        plain_v1,
690        MAX_DEF_LEVEL,
691        MAX_REP_LEVEL,
692        NUM_PAGES,
693        NUM_LEVELS,
694        17,
695        i32::MIN,
696        i32::MAX
697    );
698    test!(
699        test_read_plain_v2_int32_uneven,
700        i32,
701        plain_v2,
702        MAX_DEF_LEVEL,
703        MAX_REP_LEVEL,
704        NUM_PAGES,
705        NUM_LEVELS,
706        17,
707        i32::MIN,
708        i32::MAX
709    );
710
711    test!(
712        test_read_plain_v1_int32_multi_page,
713        i32,
714        plain_v1,
715        MAX_DEF_LEVEL,
716        MAX_REP_LEVEL,
717        NUM_PAGES,
718        NUM_LEVELS,
719        512,
720        i32::MIN,
721        i32::MAX
722    );
723    test!(
724        test_read_plain_v2_int32_multi_page,
725        i32,
726        plain_v2,
727        MAX_DEF_LEVEL,
728        MAX_REP_LEVEL,
729        NUM_PAGES,
730        NUM_LEVELS,
731        512,
732        i32::MIN,
733        i32::MAX
734    );
735
736    // test cases when column descriptor has MAX_DEF_LEVEL = 0 and MAX_REP_LEVEL = 0
737    test!(
738        test_read_plain_v1_int32_required_non_repeated,
739        i32,
740        plain_v1,
741        0,
742        0,
743        NUM_PAGES,
744        NUM_LEVELS,
745        16,
746        i32::MIN,
747        i32::MAX
748    );
749    test!(
750        test_read_plain_v2_int32_required_non_repeated,
751        i32,
752        plain_v2,
753        0,
754        0,
755        NUM_PAGES,
756        NUM_LEVELS,
757        16,
758        i32::MIN,
759        i32::MAX
760    );
761
762    test!(
763        test_read_plain_v1_int64,
764        i64,
765        plain_v1,
766        1,
767        1,
768        NUM_PAGES,
769        NUM_LEVELS,
770        16,
771        i64::MIN,
772        i64::MAX
773    );
774    test!(
775        test_read_plain_v2_int64,
776        i64,
777        plain_v2,
778        1,
779        1,
780        NUM_PAGES,
781        NUM_LEVELS,
782        16,
783        i64::MIN,
784        i64::MAX
785    );
786
787    test!(
788        test_read_plain_v1_int64_uneven,
789        i64,
790        plain_v1,
791        1,
792        1,
793        NUM_PAGES,
794        NUM_LEVELS,
795        17,
796        i64::MIN,
797        i64::MAX
798    );
799    test!(
800        test_read_plain_v2_int64_uneven,
801        i64,
802        plain_v2,
803        1,
804        1,
805        NUM_PAGES,
806        NUM_LEVELS,
807        17,
808        i64::MIN,
809        i64::MAX
810    );
811
812    test!(
813        test_read_plain_v1_int64_multi_page,
814        i64,
815        plain_v1,
816        1,
817        1,
818        NUM_PAGES,
819        NUM_LEVELS,
820        512,
821        i64::MIN,
822        i64::MAX
823    );
824    test!(
825        test_read_plain_v2_int64_multi_page,
826        i64,
827        plain_v2,
828        1,
829        1,
830        NUM_PAGES,
831        NUM_LEVELS,
832        512,
833        i64::MIN,
834        i64::MAX
835    );
836
837    // test cases when column descriptor has MAX_DEF_LEVEL = 0 and MAX_REP_LEVEL = 0
838    test!(
839        test_read_plain_v1_int64_required_non_repeated,
840        i64,
841        plain_v1,
842        0,
843        0,
844        NUM_PAGES,
845        NUM_LEVELS,
846        16,
847        i64::MIN,
848        i64::MAX
849    );
850    test!(
851        test_read_plain_v2_int64_required_non_repeated,
852        i64,
853        plain_v2,
854        0,
855        0,
856        NUM_PAGES,
857        NUM_LEVELS,
858        16,
859        i64::MIN,
860        i64::MAX
861    );
862
863    test!(
864        test_read_dict_v1_int32_small,
865        i32,
866        dict_v1,
867        MAX_DEF_LEVEL,
868        MAX_REP_LEVEL,
869        2,
870        2,
871        16,
872        0,
873        3
874    );
875    test!(
876        test_read_dict_v2_int32_small,
877        i32,
878        dict_v2,
879        MAX_DEF_LEVEL,
880        MAX_REP_LEVEL,
881        2,
882        2,
883        16,
884        0,
885        3
886    );
887
888    test!(
889        test_read_dict_v1_int32,
890        i32,
891        dict_v1,
892        MAX_DEF_LEVEL,
893        MAX_REP_LEVEL,
894        NUM_PAGES,
895        NUM_LEVELS,
896        16,
897        0,
898        3
899    );
900    test!(
901        test_read_dict_v2_int32,
902        i32,
903        dict_v2,
904        MAX_DEF_LEVEL,
905        MAX_REP_LEVEL,
906        NUM_PAGES,
907        NUM_LEVELS,
908        16,
909        0,
910        3
911    );
912
913    test!(
914        test_read_dict_v1_int32_uneven,
915        i32,
916        dict_v1,
917        MAX_DEF_LEVEL,
918        MAX_REP_LEVEL,
919        NUM_PAGES,
920        NUM_LEVELS,
921        17,
922        0,
923        3
924    );
925    test!(
926        test_read_dict_v2_int32_uneven,
927        i32,
928        dict_v2,
929        MAX_DEF_LEVEL,
930        MAX_REP_LEVEL,
931        NUM_PAGES,
932        NUM_LEVELS,
933        17,
934        0,
935        3
936    );
937
938    test!(
939        test_read_dict_v1_int32_multi_page,
940        i32,
941        dict_v1,
942        MAX_DEF_LEVEL,
943        MAX_REP_LEVEL,
944        NUM_PAGES,
945        NUM_LEVELS,
946        512,
947        0,
948        3
949    );
950    test!(
951        test_read_dict_v2_int32_multi_page,
952        i32,
953        dict_v2,
954        MAX_DEF_LEVEL,
955        MAX_REP_LEVEL,
956        NUM_PAGES,
957        NUM_LEVELS,
958        512,
959        0,
960        3
961    );
962
963    test!(
964        test_read_dict_v1_int64,
965        i64,
966        dict_v1,
967        MAX_DEF_LEVEL,
968        MAX_REP_LEVEL,
969        NUM_PAGES,
970        NUM_LEVELS,
971        16,
972        0,
973        3
974    );
975    test!(
976        test_read_dict_v2_int64,
977        i64,
978        dict_v2,
979        MAX_DEF_LEVEL,
980        MAX_REP_LEVEL,
981        NUM_PAGES,
982        NUM_LEVELS,
983        16,
984        0,
985        3
986    );
987
988    #[test]
989    fn test_read_batch_values_only() {
990        test_read_batch_int32(16, 0, 0);
991    }
992
993    #[test]
994    fn test_read_batch_values_def_levels() {
995        test_read_batch_int32(16, MAX_DEF_LEVEL, 0);
996    }
997
998    #[test]
999    fn test_read_batch_values_rep_levels() {
1000        test_read_batch_int32(16, 0, MAX_REP_LEVEL);
1001    }
1002
1003    #[test]
1004    fn test_read_batch_values_def_rep_levels() {
1005        test_read_batch_int32(128, MAX_DEF_LEVEL, MAX_REP_LEVEL);
1006    }
1007
1008    #[test]
1009    fn test_read_batch_adjust_after_buffering_page() {
1010        // This test covers scenario when buffering new page results in setting number
1011        // of decoded values to 0, resulting on reading `batch_size` of values, but it is
1012        // larger than we can insert into slice (affects values and levels).
1013        //
1014        // Note: values are chosen to reproduce the issue.
1015        //
1016        let primitive_type = get_test_int32_type();
1017        let desc = Arc::new(ColumnDescriptor::new(
1018            Arc::new(primitive_type),
1019            1,
1020            1,
1021            ColumnPath::new(Vec::new()),
1022        ));
1023
1024        let num_pages = 2;
1025        let num_levels = 4;
1026        let batch_size = 5;
1027
1028        let mut tester = ColumnReaderTester::<Int32Type>::new();
1029        tester.test_read_batch(
1030            desc,
1031            Encoding::RLE_DICTIONARY,
1032            num_pages,
1033            num_levels,
1034            batch_size,
1035            i32::MIN,
1036            i32::MAX,
1037            false,
1038        );
1039    }
1040
1041    // ----------------------------------------------------------------------
1042    // Helper methods to make pages and test
1043    //
1044    // # Overview
1045    //
1046    // Most of the test functionality is implemented in `ColumnReaderTester`, which
1047    // provides some general data page test methods:
1048    // - `test_read_batch_general`
1049    // - `test_read_batch`
1050    //
1051    // There are also some high level wrappers that are part of `ColumnReaderTester`:
1052    // - `plain_v1` -> call `test_read_batch_general` with data page v1 and plain encoding
1053    // - `plain_v2` -> call `test_read_batch_general` with data page v2 and plain encoding
1054    // - `dict_v1` -> call `test_read_batch_general` with data page v1 + dictionary page
1055    // - `dict_v2` -> call `test_read_batch_general` with data page v2 + dictionary page
1056    //
1057    // And even higher level wrappers that simplify testing of almost the same test cases:
1058    // - `get_test_int32_type`, provides dummy schema type
1059    // - `get_test_int64_type`, provides dummy schema type
1060    // - `test_read_batch_int32`, wrapper for `read_batch` tests, since they are basically
1061    //   the same, just different def/rep levels and batch size.
1062    //
1063    // # Page assembly
1064    //
1065    // Page construction and generation of values, definition and repetition levels
1066    // happens in `make_pages` function.
1067    // All values are randomly generated based on provided min/max, levels are calculated
1068    // based on provided max level for column descriptor (which is basically either int32
1069    // or int64 type in tests) and `levels_per_page` variable.
1070    //
1071    // We use `DataPageBuilder` and its implementation `DataPageBuilderImpl` to actually
1072    // turn values, definition and repetition levels into data pages (either v1 or v2).
1073    //
1074    // Those data pages are then stored as part of `TestPageReader` (we just pass vector
1075    // of generated pages directly), which implements `PageReader` interface.
1076    //
1077    // # Comparison
1078    //
1079    // This allows us to pass test page reader into column reader, so we can test
1080    // functionality of column reader - see `test_read_batch`, where we create column
1081    // reader -> typed column reader, buffer values in `read_batch` method and compare
1082    // output with generated data.
1083
1084    // Returns dummy Parquet `Type` for primitive field, because most of our tests use
1085    // INT32 physical type.
1086    fn get_test_int32_type() -> SchemaType {
1087        SchemaType::primitive_type_builder("a", PhysicalType::INT32)
1088            .with_repetition(Repetition::REQUIRED)
1089            .with_converted_type(ConvertedType::INT_32)
1090            .with_length(-1)
1091            .build()
1092            .expect("build() should be OK")
1093    }
1094
1095    // Returns dummy Parquet `Type` for INT64 physical type.
1096    fn get_test_int64_type() -> SchemaType {
1097        SchemaType::primitive_type_builder("a", PhysicalType::INT64)
1098            .with_repetition(Repetition::REQUIRED)
1099            .with_converted_type(ConvertedType::INT_64)
1100            .with_length(-1)
1101            .build()
1102            .expect("build() should be OK")
1103    }
1104
1105    // Tests `read_batch()` functionality for INT32.
1106    //
1107    // This is a high level wrapper on `ColumnReaderTester` that allows us to specify some
1108    // boilerplate code for setting up definition/repetition levels and column descriptor.
1109    fn test_read_batch_int32(batch_size: usize, max_def_level: i16, max_rep_level: i16) {
1110        let primitive_type = get_test_int32_type();
1111
1112        let desc = Arc::new(ColumnDescriptor::new(
1113            Arc::new(primitive_type),
1114            max_def_level,
1115            max_rep_level,
1116            ColumnPath::new(Vec::new()),
1117        ));
1118
1119        let mut tester = ColumnReaderTester::<Int32Type>::new();
1120        tester.test_read_batch(
1121            desc,
1122            Encoding::RLE_DICTIONARY,
1123            NUM_PAGES,
1124            NUM_LEVELS,
1125            batch_size,
1126            i32::MIN,
1127            i32::MAX,
1128            false,
1129        );
1130    }
1131
1132    struct ColumnReaderTester<T: DataType>
1133    where
1134        T::T: PartialOrd + SampleUniform + Copy,
1135    {
1136        rep_levels: Vec<i16>,
1137        def_levels: Vec<i16>,
1138        values: Vec<T::T>,
1139    }
1140
1141    impl<T: DataType> ColumnReaderTester<T>
1142    where
1143        T::T: PartialOrd + SampleUniform + Copy,
1144    {
1145        pub fn new() -> Self {
1146            Self {
1147                rep_levels: Vec::new(),
1148                def_levels: Vec::new(),
1149                values: Vec::new(),
1150            }
1151        }
1152
1153        // Method to generate and test data pages v1
1154        fn plain_v1(
1155            &mut self,
1156            desc: ColumnDescPtr,
1157            num_pages: usize,
1158            num_levels: usize,
1159            batch_size: usize,
1160            min: T::T,
1161            max: T::T,
1162        ) {
1163            self.test_read_batch_general(
1164                desc,
1165                Encoding::PLAIN,
1166                num_pages,
1167                num_levels,
1168                batch_size,
1169                min,
1170                max,
1171                false,
1172            );
1173        }
1174
1175        // Method to generate and test data pages v2
1176        fn plain_v2(
1177            &mut self,
1178            desc: ColumnDescPtr,
1179            num_pages: usize,
1180            num_levels: usize,
1181            batch_size: usize,
1182            min: T::T,
1183            max: T::T,
1184        ) {
1185            self.test_read_batch_general(
1186                desc,
1187                Encoding::PLAIN,
1188                num_pages,
1189                num_levels,
1190                batch_size,
1191                min,
1192                max,
1193                true,
1194            );
1195        }
1196
1197        // Method to generate and test dictionary page + data pages v1
1198        fn dict_v1(
1199            &mut self,
1200            desc: ColumnDescPtr,
1201            num_pages: usize,
1202            num_levels: usize,
1203            batch_size: usize,
1204            min: T::T,
1205            max: T::T,
1206        ) {
1207            self.test_read_batch_general(
1208                desc,
1209                Encoding::RLE_DICTIONARY,
1210                num_pages,
1211                num_levels,
1212                batch_size,
1213                min,
1214                max,
1215                false,
1216            );
1217        }
1218
1219        // Method to generate and test dictionary page + data pages v2
1220        fn dict_v2(
1221            &mut self,
1222            desc: ColumnDescPtr,
1223            num_pages: usize,
1224            num_levels: usize,
1225            batch_size: usize,
1226            min: T::T,
1227            max: T::T,
1228        ) {
1229            self.test_read_batch_general(
1230                desc,
1231                Encoding::RLE_DICTIONARY,
1232                num_pages,
1233                num_levels,
1234                batch_size,
1235                min,
1236                max,
1237                true,
1238            );
1239        }
1240
1241        // Helper function for the general case of `read_batch()` where `values`,
1242        // `def_levels` and `rep_levels` are always provided with enough space.
1243        #[allow(clippy::too_many_arguments)]
1244        fn test_read_batch_general(
1245            &mut self,
1246            desc: ColumnDescPtr,
1247            encoding: Encoding,
1248            num_pages: usize,
1249            num_levels: usize,
1250            batch_size: usize,
1251            min: T::T,
1252            max: T::T,
1253            use_v2: bool,
1254        ) {
1255            self.test_read_batch(
1256                desc, encoding, num_pages, num_levels, batch_size, min, max, use_v2,
1257            );
1258        }
1259
1260        // Helper function to test `read_batch()` method with custom buffers for values,
1261        // definition and repetition levels.
1262        #[allow(clippy::too_many_arguments)]
1263        fn test_read_batch(
1264            &mut self,
1265            desc: ColumnDescPtr,
1266            encoding: Encoding,
1267            num_pages: usize,
1268            num_levels: usize,
1269            batch_size: usize,
1270            min: T::T,
1271            max: T::T,
1272            use_v2: bool,
1273        ) {
1274            let mut pages = VecDeque::new();
1275            make_pages::<T>(
1276                desc.clone(),
1277                encoding,
1278                num_pages,
1279                num_levels,
1280                min,
1281                max,
1282                &mut self.def_levels,
1283                &mut self.rep_levels,
1284                &mut self.values,
1285                &mut pages,
1286                use_v2,
1287            );
1288            let max_def_level = desc.max_def_level();
1289            let max_rep_level = desc.max_rep_level();
1290            let page_reader = InMemoryPageReader::new(pages);
1291            let column_reader: ColumnReader = get_column_reader(desc, Box::new(page_reader));
1292            let mut typed_column_reader = get_typed_column_reader::<T>(column_reader);
1293
1294            let mut values = Vec::new();
1295            let mut def_levels = Vec::new();
1296            let mut rep_levels = Vec::new();
1297
1298            let mut curr_values_read = 0;
1299            let mut curr_levels_read = 0;
1300            loop {
1301                let (_, values_read, levels_read) = typed_column_reader
1302                    .read_records(
1303                        batch_size,
1304                        Some(&mut def_levels),
1305                        Some(&mut rep_levels),
1306                        &mut values,
1307                    )
1308                    .expect("read_batch() should be OK");
1309
1310                curr_values_read += values_read;
1311                curr_levels_read += levels_read;
1312
1313                if values_read == 0 && levels_read == 0 {
1314                    break;
1315                }
1316            }
1317
1318            assert_eq!(values, self.values, "values content doesn't match");
1319
1320            if max_def_level > 0 {
1321                assert_eq!(
1322                    def_levels, self.def_levels,
1323                    "definition levels content doesn't match"
1324                );
1325            }
1326
1327            if max_rep_level > 0 {
1328                assert_eq!(
1329                    rep_levels, self.rep_levels,
1330                    "repetition levels content doesn't match"
1331                );
1332            }
1333
1334            assert!(
1335                curr_levels_read >= curr_values_read,
1336                "expected levels read to be greater than values read"
1337            );
1338        }
1339    }
1340}