parquet/column/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains column reader API.
19
20use bytes::Bytes;
21
22use super::page::{Page, PageReader};
23use crate::basic::*;
24use crate::column::reader::decoder::{
25    ColumnValueDecoder, ColumnValueDecoderImpl, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
26    RepetitionLevelDecoder, RepetitionLevelDecoderImpl,
27};
28use crate::data_type::*;
29use crate::errors::{ParquetError, Result};
30use crate::schema::types::ColumnDescPtr;
31use crate::util::bit_util::{ceil, num_required_bits, read_num_bytes};
32
33pub(crate) mod decoder;
34
35/// Column reader for a Parquet type.
36pub enum ColumnReader {
37    /// Column reader for boolean type
38    BoolColumnReader(ColumnReaderImpl<BoolType>),
39    /// Column reader for int32 type
40    Int32ColumnReader(ColumnReaderImpl<Int32Type>),
41    /// Column reader for int64 type
42    Int64ColumnReader(ColumnReaderImpl<Int64Type>),
43    /// Column reader for int96 type
44    Int96ColumnReader(ColumnReaderImpl<Int96Type>),
45    /// Column reader for float type
46    FloatColumnReader(ColumnReaderImpl<FloatType>),
47    /// Column reader for double type
48    DoubleColumnReader(ColumnReaderImpl<DoubleType>),
49    /// Column reader for byte array type
50    ByteArrayColumnReader(ColumnReaderImpl<ByteArrayType>),
51    /// Column reader for fixed length byte array type
52    FixedLenByteArrayColumnReader(ColumnReaderImpl<FixedLenByteArrayType>),
53}
54
55/// Gets a specific column reader corresponding to column descriptor `col_descr`. The
56/// column reader will read from pages in `col_page_reader`.
57pub fn get_column_reader(
58    col_descr: ColumnDescPtr,
59    col_page_reader: Box<dyn PageReader>,
60) -> ColumnReader {
61    match col_descr.physical_type() {
62        Type::BOOLEAN => {
63            ColumnReader::BoolColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
64        }
65        Type::INT32 => {
66            ColumnReader::Int32ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
67        }
68        Type::INT64 => {
69            ColumnReader::Int64ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
70        }
71        Type::INT96 => {
72            ColumnReader::Int96ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
73        }
74        Type::FLOAT => {
75            ColumnReader::FloatColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
76        }
77        Type::DOUBLE => {
78            ColumnReader::DoubleColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
79        }
80        Type::BYTE_ARRAY => {
81            ColumnReader::ByteArrayColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
82        }
83        Type::FIXED_LEN_BYTE_ARRAY => ColumnReader::FixedLenByteArrayColumnReader(
84            ColumnReaderImpl::new(col_descr, col_page_reader),
85        ),
86    }
87}
88
89/// Gets a typed column reader for the specific type `T`, by "up-casting" `col_reader` of
90/// non-generic type to a generic column reader type `ColumnReaderImpl`.
91///
92/// Panics if actual enum value for `col_reader` does not match the type `T`.
93pub fn get_typed_column_reader<T: DataType>(col_reader: ColumnReader) -> ColumnReaderImpl<T> {
94    T::get_column_reader(col_reader).unwrap_or_else(|| {
95        panic!(
96            "Failed to convert column reader into a typed column reader for `{}` type",
97            T::get_physical_type()
98        )
99    })
100}
101
102/// Typed value reader for a particular primitive column.
103pub type ColumnReaderImpl<T> = GenericColumnReader<
104    RepetitionLevelDecoderImpl,
105    DefinitionLevelDecoderImpl,
106    ColumnValueDecoderImpl<T>,
107>;
108
109/// Reads data for a given column chunk, using the provided decoders:
110///
111/// - R: `ColumnLevelDecoder` used to decode repetition levels
112/// - D: `ColumnLevelDecoder` used to decode definition levels
113/// - V: `ColumnValueDecoder` used to decode value data
114pub struct GenericColumnReader<R, D, V> {
115    descr: ColumnDescPtr,
116
117    page_reader: Box<dyn PageReader>,
118
119    /// The total number of values stored in the data page.
120    num_buffered_values: usize,
121
122    /// The number of values from the current data page that has been decoded into memory
123    /// so far.
124    num_decoded_values: usize,
125
126    /// True if the end of the current data page denotes the end of a record
127    has_record_delimiter: bool,
128
129    /// The decoder for the definition levels if any
130    def_level_decoder: Option<D>,
131
132    /// The decoder for the repetition levels if any
133    rep_level_decoder: Option<R>,
134
135    /// The decoder for the values
136    values_decoder: V,
137}
138
139impl<V> GenericColumnReader<RepetitionLevelDecoderImpl, DefinitionLevelDecoderImpl, V>
140where
141    V: ColumnValueDecoder,
142{
143    /// Creates new column reader based on column descriptor and page reader.
144    pub fn new(descr: ColumnDescPtr, page_reader: Box<dyn PageReader>) -> Self {
145        let values_decoder = V::new(&descr);
146
147        let def_level_decoder = (descr.max_def_level() != 0)
148            .then(|| DefinitionLevelDecoderImpl::new(descr.max_def_level()));
149
150        let rep_level_decoder = (descr.max_rep_level() != 0)
151            .then(|| RepetitionLevelDecoderImpl::new(descr.max_rep_level()));
152
153        Self::new_with_decoders(
154            descr,
155            page_reader,
156            values_decoder,
157            def_level_decoder,
158            rep_level_decoder,
159        )
160    }
161}
162
163impl<R, D, V> GenericColumnReader<R, D, V>
164where
165    R: RepetitionLevelDecoder,
166    D: DefinitionLevelDecoder,
167    V: ColumnValueDecoder,
168{
169    pub(crate) fn new_with_decoders(
170        descr: ColumnDescPtr,
171        page_reader: Box<dyn PageReader>,
172        values_decoder: V,
173        def_level_decoder: Option<D>,
174        rep_level_decoder: Option<R>,
175    ) -> Self {
176        Self {
177            descr,
178            def_level_decoder,
179            rep_level_decoder,
180            page_reader,
181            num_buffered_values: 0,
182            num_decoded_values: 0,
183            values_decoder,
184            has_record_delimiter: false,
185        }
186    }
187
188    /// Read up to `max_records` whole records, returning the number of complete
189    /// records, non-null values and levels decoded. All levels for a given record
190    /// will be read, i.e. the next repetition level, if any, will be 0
191    ///
192    /// If the max definition level is 0, `def_levels` will be ignored and the number of records,
193    /// non-null values and levels decoded will all be equal, otherwise `def_levels` will be
194    /// populated with the number of levels read, with an error returned if it is `None`.
195    ///
196    /// If the max repetition level is 0, `rep_levels` will be ignored and the number of records
197    /// and levels decoded will both be equal, otherwise `rep_levels` will be populated with
198    /// the number of levels read, with an error returned if it is `None`.
199    ///
200    /// `values` will be contiguously populated with the non-null values. Note that if the column
201    /// is not required, this may be less than either `max_records` or the number of levels read
202    pub fn read_records(
203        &mut self,
204        max_records: usize,
205        mut def_levels: Option<&mut D::Buffer>,
206        mut rep_levels: Option<&mut R::Buffer>,
207        values: &mut V::Buffer,
208    ) -> Result<(usize, usize, usize)> {
209        let mut total_records_read = 0;
210        let mut total_levels_read = 0;
211        let mut total_values_read = 0;
212
213        while total_records_read < max_records && self.has_next()? {
214            let remaining_records = max_records - total_records_read;
215            let remaining_levels = self.num_buffered_values - self.num_decoded_values;
216
217            let (records_read, levels_to_read) = match self.rep_level_decoder.as_mut() {
218                Some(reader) => {
219                    let out = rep_levels
220                        .as_mut()
221                        .ok_or_else(|| general_err!("must specify repetition levels"))?;
222
223                    let (mut records_read, levels_read) =
224                        reader.read_rep_levels(out, remaining_records, remaining_levels)?;
225
226                    if records_read == 0 && levels_read == 0 {
227                        // The fact that we're still looping implies there must be some levels to read.
228                        return Err(general_err!(
229                            "Insufficient repetition levels read from column"
230                        ));
231                    }
232                    if levels_read == remaining_levels && self.has_record_delimiter {
233                        // Reached end of page, which implies records_read < remaining_records
234                        // as otherwise would have stopped reading before reaching the end
235                        assert!(records_read < remaining_records); // Sanity check
236                        records_read += reader.flush_partial() as usize;
237                    }
238                    (records_read, levels_read)
239                }
240                None => {
241                    let min = remaining_records.min(remaining_levels);
242                    (min, min)
243                }
244            };
245
246            let values_to_read = match self.def_level_decoder.as_mut() {
247                Some(reader) => {
248                    let out = def_levels
249                        .as_mut()
250                        .ok_or_else(|| general_err!("must specify definition levels"))?;
251
252                    let (values_read, levels_read) = reader.read_def_levels(out, levels_to_read)?;
253
254                    if levels_read != levels_to_read {
255                        return Err(general_err!(
256                            "insufficient definition levels read from column - expected {levels_to_read}, got {levels_read}"
257                        ));
258                    }
259
260                    values_read
261                }
262                None => levels_to_read,
263            };
264
265            let values_read = self.values_decoder.read(values, values_to_read)?;
266
267            if values_read != values_to_read {
268                return Err(general_err!(
269                    "insufficient values read from column - expected: {values_to_read}, got: {values_read}",
270                ));
271            }
272
273            self.num_decoded_values += levels_to_read;
274            total_records_read += records_read;
275            total_levels_read += levels_to_read;
276            total_values_read += values_read;
277        }
278
279        Ok((total_records_read, total_values_read, total_levels_read))
280    }
281
282    /// Skips over `num_records` records, where records are delimited by repetition levels of 0
283    ///
284    /// # Returns
285    ///
286    /// Returns the number of records skipped
287    pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
288        let mut remaining_records = num_records;
289        while remaining_records != 0 {
290            if self.num_buffered_values == self.num_decoded_values {
291                let metadata = match self.page_reader.peek_next_page()? {
292                    None => return Ok(num_records - remaining_records),
293                    Some(metadata) => metadata,
294                };
295
296                // If dictionary, we must read it
297                if metadata.is_dict {
298                    self.read_dictionary_page()?;
299                    continue;
300                }
301
302                // If page has less rows than the remaining records to
303                // be skipped, skip entire page
304                let rows = metadata.num_rows.or_else(|| {
305                    // If no repetition levels, num_levels == num_rows
306                    self.rep_level_decoder
307                        .is_none()
308                        .then_some(metadata.num_levels)?
309                });
310
311                if let Some(rows) = rows {
312                    if rows <= remaining_records {
313                        self.page_reader.skip_next_page()?;
314                        remaining_records -= rows;
315                        continue;
316                    }
317                }
318                // because self.num_buffered_values == self.num_decoded_values means
319                // we need reads a new page and set up the decoders for levels
320                if !self.read_new_page()? {
321                    return Ok(num_records - remaining_records);
322                }
323            }
324
325            // start skip values in page level
326
327            // The number of levels in the current data page
328            let remaining_levels = self.num_buffered_values - self.num_decoded_values;
329
330            let (records_read, rep_levels_read) = match self.rep_level_decoder.as_mut() {
331                Some(decoder) => {
332                    let (mut records_read, levels_read) =
333                        decoder.skip_rep_levels(remaining_records, remaining_levels)?;
334
335                    if levels_read == remaining_levels && self.has_record_delimiter {
336                        // Reached end of page, which implies records_read < remaining_records
337                        // as otherwise would have stopped reading before reaching the end
338                        assert!(records_read < remaining_records); // Sanity check
339                        records_read += decoder.flush_partial() as usize;
340                    }
341
342                    (records_read, levels_read)
343                }
344                None => {
345                    // No repetition levels, so each level corresponds to a row
346                    let levels = remaining_levels.min(remaining_records);
347                    (levels, levels)
348                }
349            };
350
351            self.num_decoded_values += rep_levels_read;
352            remaining_records -= records_read;
353
354            if self.num_buffered_values == self.num_decoded_values {
355                // Exhausted buffered page - no need to advance other decoders
356                continue;
357            }
358
359            let (values_read, def_levels_read) = match self.def_level_decoder.as_mut() {
360                Some(decoder) => decoder.skip_def_levels(rep_levels_read)?,
361                None => (rep_levels_read, rep_levels_read),
362            };
363
364            if rep_levels_read != def_levels_read {
365                return Err(general_err!(
366                    "levels mismatch, read {} repetition levels and {} definition levels",
367                    rep_levels_read,
368                    def_levels_read
369                ));
370            }
371
372            let values = self.values_decoder.skip_values(values_read)?;
373            if values != values_read {
374                return Err(general_err!(
375                    "skipped {} values, expected {}",
376                    values,
377                    values_read
378                ));
379            }
380        }
381        Ok(num_records - remaining_records)
382    }
383
384    /// Read the next page as a dictionary page. If the next page is not a dictionary page,
385    /// this will return an error.
386    fn read_dictionary_page(&mut self) -> Result<()> {
387        match self.page_reader.get_next_page()? {
388            Some(Page::DictionaryPage {
389                buf,
390                num_values,
391                encoding,
392                is_sorted,
393            }) => self
394                .values_decoder
395                .set_dict(buf, num_values, encoding, is_sorted),
396            _ => Err(ParquetError::General(
397                "Invalid page. Expecting dictionary page".to_string(),
398            )),
399        }
400    }
401
402    /// Reads a new page and set up the decoders for levels, values or dictionary.
403    /// Returns false if there's no page left.
404    fn read_new_page(&mut self) -> Result<bool> {
405        loop {
406            match self.page_reader.get_next_page()? {
407                // No more page to read
408                None => return Ok(false),
409                Some(current_page) => {
410                    match current_page {
411                        // 1. Dictionary page: configure dictionary for this page.
412                        Page::DictionaryPage {
413                            buf,
414                            num_values,
415                            encoding,
416                            is_sorted,
417                        } => {
418                            self.values_decoder
419                                .set_dict(buf, num_values, encoding, is_sorted)?;
420                            continue;
421                        }
422                        // 2. Data page v1
423                        Page::DataPage {
424                            buf,
425                            num_values,
426                            encoding,
427                            def_level_encoding,
428                            rep_level_encoding,
429                            statistics: _,
430                        } => {
431                            self.num_buffered_values = num_values as _;
432                            self.num_decoded_values = 0;
433
434                            let max_rep_level = self.descr.max_rep_level();
435                            let max_def_level = self.descr.max_def_level();
436
437                            let mut offset = 0;
438
439                            if max_rep_level > 0 {
440                                let (bytes_read, level_data) = parse_v1_level(
441                                    max_rep_level,
442                                    num_values,
443                                    rep_level_encoding,
444                                    buf.slice(offset..),
445                                )?;
446                                offset += bytes_read;
447
448                                self.has_record_delimiter =
449                                    self.page_reader.at_record_boundary()?;
450
451                                self.rep_level_decoder
452                                    .as_mut()
453                                    .unwrap()
454                                    .set_data(rep_level_encoding, level_data)?;
455                            }
456
457                            if max_def_level > 0 {
458                                let (bytes_read, level_data) = parse_v1_level(
459                                    max_def_level,
460                                    num_values,
461                                    def_level_encoding,
462                                    buf.slice(offset..),
463                                )?;
464                                offset += bytes_read;
465
466                                self.def_level_decoder
467                                    .as_mut()
468                                    .unwrap()
469                                    .set_data(def_level_encoding, level_data)?;
470                            }
471
472                            self.values_decoder.set_data(
473                                encoding,
474                                buf.slice(offset..),
475                                num_values as usize,
476                                None,
477                            )?;
478                            return Ok(true);
479                        }
480                        // 3. Data page v2
481                        Page::DataPageV2 {
482                            buf,
483                            num_values,
484                            encoding,
485                            num_nulls,
486                            num_rows: _,
487                            def_levels_byte_len,
488                            rep_levels_byte_len,
489                            is_compressed: _,
490                            statistics: _,
491                        } => {
492                            if num_nulls > num_values {
493                                return Err(general_err!(
494                                    "more nulls than values in page, contained {} values and {} nulls",
495                                    num_values,
496                                    num_nulls
497                                ));
498                            }
499
500                            self.num_buffered_values = num_values as _;
501                            self.num_decoded_values = 0;
502
503                            // DataPage v2 only supports RLE encoding for repetition
504                            // levels
505                            if self.descr.max_rep_level() > 0 {
506                                // Technically a DataPage v2 should not write a record
507                                // across multiple pages, however, the parquet writer
508                                // used to do this so we preserve backwards compatibility
509                                self.has_record_delimiter =
510                                    self.page_reader.at_record_boundary()?;
511
512                                self.rep_level_decoder.as_mut().unwrap().set_data(
513                                    Encoding::RLE,
514                                    buf.slice(..rep_levels_byte_len as usize),
515                                )?;
516                            }
517
518                            // DataPage v2 only supports RLE encoding for definition
519                            // levels
520                            if self.descr.max_def_level() > 0 {
521                                self.def_level_decoder.as_mut().unwrap().set_data(
522                                    Encoding::RLE,
523                                    buf.slice(
524                                        rep_levels_byte_len as usize
525                                            ..(rep_levels_byte_len + def_levels_byte_len) as usize,
526                                    ),
527                                )?;
528                            }
529
530                            self.values_decoder.set_data(
531                                encoding,
532                                buf.slice((rep_levels_byte_len + def_levels_byte_len) as usize..),
533                                num_values as usize,
534                                Some((num_values - num_nulls) as usize),
535                            )?;
536                            return Ok(true);
537                        }
538                    };
539                }
540            }
541        }
542    }
543
544    /// Check whether there is more data to read from this column,
545    /// If the current page is fully decoded, this will load the next page
546    /// (if it exists) into the buffer
547    #[inline]
548    pub(crate) fn has_next(&mut self) -> Result<bool> {
549        if self.num_buffered_values == 0 || self.num_buffered_values == self.num_decoded_values {
550            // TODO: should we return false if read_new_page() = true and
551            // num_buffered_values = 0?
552            if !self.read_new_page()? {
553                Ok(false)
554            } else {
555                Ok(self.num_buffered_values != 0)
556            }
557        } else {
558            Ok(true)
559        }
560    }
561}
562
563fn parse_v1_level(
564    max_level: i16,
565    num_buffered_values: u32,
566    encoding: Encoding,
567    buf: Bytes,
568) -> Result<(usize, Bytes)> {
569    match encoding {
570        Encoding::RLE => {
571            let i32_size = std::mem::size_of::<i32>();
572            if i32_size <= buf.len() {
573                let data_size = read_num_bytes::<i32>(i32_size, buf.as_ref()) as usize;
574                let end = i32_size
575                    .checked_add(data_size)
576                    .ok_or(general_err!("invalid level length"))?;
577                if end <= buf.len() {
578                    return Ok((end, buf.slice(i32_size..end)));
579                }
580            }
581            Err(general_err!("not enough data to read levels"))
582        }
583        #[allow(deprecated)]
584        Encoding::BIT_PACKED => {
585            let bit_width = num_required_bits(max_level as u64);
586            let num_bytes = ceil(num_buffered_values as usize * bit_width as usize, 8);
587            Ok((num_bytes, buf.slice(..num_bytes)))
588        }
589        _ => Err(general_err!("invalid level encoding: {}", encoding)),
590    }
591}
592
593#[cfg(test)]
594mod tests {
595    use super::*;
596
597    use rand::distr::uniform::SampleUniform;
598    use std::{collections::VecDeque, sync::Arc};
599
600    use crate::basic::Type as PhysicalType;
601    use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
602    use crate::util::test_common::page_util::InMemoryPageReader;
603    use crate::util::test_common::rand_gen::make_pages;
604
605    #[test]
606    fn test_parse_v1_level_invalid_length() {
607        // Say length is 10, but buffer is only 4
608        let buf = Bytes::from(vec![10, 0, 0, 0]);
609        let err = parse_v1_level(1, 100, Encoding::RLE, buf).unwrap_err();
610        assert_eq!(
611            err.to_string(),
612            "Parquet error: not enough data to read levels"
613        );
614
615        // Say length is 4, but buffer is only 3
616        let buf = Bytes::from(vec![4, 0, 0]);
617        let err = parse_v1_level(1, 100, Encoding::RLE, buf).unwrap_err();
618        assert_eq!(
619            err.to_string(),
620            "Parquet error: not enough data to read levels"
621        );
622    }
623
624    const NUM_LEVELS: usize = 128;
625    const NUM_PAGES: usize = 2;
626    const MAX_DEF_LEVEL: i16 = 5;
627    const MAX_REP_LEVEL: i16 = 5;
628
629    // Macro to generate test cases
630    macro_rules! test {
631        // branch for generating i32 cases
632        ($test_func:ident, i32, $func:ident, $def_level:expr, $rep_level:expr,
633     $num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
634            test_internal!(
635                $test_func,
636                Int32Type,
637                get_test_int32_type,
638                $func,
639                $def_level,
640                $rep_level,
641                $num_pages,
642                $num_levels,
643                $batch_size,
644                $min,
645                $max
646            );
647        };
648        // branch for generating i64 cases
649        ($test_func:ident, i64, $func:ident, $def_level:expr, $rep_level:expr,
650     $num_pages:expr, $num_levels:expr, $batch_size:expr, $min:expr, $max:expr) => {
651            test_internal!(
652                $test_func,
653                Int64Type,
654                get_test_int64_type,
655                $func,
656                $def_level,
657                $rep_level,
658                $num_pages,
659                $num_levels,
660                $batch_size,
661                $min,
662                $max
663            );
664        };
665    }
666
667    macro_rules! test_internal {
668        ($test_func:ident, $ty:ident, $pty:ident, $func:ident, $def_level:expr,
669     $rep_level:expr, $num_pages:expr, $num_levels:expr, $batch_size:expr,
670     $min:expr, $max:expr) => {
671            #[test]
672            fn $test_func() {
673                let desc = Arc::new(ColumnDescriptor::new(
674                    Arc::new($pty()),
675                    $def_level,
676                    $rep_level,
677                    ColumnPath::new(Vec::new()),
678                ));
679                let mut tester = ColumnReaderTester::<$ty>::new();
680                tester.$func(desc, $num_pages, $num_levels, $batch_size, $min, $max);
681            }
682        };
683    }
684
685    test!(
686        test_read_plain_v1_int32,
687        i32,
688        plain_v1,
689        MAX_DEF_LEVEL,
690        MAX_REP_LEVEL,
691        NUM_PAGES,
692        NUM_LEVELS,
693        16,
694        i32::MIN,
695        i32::MAX
696    );
697    test!(
698        test_read_plain_v2_int32,
699        i32,
700        plain_v2,
701        MAX_DEF_LEVEL,
702        MAX_REP_LEVEL,
703        NUM_PAGES,
704        NUM_LEVELS,
705        16,
706        i32::MIN,
707        i32::MAX
708    );
709
710    test!(
711        test_read_plain_v1_int32_uneven,
712        i32,
713        plain_v1,
714        MAX_DEF_LEVEL,
715        MAX_REP_LEVEL,
716        NUM_PAGES,
717        NUM_LEVELS,
718        17,
719        i32::MIN,
720        i32::MAX
721    );
722    test!(
723        test_read_plain_v2_int32_uneven,
724        i32,
725        plain_v2,
726        MAX_DEF_LEVEL,
727        MAX_REP_LEVEL,
728        NUM_PAGES,
729        NUM_LEVELS,
730        17,
731        i32::MIN,
732        i32::MAX
733    );
734
735    test!(
736        test_read_plain_v1_int32_multi_page,
737        i32,
738        plain_v1,
739        MAX_DEF_LEVEL,
740        MAX_REP_LEVEL,
741        NUM_PAGES,
742        NUM_LEVELS,
743        512,
744        i32::MIN,
745        i32::MAX
746    );
747    test!(
748        test_read_plain_v2_int32_multi_page,
749        i32,
750        plain_v2,
751        MAX_DEF_LEVEL,
752        MAX_REP_LEVEL,
753        NUM_PAGES,
754        NUM_LEVELS,
755        512,
756        i32::MIN,
757        i32::MAX
758    );
759
760    // test cases when column descriptor has MAX_DEF_LEVEL = 0 and MAX_REP_LEVEL = 0
761    test!(
762        test_read_plain_v1_int32_required_non_repeated,
763        i32,
764        plain_v1,
765        0,
766        0,
767        NUM_PAGES,
768        NUM_LEVELS,
769        16,
770        i32::MIN,
771        i32::MAX
772    );
773    test!(
774        test_read_plain_v2_int32_required_non_repeated,
775        i32,
776        plain_v2,
777        0,
778        0,
779        NUM_PAGES,
780        NUM_LEVELS,
781        16,
782        i32::MIN,
783        i32::MAX
784    );
785
786    test!(
787        test_read_plain_v1_int64,
788        i64,
789        plain_v1,
790        1,
791        1,
792        NUM_PAGES,
793        NUM_LEVELS,
794        16,
795        i64::MIN,
796        i64::MAX
797    );
798    test!(
799        test_read_plain_v2_int64,
800        i64,
801        plain_v2,
802        1,
803        1,
804        NUM_PAGES,
805        NUM_LEVELS,
806        16,
807        i64::MIN,
808        i64::MAX
809    );
810
811    test!(
812        test_read_plain_v1_int64_uneven,
813        i64,
814        plain_v1,
815        1,
816        1,
817        NUM_PAGES,
818        NUM_LEVELS,
819        17,
820        i64::MIN,
821        i64::MAX
822    );
823    test!(
824        test_read_plain_v2_int64_uneven,
825        i64,
826        plain_v2,
827        1,
828        1,
829        NUM_PAGES,
830        NUM_LEVELS,
831        17,
832        i64::MIN,
833        i64::MAX
834    );
835
836    test!(
837        test_read_plain_v1_int64_multi_page,
838        i64,
839        plain_v1,
840        1,
841        1,
842        NUM_PAGES,
843        NUM_LEVELS,
844        512,
845        i64::MIN,
846        i64::MAX
847    );
848    test!(
849        test_read_plain_v2_int64_multi_page,
850        i64,
851        plain_v2,
852        1,
853        1,
854        NUM_PAGES,
855        NUM_LEVELS,
856        512,
857        i64::MIN,
858        i64::MAX
859    );
860
861    // test cases when column descriptor has MAX_DEF_LEVEL = 0 and MAX_REP_LEVEL = 0
862    test!(
863        test_read_plain_v1_int64_required_non_repeated,
864        i64,
865        plain_v1,
866        0,
867        0,
868        NUM_PAGES,
869        NUM_LEVELS,
870        16,
871        i64::MIN,
872        i64::MAX
873    );
874    test!(
875        test_read_plain_v2_int64_required_non_repeated,
876        i64,
877        plain_v2,
878        0,
879        0,
880        NUM_PAGES,
881        NUM_LEVELS,
882        16,
883        i64::MIN,
884        i64::MAX
885    );
886
887    test!(
888        test_read_dict_v1_int32_small,
889        i32,
890        dict_v1,
891        MAX_DEF_LEVEL,
892        MAX_REP_LEVEL,
893        2,
894        2,
895        16,
896        0,
897        3
898    );
899    test!(
900        test_read_dict_v2_int32_small,
901        i32,
902        dict_v2,
903        MAX_DEF_LEVEL,
904        MAX_REP_LEVEL,
905        2,
906        2,
907        16,
908        0,
909        3
910    );
911
912    test!(
913        test_read_dict_v1_int32,
914        i32,
915        dict_v1,
916        MAX_DEF_LEVEL,
917        MAX_REP_LEVEL,
918        NUM_PAGES,
919        NUM_LEVELS,
920        16,
921        0,
922        3
923    );
924    test!(
925        test_read_dict_v2_int32,
926        i32,
927        dict_v2,
928        MAX_DEF_LEVEL,
929        MAX_REP_LEVEL,
930        NUM_PAGES,
931        NUM_LEVELS,
932        16,
933        0,
934        3
935    );
936
937    test!(
938        test_read_dict_v1_int32_uneven,
939        i32,
940        dict_v1,
941        MAX_DEF_LEVEL,
942        MAX_REP_LEVEL,
943        NUM_PAGES,
944        NUM_LEVELS,
945        17,
946        0,
947        3
948    );
949    test!(
950        test_read_dict_v2_int32_uneven,
951        i32,
952        dict_v2,
953        MAX_DEF_LEVEL,
954        MAX_REP_LEVEL,
955        NUM_PAGES,
956        NUM_LEVELS,
957        17,
958        0,
959        3
960    );
961
962    test!(
963        test_read_dict_v1_int32_multi_page,
964        i32,
965        dict_v1,
966        MAX_DEF_LEVEL,
967        MAX_REP_LEVEL,
968        NUM_PAGES,
969        NUM_LEVELS,
970        512,
971        0,
972        3
973    );
974    test!(
975        test_read_dict_v2_int32_multi_page,
976        i32,
977        dict_v2,
978        MAX_DEF_LEVEL,
979        MAX_REP_LEVEL,
980        NUM_PAGES,
981        NUM_LEVELS,
982        512,
983        0,
984        3
985    );
986
987    test!(
988        test_read_dict_v1_int64,
989        i64,
990        dict_v1,
991        MAX_DEF_LEVEL,
992        MAX_REP_LEVEL,
993        NUM_PAGES,
994        NUM_LEVELS,
995        16,
996        0,
997        3
998    );
999    test!(
1000        test_read_dict_v2_int64,
1001        i64,
1002        dict_v2,
1003        MAX_DEF_LEVEL,
1004        MAX_REP_LEVEL,
1005        NUM_PAGES,
1006        NUM_LEVELS,
1007        16,
1008        0,
1009        3
1010    );
1011
1012    #[test]
1013    fn test_read_batch_values_only() {
1014        test_read_batch_int32(16, 0, 0);
1015    }
1016
1017    #[test]
1018    fn test_read_batch_values_def_levels() {
1019        test_read_batch_int32(16, MAX_DEF_LEVEL, 0);
1020    }
1021
1022    #[test]
1023    fn test_read_batch_values_rep_levels() {
1024        test_read_batch_int32(16, 0, MAX_REP_LEVEL);
1025    }
1026
1027    #[test]
1028    fn test_read_batch_values_def_rep_levels() {
1029        test_read_batch_int32(128, MAX_DEF_LEVEL, MAX_REP_LEVEL);
1030    }
1031
1032    #[test]
1033    fn test_read_batch_adjust_after_buffering_page() {
1034        // This test covers scenario when buffering new page results in setting number
1035        // of decoded values to 0, resulting on reading `batch_size` of values, but it is
1036        // larger than we can insert into slice (affects values and levels).
1037        //
1038        // Note: values are chosen to reproduce the issue.
1039        //
1040        let primitive_type = get_test_int32_type();
1041        let desc = Arc::new(ColumnDescriptor::new(
1042            Arc::new(primitive_type),
1043            1,
1044            1,
1045            ColumnPath::new(Vec::new()),
1046        ));
1047
1048        let num_pages = 2;
1049        let num_levels = 4;
1050        let batch_size = 5;
1051
1052        let mut tester = ColumnReaderTester::<Int32Type>::new();
1053        tester.test_read_batch(
1054            desc,
1055            Encoding::RLE_DICTIONARY,
1056            num_pages,
1057            num_levels,
1058            batch_size,
1059            i32::MIN,
1060            i32::MAX,
1061            false,
1062        );
1063    }
1064
1065    // ----------------------------------------------------------------------
1066    // Helper methods to make pages and test
1067    //
1068    // # Overview
1069    //
1070    // Most of the test functionality is implemented in `ColumnReaderTester`, which
1071    // provides some general data page test methods:
1072    // - `test_read_batch_general`
1073    // - `test_read_batch`
1074    //
1075    // There are also some high level wrappers that are part of `ColumnReaderTester`:
1076    // - `plain_v1` -> call `test_read_batch_general` with data page v1 and plain encoding
1077    // - `plain_v2` -> call `test_read_batch_general` with data page v2 and plain encoding
1078    // - `dict_v1` -> call `test_read_batch_general` with data page v1 + dictionary page
1079    // - `dict_v2` -> call `test_read_batch_general` with data page v2 + dictionary page
1080    //
1081    // And even higher level wrappers that simplify testing of almost the same test cases:
1082    // - `get_test_int32_type`, provides dummy schema type
1083    // - `get_test_int64_type`, provides dummy schema type
1084    // - `test_read_batch_int32`, wrapper for `read_batch` tests, since they are basically
1085    //   the same, just different def/rep levels and batch size.
1086    //
1087    // # Page assembly
1088    //
1089    // Page construction and generation of values, definition and repetition levels
1090    // happens in `make_pages` function.
1091    // All values are randomly generated based on provided min/max, levels are calculated
1092    // based on provided max level for column descriptor (which is basically either int32
1093    // or int64 type in tests) and `levels_per_page` variable.
1094    //
1095    // We use `DataPageBuilder` and its implementation `DataPageBuilderImpl` to actually
1096    // turn values, definition and repetition levels into data pages (either v1 or v2).
1097    //
1098    // Those data pages are then stored as part of `TestPageReader` (we just pass vector
1099    // of generated pages directly), which implements `PageReader` interface.
1100    //
1101    // # Comparison
1102    //
1103    // This allows us to pass test page reader into column reader, so we can test
1104    // functionality of column reader - see `test_read_batch`, where we create column
1105    // reader -> typed column reader, buffer values in `read_batch` method and compare
1106    // output with generated data.
1107
1108    // Returns dummy Parquet `Type` for primitive field, because most of our tests use
1109    // INT32 physical type.
1110    fn get_test_int32_type() -> SchemaType {
1111        SchemaType::primitive_type_builder("a", PhysicalType::INT32)
1112            .with_repetition(Repetition::REQUIRED)
1113            .with_converted_type(ConvertedType::INT_32)
1114            .with_length(-1)
1115            .build()
1116            .expect("build() should be OK")
1117    }
1118
1119    // Returns dummy Parquet `Type` for INT64 physical type.
1120    fn get_test_int64_type() -> SchemaType {
1121        SchemaType::primitive_type_builder("a", PhysicalType::INT64)
1122            .with_repetition(Repetition::REQUIRED)
1123            .with_converted_type(ConvertedType::INT_64)
1124            .with_length(-1)
1125            .build()
1126            .expect("build() should be OK")
1127    }
1128
1129    // Tests `read_batch()` functionality for INT32.
1130    //
1131    // This is a high level wrapper on `ColumnReaderTester` that allows us to specify some
1132    // boilerplate code for setting up definition/repetition levels and column descriptor.
1133    fn test_read_batch_int32(batch_size: usize, max_def_level: i16, max_rep_level: i16) {
1134        let primitive_type = get_test_int32_type();
1135
1136        let desc = Arc::new(ColumnDescriptor::new(
1137            Arc::new(primitive_type),
1138            max_def_level,
1139            max_rep_level,
1140            ColumnPath::new(Vec::new()),
1141        ));
1142
1143        let mut tester = ColumnReaderTester::<Int32Type>::new();
1144        tester.test_read_batch(
1145            desc,
1146            Encoding::RLE_DICTIONARY,
1147            NUM_PAGES,
1148            NUM_LEVELS,
1149            batch_size,
1150            i32::MIN,
1151            i32::MAX,
1152            false,
1153        );
1154    }
1155
1156    struct ColumnReaderTester<T: DataType>
1157    where
1158        T::T: PartialOrd + SampleUniform + Copy,
1159    {
1160        rep_levels: Vec<i16>,
1161        def_levels: Vec<i16>,
1162        values: Vec<T::T>,
1163    }
1164
1165    impl<T: DataType> ColumnReaderTester<T>
1166    where
1167        T::T: PartialOrd + SampleUniform + Copy,
1168    {
1169        pub fn new() -> Self {
1170            Self {
1171                rep_levels: Vec::new(),
1172                def_levels: Vec::new(),
1173                values: Vec::new(),
1174            }
1175        }
1176
1177        // Method to generate and test data pages v1
1178        fn plain_v1(
1179            &mut self,
1180            desc: ColumnDescPtr,
1181            num_pages: usize,
1182            num_levels: usize,
1183            batch_size: usize,
1184            min: T::T,
1185            max: T::T,
1186        ) {
1187            self.test_read_batch_general(
1188                desc,
1189                Encoding::PLAIN,
1190                num_pages,
1191                num_levels,
1192                batch_size,
1193                min,
1194                max,
1195                false,
1196            );
1197        }
1198
1199        // Method to generate and test data pages v2
1200        fn plain_v2(
1201            &mut self,
1202            desc: ColumnDescPtr,
1203            num_pages: usize,
1204            num_levels: usize,
1205            batch_size: usize,
1206            min: T::T,
1207            max: T::T,
1208        ) {
1209            self.test_read_batch_general(
1210                desc,
1211                Encoding::PLAIN,
1212                num_pages,
1213                num_levels,
1214                batch_size,
1215                min,
1216                max,
1217                true,
1218            );
1219        }
1220
1221        // Method to generate and test dictionary page + data pages v1
1222        fn dict_v1(
1223            &mut self,
1224            desc: ColumnDescPtr,
1225            num_pages: usize,
1226            num_levels: usize,
1227            batch_size: usize,
1228            min: T::T,
1229            max: T::T,
1230        ) {
1231            self.test_read_batch_general(
1232                desc,
1233                Encoding::RLE_DICTIONARY,
1234                num_pages,
1235                num_levels,
1236                batch_size,
1237                min,
1238                max,
1239                false,
1240            );
1241        }
1242
1243        // Method to generate and test dictionary page + data pages v2
1244        fn dict_v2(
1245            &mut self,
1246            desc: ColumnDescPtr,
1247            num_pages: usize,
1248            num_levels: usize,
1249            batch_size: usize,
1250            min: T::T,
1251            max: T::T,
1252        ) {
1253            self.test_read_batch_general(
1254                desc,
1255                Encoding::RLE_DICTIONARY,
1256                num_pages,
1257                num_levels,
1258                batch_size,
1259                min,
1260                max,
1261                true,
1262            );
1263        }
1264
1265        // Helper function for the general case of `read_batch()` where `values`,
1266        // `def_levels` and `rep_levels` are always provided with enough space.
1267        #[allow(clippy::too_many_arguments)]
1268        fn test_read_batch_general(
1269            &mut self,
1270            desc: ColumnDescPtr,
1271            encoding: Encoding,
1272            num_pages: usize,
1273            num_levels: usize,
1274            batch_size: usize,
1275            min: T::T,
1276            max: T::T,
1277            use_v2: bool,
1278        ) {
1279            self.test_read_batch(
1280                desc, encoding, num_pages, num_levels, batch_size, min, max, use_v2,
1281            );
1282        }
1283
1284        // Helper function to test `read_batch()` method with custom buffers for values,
1285        // definition and repetition levels.
1286        #[allow(clippy::too_many_arguments)]
1287        fn test_read_batch(
1288            &mut self,
1289            desc: ColumnDescPtr,
1290            encoding: Encoding,
1291            num_pages: usize,
1292            num_levels: usize,
1293            batch_size: usize,
1294            min: T::T,
1295            max: T::T,
1296            use_v2: bool,
1297        ) {
1298            let mut pages = VecDeque::new();
1299            make_pages::<T>(
1300                desc.clone(),
1301                encoding,
1302                num_pages,
1303                num_levels,
1304                min,
1305                max,
1306                &mut self.def_levels,
1307                &mut self.rep_levels,
1308                &mut self.values,
1309                &mut pages,
1310                use_v2,
1311            );
1312            let max_def_level = desc.max_def_level();
1313            let max_rep_level = desc.max_rep_level();
1314            let page_reader = InMemoryPageReader::new(pages);
1315            let column_reader: ColumnReader = get_column_reader(desc, Box::new(page_reader));
1316            let mut typed_column_reader = get_typed_column_reader::<T>(column_reader);
1317
1318            let mut values = Vec::new();
1319            let mut def_levels = Vec::new();
1320            let mut rep_levels = Vec::new();
1321
1322            let mut curr_values_read = 0;
1323            let mut curr_levels_read = 0;
1324            loop {
1325                let (_, values_read, levels_read) = typed_column_reader
1326                    .read_records(
1327                        batch_size,
1328                        Some(&mut def_levels),
1329                        Some(&mut rep_levels),
1330                        &mut values,
1331                    )
1332                    .expect("read_batch() should be OK");
1333
1334                curr_values_read += values_read;
1335                curr_levels_read += levels_read;
1336
1337                if values_read == 0 && levels_read == 0 {
1338                    break;
1339                }
1340            }
1341
1342            assert_eq!(values, self.values, "values content doesn't match");
1343
1344            if max_def_level > 0 {
1345                assert_eq!(
1346                    def_levels, self.def_levels,
1347                    "definition levels content doesn't match"
1348                );
1349            }
1350
1351            if max_rep_level > 0 {
1352                assert_eq!(
1353                    rep_levels, self.rep_levels,
1354                    "repetition levels content doesn't match"
1355                );
1356            }
1357
1358            assert!(
1359                curr_levels_read >= curr_values_read,
1360                "expected levels read to be greater than values read"
1361            );
1362        }
1363    }
1364}