1use std::collections::HashMap;
19
20use bytes::Bytes;
21
22use crate::basic::Encoding;
23use crate::data_type::DataType;
24use crate::encodings::{
25 decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder},
26 rle::RleDecoder,
27};
28use crate::errors::{ParquetError, Result};
29use crate::schema::types::ColumnDescPtr;
30use crate::util::bit_util::{num_required_bits, BitReader};
31
32pub trait ColumnLevelDecoder {
34 type Buffer;
35
36 fn set_data(&mut self, encoding: Encoding, data: Bytes);
38}
39
40pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
41 fn read_rep_levels(
51 &mut self,
52 out: &mut Self::Buffer,
53 num_records: usize,
54 num_levels: usize,
55 ) -> Result<(usize, usize)>;
56
57 fn skip_rep_levels(&mut self, num_records: usize, num_levels: usize) -> Result<(usize, usize)>;
65
66 fn flush_partial(&mut self) -> bool;
68}
69
70pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
71 fn read_def_levels(
79 &mut self,
80 out: &mut Self::Buffer,
81 num_levels: usize,
82 ) -> Result<(usize, usize)>;
83
84 fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)>;
88}
89
90pub trait ColumnValueDecoder {
92 type Buffer;
93
94 fn new(col: &ColumnDescPtr) -> Self;
96
97 fn set_dict(
99 &mut self,
100 buf: Bytes,
101 num_values: u32,
102 encoding: Encoding,
103 is_sorted: bool,
104 ) -> Result<()>;
105
106 fn set_data(
118 &mut self,
119 encoding: Encoding,
120 data: Bytes,
121 num_levels: usize,
122 num_values: Option<usize>,
123 ) -> Result<()>;
124
125 fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize>;
132
133 fn skip_values(&mut self, num_values: usize) -> Result<usize>;
137}
138
139pub struct ColumnValueDecoderImpl<T: DataType> {
141 descr: ColumnDescPtr,
142
143 current_encoding: Option<Encoding>,
144
145 decoders: HashMap<Encoding, Box<dyn Decoder<T>>>,
147}
148
149impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
150 type Buffer = Vec<T::T>;
151
152 fn new(descr: &ColumnDescPtr) -> Self {
153 Self {
154 descr: descr.clone(),
155 current_encoding: None,
156 decoders: Default::default(),
157 }
158 }
159
160 fn set_dict(
161 &mut self,
162 buf: Bytes,
163 num_values: u32,
164 mut encoding: Encoding,
165 _is_sorted: bool,
166 ) -> Result<()> {
167 if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
168 encoding = Encoding::RLE_DICTIONARY
169 }
170
171 if self.decoders.contains_key(&encoding) {
172 return Err(general_err!("Column cannot have more than one dictionary"));
173 }
174
175 if encoding == Encoding::RLE_DICTIONARY {
176 let mut dictionary = PlainDecoder::<T>::new(self.descr.type_length());
177 dictionary.set_data(buf, num_values as usize)?;
178
179 let mut decoder = DictDecoder::new();
180 decoder.set_dict(Box::new(dictionary))?;
181 self.decoders.insert(encoding, Box::new(decoder));
182 Ok(())
183 } else {
184 Err(nyi_err!(
185 "Invalid/Unsupported encoding type for dictionary: {}",
186 encoding
187 ))
188 }
189 }
190
191 fn set_data(
192 &mut self,
193 mut encoding: Encoding,
194 data: Bytes,
195 num_levels: usize,
196 num_values: Option<usize>,
197 ) -> Result<()> {
198 use std::collections::hash_map::Entry;
199
200 if encoding == Encoding::PLAIN_DICTIONARY {
201 encoding = Encoding::RLE_DICTIONARY;
202 }
203
204 let decoder = if encoding == Encoding::RLE_DICTIONARY {
205 self.decoders
206 .get_mut(&encoding)
207 .expect("Decoder for dict should have been set")
208 } else {
209 match self.decoders.entry(encoding) {
211 Entry::Occupied(e) => e.into_mut(),
212 Entry::Vacant(v) => {
213 let data_decoder = get_decoder::<T>(self.descr.clone(), encoding)?;
214 v.insert(data_decoder)
215 }
216 }
217 };
218
219 decoder.set_data(data, num_values.unwrap_or(num_levels))?;
220 self.current_encoding = Some(encoding);
221 Ok(())
222 }
223
224 fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
225 let encoding = self
226 .current_encoding
227 .expect("current_encoding should be set");
228
229 let current_decoder = self
230 .decoders
231 .get_mut(&encoding)
232 .unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));
233
234 let start = out.len();
236 out.resize(start + num_values, T::T::default());
237 let read = current_decoder.get(&mut out[start..])?;
238 out.truncate(start + read);
239 Ok(read)
240 }
241
242 fn skip_values(&mut self, num_values: usize) -> Result<usize> {
243 let encoding = self
244 .current_encoding
245 .expect("current_encoding should be set");
246
247 let current_decoder = self
248 .decoders
249 .get_mut(&encoding)
250 .unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set"));
251
252 current_decoder.skip(num_values)
253 }
254}
255
256const SKIP_BUFFER_SIZE: usize = 1024;
257
258enum LevelDecoder {
259 Packed(BitReader, u8),
260 Rle(RleDecoder),
261}
262
263impl LevelDecoder {
264 fn new(encoding: Encoding, data: Bytes, bit_width: u8) -> Self {
265 match encoding {
266 Encoding::RLE => {
267 let mut decoder = RleDecoder::new(bit_width);
268 decoder.set_data(data);
269 Self::Rle(decoder)
270 }
271 #[allow(deprecated)]
272 Encoding::BIT_PACKED => Self::Packed(BitReader::new(data), bit_width),
273 _ => unreachable!("invalid level encoding: {}", encoding),
274 }
275 }
276
277 fn read(&mut self, out: &mut [i16]) -> Result<usize> {
278 match self {
279 Self::Packed(reader, bit_width) => {
280 Ok(reader.get_batch::<i16>(out, *bit_width as usize))
281 }
282 Self::Rle(reader) => Ok(reader.get_batch(out)?),
283 }
284 }
285}
286
287pub struct DefinitionLevelDecoderImpl {
289 decoder: Option<LevelDecoder>,
290 bit_width: u8,
291 max_level: i16,
292}
293
294impl DefinitionLevelDecoderImpl {
295 pub fn new(max_level: i16) -> Self {
296 let bit_width = num_required_bits(max_level as u64);
297 Self {
298 decoder: None,
299 bit_width,
300 max_level,
301 }
302 }
303}
304
305impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
306 type Buffer = Vec<i16>;
307
308 fn set_data(&mut self, encoding: Encoding, data: Bytes) {
309 self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width))
310 }
311}
312
313impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl {
314 fn read_def_levels(
315 &mut self,
316 out: &mut Self::Buffer,
317 num_levels: usize,
318 ) -> Result<(usize, usize)> {
319 let start = out.len();
321 out.resize(start + num_levels, 0);
322 let levels_read = self.decoder.as_mut().unwrap().read(&mut out[start..])?;
323 out.truncate(start + levels_read);
324
325 let iter = out.iter().skip(start);
326 let values_read = iter.filter(|x| **x == self.max_level).count();
327 Ok((values_read, levels_read))
328 }
329
330 fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)> {
331 let mut level_skip = 0;
332 let mut value_skip = 0;
333 let mut buf: Vec<i16> = vec![];
334 while level_skip < num_levels {
335 let remaining_levels = num_levels - level_skip;
336
337 let to_read = remaining_levels.min(SKIP_BUFFER_SIZE);
338 buf.resize(to_read, 0);
339 let (values_read, levels_read) = self.read_def_levels(&mut buf, to_read)?;
340 if levels_read == 0 {
341 break;
343 }
344
345 level_skip += levels_read;
346 value_skip += values_read;
347 }
348
349 Ok((value_skip, level_skip))
350 }
351}
352
353pub(crate) const REPETITION_LEVELS_BATCH_SIZE: usize = 1024;
354
355pub struct RepetitionLevelDecoderImpl {
357 decoder: Option<LevelDecoder>,
358 bit_width: u8,
359 buffer: Box<[i16; REPETITION_LEVELS_BATCH_SIZE]>,
360 buffer_len: usize,
361 buffer_offset: usize,
362 has_partial: bool,
363}
364
365impl RepetitionLevelDecoderImpl {
366 pub fn new(max_level: i16) -> Self {
367 let bit_width = num_required_bits(max_level as u64);
368 Self {
369 decoder: None,
370 bit_width,
371 buffer: Box::new([0; REPETITION_LEVELS_BATCH_SIZE]),
372 buffer_offset: 0,
373 buffer_len: 0,
374 has_partial: false,
375 }
376 }
377
378 fn fill_buf(&mut self) -> Result<()> {
379 let read = self.decoder.as_mut().unwrap().read(self.buffer.as_mut())?;
380 self.buffer_offset = 0;
381 self.buffer_len = read;
382 Ok(())
383 }
384
385 fn count_records(&mut self, records_to_read: usize, num_levels: usize) -> (bool, usize, usize) {
390 let mut records_read = 0;
391
392 let levels = num_levels.min(self.buffer_len - self.buffer_offset);
393 let buf = self.buffer.iter().skip(self.buffer_offset);
394 for (idx, item) in buf.take(levels).enumerate() {
395 if *item == 0 && (idx != 0 || self.has_partial) {
396 records_read += 1;
397
398 if records_read == records_to_read {
399 return (false, records_read, idx);
400 }
401 }
402 }
403 (true, records_read, levels)
405 }
406}
407
408impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
409 type Buffer = Vec<i16>;
410
411 fn set_data(&mut self, encoding: Encoding, data: Bytes) {
412 self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width));
413 self.buffer_len = 0;
414 self.buffer_offset = 0;
415 }
416}
417
418impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl {
419 fn read_rep_levels(
420 &mut self,
421 out: &mut Self::Buffer,
422 num_records: usize,
423 num_levels: usize,
424 ) -> Result<(usize, usize)> {
425 let mut total_records_read = 0;
426 let mut total_levels_read = 0;
427
428 while total_records_read < num_records && total_levels_read < num_levels {
429 if self.buffer_len == self.buffer_offset {
430 self.fill_buf()?;
431 if self.buffer_len == 0 {
432 break;
433 }
434 }
435
436 let (partial, records_read, levels_read) = self.count_records(
437 num_records - total_records_read,
438 num_levels - total_levels_read,
439 );
440
441 out.extend_from_slice(
442 &self.buffer[self.buffer_offset..self.buffer_offset + levels_read],
443 );
444
445 total_levels_read += levels_read;
446 total_records_read += records_read;
447 self.buffer_offset += levels_read;
448 self.has_partial = partial;
449 }
450 Ok((total_records_read, total_levels_read))
451 }
452
453 fn skip_rep_levels(&mut self, num_records: usize, num_levels: usize) -> Result<(usize, usize)> {
454 let mut total_records_read = 0;
455 let mut total_levels_read = 0;
456
457 while total_records_read < num_records && total_levels_read < num_levels {
458 if self.buffer_len == self.buffer_offset {
459 self.fill_buf()?;
460 if self.buffer_len == 0 {
461 break;
462 }
463 }
464
465 let (partial, records_read, levels_read) = self.count_records(
466 num_records - total_records_read,
467 num_levels - total_levels_read,
468 );
469
470 total_levels_read += levels_read;
471 total_records_read += records_read;
472 self.buffer_offset += levels_read;
473 self.has_partial = partial;
474 }
475 Ok((total_records_read, total_levels_read))
476 }
477
478 fn flush_partial(&mut self) -> bool {
479 std::mem::take(&mut self.has_partial)
480 }
481}
482
483#[cfg(test)]
484mod tests {
485 use super::*;
486 use crate::encodings::rle::RleEncoder;
487 use rand::prelude::*;
488
489 #[test]
490 fn test_skip_padding() {
491 let mut encoder = RleEncoder::new(1, 1024);
492 encoder.put(0);
493 (0..3).for_each(|_| encoder.put(1));
494 let data = Bytes::from(encoder.consume());
495
496 let mut decoder = RepetitionLevelDecoderImpl::new(1);
497 decoder.set_data(Encoding::RLE, data.clone());
498 let (_, levels) = decoder.skip_rep_levels(100, 4).unwrap();
499 assert_eq!(levels, 4);
500
501 let mut decoder = RepetitionLevelDecoderImpl::new(1);
504 decoder.set_data(Encoding::RLE, data);
505 let (_, levels) = decoder.skip_rep_levels(100, 6).unwrap();
506 assert_eq!(levels, 6);
507 }
508
509 #[test]
510 fn test_skip_rep_levels() {
511 for _ in 0..10 {
512 let mut rng = thread_rng();
513 let total_len = 10000_usize;
514 let mut encoded: Vec<i16> = (0..total_len).map(|_| rng.gen_range(0..5)).collect();
515 encoded[0] = 0;
516 let mut encoder = RleEncoder::new(3, 1024);
517 for v in &encoded {
518 encoder.put(*v as _)
519 }
520 let data = Bytes::from(encoder.consume());
521
522 let mut decoder = RepetitionLevelDecoderImpl::new(5);
523 decoder.set_data(Encoding::RLE, data);
524
525 let total_records = encoded.iter().filter(|x| **x == 0).count();
526 let mut remaining_records = total_records;
527 let mut remaining_levels = encoded.len();
528 loop {
529 let skip = rng.gen_bool(0.5);
530 let records = rng.gen_range(1..=remaining_records.min(5));
531 let (records_read, levels_read) = if skip {
532 decoder.skip_rep_levels(records, remaining_levels).unwrap()
533 } else {
534 let mut decoded = Vec::new();
535 let (records_read, levels_read) = decoder
536 .read_rep_levels(&mut decoded, records, remaining_levels)
537 .unwrap();
538
539 assert_eq!(
540 decoded,
541 encoded[encoded.len() - remaining_levels..][..levels_read]
542 );
543 (records_read, levels_read)
544 };
545
546 remaining_levels = remaining_levels.checked_sub(levels_read).unwrap();
547 if remaining_levels == 0 {
548 assert_eq!(records_read + 1, records);
549 assert_eq!(records, remaining_records);
550 break;
551 }
552 assert_eq!(records_read, records);
553 remaining_records -= records;
554 assert_ne!(remaining_records, 0);
555 }
556 }
557 }
558}