parquet/column/reader/
decoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes::Bytes;
19
20use crate::basic::{Encoding, EncodingMask};
21use crate::data_type::DataType;
22use crate::encodings::{
23    decoding::{Decoder, DictDecoder, PlainDecoder, get_decoder},
24    rle::RleDecoder,
25};
26use crate::errors::{ParquetError, Result};
27use crate::schema::types::ColumnDescPtr;
28use crate::util::bit_util::{BitReader, num_required_bits};
29
30/// Decodes level data
31pub trait ColumnLevelDecoder {
32    type Buffer;
33
34    /// Set data for this [`ColumnLevelDecoder`]
35    fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()>;
36}
37
38pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
39    /// Read up to `max_records` of repetition level data into `out` returning the number
40    /// of complete records and levels read
41    ///
42    /// A record only ends when the data contains a subsequent repetition level of 0,
43    /// it is therefore left to the caller to delimit the final record in a column
44    ///
45    /// # Panics
46    ///
47    /// Implementations may panic if `range` overlaps with already written data
48    fn read_rep_levels(
49        &mut self,
50        out: &mut Self::Buffer,
51        num_records: usize,
52        num_levels: usize,
53    ) -> Result<(usize, usize)>;
54
55    /// Skips over up to `num_levels` repetition levels corresponding to `num_records` records,
56    /// where a record is delimited by a repetition level of 0
57    ///
58    /// Returns the number of records skipped, and the number of levels skipped
59    ///
60    /// A record only ends when the data contains a subsequent repetition level of 0,
61    /// it is therefore left to the caller to delimit the final record in a column
62    fn skip_rep_levels(&mut self, num_records: usize, num_levels: usize) -> Result<(usize, usize)>;
63
64    /// Flush any partially read or skipped record
65    fn flush_partial(&mut self) -> bool;
66}
67
68pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
69    /// Read up to `num_levels` definition levels into `out`.
70    ///
71    /// Returns the number of values read, and the number of levels read.
72    ///
73    /// # Panics
74    ///
75    /// Implementations may panic if `range` overlaps with already written data
76    fn read_def_levels(
77        &mut self,
78        out: &mut Self::Buffer,
79        num_levels: usize,
80    ) -> Result<(usize, usize)>;
81
82    /// Skips over `num_levels` definition levels.
83    ///
84    /// Returns the number of values skipped, and the number of levels skipped.
85    fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)>;
86}
87
88/// Decodes value data
89pub trait ColumnValueDecoder {
90    type Buffer;
91
92    /// Create a new [`ColumnValueDecoder`]
93    fn new(col: &ColumnDescPtr) -> Self;
94
95    /// Set the current dictionary page
96    fn set_dict(
97        &mut self,
98        buf: Bytes,
99        num_values: u32,
100        encoding: Encoding,
101        is_sorted: bool,
102    ) -> Result<()>;
103
104    /// Set the current data page
105    ///
106    /// - `encoding` - the encoding of the page
107    /// - `data` - a point to the page's uncompressed value data
108    /// - `num_levels` - the number of levels contained within the page, i.e. values including nulls
109    /// - `num_values` - the number of non-null values contained within the page (V2 page only)
110    ///
111    /// Note: data encoded with [`Encoding::RLE`] may not know its exact length, as the final
112    /// run may be zero-padded. As such if `num_values` is not provided (i.e. `None`),
113    /// subsequent calls to `ColumnValueDecoder::read` may yield more values than
114    /// non-null definition levels within the page
115    fn set_data(
116        &mut self,
117        encoding: Encoding,
118        data: Bytes,
119        num_levels: usize,
120        num_values: Option<usize>,
121    ) -> Result<()>;
122
123    /// Read up to `num_values` values into `out`
124    ///
125    /// # Panics
126    ///
127    /// Implementations may panic if `range` overlaps with already written data
128    ///
129    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize>;
130
131    /// Skips over `num_values` values
132    ///
133    /// Returns the number of values skipped
134    fn skip_values(&mut self, num_values: usize) -> Result<usize>;
135}
136
137/// Bucket-based storage for decoder instances keyed by `Encoding`.
138///
139/// This replaces `HashMap` lookups with direct indexing to avoid hashing overhead in the
140/// hot decoding paths.
141const ENCODING_SLOTS: usize = Encoding::MAX_DISCRIMINANT as usize + 1;
142
143/// An implementation of [`ColumnValueDecoder`] for `[T::T]`
144pub struct ColumnValueDecoderImpl<T: DataType> {
145    descr: ColumnDescPtr,
146
147    current_encoding: Option<Encoding>,
148
149    /// Cache of decoders for existing encodings.
150    /// Uses `EncodingMask` and dense storage keyed by encoding discriminant.
151    decoder_mask: EncodingMask,
152    decoders: [Option<Box<dyn Decoder<T>>>; ENCODING_SLOTS],
153}
154
155impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
156    type Buffer = Vec<T::T>;
157
158    fn new(descr: &ColumnDescPtr) -> Self {
159        Self {
160            descr: descr.clone(),
161            current_encoding: None,
162            decoder_mask: EncodingMask::default(),
163            decoders: std::array::from_fn(|_| None),
164        }
165    }
166
167    fn set_dict(
168        &mut self,
169        buf: Bytes,
170        num_values: u32,
171        mut encoding: Encoding,
172        _is_sorted: bool,
173    ) -> Result<()> {
174        if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
175            encoding = Encoding::RLE_DICTIONARY
176        }
177
178        if self.decoder_mask.is_set(encoding) {
179            return Err(general_err!("Column cannot have more than one dictionary"));
180        }
181
182        if encoding == Encoding::RLE_DICTIONARY {
183            let mut dictionary = PlainDecoder::<T>::new(self.descr.type_length());
184            dictionary.set_data(buf, num_values as usize)?;
185
186            let mut decoder = DictDecoder::new();
187            decoder.set_dict(Box::new(dictionary))?;
188            self.decoders[encoding as usize] = Some(Box::new(decoder));
189            self.decoder_mask.insert(encoding);
190            Ok(())
191        } else {
192            Err(nyi_err!(
193                "Invalid/Unsupported encoding type for dictionary: {}",
194                encoding
195            ))
196        }
197    }
198
199    fn set_data(
200        &mut self,
201        mut encoding: Encoding,
202        data: Bytes,
203        num_levels: usize,
204        num_values: Option<usize>,
205    ) -> Result<()> {
206        if encoding == Encoding::PLAIN_DICTIONARY {
207            encoding = Encoding::RLE_DICTIONARY;
208        }
209
210        let decoder = if encoding == Encoding::RLE_DICTIONARY {
211            self.decoders[encoding as usize]
212                .as_mut()
213                .expect("Decoder for dict should have been set")
214        } else {
215            let slot = encoding as usize;
216            if self.decoders[slot].is_none() {
217                let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
218                self.decoders[slot] = Some(data_decoder);
219                self.decoder_mask.insert(encoding);
220            }
221            self.decoders[slot]
222                .as_mut()
223                .expect("decoder should have been inserted")
224        };
225
226        decoder.set_data(data, num_values.unwrap_or(num_levels))?;
227        self.current_encoding = Some(encoding);
228        Ok(())
229    }
230
231    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
232        let encoding = self
233            .current_encoding
234            .expect("current_encoding should be set");
235
236        let current_decoder = self.decoders[encoding as usize]
237            .as_mut()
238            .unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));
239
240        // TODO: Push vec into decoder (#5177)
241        let start = out.len();
242        out.resize(start + num_values, T::T::default());
243        let read = current_decoder.get(&mut out[start..])?;
244        out.truncate(start + read);
245        Ok(read)
246    }
247
248    fn skip_values(&mut self, num_values: usize) -> Result<usize> {
249        let encoding = self
250            .current_encoding
251            .expect("current_encoding should be set");
252
253        let current_decoder = self.decoders[encoding as usize]
254            .as_mut()
255            .unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));
256
257        current_decoder.skip(num_values)
258    }
259}
260
261const SKIP_BUFFER_SIZE: usize = 1024;
262
263enum LevelDecoder {
264    Packed(BitReader, u8),
265    Rle(RleDecoder),
266}
267
268impl LevelDecoder {
269    fn new(encoding: Encoding, data: Bytes, bit_width: u8) -> Result<Self> {
270        match encoding {
271            Encoding::RLE => {
272                let mut decoder = RleDecoder::new(bit_width);
273                decoder.set_data(data)?;
274                Ok(Self::Rle(decoder))
275            }
276            #[allow(deprecated)]
277            Encoding::BIT_PACKED => Ok(Self::Packed(BitReader::new(data), bit_width)),
278            _ => unreachable!("invalid level encoding: {}", encoding),
279        }
280    }
281
282    fn read(&mut self, out: &mut [i16]) -> Result<usize> {
283        match self {
284            Self::Packed(reader, bit_width) => {
285                Ok(reader.get_batch::<i16>(out, *bit_width as usize))
286            }
287            Self::Rle(reader) => Ok(reader.get_batch(out)?),
288        }
289    }
290}
291
292/// An implementation of [`DefinitionLevelDecoder`] for `[i16]`
293pub struct DefinitionLevelDecoderImpl {
294    decoder: Option<LevelDecoder>,
295    bit_width: u8,
296    max_level: i16,
297}
298
299impl DefinitionLevelDecoderImpl {
300    pub fn new(max_level: i16) -> Self {
301        let bit_width = num_required_bits(max_level as u64);
302        Self {
303            decoder: None,
304            bit_width,
305            max_level,
306        }
307    }
308}
309
310impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
311    type Buffer = Vec<i16>;
312
313    fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> {
314        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)?);
315        Ok(())
316    }
317}
318
319impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl {
320    fn read_def_levels(
321        &mut self,
322        out: &mut Self::Buffer,
323        num_levels: usize,
324    ) -> Result<(usize, usize)> {
325        // TODO: Push vec into decoder (#5177)
326        let start = out.len();
327        out.resize(start + num_levels, 0);
328        let levels_read = self.decoder.as_mut().unwrap().read(&mut out[start..])?;
329        out.truncate(start + levels_read);
330
331        let iter = out.iter().skip(start);
332        let values_read = iter.filter(|x| **x == self.max_level).count();
333        Ok((values_read, levels_read))
334    }
335
336    fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)> {
337        let mut level_skip = 0;
338        let mut value_skip = 0;
339        let mut buf: Vec<i16> = vec![];
340        while level_skip < num_levels {
341            let remaining_levels = num_levels - level_skip;
342
343            let to_read = remaining_levels.min(SKIP_BUFFER_SIZE);
344            buf.resize(to_read, 0);
345            let (values_read, levels_read) = self.read_def_levels(&mut buf, to_read)?;
346            if levels_read == 0 {
347                // Reached end of page
348                break;
349            }
350
351            level_skip += levels_read;
352            value_skip += values_read;
353        }
354
355        Ok((value_skip, level_skip))
356    }
357}
358
359pub(crate) const REPETITION_LEVELS_BATCH_SIZE: usize = 1024;
360
361/// An implementation of [`RepetitionLevelDecoder`] for `[i16]`
362pub struct RepetitionLevelDecoderImpl {
363    decoder: Option<LevelDecoder>,
364    bit_width: u8,
365    buffer: Box<[i16; REPETITION_LEVELS_BATCH_SIZE]>,
366    buffer_len: usize,
367    buffer_offset: usize,
368    has_partial: bool,
369}
370
371impl RepetitionLevelDecoderImpl {
372    pub fn new(max_level: i16) -> Self {
373        let bit_width = num_required_bits(max_level as u64);
374        Self {
375            decoder: None,
376            bit_width,
377            buffer: Box::new([0; REPETITION_LEVELS_BATCH_SIZE]),
378            buffer_offset: 0,
379            buffer_len: 0,
380            has_partial: false,
381        }
382    }
383
384    fn fill_buf(&mut self) -> Result<()> {
385        let read = self.decoder.as_mut().unwrap().read(self.buffer.as_mut())?;
386        self.buffer_offset = 0;
387        self.buffer_len = read;
388        Ok(())
389    }
390
391    /// Inspects the buffered repetition levels in the range `self.buffer_offset..self.buffer_len`
392    /// and returns the number of "complete" records along with the corresponding number of values
393    ///
394    /// A "complete" record is one where the buffer contains a subsequent repetition level of 0
395    fn count_records(&mut self, records_to_read: usize, num_levels: usize) -> (bool, usize, usize) {
396        let mut records_read = 0;
397
398        let levels = num_levels.min(self.buffer_len - self.buffer_offset);
399        let buf = self.buffer.iter().skip(self.buffer_offset);
400        for (idx, item) in buf.take(levels).enumerate() {
401            if *item == 0 && (idx != 0 || self.has_partial) {
402                records_read += 1;
403
404                if records_read == records_to_read {
405                    return (false, records_read, idx);
406                }
407            }
408        }
409        // Either ran out of space in `num_levels` or data in `self.buffer`
410        (true, records_read, levels)
411    }
412}
413
414impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
415    type Buffer = Vec<i16>;
416
417    fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> {
418        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)?);
419        self.buffer_len = 0;
420        self.buffer_offset = 0;
421        Ok(())
422    }
423}
424
425impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl {
426    fn read_rep_levels(
427        &mut self,
428        out: &mut Self::Buffer,
429        num_records: usize,
430        num_levels: usize,
431    ) -> Result<(usize, usize)> {
432        let mut total_records_read = 0;
433        let mut total_levels_read = 0;
434
435        while total_records_read < num_records && total_levels_read < num_levels {
436            if self.buffer_len == self.buffer_offset {
437                self.fill_buf()?;
438                if self.buffer_len == 0 {
439                    break;
440                }
441            }
442
443            let (partial, records_read, levels_read) = self.count_records(
444                num_records - total_records_read,
445                num_levels - total_levels_read,
446            );
447
448            out.extend_from_slice(
449                &self.buffer[self.buffer_offset..self.buffer_offset + levels_read],
450            );
451
452            total_levels_read += levels_read;
453            total_records_read += records_read;
454            self.buffer_offset += levels_read;
455            self.has_partial = partial;
456        }
457        Ok((total_records_read, total_levels_read))
458    }
459
460    fn skip_rep_levels(&mut self, num_records: usize, num_levels: usize) -> Result<(usize, usize)> {
461        let mut total_records_read = 0;
462        let mut total_levels_read = 0;
463
464        while total_records_read < num_records && total_levels_read < num_levels {
465            if self.buffer_len == self.buffer_offset {
466                self.fill_buf()?;
467                if self.buffer_len == 0 {
468                    break;
469                }
470            }
471
472            let (partial, records_read, levels_read) = self.count_records(
473                num_records - total_records_read,
474                num_levels - total_levels_read,
475            );
476
477            total_levels_read += levels_read;
478            total_records_read += records_read;
479            self.buffer_offset += levels_read;
480            self.has_partial = partial;
481        }
482        Ok((total_records_read, total_levels_read))
483    }
484
485    fn flush_partial(&mut self) -> bool {
486        std::mem::take(&mut self.has_partial)
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use crate::encodings::rle::RleEncoder;
494    use rand::{prelude::*, rng};
495
496    #[test]
497    fn test_skip_padding() {
498        let mut encoder = RleEncoder::new(1, 1024);
499        encoder.put(0);
500        (0..3).for_each(|_| encoder.put(1));
501        let data = Bytes::from(encoder.consume());
502
503        let mut decoder = RepetitionLevelDecoderImpl::new(1);
504        decoder.set_data(Encoding::RLE, data.clone()).unwrap();
505        let (_, levels) = decoder.skip_rep_levels(100, 4).unwrap();
506        assert_eq!(levels, 4);
507
508        // The length of the final bit packed run is ambiguous, so without the correct
509        // levels limit, it will decode zero padding
510        let mut decoder = RepetitionLevelDecoderImpl::new(1);
511        decoder.set_data(Encoding::RLE, data).unwrap();
512        let (_, levels) = decoder.skip_rep_levels(100, 6).unwrap();
513        assert_eq!(levels, 6);
514    }
515
516    #[test]
517    fn test_skip_rep_levels() {
518        for _ in 0..10 {
519            let mut rng = rng();
520            let total_len = 10000_usize;
521            let mut encoded: Vec<i16> = (0..total_len).map(|_| rng.random_range(0..5)).collect();
522            encoded[0] = 0;
523            let mut encoder = RleEncoder::new(3, 1024);
524            for v in &encoded {
525                encoder.put(*v as _)
526            }
527            let data = Bytes::from(encoder.consume());
528
529            let mut decoder = RepetitionLevelDecoderImpl::new(5);
530            decoder.set_data(Encoding::RLE, data).unwrap();
531
532            let total_records = encoded.iter().filter(|x| **x == 0).count();
533            let mut remaining_records = total_records;
534            let mut remaining_levels = encoded.len();
535            loop {
536                let skip = rng.random_bool(0.5);
537                let records = rng.random_range(1..=remaining_records.min(5));
538                let (records_read, levels_read) = if skip {
539                    decoder.skip_rep_levels(records, remaining_levels).unwrap()
540                } else {
541                    let mut decoded = Vec::new();
542                    let (records_read, levels_read) = decoder
543                        .read_rep_levels(&mut decoded, records, remaining_levels)
544                        .unwrap();
545
546                    assert_eq!(
547                        decoded,
548                        encoded[encoded.len() - remaining_levels..][..levels_read]
549                    );
550                    (records_read, levels_read)
551                };
552
553                remaining_levels = remaining_levels.checked_sub(levels_read).unwrap();
554                if remaining_levels == 0 {
555                    assert_eq!(records_read + 1, records);
556                    assert_eq!(records, remaining_records);
557                    break;
558                }
559                assert_eq!(records_read, records);
560                remaining_records -= records;
561                assert_ne!(remaining_records, 0);
562            }
563        }
564    }
565}