1use bytes::Bytes;
19
20use crate::basic::{Encoding, EncodingMask};
21use crate::data_type::DataType;
22use crate::encodings::{
23    decoding::{Decoder, DictDecoder, PlainDecoder, get_decoder},
24    rle::RleDecoder,
25};
26use crate::errors::{ParquetError, Result};
27use crate::schema::types::ColumnDescPtr;
28use crate::util::bit_util::{BitReader, num_required_bits};
29
30pub trait ColumnLevelDecoder {
32    type Buffer;
33
34    fn set_data(&mut self, encoding: Encoding, data: Bytes);
36}
37
38pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
39    fn read_rep_levels(
49        &mut self,
50        out: &mut Self::Buffer,
51        num_records: usize,
52        num_levels: usize,
53    ) -> Result<(usize, usize)>;
54
55    fn skip_rep_levels(&mut self, num_records: usize, num_levels: usize) -> Result<(usize, usize)>;
63
64    fn flush_partial(&mut self) -> bool;
66}
67
68pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
69    fn read_def_levels(
77        &mut self,
78        out: &mut Self::Buffer,
79        num_levels: usize,
80    ) -> Result<(usize, usize)>;
81
82    fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)>;
86}
87
88pub trait ColumnValueDecoder {
90    type Buffer;
91
92    fn new(col: &ColumnDescPtr) -> Self;
94
95    fn set_dict(
97        &mut self,
98        buf: Bytes,
99        num_values: u32,
100        encoding: Encoding,
101        is_sorted: bool,
102    ) -> Result<()>;
103
104    fn set_data(
116        &mut self,
117        encoding: Encoding,
118        data: Bytes,
119        num_levels: usize,
120        num_values: Option<usize>,
121    ) -> Result<()>;
122
123    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize>;
130
131    fn skip_values(&mut self, num_values: usize) -> Result<usize>;
135}
136
137const ENCODING_SLOTS: usize = Encoding::MAX_DISCRIMINANT as usize + 1;
142
143pub struct ColumnValueDecoderImpl<T: DataType> {
145    descr: ColumnDescPtr,
146
147    current_encoding: Option<Encoding>,
148
149    decoder_mask: EncodingMask,
152    decoders: [Option<Box<dyn Decoder<T>>>; ENCODING_SLOTS],
153}
154
155impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
156    type Buffer = Vec<T::T>;
157
158    fn new(descr: &ColumnDescPtr) -> Self {
159        Self {
160            descr: descr.clone(),
161            current_encoding: None,
162            decoder_mask: EncodingMask::default(),
163            decoders: std::array::from_fn(|_| None),
164        }
165    }
166
167    fn set_dict(
168        &mut self,
169        buf: Bytes,
170        num_values: u32,
171        mut encoding: Encoding,
172        _is_sorted: bool,
173    ) -> Result<()> {
174        if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
175            encoding = Encoding::RLE_DICTIONARY
176        }
177
178        if self.decoder_mask.is_set(encoding) {
179            return Err(general_err!("Column cannot have more than one dictionary"));
180        }
181
182        if encoding == Encoding::RLE_DICTIONARY {
183            let mut dictionary = PlainDecoder::<T>::new(self.descr.type_length());
184            dictionary.set_data(buf, num_values as usize)?;
185
186            let mut decoder = DictDecoder::new();
187            decoder.set_dict(Box::new(dictionary))?;
188            self.decoders[encoding as usize] = Some(Box::new(decoder));
189            self.decoder_mask.insert(encoding);
190            Ok(())
191        } else {
192            Err(nyi_err!(
193                "Invalid/Unsupported encoding type for dictionary: {}",
194                encoding
195            ))
196        }
197    }
198
199    fn set_data(
200        &mut self,
201        mut encoding: Encoding,
202        data: Bytes,
203        num_levels: usize,
204        num_values: Option<usize>,
205    ) -> Result<()> {
206        if encoding == Encoding::PLAIN_DICTIONARY {
207            encoding = Encoding::RLE_DICTIONARY;
208        }
209
210        let decoder = if encoding == Encoding::RLE_DICTIONARY {
211            self.decoders[encoding as usize]
212                .as_mut()
213                .expect("Decoder for dict should have been set")
214        } else {
215            let slot = encoding as usize;
216            if self.decoders[slot].is_none() {
217                let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
218                self.decoders[slot] = Some(data_decoder);
219                self.decoder_mask.insert(encoding);
220            }
221            self.decoders[slot]
222                .as_mut()
223                .expect("decoder should have been inserted")
224        };
225
226        decoder.set_data(data, num_values.unwrap_or(num_levels))?;
227        self.current_encoding = Some(encoding);
228        Ok(())
229    }
230
231    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
232        let encoding = self
233            .current_encoding
234            .expect("current_encoding should be set");
235
236        let current_decoder = self.decoders[encoding as usize]
237            .as_mut()
238            .unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));
239
240        let start = out.len();
242        out.resize(start + num_values, T::T::default());
243        let read = current_decoder.get(&mut out[start..])?;
244        out.truncate(start + read);
245        Ok(read)
246    }
247
248    fn skip_values(&mut self, num_values: usize) -> Result<usize> {
249        let encoding = self
250            .current_encoding
251            .expect("current_encoding should be set");
252
253        let current_decoder = self.decoders[encoding as usize]
254            .as_mut()
255            .unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));
256
257        current_decoder.skip(num_values)
258    }
259}
260
261const SKIP_BUFFER_SIZE: usize = 1024;
262
263enum LevelDecoder {
264    Packed(BitReader, u8),
265    Rle(RleDecoder),
266}
267
268impl LevelDecoder {
269    fn new(encoding: Encoding, data: Bytes, bit_width: u8) -> Self {
270        match encoding {
271            Encoding::RLE => {
272                let mut decoder = RleDecoder::new(bit_width);
273                decoder.set_data(data);
274                Self::Rle(decoder)
275            }
276            #[allow(deprecated)]
277            Encoding::BIT_PACKED => Self::Packed(BitReader::new(data), bit_width),
278            _ => unreachable!("invalid level encoding: {}", encoding),
279        }
280    }
281
282    fn read(&mut self, out: &mut [i16]) -> Result<usize> {
283        match self {
284            Self::Packed(reader, bit_width) => {
285                Ok(reader.get_batch::<i16>(out, *bit_width as usize))
286            }
287            Self::Rle(reader) => Ok(reader.get_batch(out)?),
288        }
289    }
290}
291
292pub struct DefinitionLevelDecoderImpl {
294    decoder: Option<LevelDecoder>,
295    bit_width: u8,
296    max_level: i16,
297}
298
299impl DefinitionLevelDecoderImpl {
300    pub fn new(max_level: i16) -> Self {
301        let bit_width = num_required_bits(max_level as u64);
302        Self {
303            decoder: None,
304            bit_width,
305            max_level,
306        }
307    }
308}
309
310impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
311    type Buffer = Vec<i16>;
312
313    fn set_data(&mut self, encoding: Encoding, data: Bytes) {
314        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width))
315    }
316}
317
318impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl {
319    fn read_def_levels(
320        &mut self,
321        out: &mut Self::Buffer,
322        num_levels: usize,
323    ) -> Result<(usize, usize)> {
324        let start = out.len();
326        out.resize(start + num_levels, 0);
327        let levels_read = self.decoder.as_mut().unwrap().read(&mut out[start..])?;
328        out.truncate(start + levels_read);
329
330        let iter = out.iter().skip(start);
331        let values_read = iter.filter(|x| **x == self.max_level).count();
332        Ok((values_read, levels_read))
333    }
334
335    fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)> {
336        let mut level_skip = 0;
337        let mut value_skip = 0;
338        let mut buf: Vec<i16> = vec![];
339        while level_skip < num_levels {
340            let remaining_levels = num_levels - level_skip;
341
342            let to_read = remaining_levels.min(SKIP_BUFFER_SIZE);
343            buf.resize(to_read, 0);
344            let (values_read, levels_read) = self.read_def_levels(&mut buf, to_read)?;
345            if levels_read == 0 {
346                break;
348            }
349
350            level_skip += levels_read;
351            value_skip += values_read;
352        }
353
354        Ok((value_skip, level_skip))
355    }
356}
357
358pub(crate) const REPETITION_LEVELS_BATCH_SIZE: usize = 1024;
359
360pub struct RepetitionLevelDecoderImpl {
362    decoder: Option<LevelDecoder>,
363    bit_width: u8,
364    buffer: Box<[i16; REPETITION_LEVELS_BATCH_SIZE]>,
365    buffer_len: usize,
366    buffer_offset: usize,
367    has_partial: bool,
368}
369
370impl RepetitionLevelDecoderImpl {
371    pub fn new(max_level: i16) -> Self {
372        let bit_width = num_required_bits(max_level as u64);
373        Self {
374            decoder: None,
375            bit_width,
376            buffer: Box::new([0; REPETITION_LEVELS_BATCH_SIZE]),
377            buffer_offset: 0,
378            buffer_len: 0,
379            has_partial: false,
380        }
381    }
382
383    fn fill_buf(&mut self) -> Result<()> {
384        let read = self.decoder.as_mut().unwrap().read(self.buffer.as_mut())?;
385        self.buffer_offset = 0;
386        self.buffer_len = read;
387        Ok(())
388    }
389
390    fn count_records(&mut self, records_to_read: usize, num_levels: usize) -> (bool, usize, usize) {
395        let mut records_read = 0;
396
397        let levels = num_levels.min(self.buffer_len - self.buffer_offset);
398        let buf = self.buffer.iter().skip(self.buffer_offset);
399        for (idx, item) in buf.take(levels).enumerate() {
400            if *item == 0 && (idx != 0 || self.has_partial) {
401                records_read += 1;
402
403                if records_read == records_to_read {
404                    return (false, records_read, idx);
405                }
406            }
407        }
408        (true, records_read, levels)
410    }
411}
412
413impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
414    type Buffer = Vec<i16>;
415
416    fn set_data(&mut self, encoding: Encoding, data: Bytes) {
417        self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width));
418        self.buffer_len = 0;
419        self.buffer_offset = 0;
420    }
421}
422
423impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl {
424    fn read_rep_levels(
425        &mut self,
426        out: &mut Self::Buffer,
427        num_records: usize,
428        num_levels: usize,
429    ) -> Result<(usize, usize)> {
430        let mut total_records_read = 0;
431        let mut total_levels_read = 0;
432
433        while total_records_read < num_records && total_levels_read < num_levels {
434            if self.buffer_len == self.buffer_offset {
435                self.fill_buf()?;
436                if self.buffer_len == 0 {
437                    break;
438                }
439            }
440
441            let (partial, records_read, levels_read) = self.count_records(
442                num_records - total_records_read,
443                num_levels - total_levels_read,
444            );
445
446            out.extend_from_slice(
447                &self.buffer[self.buffer_offset..self.buffer_offset + levels_read],
448            );
449
450            total_levels_read += levels_read;
451            total_records_read += records_read;
452            self.buffer_offset += levels_read;
453            self.has_partial = partial;
454        }
455        Ok((total_records_read, total_levels_read))
456    }
457
458    fn skip_rep_levels(&mut self, num_records: usize, num_levels: usize) -> Result<(usize, usize)> {
459        let mut total_records_read = 0;
460        let mut total_levels_read = 0;
461
462        while total_records_read < num_records && total_levels_read < num_levels {
463            if self.buffer_len == self.buffer_offset {
464                self.fill_buf()?;
465                if self.buffer_len == 0 {
466                    break;
467                }
468            }
469
470            let (partial, records_read, levels_read) = self.count_records(
471                num_records - total_records_read,
472                num_levels - total_levels_read,
473            );
474
475            total_levels_read += levels_read;
476            total_records_read += records_read;
477            self.buffer_offset += levels_read;
478            self.has_partial = partial;
479        }
480        Ok((total_records_read, total_levels_read))
481    }
482
483    fn flush_partial(&mut self) -> bool {
484        std::mem::take(&mut self.has_partial)
485    }
486}
487
488#[cfg(test)]
489mod tests {
490    use super::*;
491    use crate::encodings::rle::RleEncoder;
492    use rand::{prelude::*, rng};
493
494    #[test]
495    fn test_skip_padding() {
496        let mut encoder = RleEncoder::new(1, 1024);
497        encoder.put(0);
498        (0..3).for_each(|_| encoder.put(1));
499        let data = Bytes::from(encoder.consume());
500
501        let mut decoder = RepetitionLevelDecoderImpl::new(1);
502        decoder.set_data(Encoding::RLE, data.clone());
503        let (_, levels) = decoder.skip_rep_levels(100, 4).unwrap();
504        assert_eq!(levels, 4);
505
506        let mut decoder = RepetitionLevelDecoderImpl::new(1);
509        decoder.set_data(Encoding::RLE, data);
510        let (_, levels) = decoder.skip_rep_levels(100, 6).unwrap();
511        assert_eq!(levels, 6);
512    }
513
514    #[test]
515    fn test_skip_rep_levels() {
516        for _ in 0..10 {
517            let mut rng = rng();
518            let total_len = 10000_usize;
519            let mut encoded: Vec<i16> = (0..total_len).map(|_| rng.random_range(0..5)).collect();
520            encoded[0] = 0;
521            let mut encoder = RleEncoder::new(3, 1024);
522            for v in &encoded {
523                encoder.put(*v as _)
524            }
525            let data = Bytes::from(encoder.consume());
526
527            let mut decoder = RepetitionLevelDecoderImpl::new(5);
528            decoder.set_data(Encoding::RLE, data);
529
530            let total_records = encoded.iter().filter(|x| **x == 0).count();
531            let mut remaining_records = total_records;
532            let mut remaining_levels = encoded.len();
533            loop {
534                let skip = rng.random_bool(0.5);
535                let records = rng.random_range(1..=remaining_records.min(5));
536                let (records_read, levels_read) = if skip {
537                    decoder.skip_rep_levels(records, remaining_levels).unwrap()
538                } else {
539                    let mut decoded = Vec::new();
540                    let (records_read, levels_read) = decoder
541                        .read_rep_levels(&mut decoded, records, remaining_levels)
542                        .unwrap();
543
544                    assert_eq!(
545                        decoded,
546                        encoded[encoded.len() - remaining_levels..][..levels_read]
547                    );
548                    (records_read, levels_read)
549                };
550
551                remaining_levels = remaining_levels.checked_sub(levels_read).unwrap();
552                if remaining_levels == 0 {
553                    assert_eq!(records_read + 1, records);
554                    assert_eq!(records, remaining_records);
555                    break;
556                }
557                assert_eq!(records_read, records);
558                remaining_records -= records;
559                assert_ne!(remaining_records, 0);
560            }
561        }
562    }
563}