1use bytes::Bytes;
19
20use crate::basic::{Encoding, EncodingMask};
21use crate::data_type::DataType;
22use crate::encodings::{
23 decoding::{Decoder, DictDecoder, PlainDecoder, get_decoder},
24 rle::RleDecoder,
25};
26use crate::errors::{ParquetError, Result};
27use crate::schema::types::ColumnDescPtr;
28use crate::util::bit_util::{BitReader, num_required_bits};
29
30pub trait ColumnLevelDecoder {
32 type Buffer;
33
34 fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()>;
36}
37
38pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
39 fn read_rep_levels(
49 &mut self,
50 out: &mut Self::Buffer,
51 num_records: usize,
52 num_levels: usize,
53 ) -> Result<(usize, usize)>;
54
55 fn skip_rep_levels(&mut self, num_records: usize, num_levels: usize) -> Result<(usize, usize)>;
63
64 fn flush_partial(&mut self) -> bool;
66}
67
68pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
69 fn read_def_levels(
77 &mut self,
78 out: &mut Self::Buffer,
79 num_levels: usize,
80 ) -> Result<(usize, usize)>;
81
82 fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)>;
86}
87
88pub trait ColumnValueDecoder {
90 type Buffer;
91
92 fn new(col: &ColumnDescPtr) -> Self;
94
95 fn set_dict(
97 &mut self,
98 buf: Bytes,
99 num_values: u32,
100 encoding: Encoding,
101 is_sorted: bool,
102 ) -> Result<()>;
103
104 fn set_data(
116 &mut self,
117 encoding: Encoding,
118 data: Bytes,
119 num_levels: usize,
120 num_values: Option<usize>,
121 ) -> Result<()>;
122
123 fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize>;
130
131 fn skip_values(&mut self, num_values: usize) -> Result<usize>;
135}
136
137const ENCODING_SLOTS: usize = Encoding::MAX_DISCRIMINANT as usize + 1;
142
143pub struct ColumnValueDecoderImpl<T: DataType> {
145 descr: ColumnDescPtr,
146
147 current_encoding: Option<Encoding>,
148
149 decoder_mask: EncodingMask,
152 decoders: [Option<Box<dyn Decoder<T>>>; ENCODING_SLOTS],
153}
154
155impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
156 type Buffer = Vec<T::T>;
157
158 fn new(descr: &ColumnDescPtr) -> Self {
159 Self {
160 descr: descr.clone(),
161 current_encoding: None,
162 decoder_mask: EncodingMask::default(),
163 decoders: std::array::from_fn(|_| None),
164 }
165 }
166
167 fn set_dict(
168 &mut self,
169 buf: Bytes,
170 num_values: u32,
171 mut encoding: Encoding,
172 _is_sorted: bool,
173 ) -> Result<()> {
174 if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
175 encoding = Encoding::RLE_DICTIONARY
176 }
177
178 if self.decoder_mask.is_set(encoding) {
179 return Err(general_err!("Column cannot have more than one dictionary"));
180 }
181
182 if encoding == Encoding::RLE_DICTIONARY {
183 let mut dictionary = PlainDecoder::<T>::new(self.descr.type_length());
184 dictionary.set_data(buf, num_values as usize)?;
185
186 let mut decoder = DictDecoder::new();
187 decoder.set_dict(Box::new(dictionary))?;
188 self.decoders[encoding as usize] = Some(Box::new(decoder));
189 self.decoder_mask.insert(encoding);
190 Ok(())
191 } else {
192 Err(nyi_err!(
193 "Invalid/Unsupported encoding type for dictionary: {}",
194 encoding
195 ))
196 }
197 }
198
199 fn set_data(
200 &mut self,
201 mut encoding: Encoding,
202 data: Bytes,
203 num_levels: usize,
204 num_values: Option<usize>,
205 ) -> Result<()> {
206 if encoding == Encoding::PLAIN_DICTIONARY {
207 encoding = Encoding::RLE_DICTIONARY;
208 }
209
210 let decoder = if encoding == Encoding::RLE_DICTIONARY {
211 self.decoders[encoding as usize]
212 .as_mut()
213 .expect("Decoder for dict should have been set")
214 } else {
215 let slot = encoding as usize;
216 if self.decoders[slot].is_none() {
217 let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
218 self.decoders[slot] = Some(data_decoder);
219 self.decoder_mask.insert(encoding);
220 }
221 self.decoders[slot]
222 .as_mut()
223 .expect("decoder should have been inserted")
224 };
225
226 decoder.set_data(data, num_values.unwrap_or(num_levels))?;
227 self.current_encoding = Some(encoding);
228 Ok(())
229 }
230
231 fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
232 let encoding = self
233 .current_encoding
234 .expect("current_encoding should be set");
235
236 let current_decoder = self.decoders[encoding as usize]
237 .as_mut()
238 .unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));
239
240 let start = out.len();
242 out.resize(start + num_values, T::T::default());
243 let read = current_decoder.get(&mut out[start..])?;
244 out.truncate(start + read);
245 Ok(read)
246 }
247
248 fn skip_values(&mut self, num_values: usize) -> Result<usize> {
249 let encoding = self
250 .current_encoding
251 .expect("current_encoding should be set");
252
253 let current_decoder = self.decoders[encoding as usize]
254 .as_mut()
255 .unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));
256
257 current_decoder.skip(num_values)
258 }
259}
260
261const SKIP_BUFFER_SIZE: usize = 1024;
262
263enum LevelDecoder {
264 Packed(BitReader, u8),
265 Rle(RleDecoder),
266}
267
268impl LevelDecoder {
269 fn new(encoding: Encoding, data: Bytes, bit_width: u8) -> Result<Self> {
270 match encoding {
271 Encoding::RLE => {
272 let mut decoder = RleDecoder::new(bit_width);
273 decoder.set_data(data)?;
274 Ok(Self::Rle(decoder))
275 }
276 #[allow(deprecated)]
277 Encoding::BIT_PACKED => Ok(Self::Packed(BitReader::new(data), bit_width)),
278 _ => unreachable!("invalid level encoding: {}", encoding),
279 }
280 }
281
282 fn read(&mut self, out: &mut [i16]) -> Result<usize> {
283 match self {
284 Self::Packed(reader, bit_width) => {
285 Ok(reader.get_batch::<i16>(out, *bit_width as usize))
286 }
287 Self::Rle(reader) => Ok(reader.get_batch(out)?),
288 }
289 }
290}
291
292pub struct DefinitionLevelDecoderImpl {
294 decoder: Option<LevelDecoder>,
295 bit_width: u8,
296 max_level: i16,
297}
298
299impl DefinitionLevelDecoderImpl {
300 pub fn new(max_level: i16) -> Self {
301 let bit_width = num_required_bits(max_level as u64);
302 Self {
303 decoder: None,
304 bit_width,
305 max_level,
306 }
307 }
308}
309
310impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
311 type Buffer = Vec<i16>;
312
313 fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> {
314 self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)?);
315 Ok(())
316 }
317}
318
319impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl {
320 fn read_def_levels(
321 &mut self,
322 out: &mut Self::Buffer,
323 num_levels: usize,
324 ) -> Result<(usize, usize)> {
325 let start = out.len();
327 out.resize(start + num_levels, 0);
328 let levels_read = self.decoder.as_mut().unwrap().read(&mut out[start..])?;
329 out.truncate(start + levels_read);
330
331 let iter = out.iter().skip(start);
332 let values_read = iter.filter(|x| **x == self.max_level).count();
333 Ok((values_read, levels_read))
334 }
335
336 fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)> {
337 let mut level_skip = 0;
338 let mut value_skip = 0;
339 let mut buf: Vec<i16> = vec![];
340 while level_skip < num_levels {
341 let remaining_levels = num_levels - level_skip;
342
343 let to_read = remaining_levels.min(SKIP_BUFFER_SIZE);
344 buf.resize(to_read, 0);
345 let (values_read, levels_read) = self.read_def_levels(&mut buf, to_read)?;
346 if levels_read == 0 {
347 break;
349 }
350
351 level_skip += levels_read;
352 value_skip += values_read;
353 }
354
355 Ok((value_skip, level_skip))
356 }
357}
358
359pub(crate) const REPETITION_LEVELS_BATCH_SIZE: usize = 1024;
360
361pub struct RepetitionLevelDecoderImpl {
363 decoder: Option<LevelDecoder>,
364 bit_width: u8,
365 buffer: Box<[i16; REPETITION_LEVELS_BATCH_SIZE]>,
366 buffer_len: usize,
367 buffer_offset: usize,
368 has_partial: bool,
369}
370
371impl RepetitionLevelDecoderImpl {
372 pub fn new(max_level: i16) -> Self {
373 let bit_width = num_required_bits(max_level as u64);
374 Self {
375 decoder: None,
376 bit_width,
377 buffer: Box::new([0; REPETITION_LEVELS_BATCH_SIZE]),
378 buffer_offset: 0,
379 buffer_len: 0,
380 has_partial: false,
381 }
382 }
383
384 fn fill_buf(&mut self) -> Result<()> {
385 let read = self.decoder.as_mut().unwrap().read(self.buffer.as_mut())?;
386 self.buffer_offset = 0;
387 self.buffer_len = read;
388 Ok(())
389 }
390
391 fn count_records(&mut self, records_to_read: usize, num_levels: usize) -> (bool, usize, usize) {
396 let mut records_read = 0;
397
398 let levels = num_levels.min(self.buffer_len - self.buffer_offset);
399 let buf = self.buffer.iter().skip(self.buffer_offset);
400 for (idx, item) in buf.take(levels).enumerate() {
401 if *item == 0 && (idx != 0 || self.has_partial) {
402 records_read += 1;
403
404 if records_read == records_to_read {
405 return (false, records_read, idx);
406 }
407 }
408 }
409 (true, records_read, levels)
411 }
412}
413
414impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
415 type Buffer = Vec<i16>;
416
417 fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> {
418 self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)?);
419 self.buffer_len = 0;
420 self.buffer_offset = 0;
421 Ok(())
422 }
423}
424
425impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl {
426 fn read_rep_levels(
427 &mut self,
428 out: &mut Self::Buffer,
429 num_records: usize,
430 num_levels: usize,
431 ) -> Result<(usize, usize)> {
432 let mut total_records_read = 0;
433 let mut total_levels_read = 0;
434
435 while total_records_read < num_records && total_levels_read < num_levels {
436 if self.buffer_len == self.buffer_offset {
437 self.fill_buf()?;
438 if self.buffer_len == 0 {
439 break;
440 }
441 }
442
443 let (partial, records_read, levels_read) = self.count_records(
444 num_records - total_records_read,
445 num_levels - total_levels_read,
446 );
447
448 out.extend_from_slice(
449 &self.buffer[self.buffer_offset..self.buffer_offset + levels_read],
450 );
451
452 total_levels_read += levels_read;
453 total_records_read += records_read;
454 self.buffer_offset += levels_read;
455 self.has_partial = partial;
456 }
457 Ok((total_records_read, total_levels_read))
458 }
459
460 fn skip_rep_levels(&mut self, num_records: usize, num_levels: usize) -> Result<(usize, usize)> {
461 let mut total_records_read = 0;
462 let mut total_levels_read = 0;
463
464 while total_records_read < num_records && total_levels_read < num_levels {
465 if self.buffer_len == self.buffer_offset {
466 self.fill_buf()?;
467 if self.buffer_len == 0 {
468 break;
469 }
470 }
471
472 let (partial, records_read, levels_read) = self.count_records(
473 num_records - total_records_read,
474 num_levels - total_levels_read,
475 );
476
477 total_levels_read += levels_read;
478 total_records_read += records_read;
479 self.buffer_offset += levels_read;
480 self.has_partial = partial;
481 }
482 Ok((total_records_read, total_levels_read))
483 }
484
485 fn flush_partial(&mut self) -> bool {
486 std::mem::take(&mut self.has_partial)
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493 use crate::encodings::rle::RleEncoder;
494 use rand::{prelude::*, rng};
495
496 #[test]
497 fn test_skip_padding() {
498 let mut encoder = RleEncoder::new(1, 1024);
499 encoder.put(0);
500 (0..3).for_each(|_| encoder.put(1));
501 let data = Bytes::from(encoder.consume());
502
503 let mut decoder = RepetitionLevelDecoderImpl::new(1);
504 decoder.set_data(Encoding::RLE, data.clone()).unwrap();
505 let (_, levels) = decoder.skip_rep_levels(100, 4).unwrap();
506 assert_eq!(levels, 4);
507
508 let mut decoder = RepetitionLevelDecoderImpl::new(1);
511 decoder.set_data(Encoding::RLE, data).unwrap();
512 let (_, levels) = decoder.skip_rep_levels(100, 6).unwrap();
513 assert_eq!(levels, 6);
514 }
515
516 #[test]
517 fn test_skip_rep_levels() {
518 for _ in 0..10 {
519 let mut rng = rng();
520 let total_len = 10000_usize;
521 let mut encoded: Vec<i16> = (0..total_len).map(|_| rng.random_range(0..5)).collect();
522 encoded[0] = 0;
523 let mut encoder = RleEncoder::new(3, 1024);
524 for v in &encoded {
525 encoder.put(*v as _)
526 }
527 let data = Bytes::from(encoder.consume());
528
529 let mut decoder = RepetitionLevelDecoderImpl::new(5);
530 decoder.set_data(Encoding::RLE, data).unwrap();
531
532 let total_records = encoded.iter().filter(|x| **x == 0).count();
533 let mut remaining_records = total_records;
534 let mut remaining_levels = encoded.len();
535 loop {
536 let skip = rng.random_bool(0.5);
537 let records = rng.random_range(1..=remaining_records.min(5));
538 let (records_read, levels_read) = if skip {
539 decoder.skip_rep_levels(records, remaining_levels).unwrap()
540 } else {
541 let mut decoded = Vec::new();
542 let (records_read, levels_read) = decoder
543 .read_rep_levels(&mut decoded, records, remaining_levels)
544 .unwrap();
545
546 assert_eq!(
547 decoded,
548 encoded[encoded.len() - remaining_levels..][..levels_read]
549 );
550 (records_read, levels_read)
551 };
552
553 remaining_levels = remaining_levels.checked_sub(levels_read).unwrap();
554 if remaining_levels == 0 {
555 assert_eq!(records_read + 1, records);
556 assert_eq!(records, remaining_records);
557 break;
558 }
559 assert_eq!(records_read, records);
560 remaining_records -= records;
561 assert_ne!(remaining_records, 0);
562 }
563 }
564 }
565}