1use bytes::Bytes;
21use num_traits::{FromPrimitive, WrappingAdd};
22use std::{cmp, marker::PhantomData, mem};
23
24use super::rle::RleDecoder;
25
26use crate::basic::*;
27use crate::data_type::private::ParquetValueType;
28use crate::data_type::*;
29use crate::encodings::decoding::byte_stream_split_decoder::{
30 ByteStreamSplitDecoder, VariableWidthByteStreamSplitDecoder,
31};
32use crate::errors::{ParquetError, Result};
33use crate::schema::types::ColumnDescPtr;
34use crate::util::bit_util::{self, BitReader};
35
36mod byte_stream_split_decoder;
37
38pub(crate) mod private {
39 use super::*;
40
41 pub trait GetDecoder {
46 fn get_decoder<T: DataType<T = Self>>(
47 descr: ColumnDescPtr,
48 encoding: Encoding,
49 ) -> Result<Box<dyn Decoder<T>>> {
50 get_decoder_default(descr, encoding)
51 }
52 }
53
54 fn get_decoder_default<T: DataType>(
55 descr: ColumnDescPtr,
56 encoding: Encoding,
57 ) -> Result<Box<dyn Decoder<T>>> {
58 match encoding {
59 Encoding::PLAIN => Ok(Box::new(PlainDecoder::new(descr.type_length()))),
60 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Err(general_err!(
61 "Cannot initialize this encoding through this function"
62 )),
63 Encoding::RLE
64 | Encoding::DELTA_BINARY_PACKED
65 | Encoding::DELTA_BYTE_ARRAY
66 | Encoding::DELTA_LENGTH_BYTE_ARRAY => Err(general_err!(
67 "Encoding {} is not supported for type",
68 encoding
69 )),
70 e => Err(nyi_err!("Encoding {} is not supported", e)),
71 }
72 }
73
74 impl GetDecoder for bool {
75 fn get_decoder<T: DataType<T = Self>>(
76 descr: ColumnDescPtr,
77 encoding: Encoding,
78 ) -> Result<Box<dyn Decoder<T>>> {
79 match encoding {
80 Encoding::RLE => Ok(Box::new(RleValueDecoder::new())),
81 _ => get_decoder_default(descr, encoding),
82 }
83 }
84 }
85
86 impl GetDecoder for i32 {
87 fn get_decoder<T: DataType<T = Self>>(
88 descr: ColumnDescPtr,
89 encoding: Encoding,
90 ) -> Result<Box<dyn Decoder<T>>> {
91 match encoding {
92 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
93 Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
94 _ => get_decoder_default(descr, encoding),
95 }
96 }
97 }
98
99 impl GetDecoder for i64 {
100 fn get_decoder<T: DataType<T = Self>>(
101 descr: ColumnDescPtr,
102 encoding: Encoding,
103 ) -> Result<Box<dyn Decoder<T>>> {
104 match encoding {
105 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
106 Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
107 _ => get_decoder_default(descr, encoding),
108 }
109 }
110 }
111
112 impl GetDecoder for f32 {
113 fn get_decoder<T: DataType<T = Self>>(
114 descr: ColumnDescPtr,
115 encoding: Encoding,
116 ) -> Result<Box<dyn Decoder<T>>> {
117 match encoding {
118 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
119 _ => get_decoder_default(descr, encoding),
120 }
121 }
122 }
123 impl GetDecoder for f64 {
124 fn get_decoder<T: DataType<T = Self>>(
125 descr: ColumnDescPtr,
126 encoding: Encoding,
127 ) -> Result<Box<dyn Decoder<T>>> {
128 match encoding {
129 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
130 _ => get_decoder_default(descr, encoding),
131 }
132 }
133 }
134
135 impl GetDecoder for ByteArray {
136 fn get_decoder<T: DataType<T = Self>>(
137 descr: ColumnDescPtr,
138 encoding: Encoding,
139 ) -> Result<Box<dyn Decoder<T>>> {
140 match encoding {
141 Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
142 Encoding::DELTA_LENGTH_BYTE_ARRAY => {
143 Ok(Box::new(DeltaLengthByteArrayDecoder::new()))
144 }
145 _ => get_decoder_default(descr, encoding),
146 }
147 }
148 }
149
150 impl GetDecoder for FixedLenByteArray {
151 fn get_decoder<T: DataType<T = Self>>(
152 descr: ColumnDescPtr,
153 encoding: Encoding,
154 ) -> Result<Box<dyn Decoder<T>>> {
155 match encoding {
156 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(
157 VariableWidthByteStreamSplitDecoder::new(descr.type_length()),
158 )),
159 Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
160 _ => get_decoder_default(descr, encoding),
161 }
162 }
163 }
164
165 impl GetDecoder for Int96 {}
166}
167
168pub trait Decoder<T: DataType>: Send {
173 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()>;
176
177 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize>;
184
185 fn get_spaced(
198 &mut self,
199 buffer: &mut [T::T],
200 null_count: usize,
201 valid_bits: &[u8],
202 ) -> Result<usize> {
203 assert!(buffer.len() >= null_count);
204
205 if null_count == 0 {
207 return self.get(buffer);
208 }
209
210 let num_values = buffer.len();
211 let values_to_read = num_values - null_count;
212 let values_read = self.get(buffer)?;
213 if values_read != values_to_read {
214 return Err(general_err!(
215 "Number of values read: {}, doesn't match expected: {}",
216 values_read,
217 values_to_read
218 ));
219 }
220 let mut values_to_move = values_read;
221 for i in (0..num_values).rev() {
222 if bit_util::get_bit(valid_bits, i) {
223 values_to_move -= 1;
224 buffer.swap(i, values_to_move);
225 }
226 }
227
228 Ok(num_values)
229 }
230
231 fn values_left(&self) -> usize;
233
234 fn encoding(&self) -> Encoding;
236
237 fn skip(&mut self, num_values: usize) -> Result<usize>;
239}
240
241pub fn get_decoder<T: DataType>(
246 descr: ColumnDescPtr,
247 encoding: Encoding,
248) -> Result<Box<dyn Decoder<T>>> {
249 use self::private::GetDecoder;
250 T::T::get_decoder(descr, encoding)
251}
252
253#[derive(Default)]
257pub struct PlainDecoderDetails {
258 pub(crate) num_values: usize,
260
261 pub(crate) start: usize,
263
264 pub(crate) type_length: i32,
266
267 pub(crate) data: Option<Bytes>,
269
270 pub(crate) bit_reader: Option<BitReader>,
272}
273
274pub struct PlainDecoder<T: DataType> {
280 inner: PlainDecoderDetails,
282
283 _phantom: PhantomData<T>,
286}
287
288impl<T: DataType> PlainDecoder<T> {
289 pub fn new(type_length: i32) -> Self {
291 PlainDecoder {
292 inner: PlainDecoderDetails {
293 type_length,
294 num_values: 0,
295 start: 0,
296 data: None,
297 bit_reader: None,
298 },
299 _phantom: PhantomData,
300 }
301 }
302}
303
304impl<T: DataType> Decoder<T> for PlainDecoder<T> {
305 #[inline]
306 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
307 T::T::set_data(&mut self.inner, data, num_values);
308 Ok(())
309 }
310
311 #[inline]
312 fn values_left(&self) -> usize {
313 self.inner.num_values
314 }
315
316 #[inline]
317 fn encoding(&self) -> Encoding {
318 Encoding::PLAIN
319 }
320
321 #[inline]
322 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
323 T::T::decode(buffer, &mut self.inner)
324 }
325
326 #[inline]
327 fn skip(&mut self, num_values: usize) -> Result<usize> {
328 T::T::skip(&mut self.inner, num_values)
329 }
330}
331
332pub struct DictDecoder<T: DataType> {
341 dictionary: Vec<T::T>,
343
344 has_dictionary: bool,
346
347 rle_decoder: Option<RleDecoder>,
349
350 num_values: usize,
352}
353
354impl<T: DataType> Default for DictDecoder<T> {
355 fn default() -> Self {
356 Self::new()
357 }
358}
359
360impl<T: DataType> DictDecoder<T> {
361 pub fn new() -> Self {
363 Self {
364 dictionary: vec![],
365 has_dictionary: false,
366 rle_decoder: None,
367 num_values: 0,
368 }
369 }
370
371 pub fn set_dict(&mut self, mut decoder: Box<dyn Decoder<T>>) -> Result<()> {
373 let num_values = decoder.values_left();
374 self.dictionary.resize(num_values, T::T::default());
375 let _ = decoder.get(&mut self.dictionary)?;
376 self.has_dictionary = true;
377 Ok(())
378 }
379}
380
381impl<T: DataType> Decoder<T> for DictDecoder<T> {
382 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
383 if data.is_empty() {
385 return Err(eof_err!("Not enough bytes to decode bit_width"));
386 }
387
388 let bit_width = data.as_ref()[0];
389 if bit_width > 32 {
390 return Err(general_err!(
391 "Invalid or corrupted RLE bit width {}. Max allowed is 32",
392 bit_width
393 ));
394 }
395 let mut rle_decoder = RleDecoder::new(bit_width);
396 rle_decoder.set_data(data.slice(1..))?;
397 self.num_values = num_values;
398 self.rle_decoder = Some(rle_decoder);
399 Ok(())
400 }
401
402 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
403 assert!(self.rle_decoder.is_some());
404 assert!(self.has_dictionary, "Must call set_dict() first!");
405
406 let rle = self.rle_decoder.as_mut().unwrap();
407 let num_values = cmp::min(buffer.len(), self.num_values);
408 rle.get_batch_with_dict(&self.dictionary[..], buffer, num_values)
409 }
410
411 fn values_left(&self) -> usize {
413 self.num_values
414 }
415
416 fn encoding(&self) -> Encoding {
417 Encoding::RLE_DICTIONARY
418 }
419
420 fn skip(&mut self, num_values: usize) -> Result<usize> {
421 assert!(self.rle_decoder.is_some());
422 assert!(self.has_dictionary, "Must call set_dict() first!");
423
424 let rle = self.rle_decoder.as_mut().unwrap();
425 let num_values = cmp::min(num_values, self.num_values);
426 rle.skip(num_values)
427 }
428}
429
430pub struct RleValueDecoder<T: DataType> {
437 values_left: usize,
438 decoder: RleDecoder,
439 _phantom: PhantomData<T>,
440}
441
442impl<T: DataType> Default for RleValueDecoder<T> {
443 fn default() -> Self {
444 Self::new()
445 }
446}
447
448impl<T: DataType> RleValueDecoder<T> {
449 pub fn new() -> Self {
450 Self {
451 values_left: 0,
452 decoder: RleDecoder::new(1),
453 _phantom: PhantomData,
454 }
455 }
456}
457
458impl<T: DataType> Decoder<T> for RleValueDecoder<T> {
459 #[inline]
460 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
461 ensure_phys_ty!(Type::BOOLEAN, "RleValueDecoder only supports BoolType");
463
464 const I32_SIZE: usize = mem::size_of::<i32>();
466 if data.len() < I32_SIZE {
467 return Err(eof_err!("Not enough bytes to decode"));
468 }
469 let data_size = bit_util::read_num_bytes::<i32>(I32_SIZE, data.as_ref()) as usize;
470 if data.len() - I32_SIZE < data_size {
471 return Err(eof_err!("Not enough bytes to decode"));
472 }
473
474 self.decoder = RleDecoder::new(1);
475 self.decoder
476 .set_data(data.slice(I32_SIZE..I32_SIZE + data_size))?;
477 self.values_left = num_values;
478 Ok(())
479 }
480
481 #[inline]
482 fn values_left(&self) -> usize {
483 self.values_left
484 }
485
486 #[inline]
487 fn encoding(&self) -> Encoding {
488 Encoding::RLE
489 }
490
491 #[inline]
492 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
493 let num_values = cmp::min(buffer.len(), self.values_left);
494 let values_read = self.decoder.get_batch(&mut buffer[..num_values])?;
495 self.values_left -= values_read;
496 Ok(values_read)
497 }
498
499 #[inline]
500 fn skip(&mut self, num_values: usize) -> Result<usize> {
501 let num_values = cmp::min(num_values, self.values_left);
502 let values_skipped = self.decoder.skip(num_values)?;
503 self.values_left -= values_skipped;
504 Ok(values_skipped)
505 }
506}
507
508pub struct DeltaBitPackDecoder<T: DataType> {
516 bit_reader: BitReader,
517 initialized: bool,
518
519 block_size: usize,
522 values_left: usize,
524 mini_blocks_per_block: usize,
526 values_per_mini_block: usize,
528
529 min_delta: T::T,
532 block_end_offset: usize,
534 mini_block_idx: usize,
536 mini_block_bit_widths: Vec<u8>,
538 mini_block_remaining: usize,
540
541 first_value: Option<T::T>,
543 last_value: T::T,
545}
546
547impl<T: DataType> Default for DeltaBitPackDecoder<T>
548where
549 T::T: Default + FromPrimitive + WrappingAdd + Copy,
550{
551 fn default() -> Self {
552 Self::new()
553 }
554}
555
556impl<T: DataType> DeltaBitPackDecoder<T>
557where
558 T::T: Default + FromPrimitive + WrappingAdd + Copy,
559{
560 pub fn new() -> Self {
562 Self {
563 bit_reader: BitReader::from(vec![]),
564 initialized: false,
565 block_size: 0,
566 values_left: 0,
567 mini_blocks_per_block: 0,
568 values_per_mini_block: 0,
569 min_delta: Default::default(),
570 mini_block_idx: 0,
571 mini_block_bit_widths: vec![],
572 mini_block_remaining: 0,
573 block_end_offset: 0,
574 first_value: None,
575 last_value: Default::default(),
576 }
577 }
578
579 pub fn get_offset(&self) -> usize {
581 assert!(self.initialized, "Bit reader is not initialized");
582 match self.values_left {
583 0 => self.bit_reader.get_byte_offset().max(self.block_end_offset),
589 _ => self.bit_reader.get_byte_offset(),
590 }
591 }
592
593 #[inline]
595 fn next_block(&mut self) -> Result<()> {
596 let min_delta = self
597 .bit_reader
598 .get_zigzag_vlq_int()
599 .ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?;
600
601 self.min_delta =
602 T::T::from_i64(min_delta).ok_or_else(|| general_err!("'min_delta' too large"))?;
603
604 self.mini_block_bit_widths.clear();
605 self.bit_reader
606 .get_aligned_bytes(&mut self.mini_block_bit_widths, self.mini_blocks_per_block);
607
608 let mut offset = self.bit_reader.get_byte_offset();
609 let mut remaining = self.values_left;
610
611 for b in &mut self.mini_block_bit_widths {
613 if remaining == 0 {
614 *b = 0;
617 }
618 remaining = remaining.saturating_sub(self.values_per_mini_block);
619 offset += *b as usize * self.values_per_mini_block / 8;
620 }
621 self.block_end_offset = offset;
622
623 if self.mini_block_bit_widths.len() != self.mini_blocks_per_block {
624 return Err(eof_err!("insufficient mini block bit widths"));
625 }
626
627 self.mini_block_remaining = self.values_per_mini_block;
628 self.mini_block_idx = 0;
629
630 Ok(())
631 }
632
633 #[inline]
635 fn next_mini_block(&mut self) -> Result<()> {
636 if self.mini_block_idx + 1 < self.mini_block_bit_widths.len() {
637 self.mini_block_idx += 1;
638 self.mini_block_remaining = self.values_per_mini_block;
639 Ok(())
640 } else {
641 self.next_block()
642 }
643 }
644
645 #[inline]
647 fn check_bit_width(&self, bit_width: usize) -> Result<()> {
648 if bit_width > std::mem::size_of::<T::T>() * 8 {
649 return Err(general_err!(
650 "Invalid delta bit width {} which is larger than expected {} ",
651 bit_width,
652 std::mem::size_of::<T::T>() * 8
653 ));
654 }
655 Ok(())
656 }
657}
658
659impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T>
660where
661 T::T: Default + FromPrimitive + WrappingAdd + Copy,
662{
663 #[inline]
665 fn set_data(&mut self, data: Bytes, _index: usize) -> Result<()> {
666 self.bit_reader = BitReader::new(data);
667 self.initialized = true;
668
669 self.block_size = self
671 .bit_reader
672 .get_vlq_int()
673 .ok_or_else(|| eof_err!("Not enough data to decode 'block_size'"))?
674 .try_into()
675 .map_err(|_| general_err!("invalid 'block_size'"))?;
676
677 self.mini_blocks_per_block = self
678 .bit_reader
679 .get_vlq_int()
680 .ok_or_else(|| eof_err!("Not enough data to decode 'mini_blocks_per_block'"))?
681 .try_into()
682 .map_err(|_| general_err!("invalid 'mini_blocks_per_block'"))?;
683
684 if self.mini_blocks_per_block == 0 {
685 return Err(general_err!("cannot have zero miniblocks per block"));
686 }
687
688 self.values_left = self
689 .bit_reader
690 .get_vlq_int()
691 .ok_or_else(|| eof_err!("Not enough data to decode 'values_left'"))?
692 .try_into()
693 .map_err(|_| general_err!("invalid 'values_left'"))?;
694
695 let first_value = self
696 .bit_reader
697 .get_zigzag_vlq_int()
698 .ok_or_else(|| eof_err!("Not enough data to decode 'first_value'"))?;
699
700 self.first_value =
701 Some(T::T::from_i64(first_value).ok_or_else(|| general_err!("first value too large"))?);
702
703 if self.block_size % 128 != 0 {
704 return Err(general_err!(
705 "'block_size' must be a multiple of 128, got {}",
706 self.block_size
707 ));
708 }
709
710 if self.block_size % self.mini_blocks_per_block != 0 {
711 return Err(general_err!(
712 "'block_size' must be a multiple of 'mini_blocks_per_block' got {} and {}",
713 self.block_size,
714 self.mini_blocks_per_block
715 ));
716 }
717
718 self.mini_block_idx = 0;
720 self.values_per_mini_block = self.block_size / self.mini_blocks_per_block;
721 self.mini_block_remaining = 0;
722 self.mini_block_bit_widths.clear();
723
724 if self.values_per_mini_block % 32 != 0 {
725 return Err(general_err!(
726 "'values_per_mini_block' must be a multiple of 32 got {}",
727 self.values_per_mini_block
728 ));
729 }
730
731 Ok(())
732 }
733
734 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
735 assert!(self.initialized, "Bit reader is not initialized");
736 if buffer.is_empty() {
737 return Ok(0);
738 }
739
740 let mut read = 0;
741 let to_read = buffer.len().min(self.values_left);
742
743 if let Some(value) = self.first_value.take() {
744 self.last_value = value;
745 buffer[0] = value;
746 read += 1;
747 self.values_left -= 1;
748 }
749
750 while read != to_read {
751 if self.mini_block_remaining == 0 {
752 self.next_mini_block()?;
753 }
754
755 let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
756 self.check_bit_width(bit_width)?;
757 let batch_to_read = self.mini_block_remaining.min(to_read - read);
758
759 let batch_read = self
760 .bit_reader
761 .get_batch(&mut buffer[read..read + batch_to_read], bit_width);
762
763 if batch_read != batch_to_read {
764 return Err(general_err!(
765 "Expected to read {} values from miniblock got {}",
766 batch_to_read,
767 batch_read
768 ));
769 }
770
771 for v in &mut buffer[read..read + batch_read] {
774 *v = v
778 .wrapping_add(&self.min_delta)
779 .wrapping_add(&self.last_value);
780
781 self.last_value = *v;
782 }
783
784 read += batch_read;
785 self.mini_block_remaining -= batch_read;
786 self.values_left -= batch_read;
787 }
788
789 Ok(to_read)
790 }
791
792 fn values_left(&self) -> usize {
793 self.values_left
794 }
795
796 fn encoding(&self) -> Encoding {
797 Encoding::DELTA_BINARY_PACKED
798 }
799
800 fn skip(&mut self, num_values: usize) -> Result<usize> {
801 let mut skip = 0;
802 let to_skip = num_values.min(self.values_left);
803 if to_skip == 0 {
804 return Ok(0);
805 }
806
807 if let Some(value) = self.first_value.take() {
809 self.last_value = value;
810 skip += 1;
811 self.values_left -= 1;
812 }
813
814 let mini_block_batch_size = match T::T::PHYSICAL_TYPE {
815 Type::INT32 => 32,
816 Type::INT64 => 64,
817 _ => unreachable!(),
818 };
819
820 let mut skip_buffer = vec![T::T::default(); mini_block_batch_size];
821 while skip < to_skip {
822 if self.mini_block_remaining == 0 {
823 self.next_mini_block()?;
824 }
825
826 let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
827 self.check_bit_width(bit_width)?;
828 let mini_block_to_skip = self.mini_block_remaining.min(to_skip - skip);
829 let mini_block_should_skip = mini_block_to_skip;
830
831 let skip_count = self
832 .bit_reader
833 .get_batch(&mut skip_buffer[0..mini_block_to_skip], bit_width);
834
835 if skip_count != mini_block_to_skip {
836 return Err(general_err!(
837 "Expected to skip {} values from mini block got {}.",
838 mini_block_batch_size,
839 skip_count
840 ));
841 }
842
843 for v in &mut skip_buffer[0..skip_count] {
844 *v = v
845 .wrapping_add(&self.min_delta)
846 .wrapping_add(&self.last_value);
847
848 self.last_value = *v;
849 }
850
851 skip += mini_block_should_skip;
852 self.mini_block_remaining -= mini_block_should_skip;
853 self.values_left -= mini_block_should_skip;
854 }
855
856 Ok(to_skip)
857 }
858}
859
860pub struct DeltaLengthByteArrayDecoder<T: DataType> {
870 lengths: Vec<i32>,
873
874 current_idx: usize,
876
877 data: Option<Bytes>,
879
880 offset: usize,
882
883 num_values: usize,
885
886 _phantom: PhantomData<T>,
888}
889
890impl<T: DataType> Default for DeltaLengthByteArrayDecoder<T> {
891 fn default() -> Self {
892 Self::new()
893 }
894}
895
896impl<T: DataType> DeltaLengthByteArrayDecoder<T> {
897 pub fn new() -> Self {
899 Self {
900 lengths: vec![],
901 current_idx: 0,
902 data: None,
903 offset: 0,
904 num_values: 0,
905 _phantom: PhantomData,
906 }
907 }
908}
909
910impl<T: DataType> Decoder<T> for DeltaLengthByteArrayDecoder<T> {
911 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
912 match T::get_physical_type() {
913 Type::BYTE_ARRAY => {
914 let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
915 len_decoder.set_data(data.clone(), num_values)?;
916 let num_lengths = len_decoder.values_left();
917 self.lengths.resize(num_lengths, 0);
918 len_decoder.get(&mut self.lengths[..])?;
919
920 self.data = Some(data.slice(len_decoder.get_offset()..));
921 self.offset = 0;
922 self.current_idx = 0;
923 self.num_values = num_lengths;
924 Ok(())
925 }
926 _ => Err(general_err!(
927 "DeltaLengthByteArrayDecoder only support ByteArrayType"
928 )),
929 }
930 }
931
932 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
933 match T::get_physical_type() {
934 Type::BYTE_ARRAY => {
935 assert!(self.data.is_some());
936
937 let data = self.data.as_ref().unwrap();
938 let num_values = cmp::min(buffer.len(), self.num_values);
939
940 for item in buffer.iter_mut().take(num_values) {
941 let len = self.lengths[self.current_idx] as usize;
942 item.set_from_bytes(data.slice(self.offset..self.offset + len));
943
944 self.offset += len;
945 self.current_idx += 1;
946 }
947
948 self.num_values -= num_values;
949 Ok(num_values)
950 }
951 _ => Err(general_err!(
952 "DeltaLengthByteArrayDecoder only support ByteArrayType"
953 )),
954 }
955 }
956
957 fn values_left(&self) -> usize {
958 self.num_values
959 }
960
961 fn encoding(&self) -> Encoding {
962 Encoding::DELTA_LENGTH_BYTE_ARRAY
963 }
964
965 fn skip(&mut self, num_values: usize) -> Result<usize> {
966 match T::get_physical_type() {
967 Type::BYTE_ARRAY => {
968 let num_values = cmp::min(num_values, self.num_values);
969
970 let next_offset: i32 = self.lengths
971 [self.current_idx..self.current_idx + num_values]
972 .iter()
973 .sum();
974
975 self.current_idx += num_values;
976 self.offset += next_offset as usize;
977
978 self.num_values -= num_values;
979 Ok(num_values)
980 }
981 other_type => Err(general_err!(
982 "DeltaLengthByteArrayDecoder not support {}, only support byte array",
983 other_type
984 )),
985 }
986 }
987}
988
989pub struct DeltaByteArrayDecoder<T: DataType> {
999 prefix_lengths: Vec<i32>,
1002
1003 current_idx: usize,
1005
1006 suffix_decoder: Option<DeltaLengthByteArrayDecoder<ByteArrayType>>,
1009
1010 previous_value: Bytes,
1013
1014 num_values: usize,
1016
1017 _phantom: PhantomData<T>,
1019}
1020
1021impl<T: DataType> Default for DeltaByteArrayDecoder<T> {
1022 fn default() -> Self {
1023 Self::new()
1024 }
1025}
1026
1027impl<T: DataType> DeltaByteArrayDecoder<T> {
1028 pub fn new() -> Self {
1030 Self {
1031 prefix_lengths: vec![],
1032 current_idx: 0,
1033 suffix_decoder: None,
1034 previous_value: Bytes::new(),
1035 num_values: 0,
1036 _phantom: PhantomData,
1037 }
1038 }
1039}
1040
1041impl<T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
1042 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
1043 match T::get_physical_type() {
1044 Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1045 let mut prefix_len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
1046 prefix_len_decoder.set_data(data.clone(), num_values)?;
1047 let num_prefixes = prefix_len_decoder.values_left();
1048 self.prefix_lengths.resize(num_prefixes, 0);
1049 prefix_len_decoder.get(&mut self.prefix_lengths[..])?;
1050
1051 let mut suffix_decoder = DeltaLengthByteArrayDecoder::new();
1052 suffix_decoder
1053 .set_data(data.slice(prefix_len_decoder.get_offset()..), num_values)?;
1054 self.suffix_decoder = Some(suffix_decoder);
1055 self.num_values = num_prefixes;
1056 self.current_idx = 0;
1057 self.previous_value = Bytes::new();
1058 Ok(())
1059 }
1060 _ => Err(general_err!(
1061 "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1062 )),
1063 }
1064 }
1065
1066 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
1067 match T::get_physical_type() {
1068 Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1069 let num_values = cmp::min(buffer.len(), self.num_values);
1070 let mut v: [ByteArray; 1] = [ByteArray::new(); 1];
1071 for item in buffer.iter_mut().take(num_values) {
1072 let suffix_decoder = self
1075 .suffix_decoder
1076 .as_mut()
1077 .expect("decoder not initialized");
1078 suffix_decoder.get(&mut v[..])?;
1079 let suffix = v[0].data();
1080
1081 let prefix_len = self.prefix_lengths[self.current_idx] as usize;
1083
1084 let mut result = Vec::with_capacity(prefix_len + suffix.len());
1086 result.extend_from_slice(&self.previous_value[0..prefix_len]);
1087 result.extend_from_slice(suffix);
1088
1089 let data = Bytes::from(result);
1090 item.set_from_bytes(data.clone());
1091
1092 self.previous_value = data;
1093 self.current_idx += 1;
1094 }
1095
1096 self.num_values -= num_values;
1097 Ok(num_values)
1098 }
1099 _ => Err(general_err!(
1100 "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1101 )),
1102 }
1103 }
1104
1105 fn values_left(&self) -> usize {
1106 self.num_values
1107 }
1108
1109 fn encoding(&self) -> Encoding {
1110 Encoding::DELTA_BYTE_ARRAY
1111 }
1112
1113 fn skip(&mut self, num_values: usize) -> Result<usize> {
1114 let mut buffer = vec![T::T::default(); num_values];
1115 self.get(&mut buffer)
1116 }
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121 use super::{super::encoding::*, *};
1122
1123 use std::f32::consts::PI as PI_f32;
1124 use std::f64::consts::PI as PI_f64;
1125 use std::sync::Arc;
1126
1127 use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
1128 use crate::util::test_common::rand_gen::RandGen;
1129
1130 #[test]
1131 fn test_get_decoders() {
1132 create_and_check_decoder::<Int32Type>(Encoding::PLAIN, None);
1134 create_and_check_decoder::<Int32Type>(Encoding::DELTA_BINARY_PACKED, None);
1135 create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
1136 create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, None);
1137 create_and_check_decoder::<BoolType>(Encoding::RLE, None);
1138
1139 create_and_check_decoder::<Int32Type>(
1141 Encoding::RLE_DICTIONARY,
1142 Some(general_err!(
1143 "Cannot initialize this encoding through this function"
1144 )),
1145 );
1146 create_and_check_decoder::<Int32Type>(
1147 Encoding::PLAIN_DICTIONARY,
1148 Some(general_err!(
1149 "Cannot initialize this encoding through this function"
1150 )),
1151 );
1152 create_and_check_decoder::<Int32Type>(
1153 Encoding::DELTA_LENGTH_BYTE_ARRAY,
1154 Some(general_err!(
1155 "Encoding DELTA_LENGTH_BYTE_ARRAY is not supported for type"
1156 )),
1157 );
1158 create_and_check_decoder::<Int32Type>(
1159 Encoding::DELTA_BYTE_ARRAY,
1160 Some(general_err!(
1161 "Encoding DELTA_BYTE_ARRAY is not supported for type"
1162 )),
1163 );
1164
1165 #[allow(deprecated)]
1167 create_and_check_decoder::<Int32Type>(
1168 Encoding::BIT_PACKED,
1169 Some(nyi_err!("Encoding BIT_PACKED is not supported")),
1170 );
1171 }
1172
1173 #[test]
1174 fn test_plain_decode_int32() {
1175 let data = [42, 18, 52];
1176 let data_bytes = Int32Type::to_byte_array(&data[..]);
1177 let mut buffer = [0; 3];
1178 test_plain_decode::<Int32Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1179 }
1180
1181 #[test]
1182 fn test_plain_skip_int32() {
1183 let data = [42, 18, 52];
1184 let data_bytes = Int32Type::to_byte_array(&data[..]);
1185 test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1186 }
1187
1188 #[test]
1189 fn test_plain_skip_all_int32() {
1190 let data = [42, 18, 52];
1191 let data_bytes = Int32Type::to_byte_array(&data[..]);
1192 test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1193 }
1194
1195 #[test]
1196 fn test_plain_decode_int32_spaced() {
1197 let data = [42, 18, 52];
1198 let expected_data = [0, 42, 0, 18, 0, 0, 52, 0];
1199 let data_bytes = Int32Type::to_byte_array(&data[..]);
1200 let mut buffer = [0; 8];
1201 let num_nulls = 5;
1202 let valid_bits = [0b01001010];
1203 test_plain_decode_spaced::<Int32Type>(
1204 Bytes::from(data_bytes),
1205 3,
1206 -1,
1207 &mut buffer[..],
1208 num_nulls,
1209 &valid_bits,
1210 &expected_data[..],
1211 );
1212 }
1213
1214 #[test]
1215 fn test_plain_decode_int64() {
1216 let data = [42, 18, 52];
1217 let data_bytes = Int64Type::to_byte_array(&data[..]);
1218 let mut buffer = [0; 3];
1219 test_plain_decode::<Int64Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1220 }
1221
1222 #[test]
1223 fn test_plain_skip_int64() {
1224 let data = [42, 18, 52];
1225 let data_bytes = Int64Type::to_byte_array(&data[..]);
1226 test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 2, -1, &data[2..]);
1227 }
1228
1229 #[test]
1230 fn test_plain_skip_all_int64() {
1231 let data = [42, 18, 52];
1232 let data_bytes = Int64Type::to_byte_array(&data[..]);
1233 test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 3, -1, &[]);
1234 }
1235
1236 #[test]
1237 fn test_plain_decode_float() {
1238 let data = [PI_f32, 2.414, 12.51];
1239 let data_bytes = FloatType::to_byte_array(&data[..]);
1240 let mut buffer = [0.0; 3];
1241 test_plain_decode::<FloatType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1242 }
1243
1244 #[test]
1245 fn test_plain_skip_float() {
1246 let data = [PI_f32, 2.414, 12.51];
1247 let data_bytes = FloatType::to_byte_array(&data[..]);
1248 test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1249 }
1250
1251 #[test]
1252 fn test_plain_skip_all_float() {
1253 let data = [PI_f32, 2.414, 12.51];
1254 let data_bytes = FloatType::to_byte_array(&data[..]);
1255 test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 4, -1, &[]);
1256 }
1257
1258 #[test]
1259 fn test_plain_skip_double() {
1260 let data = [PI_f64, 2.414f64, 12.51f64];
1261 let data_bytes = DoubleType::to_byte_array(&data[..]);
1262 test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1263 }
1264
1265 #[test]
1266 fn test_plain_skip_all_double() {
1267 let data = [PI_f64, 2.414f64, 12.51f64];
1268 let data_bytes = DoubleType::to_byte_array(&data[..]);
1269 test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1270 }
1271
1272 #[test]
1273 fn test_plain_decode_double() {
1274 let data = [PI_f64, 2.414f64, 12.51f64];
1275 let data_bytes = DoubleType::to_byte_array(&data[..]);
1276 let mut buffer = [0.0f64; 3];
1277 test_plain_decode::<DoubleType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1278 }
1279
1280 #[test]
1281 fn test_plain_decode_int96() {
1282 let mut data = [Int96::new(); 4];
1283 data[0].set_data(11, 22, 33);
1284 data[1].set_data(44, 55, 66);
1285 data[2].set_data(10, 20, 30);
1286 data[3].set_data(40, 50, 60);
1287 let data_bytes = Int96Type::to_byte_array(&data[..]);
1288 let mut buffer = [Int96::new(); 4];
1289 test_plain_decode::<Int96Type>(Bytes::from(data_bytes), 4, -1, &mut buffer[..], &data[..]);
1290 }
1291
1292 #[test]
1293 fn test_plain_skip_int96() {
1294 let mut data = [Int96::new(); 4];
1295 data[0].set_data(11, 22, 33);
1296 data[1].set_data(44, 55, 66);
1297 data[2].set_data(10, 20, 30);
1298 data[3].set_data(40, 50, 60);
1299 let data_bytes = Int96Type::to_byte_array(&data[..]);
1300 test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 2, -1, &data[2..]);
1301 }
1302
1303 #[test]
1304 fn test_plain_skip_all_int96() {
1305 let mut data = [Int96::new(); 4];
1306 data[0].set_data(11, 22, 33);
1307 data[1].set_data(44, 55, 66);
1308 data[2].set_data(10, 20, 30);
1309 data[3].set_data(40, 50, 60);
1310 let data_bytes = Int96Type::to_byte_array(&data[..]);
1311 test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 8, -1, &[]);
1312 }
1313
1314 #[test]
1315 fn test_plain_decode_bool() {
1316 let data = [
1317 false, true, false, false, true, false, true, true, false, true,
1318 ];
1319 let data_bytes = BoolType::to_byte_array(&data[..]);
1320 let mut buffer = [false; 10];
1321 test_plain_decode::<BoolType>(Bytes::from(data_bytes), 10, -1, &mut buffer[..], &data[..]);
1322 }
1323
1324 #[test]
1325 fn test_plain_skip_bool() {
1326 let data = [
1327 false, true, false, false, true, false, true, true, false, true,
1328 ];
1329 let data_bytes = BoolType::to_byte_array(&data[..]);
1330 test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 5, -1, &data[5..]);
1331 }
1332
1333 #[test]
1334 fn test_plain_skip_all_bool() {
1335 let data = [
1336 false, true, false, false, true, false, true, true, false, true,
1337 ];
1338 let data_bytes = BoolType::to_byte_array(&data[..]);
1339 test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 20, -1, &[]);
1340 }
1341
1342 #[test]
1343 fn test_plain_decode_byte_array() {
1344 let mut data = vec![ByteArray::new(); 2];
1345 data[0].set_data(Bytes::from(String::from("hello")));
1346 data[1].set_data(Bytes::from(String::from("parquet")));
1347 let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1348 let mut buffer = vec![ByteArray::new(); 2];
1349 test_plain_decode::<ByteArrayType>(
1350 Bytes::from(data_bytes),
1351 2,
1352 -1,
1353 &mut buffer[..],
1354 &data[..],
1355 );
1356 }
1357
1358 #[test]
1359 fn test_plain_skip_byte_array() {
1360 let mut data = vec![ByteArray::new(); 2];
1361 data[0].set_data(Bytes::from(String::from("hello")));
1362 data[1].set_data(Bytes::from(String::from("parquet")));
1363 let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1364 test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 1, -1, &data[1..]);
1365 }
1366
1367 #[test]
1368 fn test_plain_skip_all_byte_array() {
1369 let mut data = vec![ByteArray::new(); 2];
1370 data[0].set_data(Bytes::from(String::from("hello")));
1371 data[1].set_data(Bytes::from(String::from("parquet")));
1372 let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1373 test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 2, -1, &[]);
1374 }
1375
1376 #[test]
1377 fn test_plain_decode_fixed_len_byte_array() {
1378 let mut data = vec![FixedLenByteArray::default(); 3];
1379 data[0].set_data(Bytes::from(String::from("bird")));
1380 data[1].set_data(Bytes::from(String::from("come")));
1381 data[2].set_data(Bytes::from(String::from("flow")));
1382 let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1383 let mut buffer = vec![FixedLenByteArray::default(); 3];
1384 test_plain_decode::<FixedLenByteArrayType>(
1385 Bytes::from(data_bytes),
1386 3,
1387 4,
1388 &mut buffer[..],
1389 &data[..],
1390 );
1391 }
1392
1393 #[test]
1394 fn test_plain_skip_fixed_len_byte_array() {
1395 let mut data = vec![FixedLenByteArray::default(); 3];
1396 data[0].set_data(Bytes::from(String::from("bird")));
1397 data[1].set_data(Bytes::from(String::from("come")));
1398 data[2].set_data(Bytes::from(String::from("flow")));
1399 let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1400 test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 1, 4, &data[1..]);
1401 }
1402
1403 #[test]
1404 fn test_plain_skip_all_fixed_len_byte_array() {
1405 let mut data = vec![FixedLenByteArray::default(); 3];
1406 data[0].set_data(Bytes::from(String::from("bird")));
1407 data[1].set_data(Bytes::from(String::from("come")));
1408 data[2].set_data(Bytes::from(String::from("flow")));
1409 let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1410 test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 6, 4, &[]);
1411 }
1412
1413 #[test]
1414 fn test_dict_decoder_empty_data() {
1415 let mut decoder = DictDecoder::<Int32Type>::new();
1416 let err = decoder.set_data(Bytes::new(), 10).unwrap_err();
1417 assert_eq!(err.to_string(), "EOF: Not enough bytes to decode bit_width");
1418 }
1419
1420 fn test_plain_decode<T: DataType>(
1421 data: Bytes,
1422 num_values: usize,
1423 type_length: i32,
1424 buffer: &mut [T::T],
1425 expected: &[T::T],
1426 ) {
1427 let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1428 let result = decoder.set_data(data, num_values);
1429 assert!(result.is_ok());
1430 let result = decoder.get(buffer);
1431 assert!(result.is_ok());
1432 assert_eq!(decoder.values_left(), 0);
1433 assert_eq!(buffer, expected);
1434 }
1435
1436 fn test_plain_skip<T: DataType>(
1437 data: Bytes,
1438 num_values: usize,
1439 skip: usize,
1440 type_length: i32,
1441 expected: &[T::T],
1442 ) {
1443 let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1444 let result = decoder.set_data(data, num_values);
1445 assert!(result.is_ok());
1446 let skipped = decoder.skip(skip).expect("skipping values");
1447
1448 if skip >= num_values {
1449 assert_eq!(skipped, num_values);
1450
1451 let mut buffer = vec![T::T::default(); 1];
1452 let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1453 assert_eq!(remaining, 0);
1454 } else {
1455 assert_eq!(skipped, skip);
1456 let mut buffer = vec![T::T::default(); num_values - skip];
1457 let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1458 assert_eq!(remaining, num_values - skip);
1459 assert_eq!(decoder.values_left(), 0);
1460 assert_eq!(buffer, expected);
1461 }
1462 }
1463
1464 fn test_plain_decode_spaced<T: DataType>(
1465 data: Bytes,
1466 num_values: usize,
1467 type_length: i32,
1468 buffer: &mut [T::T],
1469 num_nulls: usize,
1470 valid_bits: &[u8],
1471 expected: &[T::T],
1472 ) {
1473 let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1474 let result = decoder.set_data(data, num_values);
1475 assert!(result.is_ok());
1476 let result = decoder.get_spaced(buffer, num_nulls, valid_bits);
1477 assert!(result.is_ok());
1478 assert_eq!(num_values + num_nulls, result.unwrap());
1479 assert_eq!(decoder.values_left(), 0);
1480 assert_eq!(buffer, expected);
1481 }
1482
1483 #[test]
1484 #[should_panic(expected = "RleValueEncoder only supports BoolType")]
1485 fn test_rle_value_encode_int32_not_supported() {
1486 let mut encoder = RleValueEncoder::<Int32Type>::new();
1487 encoder.put(&[1, 2, 3, 4]).unwrap();
1488 }
1489
1490 #[test]
1491 #[should_panic(expected = "RleValueDecoder only supports BoolType")]
1492 fn test_rle_value_decode_int32_not_supported() {
1493 let mut decoder = RleValueDecoder::<Int32Type>::new();
1494 decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).unwrap();
1495 }
1496
1497 #[test]
1498 fn test_rle_value_decode_missing_size() {
1499 let mut decoder = RleValueDecoder::<BoolType>::new();
1500 assert!(decoder.set_data(Bytes::from(vec![0]), 1).is_err());
1501 }
1502
1503 #[test]
1504 fn test_rle_value_decode_missing_data() {
1505 let mut decoder = RleValueDecoder::<BoolType>::new();
1506 assert!(decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).is_err());
1507 }
1508
1509 #[test]
1510 fn test_rle_value_decode_bool_decode() {
1511 let data = vec![
1513 BoolType::gen_vec(-1, 256),
1514 BoolType::gen_vec(-1, 257),
1515 BoolType::gen_vec(-1, 126),
1516 ];
1517 test_rle_value_decode::<BoolType>(data);
1518 }
1519
1520 #[test]
1521 #[should_panic(expected = "Bit reader is not initialized")]
1522 fn test_delta_bit_packed_not_initialized_offset() {
1523 let decoder = DeltaBitPackDecoder::<Int32Type>::new();
1525 decoder.get_offset();
1526 }
1527
1528 #[test]
1529 #[should_panic(expected = "Bit reader is not initialized")]
1530 fn test_delta_bit_packed_not_initialized_get() {
1531 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1533 let mut buffer = vec![];
1534 decoder.get(&mut buffer).unwrap();
1535 }
1536
1537 #[test]
1538 fn test_delta_bit_packed_int32_empty() {
1539 let data = vec![vec![0; 0]];
1540 test_delta_bit_packed_decode::<Int32Type>(data);
1541 }
1542
1543 #[test]
1544 fn test_delta_bit_packed_int32_repeat() {
1545 let block_data = vec![
1546 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1547 6, 7, 8,
1548 ];
1549 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1550 }
1551
1552 #[test]
1553 fn test_skip_delta_bit_packed_int32_repeat() {
1554 let block_data = vec![
1555 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1556 6, 7, 8,
1557 ];
1558 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 10);
1559 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1560 }
1561
1562 #[test]
1563 fn test_delta_bit_packed_int32_uneven() {
1564 let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1565 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1566 }
1567
1568 #[test]
1569 fn test_skip_delta_bit_packed_int32_uneven() {
1570 let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1571 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1572 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1573 }
1574
1575 #[test]
1576 fn test_delta_bit_packed_int32_same_values() {
1577 let block_data = vec![
1578 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1579 ];
1580 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1581
1582 let block_data = vec![
1583 -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1584 -127, -127,
1585 ];
1586 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1587 }
1588
1589 #[test]
1590 fn test_skip_delta_bit_packed_int32_same_values() {
1591 let block_data = vec![
1592 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1593 ];
1594 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1595 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1596
1597 let block_data = vec![
1598 -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1599 -127, -127,
1600 ];
1601 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1602 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1603 }
1604
1605 #[test]
1606 fn test_delta_bit_packed_int32_min_max() {
1607 let block_data = vec![
1608 i32::MIN,
1609 i32::MIN,
1610 i32::MIN,
1611 i32::MAX,
1612 i32::MIN,
1613 i32::MAX,
1614 i32::MIN,
1615 i32::MAX,
1616 ];
1617 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1618 }
1619
1620 #[test]
1621 fn test_skip_delta_bit_packed_int32_min_max() {
1622 let block_data = vec![
1623 i32::MIN,
1624 i32::MIN,
1625 i32::MIN,
1626 i32::MAX,
1627 i32::MIN,
1628 i32::MAX,
1629 i32::MIN,
1630 i32::MAX,
1631 ];
1632 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1633 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1634 }
1635
1636 #[test]
1637 fn test_delta_bit_packed_int32_multiple_blocks() {
1638 let data = vec![
1640 Int32Type::gen_vec(-1, 64),
1641 Int32Type::gen_vec(-1, 128),
1642 Int32Type::gen_vec(-1, 64),
1643 ];
1644 test_delta_bit_packed_decode::<Int32Type>(data);
1645 }
1646
1647 #[test]
1648 fn test_delta_bit_packed_int32_data_across_blocks() {
1649 let data = vec![Int32Type::gen_vec(-1, 256), Int32Type::gen_vec(-1, 257)];
1651 test_delta_bit_packed_decode::<Int32Type>(data);
1652 }
1653
1654 #[test]
1655 fn test_delta_bit_packed_int32_with_empty_blocks() {
1656 let data = vec![
1657 Int32Type::gen_vec(-1, 128),
1658 vec![0; 0],
1659 Int32Type::gen_vec(-1, 64),
1660 ];
1661 test_delta_bit_packed_decode::<Int32Type>(data);
1662 }
1663
1664 #[test]
1665 fn test_delta_bit_packed_int64_empty() {
1666 let data = vec![vec![0; 0]];
1667 test_delta_bit_packed_decode::<Int64Type>(data);
1668 }
1669
1670 #[test]
1671 fn test_delta_bit_packed_int64_min_max() {
1672 let block_data = vec![
1673 i64::MIN,
1674 i64::MAX,
1675 i64::MIN,
1676 i64::MAX,
1677 i64::MIN,
1678 i64::MAX,
1679 i64::MIN,
1680 i64::MAX,
1681 ];
1682 test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
1683 }
1684
1685 #[test]
1686 fn test_delta_bit_packed_int64_multiple_blocks() {
1687 let data = vec![
1689 Int64Type::gen_vec(-1, 64),
1690 Int64Type::gen_vec(-1, 128),
1691 Int64Type::gen_vec(-1, 64),
1692 ];
1693 test_delta_bit_packed_decode::<Int64Type>(data);
1694 }
1695
1696 #[test]
1697 fn test_delta_bit_packed_zero_miniblocks() {
1698 let data = vec![
1700 128, 1, 0, ];
1703 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1704 let err = decoder.set_data(data.into(), 0).unwrap_err();
1705 assert_eq!(
1706 err.to_string(),
1707 "Parquet error: cannot have zero miniblocks per block"
1708 );
1709 }
1710
1711 #[test]
1712 fn test_delta_bit_packed_decoder_sample() {
1713 let data_bytes = vec![
1714 128, 1, 4, 3, 58, 28, 6, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1715 0, 0, 0, 0, 0, 0,
1716 ];
1717 let mut decoder: DeltaBitPackDecoder<Int32Type> = DeltaBitPackDecoder::new();
1718 decoder.set_data(data_bytes.into(), 3).unwrap();
1719 assert_eq!(decoder.get_offset(), 5);
1722 let mut result = vec![0, 0, 0];
1723 decoder.get(&mut result).unwrap();
1724 assert_eq!(decoder.get_offset(), 34);
1725 assert_eq!(result, vec![29, 43, 89]);
1726 }
1727
1728 #[test]
1729 fn test_delta_bit_packed_padding() {
1730 let header = vec![
1732 128,
1736 2,
1737 4,
1739 128 + 35,
1741 3,
1742 7,
1744 ];
1745
1746 let block1_header = vec![
1748 0, 0, 1, 0, 0, ];
1751
1752 let block1 = vec![0xFF; 8];
1757
1758 let block2_header = vec![
1760 0, 0, 1, 2, 0xFF, ];
1763
1764 let block2 = vec![0xFF; 24];
1769
1770 let data: Vec<u8> = header
1771 .into_iter()
1772 .chain(block1_header)
1773 .chain(block1)
1774 .chain(block2_header)
1775 .chain(block2)
1776 .collect();
1777
1778 let length = data.len();
1779
1780 let ptr = Bytes::from(data);
1781 let mut reader = BitReader::new(ptr.clone());
1782 assert_eq!(reader.get_vlq_int().unwrap(), 256);
1783 assert_eq!(reader.get_vlq_int().unwrap(), 4);
1784 assert_eq!(reader.get_vlq_int().unwrap(), 419);
1785 assert_eq!(reader.get_vlq_int().unwrap(), 7);
1786
1787 let mut output = vec![0_i32; 420];
1789
1790 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1791 decoder.set_data(ptr.clone(), 0).unwrap();
1792 assert_eq!(decoder.get(&mut output).unwrap(), 419);
1793 assert_eq!(decoder.get_offset(), length);
1794
1795 decoder.set_data(ptr.slice(..12), 0).unwrap();
1797 let err = decoder.get(&mut output).unwrap_err().to_string();
1798 assert!(
1799 err.contains("Expected to read 64 values from miniblock got 8"),
1800 "{}",
1801 err
1802 );
1803 }
1804
1805 #[test]
1806 fn test_delta_byte_array_same_arrays() {
1807 let data = vec![
1808 vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])],
1809 vec![
1810 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1811 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1812 ],
1813 vec![
1814 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1815 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1816 ],
1817 ];
1818 test_delta_byte_array_decode(data);
1819 }
1820
1821 #[test]
1822 fn test_delta_byte_array_unique_arrays() {
1823 let data = vec![
1824 vec![ByteArray::from(vec![1])],
1825 vec![ByteArray::from(vec![2, 3]), ByteArray::from(vec![4, 5, 6])],
1826 vec![
1827 ByteArray::from(vec![7, 8]),
1828 ByteArray::from(vec![9, 0, 1, 2]),
1829 ],
1830 ];
1831 test_delta_byte_array_decode(data);
1832 }
1833
1834 #[test]
1835 fn test_delta_byte_array_single_array() {
1836 let data = vec![vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])]];
1837 test_delta_byte_array_decode(data);
1838 }
1839
1840 #[test]
1841 fn test_byte_stream_split_multiple_f32() {
1842 let data = vec![
1843 vec![
1844 f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
1845 f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
1846 ],
1847 vec![f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
1848 ];
1849 test_byte_stream_split_decode::<FloatType>(data, -1);
1850 }
1851
1852 #[test]
1853 fn test_byte_stream_split_f64() {
1854 let data = vec![vec![
1855 f64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
1856 f64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
1857 ]];
1858 test_byte_stream_split_decode::<DoubleType>(data, -1);
1859 }
1860
1861 #[test]
1862 fn test_byte_stream_split_multiple_i32() {
1863 let data = vec![
1864 vec![
1865 i32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
1866 i32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
1867 ],
1868 vec![i32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
1869 ];
1870 test_byte_stream_split_decode::<Int32Type>(data, -1);
1871 }
1872
1873 #[test]
1874 fn test_byte_stream_split_i64() {
1875 let data = vec![vec![
1876 i64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
1877 i64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
1878 ]];
1879 test_byte_stream_split_decode::<Int64Type>(data, -1);
1880 }
1881
1882 fn test_byte_stream_split_flba(type_width: usize) {
1883 let data = vec![
1884 vec![
1885 FixedLenByteArrayType::r#gen(type_width as i32),
1886 FixedLenByteArrayType::r#gen(type_width as i32),
1887 ],
1888 vec![FixedLenByteArrayType::r#gen(type_width as i32)],
1889 ];
1890 test_byte_stream_split_decode::<FixedLenByteArrayType>(data, type_width as i32);
1891 }
1892
1893 #[test]
1894 fn test_byte_stream_split_flba5() {
1895 test_byte_stream_split_flba(5);
1896 }
1897
1898 #[test]
1899 fn test_byte_stream_split_flba16() {
1900 test_byte_stream_split_flba(16);
1901 }
1902
1903 #[test]
1904 fn test_byte_stream_split_flba19() {
1905 test_byte_stream_split_flba(19);
1906 }
1907
1908 #[test]
1909 #[should_panic(expected = "Mismatched FixedLenByteArray sizes: 4 != 5")]
1910 fn test_byte_stream_split_flba_mismatch() {
1911 let data = vec![
1912 vec![
1913 FixedLenByteArray::from(vec![0xAA, 0xAB, 0xAC, 0xAD, 0xAE]),
1914 FixedLenByteArray::from(vec![0xBA, 0xBB, 0xBC, 0xBD, 0xBE]),
1915 ],
1916 vec![FixedLenByteArray::from(vec![0xCA, 0xCB, 0xCC, 0xCD])],
1917 ];
1918 test_byte_stream_split_decode::<FixedLenByteArrayType>(data, 5);
1919 }
1920
1921 #[test]
1922 #[should_panic(expected = "Input data length is not a multiple of type width 4")]
1923 fn test_byte_stream_split_flba_bad_input() {
1924 let mut decoder = VariableWidthByteStreamSplitDecoder::<FixedLenByteArrayType>::new(4);
1925 decoder
1926 .set_data(Bytes::from(vec![1, 2, 3, 4, 5]), 1)
1927 .unwrap();
1928 }
1929
1930 #[test]
1931 fn test_skip_byte_stream_split() {
1932 let block_data = vec![0.3, 0.4, 0.1, 4.10];
1933 test_skip::<FloatType>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
1934 test_skip::<DoubleType>(
1935 block_data.into_iter().map(|x| x as f64).collect(),
1936 Encoding::BYTE_STREAM_SPLIT,
1937 100,
1938 );
1939 }
1940
1941 #[test]
1942 fn test_skip_byte_stream_split_ints() {
1943 let block_data = vec![3, 4, 1, 5];
1944 test_skip::<Int32Type>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
1945 test_skip::<Int64Type>(
1946 block_data.into_iter().map(|x| x as i64).collect(),
1947 Encoding::BYTE_STREAM_SPLIT,
1948 100,
1949 );
1950 }
1951
1952 fn test_rle_value_decode<T: DataType>(data: Vec<Vec<T::T>>) {
1953 test_encode_decode::<T>(data, Encoding::RLE, -1);
1954 }
1955
1956 fn test_delta_bit_packed_decode<T: DataType>(data: Vec<Vec<T::T>>) {
1957 test_encode_decode::<T>(data, Encoding::DELTA_BINARY_PACKED, -1);
1958 }
1959
1960 fn test_byte_stream_split_decode<T: DataType>(data: Vec<Vec<T::T>>, type_width: i32) {
1961 test_encode_decode::<T>(data, Encoding::BYTE_STREAM_SPLIT, type_width);
1962 }
1963
1964 fn test_delta_byte_array_decode(data: Vec<Vec<ByteArray>>) {
1965 test_encode_decode::<ByteArrayType>(data, Encoding::DELTA_BYTE_ARRAY, -1);
1966 }
1967
1968 fn test_encode_decode<T: DataType>(data: Vec<Vec<T::T>>, encoding: Encoding, type_width: i32) {
1973 let col_descr = create_test_col_desc_ptr(type_width, T::get_physical_type());
1974
1975 let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
1977
1978 for v in &data[..] {
1979 encoder.put(&v[..]).expect("ok to encode");
1980 }
1981 let bytes = encoder.flush_buffer().expect("ok to flush buffer");
1982
1983 let expected: Vec<T::T> = data.iter().flat_map(|s| s.clone()).collect();
1985
1986 let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
1988
1989 let mut result = vec![T::T::default(); expected.len()];
1990 decoder
1991 .set_data(bytes, expected.len())
1992 .expect("ok to set data");
1993 let mut result_num_values = 0;
1994 while decoder.values_left() > 0 {
1995 result_num_values += decoder
1996 .get(&mut result[result_num_values..])
1997 .expect("ok to decode");
1998 }
1999 assert_eq!(result_num_values, expected.len());
2000 assert_eq!(result, expected);
2001 }
2002
2003 fn test_skip<T: DataType>(data: Vec<T::T>, encoding: Encoding, skip: usize) {
2004 let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());
2007
2008 let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
2010
2011 encoder.put(&data).expect("ok to encode");
2012
2013 let bytes = encoder.flush_buffer().expect("ok to flush buffer");
2014
2015 let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
2016 decoder.set_data(bytes, data.len()).expect("ok to set data");
2017
2018 if skip >= data.len() {
2019 let skipped = decoder.skip(skip).expect("ok to skip");
2020 assert_eq!(skipped, data.len());
2021
2022 let skipped_again = decoder.skip(skip).expect("ok to skip again");
2023 assert_eq!(skipped_again, 0);
2024 } else {
2025 let skipped = decoder.skip(skip).expect("ok to skip");
2026 assert_eq!(skipped, skip);
2027
2028 let remaining = data.len() - skip;
2029
2030 let expected = &data[skip..];
2031 let mut buffer = vec![T::T::default(); remaining];
2032 let fetched = decoder.get(&mut buffer).expect("ok to decode");
2033 assert_eq!(remaining, fetched);
2034 assert_eq!(&buffer, expected);
2035 }
2036 }
2037
2038 fn create_and_check_decoder<T: DataType>(encoding: Encoding, err: Option<ParquetError>) {
2039 let descr = create_test_col_desc_ptr(-1, T::get_physical_type());
2040 let decoder = get_decoder::<T>(descr, encoding);
2041 match err {
2042 Some(parquet_error) => {
2043 assert_eq!(
2044 decoder.err().unwrap().to_string(),
2045 parquet_error.to_string()
2046 );
2047 }
2048 None => {
2049 assert_eq!(decoder.unwrap().encoding(), encoding);
2050 }
2051 }
2052 }
2053
2054 fn create_test_col_desc_ptr(type_len: i32, t: Type) -> ColumnDescPtr {
2056 let ty = SchemaType::primitive_type_builder("t", t)
2057 .with_length(type_len)
2058 .build()
2059 .unwrap();
2060 Arc::new(ColumnDescriptor::new(
2061 Arc::new(ty),
2062 0,
2063 0,
2064 ColumnPath::new(vec![]),
2065 ))
2066 }
2067
2068 fn usize_to_bytes(v: usize) -> [u8; 4] {
2069 (v as u32).to_ne_bytes()
2070 }
2071
2072 trait ToByteArray<T: DataType> {
2074 #[allow(clippy::wrong_self_convention)]
2075 fn to_byte_array(data: &[T::T]) -> Vec<u8>;
2076 }
2077
2078 macro_rules! to_byte_array_impl {
2079 ($ty: ty) => {
2080 impl ToByteArray<$ty> for $ty {
2081 #[allow(clippy::wrong_self_convention)]
2082 fn to_byte_array(data: &[<$ty as DataType>::T]) -> Vec<u8> {
2083 <$ty as DataType>::T::slice_as_bytes(data).to_vec()
2084 }
2085 }
2086 };
2087 }
2088
2089 to_byte_array_impl!(Int32Type);
2090 to_byte_array_impl!(Int64Type);
2091 to_byte_array_impl!(FloatType);
2092 to_byte_array_impl!(DoubleType);
2093
2094 impl ToByteArray<BoolType> for BoolType {
2095 #[allow(clippy::wrong_self_convention)]
2096 fn to_byte_array(data: &[bool]) -> Vec<u8> {
2097 let mut v = vec![];
2098 for (i, item) in data.iter().enumerate() {
2099 if i % 8 == 0 {
2100 v.push(0);
2101 }
2102 if *item {
2103 v[i / 8] |= 1 << (i % 8);
2104 }
2105 }
2106 v
2107 }
2108 }
2109
2110 impl ToByteArray<Int96Type> for Int96Type {
2111 #[allow(clippy::wrong_self_convention)]
2112 fn to_byte_array(data: &[Int96]) -> Vec<u8> {
2113 let mut v = vec![];
2114 for d in data {
2115 v.extend_from_slice(d.as_bytes());
2116 }
2117 v
2118 }
2119 }
2120
2121 impl ToByteArray<ByteArrayType> for ByteArrayType {
2122 #[allow(clippy::wrong_self_convention)]
2123 fn to_byte_array(data: &[ByteArray]) -> Vec<u8> {
2124 let mut v = vec![];
2125 for d in data {
2126 let buf = d.data();
2127 let len = &usize_to_bytes(buf.len());
2128 v.extend_from_slice(len);
2129 v.extend(buf);
2130 }
2131 v
2132 }
2133 }
2134
2135 impl ToByteArray<FixedLenByteArrayType> for FixedLenByteArrayType {
2136 #[allow(clippy::wrong_self_convention)]
2137 fn to_byte_array(data: &[FixedLenByteArray]) -> Vec<u8> {
2138 let mut v = vec![];
2139 for d in data {
2140 let buf = d.data();
2141 v.extend(buf);
2142 }
2143 v
2144 }
2145 }
2146
2147 #[test]
2148 #[allow(clippy::vec_init_then_push)]
2150 fn test_delta_bit_packed_invalid_bit_width() {
2151 let mut buffer = vec![];
2153 buffer.push(128);
2155 buffer.push(1);
2156 buffer.push(4);
2158 buffer.push(32);
2160 buffer.push(0);
2162 buffer.push(0);
2164 buffer.push(33); buffer.push(0);
2167 buffer.push(0);
2168 buffer.push(0);
2169
2170 let corrupted_buffer = Bytes::from(buffer);
2171
2172 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
2173 decoder.set_data(corrupted_buffer.clone(), 32).unwrap();
2174 let mut read_buffer = vec![0; 32];
2175 let err = decoder.get(&mut read_buffer).unwrap_err();
2176 assert!(
2177 err.to_string()
2178 .contains("Invalid delta bit width 33 which is larger than expected 32"),
2179 "{}",
2180 err
2181 );
2182
2183 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
2184 decoder.set_data(corrupted_buffer, 32).unwrap();
2185 let err = decoder.skip(32).unwrap_err();
2186 assert!(
2187 err.to_string()
2188 .contains("Invalid delta bit width 33 which is larger than expected 32"),
2189 "{}",
2190 err
2191 );
2192 }
2193}