Skip to main content

parquet/record/
triplet.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::basic::Type as PhysicalType;
19use crate::column::reader::{ColumnReader, ColumnReaderImpl, get_typed_column_reader};
20use crate::data_type::*;
21use crate::errors::{ParquetError, Result};
22use crate::record::api::Field;
23use crate::schema::types::ColumnDescPtr;
24
25/// Macro to generate simple functions that cover all types of triplet iterator.
26/// $func is a function of a typed triplet iterator and $token is a either {`ref`} or
27/// {`ref`, `mut`}
28macro_rules! triplet_enum_func {
29  ($self:ident, $func:ident, $( $token:tt ),*) => ({
30    match *$self {
31      TripletIter::BoolTripletIter($($token)* typed) => typed.$func(),
32      TripletIter::Int32TripletIter($($token)* typed) => typed.$func(),
33      TripletIter::Int64TripletIter($($token)* typed) => typed.$func(),
34      TripletIter::Int96TripletIter($($token)* typed) => typed.$func(),
35      TripletIter::FloatTripletIter($($token)* typed) => typed.$func(),
36      TripletIter::DoubleTripletIter($($token)* typed) => typed.$func(),
37      TripletIter::ByteArrayTripletIter($($token)* typed) => typed.$func(),
38      TripletIter::FixedLenByteArrayTripletIter($($token)* typed) => typed.$func()
39    }
40  });
41}
42
43/// High level API wrapper on column reader.
44/// Provides per-element access for each primitive column.
45#[allow(clippy::enum_variant_names)]
46pub enum TripletIter {
47    BoolTripletIter(TypedTripletIter<BoolType>),
48    Int32TripletIter(TypedTripletIter<Int32Type>),
49    Int64TripletIter(TypedTripletIter<Int64Type>),
50    Int96TripletIter(TypedTripletIter<Int96Type>),
51    FloatTripletIter(TypedTripletIter<FloatType>),
52    DoubleTripletIter(TypedTripletIter<DoubleType>),
53    ByteArrayTripletIter(TypedTripletIter<ByteArrayType>),
54    FixedLenByteArrayTripletIter(TypedTripletIter<FixedLenByteArrayType>),
55}
56
57impl TripletIter {
58    /// Creates new triplet for column reader
59    pub fn new(descr: ColumnDescPtr, reader: ColumnReader, batch_size: usize) -> Self {
60        match descr.physical_type() {
61            PhysicalType::BOOLEAN => {
62                TripletIter::BoolTripletIter(TypedTripletIter::new(descr, batch_size, reader))
63            }
64            PhysicalType::INT32 => {
65                TripletIter::Int32TripletIter(TypedTripletIter::new(descr, batch_size, reader))
66            }
67            PhysicalType::INT64 => {
68                TripletIter::Int64TripletIter(TypedTripletIter::new(descr, batch_size, reader))
69            }
70            PhysicalType::INT96 => {
71                TripletIter::Int96TripletIter(TypedTripletIter::new(descr, batch_size, reader))
72            }
73            PhysicalType::FLOAT => {
74                TripletIter::FloatTripletIter(TypedTripletIter::new(descr, batch_size, reader))
75            }
76            PhysicalType::DOUBLE => {
77                TripletIter::DoubleTripletIter(TypedTripletIter::new(descr, batch_size, reader))
78            }
79            PhysicalType::BYTE_ARRAY => {
80                TripletIter::ByteArrayTripletIter(TypedTripletIter::new(descr, batch_size, reader))
81            }
82            PhysicalType::FIXED_LEN_BYTE_ARRAY => TripletIter::FixedLenByteArrayTripletIter(
83                TypedTripletIter::new(descr, batch_size, reader),
84            ),
85        }
86    }
87
88    /// Invokes underlying typed triplet iterator to buffer current value.
89    /// Should be called once - either before `is_null` or `current_value`.
90    #[inline]
91    pub fn read_next(&mut self) -> Result<bool> {
92        triplet_enum_func!(self, read_next, ref, mut)
93    }
94
95    /// Provides check on values/levels left without invoking the underlying typed triplet
96    /// iterator.
97    /// Returns true if more values/levels exist, false otherwise.
98    /// It is always in sync with `read_next` method.
99    #[inline]
100    pub fn has_next(&self) -> bool {
101        triplet_enum_func!(self, has_next, ref)
102    }
103
104    /// Returns current definition level for a leaf triplet iterator
105    #[inline]
106    pub fn current_def_level(&self) -> i16 {
107        triplet_enum_func!(self, current_def_level, ref)
108    }
109
110    /// Returns max definition level for a leaf triplet iterator
111    #[inline]
112    pub fn max_def_level(&self) -> i16 {
113        triplet_enum_func!(self, max_def_level, ref)
114    }
115
116    /// Returns current repetition level for a leaf triplet iterator
117    #[inline]
118    pub fn current_rep_level(&self) -> i16 {
119        triplet_enum_func!(self, current_rep_level, ref)
120    }
121
122    /// Returns max repetition level for a leaf triplet iterator
123    #[inline]
124    pub fn max_rep_level(&self) -> i16 {
125        triplet_enum_func!(self, max_rep_level, ref)
126    }
127
128    /// Returns true, if current value is null.
129    /// Based on the fact that for non-null value current definition level
130    /// equals to max definition level.
131    #[inline]
132    pub fn is_null(&self) -> bool {
133        self.current_def_level() < self.max_def_level()
134    }
135
136    /// Updates non-null value for current row.
137    pub fn current_value(&self) -> Result<Field> {
138        if self.is_null() {
139            return Ok(Field::Null);
140        }
141        let field = match *self {
142            TripletIter::BoolTripletIter(ref typed) => {
143                Field::convert_bool(typed.column_descr(), *typed.current_value())
144            }
145            TripletIter::Int32TripletIter(ref typed) => {
146                Field::convert_int32(typed.column_descr(), *typed.current_value())
147            }
148            TripletIter::Int64TripletIter(ref typed) => {
149                Field::convert_int64(typed.column_descr(), *typed.current_value())
150            }
151            TripletIter::Int96TripletIter(ref typed) => {
152                Field::convert_int96(typed.column_descr(), *typed.current_value())
153            }
154            TripletIter::FloatTripletIter(ref typed) => {
155                Field::convert_float(typed.column_descr(), *typed.current_value())
156            }
157            TripletIter::DoubleTripletIter(ref typed) => {
158                Field::convert_double(typed.column_descr(), *typed.current_value())
159            }
160            TripletIter::ByteArrayTripletIter(ref typed) => {
161                Field::convert_byte_array(typed.column_descr(), typed.current_value().clone())?
162            }
163            TripletIter::FixedLenByteArrayTripletIter(ref typed) => Field::convert_byte_array(
164                typed.column_descr(),
165                typed.current_value().clone().into(),
166            )?,
167        };
168        Ok(field)
169    }
170}
171
172/// Internal typed triplet iterator as a wrapper for column reader
173/// (primitive leaf column), provides per-element access.
174pub struct TypedTripletIter<T: DataType> {
175    reader: ColumnReaderImpl<T>,
176    column_descr: ColumnDescPtr,
177    batch_size: usize,
178    // type properties
179    max_def_level: i16,
180    max_rep_level: i16,
181    // values and levels
182    values: Vec<T::T>,
183    def_levels: Option<Vec<i16>>,
184    rep_levels: Option<Vec<i16>>,
185    // current index for the triplet (value, def, rep)
186    curr_triplet_index: usize,
187    // how many triplets are left before we need to buffer
188    triplets_left: usize,
189    // helper flag to quickly check if we have more values/levels to read
190    has_next: bool,
191}
192
193impl<T: DataType> TypedTripletIter<T> {
194    /// Creates new typed triplet iterator based on provided column reader.
195    /// Use batch size to specify the amount of values to buffer from column reader.
196    fn new(descr: ColumnDescPtr, batch_size: usize, column_reader: ColumnReader) -> Self {
197        assert!(
198            batch_size > 0,
199            "Expected positive batch size, found: {batch_size}"
200        );
201
202        let max_def_level = descr.max_def_level();
203        let max_rep_level = descr.max_rep_level();
204
205        let def_levels = if max_def_level == 0 {
206            None
207        } else {
208            Some(vec![0; batch_size])
209        };
210        let rep_levels = if max_rep_level == 0 {
211            None
212        } else {
213            Some(vec![0; batch_size])
214        };
215
216        Self {
217            reader: get_typed_column_reader(column_reader),
218            column_descr: descr,
219            batch_size,
220            max_def_level,
221            max_rep_level,
222            values: vec![T::T::default(); batch_size],
223            def_levels,
224            rep_levels,
225            curr_triplet_index: 0,
226            triplets_left: 0,
227            has_next: false,
228        }
229    }
230
231    /// Returns column descriptor reference for the current typed triplet iterator.
232    #[inline]
233    pub fn column_descr(&self) -> &ColumnDescPtr {
234        &self.column_descr
235    }
236
237    /// Returns maximum definition level for the triplet iterator (leaf column).
238    #[inline]
239    fn max_def_level(&self) -> i16 {
240        self.max_def_level
241    }
242
243    /// Returns maximum repetition level for the triplet iterator (leaf column).
244    #[inline]
245    fn max_rep_level(&self) -> i16 {
246        self.max_rep_level
247    }
248
249    /// Returns current value.
250    /// Method does not advance the iterator, therefore can be called multiple times.
251    #[inline]
252    fn current_value(&self) -> &T::T {
253        assert!(
254            self.current_def_level() == self.max_def_level(),
255            "Cannot extract value, max definition level: {}, current level: {}",
256            self.max_def_level(),
257            self.current_def_level()
258        );
259        &self.values[self.curr_triplet_index]
260    }
261
262    /// Returns current definition level.
263    /// If field is required, then maximum definition level is returned.
264    #[inline]
265    fn current_def_level(&self) -> i16 {
266        if !self.has_next {
267            return 0;
268        }
269        match self.def_levels {
270            Some(ref vec) => vec[self.curr_triplet_index],
271            None => self.max_def_level,
272        }
273    }
274
275    /// Returns current repetition level.
276    /// If field is required, then maximum repetition level is returned.
277    #[inline]
278    fn current_rep_level(&self) -> i16 {
279        if !self.has_next {
280            return 0;
281        }
282        match self.rep_levels {
283            Some(ref vec) => vec[self.curr_triplet_index],
284            None => self.max_rep_level,
285        }
286    }
287
288    /// Quick check if iterator has more values/levels to read.
289    /// It is updated as a result of `read_next` method, so they are synchronized.
290    #[inline]
291    fn has_next(&self) -> bool {
292        self.has_next
293    }
294
295    /// Advances to the next triplet.
296    /// Returns true, if there are more records to read, false there are no records left.
297    fn read_next(&mut self) -> Result<bool> {
298        self.curr_triplet_index += 1;
299
300        // A loop is required to handle the case of a batch size of 1, as in such a case
301        // on reaching the end of a record, read_records will return `Ok((1, 0, 0))`
302        // and therefore not advance `self.triplets_left`
303        while self.curr_triplet_index >= self.triplets_left {
304            let (records_read, values_read, levels_read) = {
305                self.values.clear();
306                if let Some(x) = &mut self.def_levels {
307                    x.clear()
308                }
309                if let Some(x) = &mut self.rep_levels {
310                    x.clear()
311                }
312
313                // Buffer triplets
314                self.reader.read_records(
315                    self.batch_size,
316                    self.def_levels.as_mut(),
317                    self.rep_levels.as_mut(),
318                    &mut self.values,
319                )?
320            };
321
322            // No more values or levels to read
323            if records_read == 0 && values_read == 0 && levels_read == 0 {
324                self.curr_triplet_index = 0;
325                self.has_next = false;
326                return Ok(false);
327            }
328
329            // We never read values more than levels
330            if levels_read == 0 || values_read == levels_read {
331                // There are no definition levels to read, column is required
332                // or definition levels match values, so it does not require spacing
333                self.curr_triplet_index = 0;
334                self.triplets_left = values_read;
335            } else if values_read < levels_read {
336                // Add spacing for triplets.
337                // The idea is setting values for positions in def_levels when current
338                // definition level equals to maximum definition level.
339                // Values and levels are guaranteed to line up, because of
340                // the column reader method.
341
342                // Note: if values_read == 0, then spacing will not be triggered
343                let mut idx = values_read;
344                let def_levels = self.def_levels.as_ref().unwrap();
345                self.values.resize(levels_read, T::T::default());
346                for i in 0..levels_read {
347                    if def_levels[levels_read - i - 1] == self.max_def_level {
348                        idx -= 1; // This is done to avoid usize becoming a negative value
349                        self.values.swap(levels_read - i - 1, idx);
350                    }
351                }
352                self.curr_triplet_index = 0;
353                self.triplets_left = levels_read;
354            } else {
355                return Err(general_err!(
356                    "Spacing of values/levels is wrong, values_read: {}, levels_read: {}",
357                    values_read,
358                    levels_read
359                ));
360            }
361        }
362
363        self.has_next = true;
364        Ok(true)
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371
372    use crate::file::reader::{FileReader, SerializedFileReader};
373    use crate::schema::types::ColumnPath;
374    use crate::util::test_common::file_util::get_test_file;
375
376    #[test]
377    #[should_panic(expected = "Expected positive batch size, found: 0")]
378    fn test_triplet_zero_batch_size() {
379        let column_path = ColumnPath::from(vec!["b_struct".to_string(), "b_c_int".to_string()]);
380        test_column_in_file("nulls.snappy.parquet", 0, &column_path, &[], &[], &[]);
381    }
382
383    #[test]
384    fn test_triplet_null_column() {
385        let path = vec!["b_struct", "b_c_int"];
386        let values = vec![];
387        let def_levels = vec![1, 1, 1, 1, 1, 1, 1, 1];
388        let rep_levels = vec![0, 0, 0, 0, 0, 0, 0, 0];
389        test_triplet_iter(
390            "nulls.snappy.parquet",
391            path,
392            &values,
393            &def_levels,
394            &rep_levels,
395        );
396    }
397
398    #[test]
399    fn test_triplet_required_column() {
400        let path = vec!["ID"];
401        let values = vec![Field::Long(8)];
402        let def_levels = vec![0];
403        let rep_levels = vec![0];
404        test_triplet_iter(
405            "nonnullable.impala.parquet",
406            path,
407            &values,
408            &def_levels,
409            &rep_levels,
410        );
411    }
412
413    #[test]
414    fn test_triplet_optional_column() {
415        let path = vec!["nested_struct", "A"];
416        let values = vec![Field::Int(1), Field::Int(7)];
417        let def_levels = vec![2, 1, 1, 1, 1, 0, 2];
418        let rep_levels = vec![0, 0, 0, 0, 0, 0, 0];
419        test_triplet_iter(
420            "nullable.impala.parquet",
421            path,
422            &values,
423            &def_levels,
424            &rep_levels,
425        );
426    }
427
428    #[test]
429    fn test_triplet_optional_list_column() {
430        let path = vec!["a", "list", "element", "list", "element", "list", "element"];
431        let values = vec![
432            Field::Str("a".to_string()),
433            Field::Str("b".to_string()),
434            Field::Str("c".to_string()),
435            Field::Str("d".to_string()),
436            Field::Str("a".to_string()),
437            Field::Str("b".to_string()),
438            Field::Str("c".to_string()),
439            Field::Str("d".to_string()),
440            Field::Str("e".to_string()),
441            Field::Str("a".to_string()),
442            Field::Str("b".to_string()),
443            Field::Str("c".to_string()),
444            Field::Str("d".to_string()),
445            Field::Str("e".to_string()),
446            Field::Str("f".to_string()),
447        ];
448        let def_levels = vec![7, 7, 7, 4, 7, 7, 7, 7, 7, 4, 7, 7, 7, 7, 7, 7, 4, 7];
449        let rep_levels = vec![0, 3, 2, 1, 2, 0, 3, 2, 3, 1, 2, 0, 3, 2, 3, 2, 1, 2];
450        test_triplet_iter(
451            "nested_lists.snappy.parquet",
452            path,
453            &values,
454            &def_levels,
455            &rep_levels,
456        );
457    }
458
459    #[test]
460    fn test_triplet_optional_map_column() {
461        let path = vec!["a", "key_value", "value", "key_value", "key"];
462        let values = vec![
463            Field::Int(1),
464            Field::Int(2),
465            Field::Int(1),
466            Field::Int(1),
467            Field::Int(3),
468            Field::Int(4),
469            Field::Int(5),
470        ];
471        let def_levels = vec![4, 4, 4, 2, 3, 4, 4, 4, 4];
472        let rep_levels = vec![0, 2, 0, 0, 0, 0, 0, 2, 2];
473        test_triplet_iter(
474            "nested_maps.snappy.parquet",
475            path,
476            &values,
477            &def_levels,
478            &rep_levels,
479        );
480    }
481
482    // Check triplet iterator across different batch sizes
483    fn test_triplet_iter(
484        file_name: &str,
485        column_path: Vec<&str>,
486        expected_values: &[Field],
487        expected_def_levels: &[i16],
488        expected_rep_levels: &[i16],
489    ) {
490        // Convert path into column path
491        let path: Vec<String> = column_path.iter().map(|x| x.to_string()).collect();
492        let column_path = ColumnPath::from(path);
493
494        let batch_sizes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 128, 256];
495        for batch_size in batch_sizes {
496            test_column_in_file(
497                file_name,
498                batch_size,
499                &column_path,
500                expected_values,
501                expected_def_levels,
502                expected_rep_levels,
503            );
504        }
505    }
506
507    // Check values of a selected column in a file
508    fn test_column_in_file(
509        file_name: &str,
510        batch_size: usize,
511        column_path: &ColumnPath,
512        expected_values: &[Field],
513        expected_def_levels: &[i16],
514        expected_rep_levels: &[i16],
515    ) {
516        let file = get_test_file(file_name);
517        let file_reader = SerializedFileReader::new(file).unwrap();
518        let metadata = file_reader.metadata();
519        // Get schema descriptor
520        let file_metadata = metadata.file_metadata();
521        let schema = file_metadata.schema_descr();
522        // Get first row group
523        let row_group_reader = file_reader.get_row_group(0).unwrap();
524
525        for i in 0..schema.num_columns() {
526            let descr = schema.column(i);
527            if descr.path() == column_path {
528                let reader = row_group_reader.get_column_reader(i).unwrap();
529                test_triplet_column(
530                    descr,
531                    reader,
532                    batch_size,
533                    expected_values,
534                    expected_def_levels,
535                    expected_rep_levels,
536                );
537            }
538        }
539    }
540
541    // Check values for individual triplet iterator
542    fn test_triplet_column(
543        descr: ColumnDescPtr,
544        reader: ColumnReader,
545        batch_size: usize,
546        expected_values: &[Field],
547        expected_def_levels: &[i16],
548        expected_rep_levels: &[i16],
549    ) {
550        let mut iter = TripletIter::new(descr.clone(), reader, batch_size);
551        let mut values: Vec<Field> = Vec::new();
552        let mut def_levels: Vec<i16> = Vec::new();
553        let mut rep_levels: Vec<i16> = Vec::new();
554
555        assert_eq!(iter.max_def_level(), descr.max_def_level());
556        assert_eq!(iter.max_rep_level(), descr.max_rep_level());
557
558        while let Ok(true) = iter.read_next() {
559            assert!(iter.has_next());
560            if !iter.is_null() {
561                values.push(iter.current_value().unwrap());
562            }
563            def_levels.push(iter.current_def_level());
564            rep_levels.push(iter.current_rep_level());
565        }
566
567        assert_eq!(values, expected_values);
568        assert_eq!(def_levels, expected_def_levels);
569        assert_eq!(rep_levels, expected_rep_levels);
570    }
571
572    fn open_triplet_iter(file_name: &str, path: &[&str], batch_size: usize) -> TripletIter {
573        let column_path = ColumnPath::from(path.iter().map(|x| x.to_string()).collect::<Vec<_>>());
574        let file = get_test_file(file_name);
575        let file_reader = SerializedFileReader::new(file).unwrap();
576        let metadata = file_reader.metadata();
577        let schema = metadata.file_metadata().schema_descr();
578        let row_group_reader = file_reader.get_row_group(0).unwrap();
579        for i in 0..schema.num_columns() {
580            let descr = schema.column(i);
581            if descr.path() == &column_path {
582                let reader = row_group_reader.get_column_reader(i).unwrap();
583                return TripletIter::new(descr.clone(), reader, batch_size);
584            }
585        }
586        panic!("Column {column_path:?} not found in {file_name}");
587    }
588
589    #[test]
590    fn test_current_def_level_safe_after_exhaustion() {
591        let mut iter = open_triplet_iter("nulls.snappy.parquet", &["b_struct", "b_c_int"], 256);
592        while let Ok(true) = iter.read_next() {}
593        assert!(!iter.has_next());
594        assert_eq!(iter.current_def_level(), 0);
595    }
596
597    #[test]
598    fn test_current_rep_level_safe_after_exhaustion() {
599        let mut iter = open_triplet_iter(
600            "nested_lists.snappy.parquet",
601            &["a", "list", "element", "list", "element", "list", "element"],
602            256,
603        );
604        while let Ok(true) = iter.read_next() {}
605        assert!(!iter.has_next());
606        assert_eq!(iter.current_rep_level(), 0);
607    }
608}