parquet/record/
triplet.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::basic::Type as PhysicalType;
19use crate::column::reader::{get_typed_column_reader, ColumnReader, ColumnReaderImpl};
20use crate::data_type::*;
21use crate::errors::{ParquetError, Result};
22use crate::record::api::Field;
23use crate::schema::types::ColumnDescPtr;
24
25/// Macro to generate simple functions that cover all types of triplet iterator.
26/// $func is a function of a typed triplet iterator and $token is a either {`ref`} or
27/// {`ref`, `mut`}
28macro_rules! triplet_enum_func {
29  ($self:ident, $func:ident, $( $token:tt ),*) => ({
30    match *$self {
31      TripletIter::BoolTripletIter($($token)* typed) => typed.$func(),
32      TripletIter::Int32TripletIter($($token)* typed) => typed.$func(),
33      TripletIter::Int64TripletIter($($token)* typed) => typed.$func(),
34      TripletIter::Int96TripletIter($($token)* typed) => typed.$func(),
35      TripletIter::FloatTripletIter($($token)* typed) => typed.$func(),
36      TripletIter::DoubleTripletIter($($token)* typed) => typed.$func(),
37      TripletIter::ByteArrayTripletIter($($token)* typed) => typed.$func(),
38      TripletIter::FixedLenByteArrayTripletIter($($token)* typed) => typed.$func()
39    }
40  });
41}
42
43/// High level API wrapper on column reader.
44/// Provides per-element access for each primitive column.
45#[allow(clippy::enum_variant_names)]
46pub enum TripletIter {
47    BoolTripletIter(TypedTripletIter<BoolType>),
48    Int32TripletIter(TypedTripletIter<Int32Type>),
49    Int64TripletIter(TypedTripletIter<Int64Type>),
50    Int96TripletIter(TypedTripletIter<Int96Type>),
51    FloatTripletIter(TypedTripletIter<FloatType>),
52    DoubleTripletIter(TypedTripletIter<DoubleType>),
53    ByteArrayTripletIter(TypedTripletIter<ByteArrayType>),
54    FixedLenByteArrayTripletIter(TypedTripletIter<FixedLenByteArrayType>),
55}
56
57impl TripletIter {
58    /// Creates new triplet for column reader
59    pub fn new(descr: ColumnDescPtr, reader: ColumnReader, batch_size: usize) -> Self {
60        match descr.physical_type() {
61            PhysicalType::BOOLEAN => {
62                TripletIter::BoolTripletIter(TypedTripletIter::new(descr, batch_size, reader))
63            }
64            PhysicalType::INT32 => {
65                TripletIter::Int32TripletIter(TypedTripletIter::new(descr, batch_size, reader))
66            }
67            PhysicalType::INT64 => {
68                TripletIter::Int64TripletIter(TypedTripletIter::new(descr, batch_size, reader))
69            }
70            PhysicalType::INT96 => {
71                TripletIter::Int96TripletIter(TypedTripletIter::new(descr, batch_size, reader))
72            }
73            PhysicalType::FLOAT => {
74                TripletIter::FloatTripletIter(TypedTripletIter::new(descr, batch_size, reader))
75            }
76            PhysicalType::DOUBLE => {
77                TripletIter::DoubleTripletIter(TypedTripletIter::new(descr, batch_size, reader))
78            }
79            PhysicalType::BYTE_ARRAY => {
80                TripletIter::ByteArrayTripletIter(TypedTripletIter::new(descr, batch_size, reader))
81            }
82            PhysicalType::FIXED_LEN_BYTE_ARRAY => TripletIter::FixedLenByteArrayTripletIter(
83                TypedTripletIter::new(descr, batch_size, reader),
84            ),
85        }
86    }
87
88    /// Invokes underlying typed triplet iterator to buffer current value.
89    /// Should be called once - either before `is_null` or `current_value`.
90    #[inline]
91    pub fn read_next(&mut self) -> Result<bool> {
92        triplet_enum_func!(self, read_next, ref, mut)
93    }
94
95    /// Provides check on values/levels left without invoking the underlying typed triplet
96    /// iterator.
97    /// Returns true if more values/levels exist, false otherwise.
98    /// It is always in sync with `read_next` method.
99    #[inline]
100    pub fn has_next(&self) -> bool {
101        triplet_enum_func!(self, has_next, ref)
102    }
103
104    /// Returns current definition level for a leaf triplet iterator
105    #[inline]
106    pub fn current_def_level(&self) -> i16 {
107        triplet_enum_func!(self, current_def_level, ref)
108    }
109
110    /// Returns max definition level for a leaf triplet iterator
111    #[inline]
112    pub fn max_def_level(&self) -> i16 {
113        triplet_enum_func!(self, max_def_level, ref)
114    }
115
116    /// Returns current repetition level for a leaf triplet iterator
117    #[inline]
118    pub fn current_rep_level(&self) -> i16 {
119        triplet_enum_func!(self, current_rep_level, ref)
120    }
121
122    /// Returns max repetition level for a leaf triplet iterator
123    #[inline]
124    pub fn max_rep_level(&self) -> i16 {
125        triplet_enum_func!(self, max_rep_level, ref)
126    }
127
128    /// Returns true, if current value is null.
129    /// Based on the fact that for non-null value current definition level
130    /// equals to max definition level.
131    #[inline]
132    pub fn is_null(&self) -> bool {
133        self.current_def_level() < self.max_def_level()
134    }
135
136    /// Updates non-null value for current row.
137    pub fn current_value(&self) -> Result<Field> {
138        if self.is_null() {
139            return Ok(Field::Null);
140        }
141        let field = match *self {
142            TripletIter::BoolTripletIter(ref typed) => {
143                Field::convert_bool(typed.column_descr(), *typed.current_value())
144            }
145            TripletIter::Int32TripletIter(ref typed) => {
146                Field::convert_int32(typed.column_descr(), *typed.current_value())
147            }
148            TripletIter::Int64TripletIter(ref typed) => {
149                Field::convert_int64(typed.column_descr(), *typed.current_value())
150            }
151            TripletIter::Int96TripletIter(ref typed) => {
152                Field::convert_int96(typed.column_descr(), *typed.current_value())
153            }
154            TripletIter::FloatTripletIter(ref typed) => {
155                Field::convert_float(typed.column_descr(), *typed.current_value())
156            }
157            TripletIter::DoubleTripletIter(ref typed) => {
158                Field::convert_double(typed.column_descr(), *typed.current_value())
159            }
160            TripletIter::ByteArrayTripletIter(ref typed) => {
161                Field::convert_byte_array(typed.column_descr(), typed.current_value().clone())?
162            }
163            TripletIter::FixedLenByteArrayTripletIter(ref typed) => Field::convert_byte_array(
164                typed.column_descr(),
165                typed.current_value().clone().into(),
166            )?,
167        };
168        Ok(field)
169    }
170}
171
172/// Internal typed triplet iterator as a wrapper for column reader
173/// (primitive leaf column), provides per-element access.
174pub struct TypedTripletIter<T: DataType> {
175    reader: ColumnReaderImpl<T>,
176    column_descr: ColumnDescPtr,
177    batch_size: usize,
178    // type properties
179    max_def_level: i16,
180    max_rep_level: i16,
181    // values and levels
182    values: Vec<T::T>,
183    def_levels: Option<Vec<i16>>,
184    rep_levels: Option<Vec<i16>>,
185    // current index for the triplet (value, def, rep)
186    curr_triplet_index: usize,
187    // how many triplets are left before we need to buffer
188    triplets_left: usize,
189    // helper flag to quickly check if we have more values/levels to read
190    has_next: bool,
191}
192
193impl<T: DataType> TypedTripletIter<T> {
194    /// Creates new typed triplet iterator based on provided column reader.
195    /// Use batch size to specify the amount of values to buffer from column reader.
196    fn new(descr: ColumnDescPtr, batch_size: usize, column_reader: ColumnReader) -> Self {
197        assert!(
198            batch_size > 0,
199            "Expected positive batch size, found: {batch_size}"
200        );
201
202        let max_def_level = descr.max_def_level();
203        let max_rep_level = descr.max_rep_level();
204
205        let def_levels = if max_def_level == 0 {
206            None
207        } else {
208            Some(vec![0; batch_size])
209        };
210        let rep_levels = if max_rep_level == 0 {
211            None
212        } else {
213            Some(vec![0; batch_size])
214        };
215
216        Self {
217            reader: get_typed_column_reader(column_reader),
218            column_descr: descr,
219            batch_size,
220            max_def_level,
221            max_rep_level,
222            values: vec![T::T::default(); batch_size],
223            def_levels,
224            rep_levels,
225            curr_triplet_index: 0,
226            triplets_left: 0,
227            has_next: false,
228        }
229    }
230
231    /// Returns column descriptor reference for the current typed triplet iterator.
232    #[inline]
233    pub fn column_descr(&self) -> &ColumnDescPtr {
234        &self.column_descr
235    }
236
237    /// Returns maximum definition level for the triplet iterator (leaf column).
238    #[inline]
239    fn max_def_level(&self) -> i16 {
240        self.max_def_level
241    }
242
243    /// Returns maximum repetition level for the triplet iterator (leaf column).
244    #[inline]
245    fn max_rep_level(&self) -> i16 {
246        self.max_rep_level
247    }
248
249    /// Returns current value.
250    /// Method does not advance the iterator, therefore can be called multiple times.
251    #[inline]
252    fn current_value(&self) -> &T::T {
253        assert!(
254            self.current_def_level() == self.max_def_level(),
255            "Cannot extract value, max definition level: {}, current level: {}",
256            self.max_def_level(),
257            self.current_def_level()
258        );
259        &self.values[self.curr_triplet_index]
260    }
261
262    /// Returns current definition level.
263    /// If field is required, then maximum definition level is returned.
264    #[inline]
265    fn current_def_level(&self) -> i16 {
266        match self.def_levels {
267            Some(ref vec) => vec[self.curr_triplet_index],
268            None => self.max_def_level,
269        }
270    }
271
272    /// Returns current repetition level.
273    /// If field is required, then maximum repetition level is returned.
274    #[inline]
275    fn current_rep_level(&self) -> i16 {
276        match self.rep_levels {
277            Some(ref vec) => vec[self.curr_triplet_index],
278            None => self.max_rep_level,
279        }
280    }
281
282    /// Quick check if iterator has more values/levels to read.
283    /// It is updated as a result of `read_next` method, so they are synchronized.
284    #[inline]
285    fn has_next(&self) -> bool {
286        self.has_next
287    }
288
289    /// Advances to the next triplet.
290    /// Returns true, if there are more records to read, false there are no records left.
291    fn read_next(&mut self) -> Result<bool> {
292        self.curr_triplet_index += 1;
293
294        // A loop is required to handle the case of a batch size of 1, as in such a case
295        // on reaching the end of a record, read_records will return `Ok((1, 0, 0))`
296        // and therefore not advance `self.triplets_left`
297        while self.curr_triplet_index >= self.triplets_left {
298            let (records_read, values_read, levels_read) = {
299                self.values.clear();
300                if let Some(x) = &mut self.def_levels {
301                    x.clear()
302                }
303                if let Some(x) = &mut self.rep_levels {
304                    x.clear()
305                }
306
307                // Buffer triplets
308                self.reader.read_records(
309                    self.batch_size,
310                    self.def_levels.as_mut(),
311                    self.rep_levels.as_mut(),
312                    &mut self.values,
313                )?
314            };
315
316            // No more values or levels to read
317            if records_read == 0 && values_read == 0 && levels_read == 0 {
318                self.has_next = false;
319                return Ok(false);
320            }
321
322            // We never read values more than levels
323            if levels_read == 0 || values_read == levels_read {
324                // There are no definition levels to read, column is required
325                // or definition levels match values, so it does not require spacing
326                self.curr_triplet_index = 0;
327                self.triplets_left = values_read;
328            } else if values_read < levels_read {
329                // Add spacing for triplets.
330                // The idea is setting values for positions in def_levels when current
331                // definition level equals to maximum definition level.
332                // Values and levels are guaranteed to line up, because of
333                // the column reader method.
334
335                // Note: if values_read == 0, then spacing will not be triggered
336                let mut idx = values_read;
337                let def_levels = self.def_levels.as_ref().unwrap();
338                self.values.resize(levels_read, T::T::default());
339                for i in 0..levels_read {
340                    if def_levels[levels_read - i - 1] == self.max_def_level {
341                        idx -= 1; // This is done to avoid usize becoming a negative value
342                        self.values.swap(levels_read - i - 1, idx);
343                    }
344                }
345                self.curr_triplet_index = 0;
346                self.triplets_left = levels_read;
347            } else {
348                return Err(general_err!(
349                    "Spacing of values/levels is wrong, values_read: {}, levels_read: {}",
350                    values_read,
351                    levels_read
352                ));
353            }
354        }
355
356        self.has_next = true;
357        Ok(true)
358    }
359}
360
361#[cfg(test)]
362mod tests {
363    use super::*;
364
365    use crate::file::reader::{FileReader, SerializedFileReader};
366    use crate::schema::types::ColumnPath;
367    use crate::util::test_common::file_util::get_test_file;
368
369    #[test]
370    #[should_panic(expected = "Expected positive batch size, found: 0")]
371    fn test_triplet_zero_batch_size() {
372        let column_path = ColumnPath::from(vec!["b_struct".to_string(), "b_c_int".to_string()]);
373        test_column_in_file("nulls.snappy.parquet", 0, &column_path, &[], &[], &[]);
374    }
375
376    #[test]
377    fn test_triplet_null_column() {
378        let path = vec!["b_struct", "b_c_int"];
379        let values = vec![];
380        let def_levels = vec![1, 1, 1, 1, 1, 1, 1, 1];
381        let rep_levels = vec![0, 0, 0, 0, 0, 0, 0, 0];
382        test_triplet_iter(
383            "nulls.snappy.parquet",
384            path,
385            &values,
386            &def_levels,
387            &rep_levels,
388        );
389    }
390
391    #[test]
392    fn test_triplet_required_column() {
393        let path = vec!["ID"];
394        let values = vec![Field::Long(8)];
395        let def_levels = vec![0];
396        let rep_levels = vec![0];
397        test_triplet_iter(
398            "nonnullable.impala.parquet",
399            path,
400            &values,
401            &def_levels,
402            &rep_levels,
403        );
404    }
405
406    #[test]
407    fn test_triplet_optional_column() {
408        let path = vec!["nested_struct", "A"];
409        let values = vec![Field::Int(1), Field::Int(7)];
410        let def_levels = vec![2, 1, 1, 1, 1, 0, 2];
411        let rep_levels = vec![0, 0, 0, 0, 0, 0, 0];
412        test_triplet_iter(
413            "nullable.impala.parquet",
414            path,
415            &values,
416            &def_levels,
417            &rep_levels,
418        );
419    }
420
421    #[test]
422    fn test_triplet_optional_list_column() {
423        let path = vec!["a", "list", "element", "list", "element", "list", "element"];
424        let values = vec![
425            Field::Str("a".to_string()),
426            Field::Str("b".to_string()),
427            Field::Str("c".to_string()),
428            Field::Str("d".to_string()),
429            Field::Str("a".to_string()),
430            Field::Str("b".to_string()),
431            Field::Str("c".to_string()),
432            Field::Str("d".to_string()),
433            Field::Str("e".to_string()),
434            Field::Str("a".to_string()),
435            Field::Str("b".to_string()),
436            Field::Str("c".to_string()),
437            Field::Str("d".to_string()),
438            Field::Str("e".to_string()),
439            Field::Str("f".to_string()),
440        ];
441        let def_levels = vec![7, 7, 7, 4, 7, 7, 7, 7, 7, 4, 7, 7, 7, 7, 7, 7, 4, 7];
442        let rep_levels = vec![0, 3, 2, 1, 2, 0, 3, 2, 3, 1, 2, 0, 3, 2, 3, 2, 1, 2];
443        test_triplet_iter(
444            "nested_lists.snappy.parquet",
445            path,
446            &values,
447            &def_levels,
448            &rep_levels,
449        );
450    }
451
452    #[test]
453    fn test_triplet_optional_map_column() {
454        let path = vec!["a", "key_value", "value", "key_value", "key"];
455        let values = vec![
456            Field::Int(1),
457            Field::Int(2),
458            Field::Int(1),
459            Field::Int(1),
460            Field::Int(3),
461            Field::Int(4),
462            Field::Int(5),
463        ];
464        let def_levels = vec![4, 4, 4, 2, 3, 4, 4, 4, 4];
465        let rep_levels = vec![0, 2, 0, 0, 0, 0, 0, 2, 2];
466        test_triplet_iter(
467            "nested_maps.snappy.parquet",
468            path,
469            &values,
470            &def_levels,
471            &rep_levels,
472        );
473    }
474
475    // Check triplet iterator across different batch sizes
476    fn test_triplet_iter(
477        file_name: &str,
478        column_path: Vec<&str>,
479        expected_values: &[Field],
480        expected_def_levels: &[i16],
481        expected_rep_levels: &[i16],
482    ) {
483        // Convert path into column path
484        let path: Vec<String> = column_path.iter().map(|x| x.to_string()).collect();
485        let column_path = ColumnPath::from(path);
486
487        let batch_sizes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 128, 256];
488        for batch_size in batch_sizes {
489            test_column_in_file(
490                file_name,
491                batch_size,
492                &column_path,
493                expected_values,
494                expected_def_levels,
495                expected_rep_levels,
496            );
497        }
498    }
499
500    // Check values of a selected column in a file
501    fn test_column_in_file(
502        file_name: &str,
503        batch_size: usize,
504        column_path: &ColumnPath,
505        expected_values: &[Field],
506        expected_def_levels: &[i16],
507        expected_rep_levels: &[i16],
508    ) {
509        let file = get_test_file(file_name);
510        let file_reader = SerializedFileReader::new(file).unwrap();
511        let metadata = file_reader.metadata();
512        // Get schema descriptor
513        let file_metadata = metadata.file_metadata();
514        let schema = file_metadata.schema_descr();
515        // Get first row group
516        let row_group_reader = file_reader.get_row_group(0).unwrap();
517
518        for i in 0..schema.num_columns() {
519            let descr = schema.column(i);
520            if descr.path() == column_path {
521                let reader = row_group_reader.get_column_reader(i).unwrap();
522                test_triplet_column(
523                    descr,
524                    reader,
525                    batch_size,
526                    expected_values,
527                    expected_def_levels,
528                    expected_rep_levels,
529                );
530            }
531        }
532    }
533
534    // Check values for individual triplet iterator
535    fn test_triplet_column(
536        descr: ColumnDescPtr,
537        reader: ColumnReader,
538        batch_size: usize,
539        expected_values: &[Field],
540        expected_def_levels: &[i16],
541        expected_rep_levels: &[i16],
542    ) {
543        let mut iter = TripletIter::new(descr.clone(), reader, batch_size);
544        let mut values: Vec<Field> = Vec::new();
545        let mut def_levels: Vec<i16> = Vec::new();
546        let mut rep_levels: Vec<i16> = Vec::new();
547
548        assert_eq!(iter.max_def_level(), descr.max_def_level());
549        assert_eq!(iter.max_rep_level(), descr.max_rep_level());
550
551        while let Ok(true) = iter.read_next() {
552            assert!(iter.has_next());
553            if !iter.is_null() {
554                values.push(iter.current_value().unwrap());
555            }
556            def_levels.push(iter.current_def_level());
557            rep_levels.push(iter.current_rep_level());
558        }
559
560        assert_eq!(values, expected_values);
561        assert_eq!(def_levels, expected_def_levels);
562        assert_eq!(rep_levels, expected_rep_levels);
563    }
564}