parquet/column/reader/
decoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes::Bytes;
19
20use crate::basic::{Encoding, EncodingMask};
21use crate::data_type::DataType;
22use crate::encodings::{
23    decoding::{Decoder, DictDecoder, PlainDecoder, get_decoder},
24    rle::RleDecoder,
25};
26use crate::errors::{ParquetError, Result};
27use crate::schema::types::ColumnDescPtr;
28use crate::util::bit_util::{BitReader, num_required_bits};
29
30/// Decodes level data
31pub trait ColumnLevelDecoder {
32    type Buffer;
33
34    /// Set data for this [`ColumnLevelDecoder`]
35    fn set_data(&mut self, encoding: Encoding, data: Bytes);
36}
37
38pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
39    /// Read up to `max_records` of repetition level data into `out` returning the number
40    /// of complete records and levels read
41    ///
42    /// A record only ends when the data contains a subsequent repetition level of 0,
43    /// it is therefore left to the caller to delimit the final record in a column
44    ///
45    /// # Panics
46    ///
47    /// Implementations may panic if `range` overlaps with already written data
48    fn read_rep_levels(
49        &mut self,
50        out: &mut Self::Buffer,
51        num_records: usize,
52        num_levels: usize,
53    ) -> Result<(usize, usize)>;
54
55    /// Skips over up to `num_levels` repetition levels corresponding to `num_records` records,
56    /// where a record is delimited by a repetition level of 0
57    ///
58    /// Returns the number of records skipped, and the number of levels skipped
59    ///
60    /// A record only ends when the data contains a subsequent repetition level of 0,
61    /// it is therefore left to the caller to delimit the final record in a column
62    fn skip_rep_levels(&mut self, num_records: usize, num_levels: usize) -> Result<(usize, usize)>;
63
64    /// Flush any partially read or skipped record
65    fn flush_partial(&mut self) -> bool;
66}
67
68pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
69    /// Read up to `num_levels` definition levels into `out`.
70    ///
71    /// Returns the number of values read, and the number of levels read.
72    ///
73    /// # Panics
74    ///
75    /// Implementations may panic if `range` overlaps with already written data
76    fn read_def_levels(
77        &mut self,
78        out: &mut Self::Buffer,
79        num_levels: usize,
80    ) -> Result<(usize, usize)>;
81
82    /// Skips over `num_levels` definition levels.
83    ///
84    /// Returns the number of values skipped, and the number of levels skipped.
85    fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)>;
86}
87
88/// Decodes value data
89pub trait ColumnValueDecoder {
90    type Buffer;
91
92    /// Create a new [`ColumnValueDecoder`]
93    fn new(col: &ColumnDescPtr) -> Self;
94
95    /// Set the current dictionary page
96    fn set_dict(
97        &mut self,
98        buf: Bytes,
99        num_values: u32,
100        encoding: Encoding,
101        is_sorted: bool,
102    ) -> Result<()>;
103
104    /// Set the current data page
105    ///
106    /// - `encoding` - the encoding of the page
107    /// - `data` - a point to the page's uncompressed value data
108    /// - `num_levels` - the number of levels contained within the page, i.e. values including nulls
109    /// - `num_values` - the number of non-null values contained within the page (V2 page only)
110    ///
111    /// Note: data encoded with [`Encoding::RLE`] may not know its exact length, as the final
112    /// run may be zero-padded. As such if `num_values` is not provided (i.e. `None`),
113    /// subsequent calls to `ColumnValueDecoder::read` may yield more values than
114    /// non-null definition levels within the page
115    fn set_data(
116        &mut self,
117        encoding: Encoding,
118        data: Bytes,
119        num_levels: usize,
120        num_values: Option<usize>,
121    ) -> Result<()>;
122
123    /// Read up to `num_values` values into `out`
124    ///
125    /// # Panics
126    ///
127    /// Implementations may panic if `range` overlaps with already written data
128    ///
129    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize>;
130
131    /// Skips over `num_values` values
132    ///
133    /// Returns the number of values skipped
134    fn skip_values(&mut self, num_values: usize) -> Result<usize>;
135}
136
137/// Bucket-based storage for decoder instances keyed by `Encoding`.
138///
139/// This replaces `HashMap` lookups with direct indexing to avoid hashing overhead in the
140/// hot decoding paths.
141const ENCODING_SLOTS: usize = Encoding::MAX_DISCRIMINANT as usize + 1;
142
143/// An implementation of [`ColumnValueDecoder`] for `[T::T]`
144pub struct ColumnValueDecoderImpl<T: DataType> {
145    descr: ColumnDescPtr,
146
147    current_encoding: Option<Encoding>,
148
149    /// Cache of decoders for existing encodings.
150    /// Uses `EncodingMask` and dense storage keyed by encoding discriminant.
151    decoder_mask: EncodingMask,
152    decoders: [Option<Box<dyn Decoder<T>>>; ENCODING_SLOTS],
153}
154
155impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
156    type Buffer = Vec<T::T>;
157
158    fn new(descr: &ColumnDescPtr) -> Self {
159        Self {
160            descr: descr.clone(),
161            current_encoding: None,
162            decoder_mask: EncodingMask::default(),
163            decoders: std::array::from_fn(|_| None),
164        }
165    }
166
167    fn set_dict(
168        &mut self,
169        buf: Bytes,
170        num_values: u32,
171        mut encoding: Encoding,
172        _is_sorted: bool,
173    ) -> Result<()> {
174        if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
175            encoding = Encoding::RLE_DICTIONARY
176        }
177
178        if self.decoder_mask.is_set(encoding) {
179            return Err(general_err!("Column cannot have more than one dictionary"));
180        }
181
182        if encoding == Encoding::RLE_DICTIONARY {
183            let mut dictionary = PlainDecoder::<T>::new(self.descr.type_length());
184            dictionary.set_data(buf, num_values as usize)?;
185
186            let mut decoder = DictDecoder::new();
187            decoder.set_dict(Box::new(dictionary))?;
188            self.decoders[encoding as usize] = Some(Box::new(decoder));
189            self.decoder_mask.insert(encoding);
190            Ok(())
191        } else {
192            Err(nyi_err!(
193                "Invalid/Unsupported encoding type for dictionary: {}",
194                encoding
195            ))
196        }
197    }
198
199    fn set_data(
200        &mut self,
201        mut encoding: Encoding,
202        data: Bytes,
203        num_levels: usize,
204        num_values: Option<usize>,
205    ) -> Result<()> {
206        if encoding == Encoding::PLAIN_DICTIONARY {
207            encoding = Encoding::RLE_DICTIONARY;
208        }
209
210        let decoder = if encoding == Encoding::RLE_DICTIONARY {
211            self.decoders[encoding as usize]
212                .as_mut()
213                .expect("Decoder for dict should have been set")
214        } else {
215            let slot = encoding as usize;
216            if self.decoders[slot].is_none() {
217                let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
218                self.decoders[slot] = Some(data_decoder);
219                self.decoder_mask.insert(encoding);
220            }
221            self.decoders[slot]
222                .as_mut()
223                .expect("decoder should have been inserted")
224        };
225
226        decoder.set_data(data, num_values.unwrap_or(num_levels))?;
227        self.current_encoding = Some(encoding);
228        Ok(())
229    }
230
231    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
232        let encoding = self
233            .current_encoding
234            .expect("current_encoding should be set");
235
236        let current_decoder = self.decoders[encoding as usize]
237            .as_mut()
238            .unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));
239
240        // TODO: Push vec into decoder (#5177)
241        let start = out.len();
242        out.resize(start + num_values, T::T::default());
243        let read = current_decoder.get(&mut out[start..])?;
244        out.truncate(start + read);
245        Ok(read)
246    }
247
248    fn skip_values(&mut self, num_values: usize) -> Result<usize> {
249        let encoding = self
250            .current_encoding
251            .expect("current_encoding should be set");
252
253        let current_decoder = self.decoders[encoding as usize]
254            .as_mut()
255            .unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));
256
257        current_decoder.skip(num_values)
258    }
259}
260
261const SKIP_BUFFER_SIZE: usize = 1024;
262
263enum LevelDecoder {
264    Packed(BitReader, u8),
265    Rle(RleDecoder),
266}
267
268impl LevelDecoder {
269    fn new(encoding: Encoding, data: Bytes, bit_width: u8) -> Self {
270        match encoding {
271            Encoding::RLE => {
272                let mut decoder = RleDecoder::new(bit_width);
273                decoder.set_data(data);
274                Self::Rle(decoder)
275            }
276            #[allow(deprecated)]
277            Encoding::BIT_PACKED => Self::Packed(BitReader::new(data), bit_width),
278            _ => unreachable!("invalid level encoding: {}", encoding),
279        }
280    }
281
282    fn read(&mut self, out: &mut [i16]) -> Result<usize> {
283        match self {
284            Self::Packed(reader, bit_width) => {
285                Ok(reader.get_batch::<i16>(out, *bit_width as usize))
286            }
287            Self::Rle(reader) => Ok(reader.get_batch(out)?),
288        }
289    }
290}
291
292/// An implementation of [`DefinitionLevelDecoder`] for `[i16]`
293pub struct DefinitionLevelDecoderImpl {
294    decoder: Option<LevelDecoder>,
295    bit_width: u8,
296    max_level: i16,
297}
298
299impl DefinitionLevelDecoderImpl {
300    pub fn new(max_level: i16) -> Self {
301        let bit_width = num_required_bits(max_level as u64);
302        Self {
303            decoder: None,
304            bit_width,
305            max_level,
306        }
307    }
308}
309
310impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
311    type Buffer = Vec<i16>;
312
313    fn set_data(&mut self, encoding: Encoding, data: Bytes) {
314        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width))
315    }
316}
317
318impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl {
319    fn read_def_levels(
320        &mut self,
321        out: &mut Self::Buffer,
322        num_levels: usize,
323    ) -> Result<(usize, usize)> {
324        // TODO: Push vec into decoder (#5177)
325        let start = out.len();
326        out.resize(start + num_levels, 0);
327        let levels_read = self.decoder.as_mut().unwrap().read(&mut out[start..])?;
328        out.truncate(start + levels_read);
329
330        let iter = out.iter().skip(start);
331        let values_read = iter.filter(|x| **x == self.max_level).count();
332        Ok((values_read, levels_read))
333    }
334
335    fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)> {
336        let mut level_skip = 0;
337        let mut value_skip = 0;
338        let mut buf: Vec<i16> = vec![];
339        while level_skip < num_levels {
340            let remaining_levels = num_levels - level_skip;
341
342            let to_read = remaining_levels.min(SKIP_BUFFER_SIZE);
343            buf.resize(to_read, 0);
344            let (values_read, levels_read) = self.read_def_levels(&mut buf, to_read)?;
345            if levels_read == 0 {
346                // Reached end of page
347                break;
348            }
349
350            level_skip += levels_read;
351            value_skip += values_read;
352        }
353
354        Ok((value_skip, level_skip))
355    }
356}
357
358pub(crate) const REPETITION_LEVELS_BATCH_SIZE: usize = 1024;
359
360/// An implementation of [`RepetitionLevelDecoder`] for `[i16]`
361pub struct RepetitionLevelDecoderImpl {
362    decoder: Option<LevelDecoder>,
363    bit_width: u8,
364    buffer: Box<[i16; REPETITION_LEVELS_BATCH_SIZE]>,
365    buffer_len: usize,
366    buffer_offset: usize,
367    has_partial: bool,
368}
369
370impl RepetitionLevelDecoderImpl {
371    pub fn new(max_level: i16) -> Self {
372        let bit_width = num_required_bits(max_level as u64);
373        Self {
374            decoder: None,
375            bit_width,
376            buffer: Box::new([0; REPETITION_LEVELS_BATCH_SIZE]),
377            buffer_offset: 0,
378            buffer_len: 0,
379            has_partial: false,
380        }
381    }
382
383    fn fill_buf(&mut self) -> Result<()> {
384        let read = self.decoder.as_mut().unwrap().read(self.buffer.as_mut())?;
385        self.buffer_offset = 0;
386        self.buffer_len = read;
387        Ok(())
388    }
389
390    /// Inspects the buffered repetition levels in the range `self.buffer_offset..self.buffer_len`
391    /// and returns the number of "complete" records along with the corresponding number of values
392    ///
393    /// A "complete" record is one where the buffer contains a subsequent repetition level of 0
394    fn count_records(&mut self, records_to_read: usize, num_levels: usize) -> (bool, usize, usize) {
395        let mut records_read = 0;
396
397        let levels = num_levels.min(self.buffer_len - self.buffer_offset);
398        let buf = self.buffer.iter().skip(self.buffer_offset);
399        for (idx, item) in buf.take(levels).enumerate() {
400            if *item == 0 && (idx != 0 || self.has_partial) {
401                records_read += 1;
402
403                if records_read == records_to_read {
404                    return (false, records_read, idx);
405                }
406            }
407        }
408        // Either ran out of space in `num_levels` or data in `self.buffer`
409        (true, records_read, levels)
410    }
411}
412
413impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
414    type Buffer = Vec<i16>;
415
416    fn set_data(&mut self, encoding: Encoding, data: Bytes) {
417        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width));
418        self.buffer_len = 0;
419        self.buffer_offset = 0;
420    }
421}
422
423impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl {
424    fn read_rep_levels(
425        &mut self,
426        out: &mut Self::Buffer,
427        num_records: usize,
428        num_levels: usize,
429    ) -> Result<(usize, usize)> {
430        let mut total_records_read = 0;
431        let mut total_levels_read = 0;
432
433        while total_records_read < num_records && total_levels_read < num_levels {
434            if self.buffer_len == self.buffer_offset {
435                self.fill_buf()?;
436                if self.buffer_len == 0 {
437                    break;
438                }
439            }
440
441            let (partial, records_read, levels_read) = self.count_records(
442                num_records - total_records_read,
443                num_levels - total_levels_read,
444            );
445
446            out.extend_from_slice(
447                &self.buffer[self.buffer_offset..self.buffer_offset + levels_read],
448            );
449
450            total_levels_read += levels_read;
451            total_records_read += records_read;
452            self.buffer_offset += levels_read;
453            self.has_partial = partial;
454        }
455        Ok((total_records_read, total_levels_read))
456    }
457
458    fn skip_rep_levels(&mut self, num_records: usize, num_levels: usize) -> Result<(usize, usize)> {
459        let mut total_records_read = 0;
460        let mut total_levels_read = 0;
461
462        while total_records_read < num_records && total_levels_read < num_levels {
463            if self.buffer_len == self.buffer_offset {
464                self.fill_buf()?;
465                if self.buffer_len == 0 {
466                    break;
467                }
468            }
469
470            let (partial, records_read, levels_read) = self.count_records(
471                num_records - total_records_read,
472                num_levels - total_levels_read,
473            );
474
475            total_levels_read += levels_read;
476            total_records_read += records_read;
477            self.buffer_offset += levels_read;
478            self.has_partial = partial;
479        }
480        Ok((total_records_read, total_levels_read))
481    }
482
483    fn flush_partial(&mut self) -> bool {
484        std::mem::take(&mut self.has_partial)
485    }
486}
487
488#[cfg(test)]
489mod tests {
490    use super::*;
491    use crate::encodings::rle::RleEncoder;
492    use rand::{prelude::*, rng};
493
494    #[test]
495    fn test_skip_padding() {
496        let mut encoder = RleEncoder::new(1, 1024);
497        encoder.put(0);
498        (0..3).for_each(|_| encoder.put(1));
499        let data = Bytes::from(encoder.consume());
500
501        let mut decoder = RepetitionLevelDecoderImpl::new(1);
502        decoder.set_data(Encoding::RLE, data.clone());
503        let (_, levels) = decoder.skip_rep_levels(100, 4).unwrap();
504        assert_eq!(levels, 4);
505
506        // The length of the final bit packed run is ambiguous, so without the correct
507        // levels limit, it will decode zero padding
508        let mut decoder = RepetitionLevelDecoderImpl::new(1);
509        decoder.set_data(Encoding::RLE, data);
510        let (_, levels) = decoder.skip_rep_levels(100, 6).unwrap();
511        assert_eq!(levels, 6);
512    }
513
514    #[test]
515    fn test_skip_rep_levels() {
516        for _ in 0..10 {
517            let mut rng = rng();
518            let total_len = 10000_usize;
519            let mut encoded: Vec<i16> = (0..total_len).map(|_| rng.random_range(0..5)).collect();
520            encoded[0] = 0;
521            let mut encoder = RleEncoder::new(3, 1024);
522            for v in &encoded {
523                encoder.put(*v as _)
524            }
525            let data = Bytes::from(encoder.consume());
526
527            let mut decoder = RepetitionLevelDecoderImpl::new(5);
528            decoder.set_data(Encoding::RLE, data);
529
530            let total_records = encoded.iter().filter(|x| **x == 0).count();
531            let mut remaining_records = total_records;
532            let mut remaining_levels = encoded.len();
533            loop {
534                let skip = rng.random_bool(0.5);
535                let records = rng.random_range(1..=remaining_records.min(5));
536                let (records_read, levels_read) = if skip {
537                    decoder.skip_rep_levels(records, remaining_levels).unwrap()
538                } else {
539                    let mut decoded = Vec::new();
540                    let (records_read, levels_read) = decoder
541                        .read_rep_levels(&mut decoded, records, remaining_levels)
542                        .unwrap();
543
544                    assert_eq!(
545                        decoded,
546                        encoded[encoded.len() - remaining_levels..][..levels_read]
547                    );
548                    (records_read, levels_read)
549                };
550
551                remaining_levels = remaining_levels.checked_sub(levels_read).unwrap();
552                if remaining_levels == 0 {
553                    assert_eq!(records_read + 1, records);
554                    assert_eq!(records, remaining_records);
555                    break;
556                }
557                assert_eq!(records_read, records);
558                remaining_records -= records;
559                assert_ne!(remaining_records, 0);
560            }
561        }
562    }
563}