parquet/column/reader/
decoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19
20use bytes::Bytes;
21
22use crate::basic::Encoding;
23use crate::data_type::DataType;
24use crate::encodings::{
25    decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder},
26    rle::RleDecoder,
27};
28use crate::errors::{ParquetError, Result};
29use crate::schema::types::ColumnDescPtr;
30use crate::util::bit_util::{num_required_bits, BitReader};
31
32/// Decodes level data
33pub trait ColumnLevelDecoder {
34    type Buffer;
35
36    /// Set data for this [`ColumnLevelDecoder`]
37    fn set_data(&mut self, encoding: Encoding, data: Bytes);
38}
39
40pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
41    /// Read up to `max_records` of repetition level data into `out` returning the number
42    /// of complete records and levels read
43    ///
44    /// A record only ends when the data contains a subsequent repetition level of 0,
45    /// it is therefore left to the caller to delimit the final record in a column
46    ///
47    /// # Panics
48    ///
49    /// Implementations may panic if `range` overlaps with already written data
50    fn read_rep_levels(
51        &mut self,
52        out: &mut Self::Buffer,
53        num_records: usize,
54        num_levels: usize,
55    ) -> Result<(usize, usize)>;
56
57    /// Skips over up to `num_levels` repetition levels corresponding to `num_records` records,
58    /// where a record is delimited by a repetition level of 0
59    ///
60    /// Returns the number of records skipped, and the number of levels skipped
61    ///
62    /// A record only ends when the data contains a subsequent repetition level of 0,
63    /// it is therefore left to the caller to delimit the final record in a column
64    fn skip_rep_levels(&mut self, num_records: usize, num_levels: usize) -> Result<(usize, usize)>;
65
66    /// Flush any partially read or skipped record
67    fn flush_partial(&mut self) -> bool;
68}
69
70pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
71    /// Read up to `num_levels` definition levels into `out`
72    ///
73    /// Returns the number of values skipped, and the number of levels skipped
74    ///
75    /// # Panics
76    ///
77    /// Implementations may panic if `range` overlaps with already written data
78    fn read_def_levels(
79        &mut self,
80        out: &mut Self::Buffer,
81        num_levels: usize,
82    ) -> Result<(usize, usize)>;
83
84    /// Skips over `num_levels` definition levels
85    ///
86    /// Returns the number of values skipped, and the number of levels skipped
87    fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)>;
88}
89
90/// Decodes value data
91pub trait ColumnValueDecoder {
92    type Buffer;
93
94    /// Create a new [`ColumnValueDecoder`]
95    fn new(col: &ColumnDescPtr) -> Self;
96
97    /// Set the current dictionary page
98    fn set_dict(
99        &mut self,
100        buf: Bytes,
101        num_values: u32,
102        encoding: Encoding,
103        is_sorted: bool,
104    ) -> Result<()>;
105
106    /// Set the current data page
107    ///
108    /// - `encoding` - the encoding of the page
109    /// - `data` - a point to the page's uncompressed value data
110    /// - `num_levels` - the number of levels contained within the page, i.e. values including nulls
111    /// - `num_values` - the number of non-null values contained within the page (V2 page only)
112    ///
113    /// Note: data encoded with [`Encoding::RLE`] may not know its exact length, as the final
114    /// run may be zero-padded. As such if `num_values` is not provided (i.e. `None`),
115    /// subsequent calls to `ColumnValueDecoder::read` may yield more values than
116    /// non-null definition levels within the page
117    fn set_data(
118        &mut self,
119        encoding: Encoding,
120        data: Bytes,
121        num_levels: usize,
122        num_values: Option<usize>,
123    ) -> Result<()>;
124
125    /// Read up to `num_values` values into `out`
126    ///
127    /// # Panics
128    ///
129    /// Implementations may panic if `range` overlaps with already written data
130    ///
131    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize>;
132
133    /// Skips over `num_values` values
134    ///
135    /// Returns the number of values skipped
136    fn skip_values(&mut self, num_values: usize) -> Result<usize>;
137}
138
139/// An implementation of [`ColumnValueDecoder`] for `[T::T]`
140pub struct ColumnValueDecoderImpl<T: DataType> {
141    descr: ColumnDescPtr,
142
143    current_encoding: Option<Encoding>,
144
145    // Cache of decoders for existing encodings
146    decoders: HashMap<Encoding, Box<dyn Decoder<T>>>,
147}
148
149impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
150    type Buffer = Vec<T::T>;
151
152    fn new(descr: &ColumnDescPtr) -> Self {
153        Self {
154            descr: descr.clone(),
155            current_encoding: None,
156            decoders: Default::default(),
157        }
158    }
159
160    fn set_dict(
161        &mut self,
162        buf: Bytes,
163        num_values: u32,
164        mut encoding: Encoding,
165        _is_sorted: bool,
166    ) -> Result<()> {
167        if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
168            encoding = Encoding::RLE_DICTIONARY
169        }
170
171        if self.decoders.contains_key(&encoding) {
172            return Err(general_err!("Column cannot have more than one dictionary"));
173        }
174
175        if encoding == Encoding::RLE_DICTIONARY {
176            let mut dictionary = PlainDecoder::<T>::new(self.descr.type_length());
177            dictionary.set_data(buf, num_values as usize)?;
178
179            let mut decoder = DictDecoder::new();
180            decoder.set_dict(Box::new(dictionary))?;
181            self.decoders.insert(encoding, Box::new(decoder));
182            Ok(())
183        } else {
184            Err(nyi_err!(
185                "Invalid/Unsupported encoding type for dictionary: {}",
186                encoding
187            ))
188        }
189    }
190
191    fn set_data(
192        &mut self,
193        mut encoding: Encoding,
194        data: Bytes,
195        num_levels: usize,
196        num_values: Option<usize>,
197    ) -> Result<()> {
198        use std::collections::hash_map::Entry;
199
200        if encoding == Encoding::PLAIN_DICTIONARY {
201            encoding = Encoding::RLE_DICTIONARY;
202        }
203
204        let decoder = if encoding == Encoding::RLE_DICTIONARY {
205            self.decoders
206                .get_mut(&encoding)
207                .expect("Decoder for dict should have been set")
208        } else {
209            // Search cache for data page decoder
210            match self.decoders.entry(encoding) {
211                Entry::Occupied(e) => e.into_mut(),
212                Entry::Vacant(v) => {
213                    let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
214                    v.insert(data_decoder)
215                }
216            }
217        };
218
219        decoder.set_data(data, num_values.unwrap_or(num_levels))?;
220        self.current_encoding = Some(encoding);
221        Ok(())
222    }
223
224    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
225        let encoding = self
226            .current_encoding
227            .expect("current_encoding should be set");
228
229        let current_decoder = self
230            .decoders
231            .get_mut(&encoding)
232            .unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));
233
234        // TODO: Push vec into decoder (#5177)
235        let start = out.len();
236        out.resize(start + num_values, T::T::default());
237        let read = current_decoder.get(&mut out[start..])?;
238        out.truncate(start + read);
239        Ok(read)
240    }
241
242    fn skip_values(&mut self, num_values: usize) -> Result<usize> {
243        let encoding = self
244            .current_encoding
245            .expect("current_encoding should be set");
246
247        let current_decoder = self
248            .decoders
249            .get_mut(&encoding)
250            .unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));
251
252        current_decoder.skip(num_values)
253    }
254}
255
256const SKIP_BUFFER_SIZE: usize = 1024;
257
258enum LevelDecoder {
259    Packed(BitReader, u8),
260    Rle(RleDecoder),
261}
262
263impl LevelDecoder {
264    fn new(encoding: Encoding, data: Bytes, bit_width: u8) -> Self {
265        match encoding {
266            Encoding::RLE => {
267                let mut decoder = RleDecoder::new(bit_width);
268                decoder.set_data(data);
269                Self::Rle(decoder)
270            }
271            #[allow(deprecated)]
272            Encoding::BIT_PACKED => Self::Packed(BitReader::new(data), bit_width),
273            _ => unreachable!("invalid level encoding: {}", encoding),
274        }
275    }
276
277    fn read(&mut self, out: &mut [i16]) -> Result<usize> {
278        match self {
279            Self::Packed(reader, bit_width) => {
280                Ok(reader.get_batch::<i16>(out, *bit_width as usize))
281            }
282            Self::Rle(reader) => Ok(reader.get_batch(out)?),
283        }
284    }
285}
286
287/// An implementation of [`DefinitionLevelDecoder`] for `[i16]`
288pub struct DefinitionLevelDecoderImpl {
289    decoder: Option<LevelDecoder>,
290    bit_width: u8,
291    max_level: i16,
292}
293
294impl DefinitionLevelDecoderImpl {
295    pub fn new(max_level: i16) -> Self {
296        let bit_width = num_required_bits(max_level as u64);
297        Self {
298            decoder: None,
299            bit_width,
300            max_level,
301        }
302    }
303}
304
305impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
306    type Buffer = Vec<i16>;
307
308    fn set_data(&mut self, encoding: Encoding, data: Bytes) {
309        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width))
310    }
311}
312
313impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl {
314    fn read_def_levels(
315        &mut self,
316        out: &mut Self::Buffer,
317        num_levels: usize,
318    ) -> Result<(usize, usize)> {
319        // TODO: Push vec into decoder (#5177)
320        let start = out.len();
321        out.resize(start + num_levels, 0);
322        let levels_read = self.decoder.as_mut().unwrap().read(&mut out[start..])?;
323        out.truncate(start + levels_read);
324
325        let iter = out.iter().skip(start);
326        let values_read = iter.filter(|x| **x == self.max_level).count();
327        Ok((values_read, levels_read))
328    }
329
330    fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)> {
331        let mut level_skip = 0;
332        let mut value_skip = 0;
333        let mut buf: Vec<i16> = vec![];
334        while level_skip < num_levels {
335            let remaining_levels = num_levels - level_skip;
336
337            let to_read = remaining_levels.min(SKIP_BUFFER_SIZE);
338            buf.resize(to_read, 0);
339            let (values_read, levels_read) = self.read_def_levels(&mut buf, to_read)?;
340            if levels_read == 0 {
341                // Reached end of page
342                break;
343            }
344
345            level_skip += levels_read;
346            value_skip += values_read;
347        }
348
349        Ok((value_skip, level_skip))
350    }
351}
352
353pub(crate) const REPETITION_LEVELS_BATCH_SIZE: usize = 1024;
354
355/// An implementation of [`RepetitionLevelDecoder`] for `[i16]`
356pub struct RepetitionLevelDecoderImpl {
357    decoder: Option<LevelDecoder>,
358    bit_width: u8,
359    buffer: Box<[i16; REPETITION_LEVELS_BATCH_SIZE]>,
360    buffer_len: usize,
361    buffer_offset: usize,
362    has_partial: bool,
363}
364
365impl RepetitionLevelDecoderImpl {
366    pub fn new(max_level: i16) -> Self {
367        let bit_width = num_required_bits(max_level as u64);
368        Self {
369            decoder: None,
370            bit_width,
371            buffer: Box::new([0; REPETITION_LEVELS_BATCH_SIZE]),
372            buffer_offset: 0,
373            buffer_len: 0,
374            has_partial: false,
375        }
376    }
377
378    fn fill_buf(&mut self) -> Result<()> {
379        let read = self.decoder.as_mut().unwrap().read(self.buffer.as_mut())?;
380        self.buffer_offset = 0;
381        self.buffer_len = read;
382        Ok(())
383    }
384
385    /// Inspects the buffered repetition levels in the range `self.buffer_offset..self.buffer_len`
386    /// and returns the number of "complete" records along with the corresponding number of values
387    ///
388    /// A "complete" record is one where the buffer contains a subsequent repetition level of 0
389    fn count_records(&mut self, records_to_read: usize, num_levels: usize) -> (bool, usize, usize) {
390        let mut records_read = 0;
391
392        let levels = num_levels.min(self.buffer_len - self.buffer_offset);
393        let buf = self.buffer.iter().skip(self.buffer_offset);
394        for (idx, item) in buf.take(levels).enumerate() {
395            if *item == 0 && (idx != 0 || self.has_partial) {
396                records_read += 1;
397
398                if records_read == records_to_read {
399                    return (false, records_read, idx);
400                }
401            }
402        }
403        // Either ran out of space in `num_levels` or data in `self.buffer`
404        (true, records_read, levels)
405    }
406}
407
408impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
409    type Buffer = Vec<i16>;
410
411    fn set_data(&mut self, encoding: Encoding, data: Bytes) {
412        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width));
413        self.buffer_len = 0;
414        self.buffer_offset = 0;
415    }
416}
417
418impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl {
419    fn read_rep_levels(
420        &mut self,
421        out: &mut Self::Buffer,
422        num_records: usize,
423        num_levels: usize,
424    ) -> Result<(usize, usize)> {
425        let mut total_records_read = 0;
426        let mut total_levels_read = 0;
427
428        while total_records_read < num_records && total_levels_read < num_levels {
429            if self.buffer_len == self.buffer_offset {
430                self.fill_buf()?;
431                if self.buffer_len == 0 {
432                    break;
433                }
434            }
435
436            let (partial, records_read, levels_read) = self.count_records(
437                num_records - total_records_read,
438                num_levels - total_levels_read,
439            );
440
441            out.extend_from_slice(
442                &self.buffer[self.buffer_offset..self.buffer_offset + levels_read],
443            );
444
445            total_levels_read += levels_read;
446            total_records_read += records_read;
447            self.buffer_offset += levels_read;
448            self.has_partial = partial;
449        }
450        Ok((total_records_read, total_levels_read))
451    }
452
453    fn skip_rep_levels(&mut self, num_records: usize, num_levels: usize) -> Result<(usize, usize)> {
454        let mut total_records_read = 0;
455        let mut total_levels_read = 0;
456
457        while total_records_read < num_records && total_levels_read < num_levels {
458            if self.buffer_len == self.buffer_offset {
459                self.fill_buf()?;
460                if self.buffer_len == 0 {
461                    break;
462                }
463            }
464
465            let (partial, records_read, levels_read) = self.count_records(
466                num_records - total_records_read,
467                num_levels - total_levels_read,
468            );
469
470            total_levels_read += levels_read;
471            total_records_read += records_read;
472            self.buffer_offset += levels_read;
473            self.has_partial = partial;
474        }
475        Ok((total_records_read, total_levels_read))
476    }
477
478    fn flush_partial(&mut self) -> bool {
479        std::mem::take(&mut self.has_partial)
480    }
481}
482
483#[cfg(test)]
484mod tests {
485    use super::*;
486    use crate::encodings::rle::RleEncoder;
487    use rand::prelude::*;
488
489    #[test]
490    fn test_skip_padding() {
491        let mut encoder = RleEncoder::new(1, 1024);
492        encoder.put(0);
493        (0..3).for_each(|_| encoder.put(1));
494        let data = Bytes::from(encoder.consume());
495
496        let mut decoder = RepetitionLevelDecoderImpl::new(1);
497        decoder.set_data(Encoding::RLE, data.clone());
498        let (_, levels) = decoder.skip_rep_levels(100, 4).unwrap();
499        assert_eq!(levels, 4);
500
501        // The length of the final bit packed run is ambiguous, so without the correct
502        // levels limit, it will decode zero padding
503        let mut decoder = RepetitionLevelDecoderImpl::new(1);
504        decoder.set_data(Encoding::RLE, data);
505        let (_, levels) = decoder.skip_rep_levels(100, 6).unwrap();
506        assert_eq!(levels, 6);
507    }
508
509    #[test]
510    fn test_skip_rep_levels() {
511        for _ in 0..10 {
512            let mut rng = thread_rng();
513            let total_len = 10000_usize;
514            let mut encoded: Vec<i16> = (0..total_len).map(|_| rng.gen_range(0..5)).collect();
515            encoded[0] = 0;
516            let mut encoder = RleEncoder::new(3, 1024);
517            for v in &encoded {
518                encoder.put(*v as _)
519            }
520            let data = Bytes::from(encoder.consume());
521
522            let mut decoder = RepetitionLevelDecoderImpl::new(5);
523            decoder.set_data(Encoding::RLE, data);
524
525            let total_records = encoded.iter().filter(|x| **x == 0).count();
526            let mut remaining_records = total_records;
527            let mut remaining_levels = encoded.len();
528            loop {
529                let skip = rng.gen_bool(0.5);
530                let records = rng.gen_range(1..=remaining_records.min(5));
531                let (records_read, levels_read) = if skip {
532                    decoder.skip_rep_levels(records, remaining_levels).unwrap()
533                } else {
534                    let mut decoded = Vec::new();
535                    let (records_read, levels_read) = decoder
536                        .read_rep_levels(&mut decoded, records, remaining_levels)
537                        .unwrap();
538
539                    assert_eq!(
540                        decoded,
541                        encoded[encoded.len() - remaining_levels..][..levels_read]
542                    );
543                    (records_read, levels_read)
544                };
545
546                remaining_levels = remaining_levels.checked_sub(levels_read).unwrap();
547                if remaining_levels == 0 {
548                    assert_eq!(records_read + 1, records);
549                    assert_eq!(records, remaining_records);
550                    break;
551                }
552                assert_eq!(records_read, records);
553                remaining_records -= records;
554                assert_ne!(remaining_records, 0);
555            }
556        }
557    }
558}