1use bytes::Bytes;
21use num_traits::{FromPrimitive, WrappingAdd};
22use std::{cmp, marker::PhantomData, mem};
23
24use super::rle::RleDecoder;
25
26use crate::basic::*;
27use crate::data_type::private::ParquetValueType;
28use crate::data_type::*;
29use crate::encodings::decoding::byte_stream_split_decoder::{
30 ByteStreamSplitDecoder, VariableWidthByteStreamSplitDecoder,
31};
32use crate::errors::{ParquetError, Result};
33use crate::schema::types::ColumnDescPtr;
34use crate::util::bit_util::{self, BitReader, FromBitpacked};
35
36mod byte_stream_split_decoder;
37
38pub(crate) mod private {
39 use super::*;
40
41 pub trait GetDecoder {
46 fn get_decoder<T: DataType<T = Self>>(
47 descr: ColumnDescPtr,
48 encoding: Encoding,
49 ) -> Result<Box<dyn Decoder<T>>> {
50 get_decoder_default(descr, encoding)
51 }
52 }
53
54 fn get_decoder_default<T: DataType>(
55 descr: ColumnDescPtr,
56 encoding: Encoding,
57 ) -> Result<Box<dyn Decoder<T>>> {
58 match encoding {
59 Encoding::PLAIN => Ok(Box::new(PlainDecoder::new(descr.type_length()))),
60 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Err(general_err!(
61 "Cannot initialize this encoding through this function"
62 )),
63 Encoding::RLE
64 | Encoding::DELTA_BINARY_PACKED
65 | Encoding::DELTA_BYTE_ARRAY
66 | Encoding::DELTA_LENGTH_BYTE_ARRAY => Err(general_err!(
67 "Encoding {} is not supported for type",
68 encoding
69 )),
70 e => Err(nyi_err!("Encoding {} is not supported", e)),
71 }
72 }
73
74 impl GetDecoder for bool {
75 fn get_decoder<T: DataType<T = Self>>(
76 descr: ColumnDescPtr,
77 encoding: Encoding,
78 ) -> Result<Box<dyn Decoder<T>>> {
79 match encoding {
80 Encoding::RLE => Ok(Box::new(RleValueDecoder::new())),
81 _ => get_decoder_default(descr, encoding),
82 }
83 }
84 }
85
86 impl GetDecoder for i32 {
87 fn get_decoder<T: DataType<T = Self>>(
88 descr: ColumnDescPtr,
89 encoding: Encoding,
90 ) -> Result<Box<dyn Decoder<T>>> {
91 match encoding {
92 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
93 Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
94 _ => get_decoder_default(descr, encoding),
95 }
96 }
97 }
98
99 impl GetDecoder for i64 {
100 fn get_decoder<T: DataType<T = Self>>(
101 descr: ColumnDescPtr,
102 encoding: Encoding,
103 ) -> Result<Box<dyn Decoder<T>>> {
104 match encoding {
105 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
106 Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
107 _ => get_decoder_default(descr, encoding),
108 }
109 }
110 }
111
112 impl GetDecoder for f32 {
113 fn get_decoder<T: DataType<T = Self>>(
114 descr: ColumnDescPtr,
115 encoding: Encoding,
116 ) -> Result<Box<dyn Decoder<T>>> {
117 match encoding {
118 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
119 _ => get_decoder_default(descr, encoding),
120 }
121 }
122 }
123 impl GetDecoder for f64 {
124 fn get_decoder<T: DataType<T = Self>>(
125 descr: ColumnDescPtr,
126 encoding: Encoding,
127 ) -> Result<Box<dyn Decoder<T>>> {
128 match encoding {
129 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
130 _ => get_decoder_default(descr, encoding),
131 }
132 }
133 }
134
135 impl GetDecoder for ByteArray {
136 fn get_decoder<T: DataType<T = Self>>(
137 descr: ColumnDescPtr,
138 encoding: Encoding,
139 ) -> Result<Box<dyn Decoder<T>>> {
140 match encoding {
141 Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
142 Encoding::DELTA_LENGTH_BYTE_ARRAY => {
143 Ok(Box::new(DeltaLengthByteArrayDecoder::new()))
144 }
145 _ => get_decoder_default(descr, encoding),
146 }
147 }
148 }
149
150 impl GetDecoder for FixedLenByteArray {
151 fn get_decoder<T: DataType<T = Self>>(
152 descr: ColumnDescPtr,
153 encoding: Encoding,
154 ) -> Result<Box<dyn Decoder<T>>> {
155 match encoding {
156 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(
157 VariableWidthByteStreamSplitDecoder::new(descr.type_length()),
158 )),
159 Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
160 _ => get_decoder_default(descr, encoding),
161 }
162 }
163 }
164
165 impl GetDecoder for Int96 {}
166}
167
168pub trait Decoder<T: DataType>: Send {
173 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()>;
176
177 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize>;
184
185 fn get_spaced(
198 &mut self,
199 buffer: &mut [T::T],
200 null_count: usize,
201 valid_bits: &[u8],
202 ) -> Result<usize> {
203 assert!(buffer.len() >= null_count);
204
205 if null_count == 0 {
207 return self.get(buffer);
208 }
209
210 let num_values = buffer.len();
211 let values_to_read = num_values - null_count;
212 let values_read = self.get(buffer)?;
213 if values_read != values_to_read {
214 return Err(general_err!(
215 "Number of values read: {}, doesn't match expected: {}",
216 values_read,
217 values_to_read
218 ));
219 }
220 let mut values_to_move = values_read;
221 for i in (0..num_values).rev() {
222 if bit_util::get_bit(valid_bits, i) {
223 values_to_move -= 1;
224 buffer.swap(i, values_to_move);
225 }
226 }
227
228 Ok(num_values)
229 }
230
231 fn values_left(&self) -> usize;
233
234 fn encoding(&self) -> Encoding;
236
237 fn skip(&mut self, num_values: usize) -> Result<usize>;
239}
240
241pub fn get_decoder<T: DataType>(
246 descr: ColumnDescPtr,
247 encoding: Encoding,
248) -> Result<Box<dyn Decoder<T>>> {
249 use self::private::GetDecoder;
250 T::T::get_decoder(descr, encoding)
251}
252
253#[derive(Default)]
257pub struct PlainDecoderDetails {
258 pub(crate) num_values: usize,
260
261 pub(crate) start: usize,
263
264 pub(crate) type_length: i32,
266
267 pub(crate) data: Option<Bytes>,
269
270 pub(crate) bit_reader: Option<BitReader>,
272}
273
274pub struct PlainDecoder<T: DataType> {
280 inner: PlainDecoderDetails,
282
283 _phantom: PhantomData<T>,
286}
287
288impl<T: DataType> PlainDecoder<T> {
289 pub fn new(type_length: i32) -> Self {
291 PlainDecoder {
292 inner: PlainDecoderDetails {
293 type_length,
294 num_values: 0,
295 start: 0,
296 data: None,
297 bit_reader: None,
298 },
299 _phantom: PhantomData,
300 }
301 }
302}
303
304impl<T: DataType> Decoder<T> for PlainDecoder<T> {
305 #[inline]
306 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
307 T::T::set_data(&mut self.inner, data, num_values);
308 Ok(())
309 }
310
311 #[inline]
312 fn values_left(&self) -> usize {
313 self.inner.num_values
314 }
315
316 #[inline]
317 fn encoding(&self) -> Encoding {
318 Encoding::PLAIN
319 }
320
321 #[inline]
322 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
323 T::T::decode(buffer, &mut self.inner)
324 }
325
326 #[inline]
327 fn skip(&mut self, num_values: usize) -> Result<usize> {
328 T::T::skip(&mut self.inner, num_values)
329 }
330}
331
332pub struct DictDecoder<T: DataType> {
341 dictionary: Vec<T::T>,
343
344 has_dictionary: bool,
346
347 rle_decoder: Option<RleDecoder>,
349
350 num_values: usize,
352}
353
354impl<T: DataType> Default for DictDecoder<T> {
355 fn default() -> Self {
356 Self::new()
357 }
358}
359
360impl<T: DataType> DictDecoder<T> {
361 pub fn new() -> Self {
363 Self {
364 dictionary: vec![],
365 has_dictionary: false,
366 rle_decoder: None,
367 num_values: 0,
368 }
369 }
370
371 pub fn set_dict(&mut self, mut decoder: Box<dyn Decoder<T>>) -> Result<()> {
373 let num_values = decoder.values_left();
374 self.dictionary.resize(num_values, T::T::default());
375 let _ = decoder.get(&mut self.dictionary)?;
376 self.has_dictionary = true;
377 Ok(())
378 }
379}
380
381impl<T: DataType> Decoder<T> for DictDecoder<T> {
382 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
383 if data.is_empty() {
385 return Err(eof_err!("Not enough bytes to decode bit_width"));
386 }
387
388 let bit_width = data.as_ref()[0];
389 if bit_width > 32 {
390 return Err(general_err!(
391 "Invalid or corrupted RLE bit width {}. Max allowed is 32",
392 bit_width
393 ));
394 }
395 let mut rle_decoder = RleDecoder::new(bit_width);
396 rle_decoder.set_data(data.slice(1..))?;
397 self.num_values = num_values;
398 self.rle_decoder = Some(rle_decoder);
399 Ok(())
400 }
401
402 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
403 assert!(self.rle_decoder.is_some());
404 assert!(self.has_dictionary, "Must call set_dict() first!");
405
406 let rle = self.rle_decoder.as_mut().unwrap();
407 let num_values = cmp::min(buffer.len(), self.num_values);
408 rle.get_batch_with_dict(&self.dictionary[..], buffer, num_values)
409 }
410
411 fn values_left(&self) -> usize {
413 self.num_values
414 }
415
416 fn encoding(&self) -> Encoding {
417 Encoding::RLE_DICTIONARY
418 }
419
420 fn skip(&mut self, num_values: usize) -> Result<usize> {
421 assert!(self.rle_decoder.is_some());
422 assert!(self.has_dictionary, "Must call set_dict() first!");
423
424 let rle = self.rle_decoder.as_mut().unwrap();
425 let num_values = cmp::min(num_values, self.num_values);
426 rle.skip(num_values)
427 }
428}
429
430pub struct RleValueDecoder<T: DataType> {
437 values_left: usize,
438 decoder: RleDecoder,
439 _phantom: PhantomData<T>,
440}
441
442impl<T: DataType> Default for RleValueDecoder<T> {
443 fn default() -> Self {
444 Self::new()
445 }
446}
447
448impl<T: DataType> RleValueDecoder<T> {
449 pub fn new() -> Self {
450 Self {
451 values_left: 0,
452 decoder: RleDecoder::new(1),
453 _phantom: PhantomData,
454 }
455 }
456}
457
458impl<T: DataType> Decoder<T> for RleValueDecoder<T>
459where
460 T::T: FromBitpacked,
461{
462 #[inline]
463 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
464 ensure_phys_ty!(Type::BOOLEAN, "RleValueDecoder only supports BoolType");
466
467 const I32_SIZE: usize = mem::size_of::<i32>();
469 if data.len() < I32_SIZE {
470 return Err(eof_err!("Not enough bytes to decode"));
471 }
472 let data_size = bit_util::read_num_bytes::<i32>(I32_SIZE, data.as_ref()) as usize;
473 if data.len() - I32_SIZE < data_size {
474 return Err(eof_err!("Not enough bytes to decode"));
475 }
476
477 self.decoder = RleDecoder::new(1);
478 self.decoder
479 .set_data(data.slice(I32_SIZE..I32_SIZE + data_size))?;
480 self.values_left = num_values;
481 Ok(())
482 }
483
484 #[inline]
485 fn values_left(&self) -> usize {
486 self.values_left
487 }
488
489 #[inline]
490 fn encoding(&self) -> Encoding {
491 Encoding::RLE
492 }
493
494 #[inline]
495 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
496 let num_values = cmp::min(buffer.len(), self.values_left);
497 let values_read = self.decoder.get_batch(&mut buffer[..num_values])?;
498 self.values_left -= values_read;
499 Ok(values_read)
500 }
501
502 #[inline]
503 fn skip(&mut self, num_values: usize) -> Result<usize> {
504 let num_values = cmp::min(num_values, self.values_left);
505 let values_skipped = self.decoder.skip(num_values)?;
506 self.values_left -= values_skipped;
507 Ok(values_skipped)
508 }
509}
510
511pub struct DeltaBitPackDecoder<T: DataType> {
519 bit_reader: BitReader,
520 initialized: bool,
521
522 block_size: usize,
525 values_left: usize,
527 mini_blocks_per_block: usize,
529 values_per_mini_block: usize,
531
532 min_delta: T::T,
535 block_end_offset: usize,
537 mini_block_idx: usize,
539 mini_block_bit_widths: Vec<u8>,
541 mini_block_remaining: usize,
543
544 first_value: Option<T::T>,
546 last_value: T::T,
548}
549
550impl<T: DataType> Default for DeltaBitPackDecoder<T>
551where
552 T::T: Default + FromPrimitive + WrappingAdd + Copy,
553{
554 fn default() -> Self {
555 Self::new()
556 }
557}
558
559impl<T: DataType> DeltaBitPackDecoder<T>
560where
561 T::T: Default + FromPrimitive + WrappingAdd + Copy,
562{
563 pub fn new() -> Self {
565 Self {
566 bit_reader: BitReader::from(vec![]),
567 initialized: false,
568 block_size: 0,
569 values_left: 0,
570 mini_blocks_per_block: 0,
571 values_per_mini_block: 0,
572 min_delta: Default::default(),
573 mini_block_idx: 0,
574 mini_block_bit_widths: vec![],
575 mini_block_remaining: 0,
576 block_end_offset: 0,
577 first_value: None,
578 last_value: Default::default(),
579 }
580 }
581
582 pub fn get_offset(&self) -> usize {
584 assert!(self.initialized, "Bit reader is not initialized");
585 match self.values_left {
586 0 => self.bit_reader.get_byte_offset().max(self.block_end_offset),
592 _ => self.bit_reader.get_byte_offset(),
593 }
594 }
595
596 #[inline]
598 fn next_block(&mut self) -> Result<()> {
599 let min_delta = self
600 .bit_reader
601 .get_zigzag_vlq_int()
602 .ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?;
603
604 self.min_delta =
605 T::T::from_i64(min_delta).ok_or_else(|| general_err!("'min_delta' too large"))?;
606
607 self.mini_block_bit_widths.clear();
608 self.bit_reader
609 .get_aligned_bytes(&mut self.mini_block_bit_widths, self.mini_blocks_per_block);
610
611 let mut offset = self.bit_reader.get_byte_offset();
612 let mut remaining = self.values_left;
613
614 for b in &mut self.mini_block_bit_widths {
616 if remaining == 0 {
617 *b = 0;
620 }
621 remaining = remaining.saturating_sub(self.values_per_mini_block);
622 offset += *b as usize * self.values_per_mini_block / 8;
623 }
624 self.block_end_offset = offset;
625
626 if self.mini_block_bit_widths.len() != self.mini_blocks_per_block {
627 return Err(eof_err!("insufficient mini block bit widths"));
628 }
629
630 self.mini_block_remaining = self.values_per_mini_block;
631 self.mini_block_idx = 0;
632
633 Ok(())
634 }
635
636 #[inline]
638 fn next_mini_block(&mut self) -> Result<()> {
639 if self.mini_block_idx + 1 < self.mini_block_bit_widths.len() {
640 self.mini_block_idx += 1;
641 self.mini_block_remaining = self.values_per_mini_block;
642 Ok(())
643 } else {
644 self.next_block()
645 }
646 }
647
648 #[inline]
650 fn check_bit_width(&self, bit_width: usize) -> Result<()> {
651 if bit_width > std::mem::size_of::<T::T>() * 8 {
652 return Err(general_err!(
653 "Invalid delta bit width {} which is larger than expected {} ",
654 bit_width,
655 std::mem::size_of::<T::T>() * 8
656 ));
657 }
658 Ok(())
659 }
660}
661
662impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T>
663where
664 T::T: Default + FromPrimitive + FromBitpacked + WrappingAdd + Copy,
665{
666 #[inline]
668 fn set_data(&mut self, data: Bytes, _index: usize) -> Result<()> {
669 self.bit_reader = BitReader::new(data);
670 self.initialized = true;
671
672 self.block_size = self
674 .bit_reader
675 .get_vlq_int()
676 .ok_or_else(|| eof_err!("Not enough data to decode 'block_size'"))?
677 .try_into()
678 .map_err(|_| general_err!("invalid 'block_size'"))?;
679
680 self.mini_blocks_per_block = self
681 .bit_reader
682 .get_vlq_int()
683 .ok_or_else(|| eof_err!("Not enough data to decode 'mini_blocks_per_block'"))?
684 .try_into()
685 .map_err(|_| general_err!("invalid 'mini_blocks_per_block'"))?;
686
687 if self.mini_blocks_per_block == 0 {
688 return Err(general_err!("cannot have zero miniblocks per block"));
689 }
690
691 self.values_left = self
692 .bit_reader
693 .get_vlq_int()
694 .ok_or_else(|| eof_err!("Not enough data to decode 'values_left'"))?
695 .try_into()
696 .map_err(|_| general_err!("invalid 'values_left'"))?;
697
698 let first_value = self
699 .bit_reader
700 .get_zigzag_vlq_int()
701 .ok_or_else(|| eof_err!("Not enough data to decode 'first_value'"))?;
702
703 self.first_value =
704 Some(T::T::from_i64(first_value).ok_or_else(|| general_err!("first value too large"))?);
705
706 if self.block_size % 128 != 0 {
707 return Err(general_err!(
708 "'block_size' must be a multiple of 128, got {}",
709 self.block_size
710 ));
711 }
712
713 if self.block_size % self.mini_blocks_per_block != 0 {
714 return Err(general_err!(
715 "'block_size' must be a multiple of 'mini_blocks_per_block' got {} and {}",
716 self.block_size,
717 self.mini_blocks_per_block
718 ));
719 }
720
721 self.mini_block_idx = 0;
723 self.values_per_mini_block = self.block_size / self.mini_blocks_per_block;
724 self.mini_block_remaining = 0;
725 self.mini_block_bit_widths.clear();
726
727 if self.values_per_mini_block % 32 != 0 {
728 return Err(general_err!(
729 "'values_per_mini_block' must be a multiple of 32 got {}",
730 self.values_per_mini_block
731 ));
732 }
733
734 Ok(())
735 }
736
737 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
738 assert!(self.initialized, "Bit reader is not initialized");
739 if buffer.is_empty() {
740 return Ok(0);
741 }
742
743 let mut read = 0;
744 let to_read = buffer.len().min(self.values_left);
745
746 if let Some(value) = self.first_value.take() {
747 self.last_value = value;
748 buffer[0] = value;
749 read += 1;
750 self.values_left -= 1;
751 }
752
753 while read != to_read {
754 if self.mini_block_remaining == 0 {
755 self.next_mini_block()?;
756 }
757
758 let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
759 self.check_bit_width(bit_width)?;
760 let batch_to_read = self.mini_block_remaining.min(to_read - read);
761
762 let batch_read = self
763 .bit_reader
764 .get_batch(&mut buffer[read..read + batch_to_read], bit_width);
765
766 if batch_read != batch_to_read {
767 return Err(general_err!(
768 "Expected to read {} values from miniblock got {}",
769 batch_to_read,
770 batch_read
771 ));
772 }
773
774 let min_delta = self.min_delta.as_i64()?;
786 if bit_width == 0 {
787 if min_delta == 0 {
788 buffer[read..read + batch_read].fill(self.last_value);
789 } else {
790 let mut delta = self.min_delta;
794 for v in &mut buffer[read..read + batch_read] {
795 *v = self.last_value.wrapping_add(&delta);
796 delta = delta.wrapping_add(&self.min_delta);
797 }
798
799 self.last_value = buffer[read + batch_read - 1];
800 }
801 } else {
802 if min_delta == 0 {
806 for v in &mut buffer[read..read + batch_read] {
807 *v = v.wrapping_add(&self.last_value);
808 self.last_value = *v;
809 }
810 } else {
811 for v in &mut buffer[read..read + batch_read] {
812 *v = v
813 .wrapping_add(&self.min_delta)
814 .wrapping_add(&self.last_value);
815 self.last_value = *v;
816 }
817 }
818 }
819
820 read += batch_read;
821 self.mini_block_remaining -= batch_read;
822 self.values_left -= batch_read;
823 }
824
825 Ok(to_read)
826 }
827
828 fn values_left(&self) -> usize {
829 self.values_left
830 }
831
832 fn encoding(&self) -> Encoding {
833 Encoding::DELTA_BINARY_PACKED
834 }
835
836 fn skip(&mut self, num_values: usize) -> Result<usize> {
837 let mut skip = 0;
838 let to_skip = num_values.min(self.values_left);
839 if to_skip == 0 {
840 return Ok(0);
841 }
842
843 if let Some(value) = self.first_value.take() {
845 self.last_value = value;
846 skip += 1;
847 self.values_left -= 1;
848 }
849
850 let mini_block_batch_size = match self.values_per_mini_block {
857 32 => 32,
858 64 => 64,
859 _ => {
860 return Err(general_err!(
861 "cannot skip miniblock of size {}",
862 self.values_per_mini_block
863 ));
864 }
865 };
866
867 let mut skip_buffer = vec![T::T::default(); mini_block_batch_size];
868 while skip < to_skip {
869 if self.mini_block_remaining == 0 {
870 self.next_mini_block()?;
871 }
872
873 let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
874 self.check_bit_width(bit_width)?;
875 let mini_block_to_skip = self.mini_block_remaining.min(to_skip - skip);
876
877 let min_delta = self.min_delta.as_i64()?;
879 if bit_width == 0 {
880 if min_delta != 0 {
885 let total = min_delta.wrapping_mul(mini_block_to_skip as i64);
886 let step = T::T::from_i64(total)
887 .ok_or_else(|| general_err!("delta*n overflow in skip"))?;
888 self.last_value = self.last_value.wrapping_add(&step);
889 }
890 } else {
892 let skip_count = self
894 .bit_reader
895 .get_batch(&mut skip_buffer[0..mini_block_to_skip], bit_width);
896
897 if skip_count != mini_block_to_skip {
898 return Err(general_err!(
899 "Expected to skip {} values from mini block got {}.",
900 mini_block_to_skip,
901 skip_count
902 ));
903 }
904
905 if min_delta == 0 {
906 for v in &mut skip_buffer[0..skip_count] {
907 *v = v.wrapping_add(&self.last_value);
908 self.last_value = *v;
909 }
910 } else {
911 for v in &mut skip_buffer[0..skip_count] {
912 *v = v
913 .wrapping_add(&self.min_delta)
914 .wrapping_add(&self.last_value);
915 self.last_value = *v;
916 }
917 }
918 }
919
920 skip += mini_block_to_skip;
921 self.mini_block_remaining -= mini_block_to_skip;
922 self.values_left -= mini_block_to_skip;
923 }
924
925 Ok(to_skip)
926 }
927}
928
929pub struct DeltaLengthByteArrayDecoder<T: DataType> {
939 lengths: Vec<i32>,
942
943 current_idx: usize,
945
946 data: Option<Bytes>,
948
949 offset: usize,
951
952 num_values: usize,
954
955 _phantom: PhantomData<T>,
957}
958
959impl<T: DataType> Default for DeltaLengthByteArrayDecoder<T> {
960 fn default() -> Self {
961 Self::new()
962 }
963}
964
965impl<T: DataType> DeltaLengthByteArrayDecoder<T> {
966 pub fn new() -> Self {
968 Self {
969 lengths: vec![],
970 current_idx: 0,
971 data: None,
972 offset: 0,
973 num_values: 0,
974 _phantom: PhantomData,
975 }
976 }
977}
978
979impl<T: DataType> Decoder<T> for DeltaLengthByteArrayDecoder<T> {
980 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
981 match T::get_physical_type() {
982 Type::BYTE_ARRAY => {
983 let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
984 len_decoder.set_data(data.clone(), num_values)?;
985 let num_lengths = len_decoder.values_left();
986 self.lengths.resize(num_lengths, 0);
987 len_decoder.get(&mut self.lengths[..])?;
988
989 self.data = Some(data.slice(len_decoder.get_offset()..));
990 self.offset = 0;
991 self.current_idx = 0;
992 self.num_values = num_lengths;
993 Ok(())
994 }
995 _ => Err(general_err!(
996 "DeltaLengthByteArrayDecoder only support ByteArrayType"
997 )),
998 }
999 }
1000
1001 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
1002 match T::get_physical_type() {
1003 Type::BYTE_ARRAY => {
1004 assert!(self.data.is_some());
1005
1006 let data = self.data.as_ref().unwrap();
1007 let num_values = cmp::min(buffer.len(), self.num_values);
1008
1009 for item in buffer.iter_mut().take(num_values) {
1010 let len = self.lengths[self.current_idx] as usize;
1011 item.set_from_bytes(data.slice(self.offset..self.offset + len));
1012
1013 self.offset += len;
1014 self.current_idx += 1;
1015 }
1016
1017 self.num_values -= num_values;
1018 Ok(num_values)
1019 }
1020 _ => Err(general_err!(
1021 "DeltaLengthByteArrayDecoder only support ByteArrayType"
1022 )),
1023 }
1024 }
1025
1026 fn values_left(&self) -> usize {
1027 self.num_values
1028 }
1029
1030 fn encoding(&self) -> Encoding {
1031 Encoding::DELTA_LENGTH_BYTE_ARRAY
1032 }
1033
1034 fn skip(&mut self, num_values: usize) -> Result<usize> {
1035 match T::get_physical_type() {
1036 Type::BYTE_ARRAY => {
1037 let num_values = cmp::min(num_values, self.num_values);
1038
1039 let next_offset: i32 = self.lengths
1040 [self.current_idx..self.current_idx + num_values]
1041 .iter()
1042 .sum();
1043
1044 self.current_idx += num_values;
1045 self.offset += next_offset as usize;
1046
1047 self.num_values -= num_values;
1048 Ok(num_values)
1049 }
1050 other_type => Err(general_err!(
1051 "DeltaLengthByteArrayDecoder not support {}, only support byte array",
1052 other_type
1053 )),
1054 }
1055 }
1056}
1057
1058pub struct DeltaByteArrayDecoder<T: DataType> {
1068 prefix_lengths: Vec<i32>,
1071
1072 current_idx: usize,
1074
1075 suffix_decoder: Option<DeltaLengthByteArrayDecoder<ByteArrayType>>,
1078
1079 previous_value: Bytes,
1082
1083 num_values: usize,
1085
1086 _phantom: PhantomData<T>,
1088}
1089
1090impl<T: DataType> Default for DeltaByteArrayDecoder<T> {
1091 fn default() -> Self {
1092 Self::new()
1093 }
1094}
1095
1096impl<T: DataType> DeltaByteArrayDecoder<T> {
1097 pub fn new() -> Self {
1099 Self {
1100 prefix_lengths: vec![],
1101 current_idx: 0,
1102 suffix_decoder: None,
1103 previous_value: Bytes::new(),
1104 num_values: 0,
1105 _phantom: PhantomData,
1106 }
1107 }
1108}
1109
1110impl<T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
1111 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
1112 match T::get_physical_type() {
1113 Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1114 let mut prefix_len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
1115 prefix_len_decoder.set_data(data.clone(), num_values)?;
1116 let num_prefixes = prefix_len_decoder.values_left();
1117 self.prefix_lengths.resize(num_prefixes, 0);
1118 prefix_len_decoder.get(&mut self.prefix_lengths[..])?;
1119
1120 let mut suffix_decoder = DeltaLengthByteArrayDecoder::new();
1121 suffix_decoder
1122 .set_data(data.slice(prefix_len_decoder.get_offset()..), num_values)?;
1123 self.suffix_decoder = Some(suffix_decoder);
1124 self.num_values = num_prefixes;
1125 self.current_idx = 0;
1126 self.previous_value = Bytes::new();
1127 Ok(())
1128 }
1129 _ => Err(general_err!(
1130 "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1131 )),
1132 }
1133 }
1134
1135 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
1136 match T::get_physical_type() {
1137 Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1138 let num_values = cmp::min(buffer.len(), self.num_values);
1139 let mut v: [ByteArray; 1] = [ByteArray::new(); 1];
1140 for item in buffer.iter_mut().take(num_values) {
1141 let suffix_decoder = self
1144 .suffix_decoder
1145 .as_mut()
1146 .expect("decoder not initialized");
1147 suffix_decoder.get(&mut v[..])?;
1148 let suffix = v[0].data();
1149
1150 let prefix_len = usize::try_from(self.prefix_lengths[self.current_idx])
1152 .map_err(|_| {
1153 general_err!(
1154 "Invalid DELTA_BYTE_ARRAY prefix length {}",
1155 self.prefix_lengths[self.current_idx]
1156 )
1157 })?;
1158
1159 if prefix_len > self.previous_value.len() {
1160 return Err(general_err!(
1161 "Invalid DELTA_BYTE_ARRAY prefix length {} exceeds previous value length {}",
1162 prefix_len,
1163 self.previous_value.len()
1164 ));
1165 }
1166
1167 let mut result = Vec::with_capacity(prefix_len + suffix.len());
1169 result.extend_from_slice(&self.previous_value[0..prefix_len]);
1170 result.extend_from_slice(suffix);
1171
1172 let data = Bytes::from(result);
1173 item.set_from_bytes(data.clone());
1174
1175 self.previous_value = data;
1176 self.current_idx += 1;
1177 }
1178
1179 self.num_values -= num_values;
1180 Ok(num_values)
1181 }
1182 _ => Err(general_err!(
1183 "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1184 )),
1185 }
1186 }
1187
1188 fn values_left(&self) -> usize {
1189 self.num_values
1190 }
1191
1192 fn encoding(&self) -> Encoding {
1193 Encoding::DELTA_BYTE_ARRAY
1194 }
1195
1196 fn skip(&mut self, num_values: usize) -> Result<usize> {
1197 let mut buffer = vec![T::T::default(); num_values];
1198 self.get(&mut buffer)
1199 }
1200}
1201
1202#[cfg(test)]
1203mod tests {
1204 use super::{super::encoding::*, *};
1205
1206 use std::f32::consts::PI as PI_f32;
1207 use std::f64::consts::PI as PI_f64;
1208 use std::sync::Arc;
1209
1210 use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
1211 use crate::util::test_common::rand_gen::RandGen;
1212
1213 #[test]
1214 fn test_delta_byte_array_invalid_prefix_len_returns_error() {
1215 let col_descr = create_test_col_desc_ptr(-1, Type::BYTE_ARRAY);
1216
1217 let mut encoder =
1218 get_encoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, &col_descr).unwrap();
1219 let input = vec![ByteArray::from("a"), ByteArray::from("ab")];
1220 encoder.put(&input).unwrap();
1221 let encoded = encoder.flush_buffer().unwrap();
1222
1223 let mut prefix_len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
1225 prefix_len_decoder
1226 .set_data(encoded.clone(), input.len())
1227 .unwrap();
1228 let num_prefixes = prefix_len_decoder.values_left();
1229 let mut prefix_lengths = vec![0; num_prefixes];
1230 prefix_len_decoder.get(&mut prefix_lengths).unwrap();
1231
1232 assert_eq!(prefix_lengths, vec![0, 1]);
1234
1235 let prefix_stream_end = prefix_len_decoder.get_offset();
1236
1237 let mut prefix_encoder = get_encoder::<Int32Type>(
1241 Encoding::DELTA_BINARY_PACKED,
1242 &create_test_col_desc_ptr(-1, Type::INT32),
1243 )
1244 .unwrap();
1245 prefix_encoder.put(&[1i32, 1i32]).unwrap();
1246 let corrupted_prefix = prefix_encoder.flush_buffer().unwrap();
1247
1248 let mut corrupted = Vec::new();
1249 corrupted.extend_from_slice(corrupted_prefix.as_ref());
1250 corrupted.extend_from_slice(&encoded[prefix_stream_end..]);
1251
1252 let mut decoder = DeltaByteArrayDecoder::<ByteArrayType>::new();
1253 decoder
1254 .set_data(Bytes::from(corrupted), input.len())
1255 .unwrap();
1256
1257 let mut out = vec![ByteArray::new(); input.len()];
1258
1259 let err = decoder.get(&mut out).unwrap_err();
1260 assert!(
1261 err.to_string()
1262 .contains("Invalid DELTA_BYTE_ARRAY prefix length"),
1263 "{}",
1264 err
1265 );
1266 }
1267
1268 #[test]
1269 fn test_delta_byte_array_negative_prefix_len_returns_error() {
1270 let col_descr = create_test_col_desc_ptr(-1, Type::BYTE_ARRAY);
1271
1272 let mut encoder =
1273 get_encoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, &col_descr).unwrap();
1274 let input = vec![ByteArray::from("a"), ByteArray::from("ab")];
1275 encoder.put(&input).unwrap();
1276 let encoded = encoder.flush_buffer().unwrap();
1277
1278 let mut decoder = DeltaByteArrayDecoder::<ByteArrayType>::new();
1279 decoder.set_data(encoded, input.len()).unwrap();
1280
1281 decoder.prefix_lengths[0] = -1;
1283 let mut out = vec![ByteArray::new(); input.len()];
1284
1285 let err = decoder.get(&mut out).unwrap_err();
1286 assert!(
1287 err.to_string()
1288 .contains("Invalid DELTA_BYTE_ARRAY prefix length"),
1289 "{}",
1290 err
1291 );
1292 }
1293
1294 #[test]
1295 fn test_get_decoders() {
1296 create_and_check_decoder::<Int32Type>(Encoding::PLAIN, None);
1298 create_and_check_decoder::<Int32Type>(Encoding::DELTA_BINARY_PACKED, None);
1299 create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
1300 create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, None);
1301 create_and_check_decoder::<BoolType>(Encoding::RLE, None);
1302
1303 create_and_check_decoder::<Int32Type>(
1305 Encoding::RLE_DICTIONARY,
1306 Some(general_err!(
1307 "Cannot initialize this encoding through this function"
1308 )),
1309 );
1310 create_and_check_decoder::<Int32Type>(
1311 Encoding::PLAIN_DICTIONARY,
1312 Some(general_err!(
1313 "Cannot initialize this encoding through this function"
1314 )),
1315 );
1316 create_and_check_decoder::<Int32Type>(
1317 Encoding::DELTA_LENGTH_BYTE_ARRAY,
1318 Some(general_err!(
1319 "Encoding DELTA_LENGTH_BYTE_ARRAY is not supported for type"
1320 )),
1321 );
1322 create_and_check_decoder::<Int32Type>(
1323 Encoding::DELTA_BYTE_ARRAY,
1324 Some(general_err!(
1325 "Encoding DELTA_BYTE_ARRAY is not supported for type"
1326 )),
1327 );
1328
1329 #[allow(deprecated)]
1331 create_and_check_decoder::<Int32Type>(
1332 Encoding::BIT_PACKED,
1333 Some(nyi_err!("Encoding BIT_PACKED is not supported")),
1334 );
1335 }
1336
1337 #[test]
1338 fn test_plain_decode_int32() {
1339 let data = [42, 18, 52];
1340 let data_bytes = Int32Type::to_byte_array(&data[..]);
1341 let mut buffer = [0; 3];
1342 test_plain_decode::<Int32Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1343 }
1344
1345 #[test]
1346 fn test_plain_skip_int32() {
1347 let data = [42, 18, 52];
1348 let data_bytes = Int32Type::to_byte_array(&data[..]);
1349 test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1350 }
1351
1352 #[test]
1353 fn test_plain_skip_all_int32() {
1354 let data = [42, 18, 52];
1355 let data_bytes = Int32Type::to_byte_array(&data[..]);
1356 test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1357 }
1358
1359 #[test]
1360 fn test_plain_decode_int32_spaced() {
1361 let data = [42, 18, 52];
1362 let expected_data = [0, 42, 0, 18, 0, 0, 52, 0];
1363 let data_bytes = Int32Type::to_byte_array(&data[..]);
1364 let mut buffer = [0; 8];
1365 let num_nulls = 5;
1366 let valid_bits = [0b01001010];
1367 test_plain_decode_spaced::<Int32Type>(
1368 Bytes::from(data_bytes),
1369 3,
1370 -1,
1371 &mut buffer[..],
1372 num_nulls,
1373 &valid_bits,
1374 &expected_data[..],
1375 );
1376 }
1377
1378 #[test]
1379 fn test_plain_decode_int64() {
1380 let data = [42, 18, 52];
1381 let data_bytes = Int64Type::to_byte_array(&data[..]);
1382 let mut buffer = [0; 3];
1383 test_plain_decode::<Int64Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1384 }
1385
1386 #[test]
1387 fn test_plain_skip_int64() {
1388 let data = [42, 18, 52];
1389 let data_bytes = Int64Type::to_byte_array(&data[..]);
1390 test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 2, -1, &data[2..]);
1391 }
1392
1393 #[test]
1394 fn test_plain_skip_all_int64() {
1395 let data = [42, 18, 52];
1396 let data_bytes = Int64Type::to_byte_array(&data[..]);
1397 test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 3, -1, &[]);
1398 }
1399
1400 #[test]
1401 fn test_plain_decode_float() {
1402 let data = [PI_f32, 2.414, 12.51];
1403 let data_bytes = FloatType::to_byte_array(&data[..]);
1404 let mut buffer = [0.0; 3];
1405 test_plain_decode::<FloatType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1406 }
1407
1408 #[test]
1409 fn test_plain_skip_float() {
1410 let data = [PI_f32, 2.414, 12.51];
1411 let data_bytes = FloatType::to_byte_array(&data[..]);
1412 test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1413 }
1414
1415 #[test]
1416 fn test_plain_skip_all_float() {
1417 let data = [PI_f32, 2.414, 12.51];
1418 let data_bytes = FloatType::to_byte_array(&data[..]);
1419 test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 4, -1, &[]);
1420 }
1421
1422 #[test]
1423 fn test_plain_skip_double() {
1424 let data = [PI_f64, 2.414f64, 12.51f64];
1425 let data_bytes = DoubleType::to_byte_array(&data[..]);
1426 test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1427 }
1428
1429 #[test]
1430 fn test_plain_skip_all_double() {
1431 let data = [PI_f64, 2.414f64, 12.51f64];
1432 let data_bytes = DoubleType::to_byte_array(&data[..]);
1433 test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1434 }
1435
1436 #[test]
1437 fn test_plain_decode_double() {
1438 let data = [PI_f64, 2.414f64, 12.51f64];
1439 let data_bytes = DoubleType::to_byte_array(&data[..]);
1440 let mut buffer = [0.0f64; 3];
1441 test_plain_decode::<DoubleType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1442 }
1443
1444 #[test]
1445 fn test_plain_decode_int96() {
1446 let mut data = [Int96::new(); 4];
1447 data[0].set_data(11, 22, 33);
1448 data[1].set_data(44, 55, 66);
1449 data[2].set_data(10, 20, 30);
1450 data[3].set_data(40, 50, 60);
1451 let data_bytes = Int96Type::to_byte_array(&data[..]);
1452 let mut buffer = [Int96::new(); 4];
1453 test_plain_decode::<Int96Type>(Bytes::from(data_bytes), 4, -1, &mut buffer[..], &data[..]);
1454 }
1455
1456 #[test]
1457 fn test_plain_skip_int96() {
1458 let mut data = [Int96::new(); 4];
1459 data[0].set_data(11, 22, 33);
1460 data[1].set_data(44, 55, 66);
1461 data[2].set_data(10, 20, 30);
1462 data[3].set_data(40, 50, 60);
1463 let data_bytes = Int96Type::to_byte_array(&data[..]);
1464 test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 2, -1, &data[2..]);
1465 }
1466
1467 #[test]
1468 fn test_plain_skip_all_int96() {
1469 let mut data = [Int96::new(); 4];
1470 data[0].set_data(11, 22, 33);
1471 data[1].set_data(44, 55, 66);
1472 data[2].set_data(10, 20, 30);
1473 data[3].set_data(40, 50, 60);
1474 let data_bytes = Int96Type::to_byte_array(&data[..]);
1475 test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 8, -1, &[]);
1476 }
1477
1478 #[test]
1479 fn test_plain_decode_bool() {
1480 let data = [
1481 false, true, false, false, true, false, true, true, false, true,
1482 ];
1483 let data_bytes = BoolType::to_byte_array(&data[..]);
1484 let mut buffer = [false; 10];
1485 test_plain_decode::<BoolType>(Bytes::from(data_bytes), 10, -1, &mut buffer[..], &data[..]);
1486 }
1487
1488 #[test]
1489 fn test_plain_skip_bool() {
1490 let data = [
1491 false, true, false, false, true, false, true, true, false, true,
1492 ];
1493 let data_bytes = BoolType::to_byte_array(&data[..]);
1494 test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 5, -1, &data[5..]);
1495 }
1496
1497 #[test]
1498 fn test_plain_skip_all_bool() {
1499 let data = [
1500 false, true, false, false, true, false, true, true, false, true,
1501 ];
1502 let data_bytes = BoolType::to_byte_array(&data[..]);
1503 test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 20, -1, &[]);
1504 }
1505
1506 #[test]
1507 fn test_plain_decode_byte_array() {
1508 let mut data = vec![ByteArray::new(); 2];
1509 data[0].set_data(Bytes::from(String::from("hello")));
1510 data[1].set_data(Bytes::from(String::from("parquet")));
1511 let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1512 let mut buffer = vec![ByteArray::new(); 2];
1513 test_plain_decode::<ByteArrayType>(
1514 Bytes::from(data_bytes),
1515 2,
1516 -1,
1517 &mut buffer[..],
1518 &data[..],
1519 );
1520 }
1521
1522 #[test]
1523 fn test_plain_skip_byte_array() {
1524 let mut data = vec![ByteArray::new(); 2];
1525 data[0].set_data(Bytes::from(String::from("hello")));
1526 data[1].set_data(Bytes::from(String::from("parquet")));
1527 let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1528 test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 1, -1, &data[1..]);
1529 }
1530
1531 #[test]
1532 fn test_plain_skip_all_byte_array() {
1533 let mut data = vec![ByteArray::new(); 2];
1534 data[0].set_data(Bytes::from(String::from("hello")));
1535 data[1].set_data(Bytes::from(String::from("parquet")));
1536 let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1537 test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 2, -1, &[]);
1538 }
1539
1540 #[test]
1541 fn test_plain_decode_fixed_len_byte_array() {
1542 let mut data = vec![FixedLenByteArray::default(); 3];
1543 data[0].set_data(Bytes::from(String::from("bird")));
1544 data[1].set_data(Bytes::from(String::from("come")));
1545 data[2].set_data(Bytes::from(String::from("flow")));
1546 let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1547 let mut buffer = vec![FixedLenByteArray::default(); 3];
1548 test_plain_decode::<FixedLenByteArrayType>(
1549 Bytes::from(data_bytes),
1550 3,
1551 4,
1552 &mut buffer[..],
1553 &data[..],
1554 );
1555 }
1556
1557 #[test]
1558 fn test_plain_skip_fixed_len_byte_array() {
1559 let mut data = vec![FixedLenByteArray::default(); 3];
1560 data[0].set_data(Bytes::from(String::from("bird")));
1561 data[1].set_data(Bytes::from(String::from("come")));
1562 data[2].set_data(Bytes::from(String::from("flow")));
1563 let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1564 test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 1, 4, &data[1..]);
1565 }
1566
1567 #[test]
1568 fn test_plain_skip_all_fixed_len_byte_array() {
1569 let mut data = vec![FixedLenByteArray::default(); 3];
1570 data[0].set_data(Bytes::from(String::from("bird")));
1571 data[1].set_data(Bytes::from(String::from("come")));
1572 data[2].set_data(Bytes::from(String::from("flow")));
1573 let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1574 test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 6, 4, &[]);
1575 }
1576
1577 #[test]
1578 fn test_dict_decoder_empty_data() {
1579 let mut decoder = DictDecoder::<Int32Type>::new();
1580 let err = decoder.set_data(Bytes::new(), 10).unwrap_err();
1581 assert_eq!(err.to_string(), "EOF: Not enough bytes to decode bit_width");
1582 }
1583
1584 fn test_plain_decode<T: DataType>(
1585 data: Bytes,
1586 num_values: usize,
1587 type_length: i32,
1588 buffer: &mut [T::T],
1589 expected: &[T::T],
1590 ) {
1591 let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1592 let result = decoder.set_data(data, num_values);
1593 assert!(result.is_ok());
1594 let result = decoder.get(buffer);
1595 assert!(result.is_ok());
1596 assert_eq!(decoder.values_left(), 0);
1597 assert_eq!(buffer, expected);
1598 }
1599
1600 fn test_plain_skip<T: DataType>(
1601 data: Bytes,
1602 num_values: usize,
1603 skip: usize,
1604 type_length: i32,
1605 expected: &[T::T],
1606 ) {
1607 let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1608 let result = decoder.set_data(data, num_values);
1609 assert!(result.is_ok());
1610 let skipped = decoder.skip(skip).expect("skipping values");
1611
1612 if skip >= num_values {
1613 assert_eq!(skipped, num_values);
1614
1615 let mut buffer = vec![T::T::default(); 1];
1616 let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1617 assert_eq!(remaining, 0);
1618 } else {
1619 assert_eq!(skipped, skip);
1620 let mut buffer = vec![T::T::default(); num_values - skip];
1621 let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1622 assert_eq!(remaining, num_values - skip);
1623 assert_eq!(decoder.values_left(), 0);
1624 assert_eq!(buffer, expected);
1625 }
1626 }
1627
1628 fn test_plain_decode_spaced<T: DataType>(
1629 data: Bytes,
1630 num_values: usize,
1631 type_length: i32,
1632 buffer: &mut [T::T],
1633 num_nulls: usize,
1634 valid_bits: &[u8],
1635 expected: &[T::T],
1636 ) {
1637 let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1638 let result = decoder.set_data(data, num_values);
1639 assert!(result.is_ok());
1640 let result = decoder.get_spaced(buffer, num_nulls, valid_bits);
1641 assert!(result.is_ok());
1642 assert_eq!(num_values + num_nulls, result.unwrap());
1643 assert_eq!(decoder.values_left(), 0);
1644 assert_eq!(buffer, expected);
1645 }
1646
1647 #[test]
1648 #[should_panic(expected = "RleValueEncoder only supports BoolType")]
1649 fn test_rle_value_encode_int32_not_supported() {
1650 let mut encoder = RleValueEncoder::<Int32Type>::new();
1651 encoder.put(&[1, 2, 3, 4]).unwrap();
1652 }
1653
1654 #[test]
1655 #[should_panic(expected = "RleValueDecoder only supports BoolType")]
1656 fn test_rle_value_decode_int32_not_supported() {
1657 let mut decoder = RleValueDecoder::<Int32Type>::new();
1658 decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).unwrap();
1659 }
1660
1661 #[test]
1662 fn test_rle_value_decode_missing_size() {
1663 let mut decoder = RleValueDecoder::<BoolType>::new();
1664 assert!(decoder.set_data(Bytes::from(vec![0]), 1).is_err());
1665 }
1666
1667 #[test]
1668 fn test_rle_value_decode_missing_data() {
1669 let mut decoder = RleValueDecoder::<BoolType>::new();
1670 assert!(decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).is_err());
1671 }
1672
1673 #[test]
1674 fn test_rle_value_decode_bool_decode() {
1675 let data = vec![
1677 BoolType::gen_vec(-1, 256),
1678 BoolType::gen_vec(-1, 257),
1679 BoolType::gen_vec(-1, 126),
1680 ];
1681 test_rle_value_decode::<BoolType>(data);
1682 }
1683
1684 #[test]
1685 #[should_panic(expected = "Bit reader is not initialized")]
1686 fn test_delta_bit_packed_not_initialized_offset() {
1687 let decoder = DeltaBitPackDecoder::<Int32Type>::new();
1689 decoder.get_offset();
1690 }
1691
1692 #[test]
1693 #[should_panic(expected = "Bit reader is not initialized")]
1694 fn test_delta_bit_packed_not_initialized_get() {
1695 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1697 let mut buffer = vec![];
1698 decoder.get(&mut buffer).unwrap();
1699 }
1700
1701 #[test]
1702 fn test_delta_bit_packed_int32_empty() {
1703 let data = vec![vec![0; 0]];
1704 test_delta_bit_packed_decode::<Int32Type>(data);
1705 }
1706
1707 #[test]
1708 fn test_delta_bit_packed_int32_repeat() {
1709 let block_data = vec![
1710 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1711 6, 7, 8,
1712 ];
1713 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1714 }
1715
1716 #[test]
1717 fn test_skip_delta_bit_packed_int32_repeat() {
1718 let block_data = vec![
1719 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1720 6, 7, 8,
1721 ];
1722 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 10);
1723 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1724 }
1725
1726 #[test]
1727 fn test_delta_bit_packed_int32_uneven() {
1728 let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1729 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1730 }
1731
1732 #[test]
1733 fn test_skip_delta_bit_packed_int32_uneven() {
1734 let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1735 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1736 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1737 }
1738
1739 #[test]
1740 fn test_delta_bit_packed_int32_same_values() {
1741 let block_data = vec![
1742 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1743 ];
1744 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1745
1746 let block_data = vec![
1747 -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1748 -127, -127,
1749 ];
1750 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1751 }
1752
1753 #[test]
1754 fn test_skip_delta_bit_packed_int32_same_values() {
1755 let block_data = vec![
1756 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1757 ];
1758 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1759 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1760
1761 let block_data = vec![
1762 -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1763 -127, -127,
1764 ];
1765 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1766 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1767 }
1768
1769 #[test]
1770 fn test_delta_bit_packed_int32_min_max() {
1771 let block_data = vec![
1772 i32::MIN,
1773 i32::MIN,
1774 i32::MIN,
1775 i32::MAX,
1776 i32::MIN,
1777 i32::MAX,
1778 i32::MIN,
1779 i32::MAX,
1780 ];
1781 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1782 }
1783
1784 #[test]
1785 fn test_skip_delta_bit_packed_int32_min_max() {
1786 let block_data = vec![
1787 i32::MIN,
1788 i32::MIN,
1789 i32::MIN,
1790 i32::MAX,
1791 i32::MIN,
1792 i32::MAX,
1793 i32::MIN,
1794 i32::MAX,
1795 ];
1796 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1797 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1798 }
1799
1800 #[test]
1801 fn test_skip_delta_bit_packed_bw0_uniform_step_i32() {
1802 let data: Vec<i32> = (0..128).map(|i| i * 7).collect();
1805 test_skip::<Int32Type>(data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1806 test_skip::<Int32Type>(data, Encoding::DELTA_BINARY_PACKED, 200);
1807 }
1808
1809 #[test]
1810 fn test_skip_delta_bit_packed_bw0_uniform_step_i64() {
1811 let data: Vec<i64> = (0..128).map(|i| i * 100).collect();
1813 test_skip::<Int64Type>(data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1814 test_skip::<Int64Type>(data, Encoding::DELTA_BINARY_PACKED, 200);
1815 }
1816
1817 #[test]
1818 fn test_delta_bit_packed_int32_multiple_blocks() {
1819 let data = vec![
1821 Int32Type::gen_vec(-1, 64),
1822 Int32Type::gen_vec(-1, 128),
1823 Int32Type::gen_vec(-1, 64),
1824 ];
1825 test_delta_bit_packed_decode::<Int32Type>(data);
1826 }
1827
1828 #[test]
1829 fn test_delta_bit_packed_int32_data_across_blocks() {
1830 let data = vec![Int32Type::gen_vec(-1, 256), Int32Type::gen_vec(-1, 257)];
1832 test_delta_bit_packed_decode::<Int32Type>(data);
1833 }
1834
1835 #[test]
1836 fn test_delta_bit_packed_int32_with_empty_blocks() {
1837 let data = vec![
1838 Int32Type::gen_vec(-1, 128),
1839 vec![0; 0],
1840 Int32Type::gen_vec(-1, 64),
1841 ];
1842 test_delta_bit_packed_decode::<Int32Type>(data);
1843 }
1844
1845 #[test]
1846 fn test_delta_bit_packed_int64_empty() {
1847 let data = vec![vec![0; 0]];
1848 test_delta_bit_packed_decode::<Int64Type>(data);
1849 }
1850
1851 #[test]
1852 fn test_delta_bit_packed_int64_min_max() {
1853 let block_data = vec![
1854 i64::MIN,
1855 i64::MAX,
1856 i64::MIN,
1857 i64::MAX,
1858 i64::MIN,
1859 i64::MAX,
1860 i64::MIN,
1861 i64::MAX,
1862 ];
1863 test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
1864 }
1865
1866 #[test]
1867 fn test_delta_bit_packed_int64_multiple_blocks() {
1868 let data = vec![
1870 Int64Type::gen_vec(-1, 64),
1871 Int64Type::gen_vec(-1, 128),
1872 Int64Type::gen_vec(-1, 64),
1873 ];
1874 test_delta_bit_packed_decode::<Int64Type>(data);
1875 }
1876
1877 #[test]
1878 fn test_delta_bit_packed_zero_miniblocks() {
1879 let data = vec![
1881 128, 1, 0, ];
1884 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1885 let err = decoder.set_data(data.into(), 0).unwrap_err();
1886 assert_eq!(
1887 err.to_string(),
1888 "Parquet error: cannot have zero miniblocks per block"
1889 );
1890 }
1891
1892 #[test]
1893 fn test_delta_bit_packed_decoder_sample() {
1894 let data_bytes = vec![
1895 128, 1, 4, 3, 58, 28, 6, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1896 0, 0, 0, 0, 0, 0,
1897 ];
1898 let mut decoder: DeltaBitPackDecoder<Int32Type> = DeltaBitPackDecoder::new();
1899 decoder.set_data(data_bytes.into(), 3).unwrap();
1900 assert_eq!(decoder.get_offset(), 5);
1903 let mut result = vec![0, 0, 0];
1904 decoder.get(&mut result).unwrap();
1905 assert_eq!(decoder.get_offset(), 34);
1906 assert_eq!(result, vec![29, 43, 89]);
1907 }
1908
1909 #[test]
1910 fn test_delta_bit_packed_padding() {
1911 let header = vec![
1913 128,
1917 2,
1918 4,
1920 128 + 35,
1922 3,
1923 7,
1925 ];
1926
1927 let block1_header = vec![
1929 0, 0, 1, 0, 0, ];
1932
1933 let block1 = vec![0xFF; 8];
1938
1939 let block2_header = vec![
1941 0, 0, 1, 2, 0xFF, ];
1944
1945 let block2 = vec![0xFF; 24];
1950
1951 let data: Vec<u8> = header
1952 .into_iter()
1953 .chain(block1_header)
1954 .chain(block1)
1955 .chain(block2_header)
1956 .chain(block2)
1957 .collect();
1958
1959 let length = data.len();
1960
1961 let ptr = Bytes::from(data);
1962 let mut reader = BitReader::new(ptr.clone());
1963 assert_eq!(reader.get_vlq_int().unwrap(), 256);
1964 assert_eq!(reader.get_vlq_int().unwrap(), 4);
1965 assert_eq!(reader.get_vlq_int().unwrap(), 419);
1966 assert_eq!(reader.get_vlq_int().unwrap(), 7);
1967
1968 let mut output = vec![0_i32; 420];
1970
1971 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1972 decoder.set_data(ptr.clone(), 0).unwrap();
1973 assert_eq!(decoder.get(&mut output).unwrap(), 419);
1974 assert_eq!(decoder.get_offset(), length);
1975
1976 decoder.set_data(ptr.slice(..12), 0).unwrap();
1978 let err = decoder.get(&mut output).unwrap_err().to_string();
1979 assert!(
1980 err.contains("Expected to read 64 values from miniblock got 8"),
1981 "{}",
1982 err
1983 );
1984 }
1985
1986 #[test]
1987 fn test_delta_bit_packed_int32_single_value_large() {
1988 let block_data = vec![3; 10240];
1989 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1990 }
1991
1992 #[test]
1993 fn test_delta_bit_packed_int32_single_value_skip_large() {
1994 let block_data = vec![3; 10240];
1995 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1996 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 5000);
1997 }
1998
1999 #[test]
2000 fn test_delta_bit_packed_int32_increasing_value_large() {
2001 let block_data = (0i32..10240).collect();
2002 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
2003 }
2004
2005 #[test]
2006 fn test_delta_bit_packed_int32_increasing_value_skip_large() {
2007 let block_data = (0i32..10240).collect::<Vec<i32>>();
2008 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
2009 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 5000);
2010 }
2011
2012 #[test]
2013 fn test_delta_bit_packed_int32_stepped_value_large() {
2014 let block_data = (0i32..10240).map(|i| i / 2).collect();
2015 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
2016 }
2017
2018 #[test]
2019 fn test_delta_bit_packed_int32_stepped_value_skip_large() {
2020 let block_data = (0i32..10240).map(|i| i / 2).collect::<Vec<i32>>();
2021 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
2022 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 5000);
2023 }
2024
2025 #[test]
2026 fn test_delta_bit_packed_int32_mixed_large() {
2027 const BLOCK_SIZE: i32 = 133;
2030 let block1_data = (0..BLOCK_SIZE).map(|i| (i * 7) % 11).collect();
2031 let block2_data = vec![3; BLOCK_SIZE as usize];
2032 let block3_data = (0..BLOCK_SIZE).map(|i| (i * 5) % 13).collect();
2033 let block4_data = (0..BLOCK_SIZE).collect();
2034 let block5_data = (0..BLOCK_SIZE).map(|i| (i * 3) % 17).collect();
2035 test_delta_bit_packed_decode::<Int32Type>(vec![
2036 block1_data,
2037 block2_data,
2038 block3_data,
2039 block4_data,
2040 block5_data,
2041 ]);
2042 }
2043
2044 #[test]
2045 fn test_delta_bit_packed_int64_single_value_large() {
2046 let block_data = vec![5; 10240];
2047 test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
2048 }
2049
2050 #[test]
2051 fn test_delta_bit_packed_int64_increasing_value_large() {
2052 let block_data = (0i64..10240).collect();
2053 test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
2054 }
2055
2056 #[test]
2057 fn test_delta_byte_array_same_arrays() {
2058 let data = vec![
2059 vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])],
2060 vec![
2061 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
2062 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
2063 ],
2064 vec![
2065 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
2066 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
2067 ],
2068 ];
2069 test_delta_byte_array_decode(data);
2070 }
2071
2072 #[test]
2073 fn test_delta_byte_array_unique_arrays() {
2074 let data = vec![
2075 vec![ByteArray::from(vec![1])],
2076 vec![ByteArray::from(vec![2, 3]), ByteArray::from(vec![4, 5, 6])],
2077 vec![
2078 ByteArray::from(vec![7, 8]),
2079 ByteArray::from(vec![9, 0, 1, 2]),
2080 ],
2081 ];
2082 test_delta_byte_array_decode(data);
2083 }
2084
2085 #[test]
2086 fn test_delta_byte_array_single_array() {
2087 let data = vec![vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])]];
2088 test_delta_byte_array_decode(data);
2089 }
2090
2091 #[test]
2092 fn test_byte_stream_split_multiple_f32() {
2093 let data = vec![
2094 vec![
2095 f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
2096 f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
2097 ],
2098 vec![f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
2099 ];
2100 test_byte_stream_split_decode::<FloatType>(data, -1);
2101 }
2102
2103 #[test]
2104 fn test_byte_stream_split_f64() {
2105 let data = vec![vec![
2106 f64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
2107 f64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
2108 ]];
2109 test_byte_stream_split_decode::<DoubleType>(data, -1);
2110 }
2111
2112 #[test]
2113 fn test_byte_stream_split_multiple_i32() {
2114 let data = vec![
2115 vec![
2116 i32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
2117 i32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
2118 ],
2119 vec![i32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
2120 ];
2121 test_byte_stream_split_decode::<Int32Type>(data, -1);
2122 }
2123
2124 #[test]
2125 fn test_byte_stream_split_i64() {
2126 let data = vec![vec![
2127 i64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
2128 i64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
2129 ]];
2130 test_byte_stream_split_decode::<Int64Type>(data, -1);
2131 }
2132
2133 fn test_byte_stream_split_flba(type_width: usize) {
2134 let data = vec![
2135 vec![
2136 FixedLenByteArrayType::r#gen(type_width as i32),
2137 FixedLenByteArrayType::r#gen(type_width as i32),
2138 ],
2139 vec![FixedLenByteArrayType::r#gen(type_width as i32)],
2140 ];
2141 test_byte_stream_split_decode::<FixedLenByteArrayType>(data, type_width as i32);
2142 }
2143
2144 #[test]
2145 fn test_byte_stream_split_flba5() {
2146 test_byte_stream_split_flba(5);
2147 }
2148
2149 #[test]
2150 fn test_byte_stream_split_flba16() {
2151 test_byte_stream_split_flba(16);
2152 }
2153
2154 #[test]
2155 fn test_byte_stream_split_flba19() {
2156 test_byte_stream_split_flba(19);
2157 }
2158
2159 #[test]
2160 #[should_panic(expected = "Mismatched FixedLenByteArray sizes: 4 != 5")]
2161 fn test_byte_stream_split_flba_mismatch() {
2162 let data = vec![
2163 vec![
2164 FixedLenByteArray::from(vec![0xAA, 0xAB, 0xAC, 0xAD, 0xAE]),
2165 FixedLenByteArray::from(vec![0xBA, 0xBB, 0xBC, 0xBD, 0xBE]),
2166 ],
2167 vec![FixedLenByteArray::from(vec![0xCA, 0xCB, 0xCC, 0xCD])],
2168 ];
2169 test_byte_stream_split_decode::<FixedLenByteArrayType>(data, 5);
2170 }
2171
2172 #[test]
2173 #[should_panic(expected = "Input data length is not a multiple of type width 4")]
2174 fn test_byte_stream_split_flba_bad_input() {
2175 let mut decoder = VariableWidthByteStreamSplitDecoder::<FixedLenByteArrayType>::new(4);
2176 decoder
2177 .set_data(Bytes::from(vec![1, 2, 3, 4, 5]), 1)
2178 .unwrap();
2179 }
2180
2181 #[test]
2182 fn test_skip_byte_stream_split() {
2183 let block_data = vec![0.3, 0.4, 0.1, 4.10];
2184 test_skip::<FloatType>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
2185 test_skip::<DoubleType>(
2186 block_data.into_iter().map(|x| x as f64).collect(),
2187 Encoding::BYTE_STREAM_SPLIT,
2188 100,
2189 );
2190 }
2191
2192 #[test]
2193 fn test_skip_byte_stream_split_ints() {
2194 let block_data = vec![3, 4, 1, 5];
2195 test_skip::<Int32Type>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
2196 test_skip::<Int64Type>(
2197 block_data.into_iter().map(|x| x as i64).collect(),
2198 Encoding::BYTE_STREAM_SPLIT,
2199 100,
2200 );
2201 }
2202
2203 fn test_rle_value_decode<T: DataType>(data: Vec<Vec<T::T>>) {
2204 test_encode_decode::<T>(data, Encoding::RLE, -1);
2205 }
2206
2207 fn test_delta_bit_packed_decode<T: DataType>(data: Vec<Vec<T::T>>) {
2208 test_encode_decode::<T>(data, Encoding::DELTA_BINARY_PACKED, -1);
2209 }
2210
2211 fn test_byte_stream_split_decode<T: DataType>(data: Vec<Vec<T::T>>, type_width: i32) {
2212 test_encode_decode::<T>(data, Encoding::BYTE_STREAM_SPLIT, type_width);
2213 }
2214
2215 fn test_delta_byte_array_decode(data: Vec<Vec<ByteArray>>) {
2216 test_encode_decode::<ByteArrayType>(data, Encoding::DELTA_BYTE_ARRAY, -1);
2217 }
2218
2219 fn test_encode_decode<T: DataType>(data: Vec<Vec<T::T>>, encoding: Encoding, type_width: i32) {
2224 let col_descr = create_test_col_desc_ptr(type_width, T::get_physical_type());
2225
2226 let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
2228
2229 for v in &data[..] {
2230 encoder.put(&v[..]).expect("ok to encode");
2231 }
2232 let bytes = encoder.flush_buffer().expect("ok to flush buffer");
2233
2234 let expected: Vec<T::T> = data.iter().flat_map(|s| s.clone()).collect();
2236
2237 let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
2239
2240 let mut result = vec![T::T::default(); expected.len()];
2241 decoder
2242 .set_data(bytes, expected.len())
2243 .expect("ok to set data");
2244 let mut result_num_values = 0;
2245 while decoder.values_left() > 0 {
2246 result_num_values += decoder
2247 .get(&mut result[result_num_values..])
2248 .expect("ok to decode");
2249 }
2250 assert_eq!(result_num_values, expected.len());
2251 assert_eq!(result, expected);
2252 }
2253
2254 fn test_skip<T: DataType>(data: Vec<T::T>, encoding: Encoding, skip: usize) {
2255 let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());
2258
2259 let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
2261
2262 encoder.put(&data).expect("ok to encode");
2263
2264 let bytes = encoder.flush_buffer().expect("ok to flush buffer");
2265
2266 let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
2267 decoder.set_data(bytes, data.len()).expect("ok to set data");
2268
2269 if skip >= data.len() {
2270 let skipped = decoder.skip(skip).expect("ok to skip");
2271 assert_eq!(skipped, data.len());
2272
2273 let skipped_again = decoder.skip(skip).expect("ok to skip again");
2274 assert_eq!(skipped_again, 0);
2275 } else {
2276 let skipped = decoder.skip(skip).expect("ok to skip");
2277 assert_eq!(skipped, skip);
2278
2279 let remaining = data.len() - skip;
2280
2281 let expected = &data[skip..];
2282 let mut buffer = vec![T::T::default(); remaining];
2283 let fetched = decoder.get(&mut buffer).expect("ok to decode");
2284 assert_eq!(remaining, fetched);
2285 assert_eq!(&buffer, expected);
2286 }
2287 }
2288
2289 fn create_and_check_decoder<T: DataType>(encoding: Encoding, err: Option<ParquetError>) {
2290 let descr = create_test_col_desc_ptr(-1, T::get_physical_type());
2291 let decoder = get_decoder::<T>(descr, encoding);
2292 match err {
2293 Some(parquet_error) => {
2294 assert_eq!(
2295 decoder.err().unwrap().to_string(),
2296 parquet_error.to_string()
2297 );
2298 }
2299 None => {
2300 assert_eq!(decoder.unwrap().encoding(), encoding);
2301 }
2302 }
2303 }
2304
2305 fn create_test_col_desc_ptr(type_len: i32, t: Type) -> ColumnDescPtr {
2307 let ty = SchemaType::primitive_type_builder("t", t)
2308 .with_length(type_len)
2309 .build()
2310 .unwrap();
2311 Arc::new(ColumnDescriptor::new(
2312 Arc::new(ty),
2313 0,
2314 0,
2315 ColumnPath::new(vec![]),
2316 ))
2317 }
2318
2319 fn usize_to_bytes(v: usize) -> [u8; 4] {
2320 (v as u32).to_ne_bytes()
2321 }
2322
2323 trait ToByteArray<T: DataType> {
2325 #[allow(clippy::wrong_self_convention)]
2326 fn to_byte_array(data: &[T::T]) -> Vec<u8>;
2327 }
2328
2329 macro_rules! to_byte_array_impl {
2330 ($ty: ty) => {
2331 impl ToByteArray<$ty> for $ty {
2332 #[allow(clippy::wrong_self_convention)]
2333 fn to_byte_array(data: &[<$ty as DataType>::T]) -> Vec<u8> {
2334 <$ty as DataType>::T::slice_as_bytes(data).to_vec()
2335 }
2336 }
2337 };
2338 }
2339
2340 to_byte_array_impl!(Int32Type);
2341 to_byte_array_impl!(Int64Type);
2342 to_byte_array_impl!(FloatType);
2343 to_byte_array_impl!(DoubleType);
2344
2345 impl ToByteArray<BoolType> for BoolType {
2346 #[allow(clippy::wrong_self_convention)]
2347 fn to_byte_array(data: &[bool]) -> Vec<u8> {
2348 let mut v = vec![];
2349 for (i, item) in data.iter().enumerate() {
2350 if i % 8 == 0 {
2351 v.push(0);
2352 }
2353 if *item {
2354 v[i / 8] |= 1 << (i % 8);
2355 }
2356 }
2357 v
2358 }
2359 }
2360
2361 impl ToByteArray<Int96Type> for Int96Type {
2362 #[allow(clippy::wrong_self_convention)]
2363 fn to_byte_array(data: &[Int96]) -> Vec<u8> {
2364 let mut v = vec![];
2365 for d in data {
2366 v.extend_from_slice(d.as_bytes());
2367 }
2368 v
2369 }
2370 }
2371
2372 impl ToByteArray<ByteArrayType> for ByteArrayType {
2373 #[allow(clippy::wrong_self_convention)]
2374 fn to_byte_array(data: &[ByteArray]) -> Vec<u8> {
2375 let mut v = vec![];
2376 for d in data {
2377 let buf = d.data();
2378 let len = &usize_to_bytes(buf.len());
2379 v.extend_from_slice(len);
2380 v.extend(buf);
2381 }
2382 v
2383 }
2384 }
2385
2386 impl ToByteArray<FixedLenByteArrayType> for FixedLenByteArrayType {
2387 #[allow(clippy::wrong_self_convention)]
2388 fn to_byte_array(data: &[FixedLenByteArray]) -> Vec<u8> {
2389 let mut v = vec![];
2390 for d in data {
2391 let buf = d.data();
2392 v.extend(buf);
2393 }
2394 v
2395 }
2396 }
2397
2398 #[test]
2399 #[allow(clippy::vec_init_then_push)]
2401 fn test_delta_bit_packed_invalid_bit_width() {
2402 let mut buffer = vec![];
2404 buffer.push(128);
2406 buffer.push(1);
2407 buffer.push(4);
2409 buffer.push(32);
2411 buffer.push(0);
2413 buffer.push(0);
2415 buffer.push(33); buffer.push(0);
2418 buffer.push(0);
2419 buffer.push(0);
2420
2421 let corrupted_buffer = Bytes::from(buffer);
2422
2423 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
2424 decoder.set_data(corrupted_buffer.clone(), 32).unwrap();
2425 let mut read_buffer = vec![0; 32];
2426 let err = decoder.get(&mut read_buffer).unwrap_err();
2427 assert!(
2428 err.to_string()
2429 .contains("Invalid delta bit width 33 which is larger than expected 32"),
2430 "{}",
2431 err
2432 );
2433
2434 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
2435 decoder.set_data(corrupted_buffer, 32).unwrap();
2436 let err = decoder.skip(32).unwrap_err();
2437 assert!(
2438 err.to_string()
2439 .contains("Invalid delta bit width 33 which is larger than expected 32"),
2440 "{}",
2441 err
2442 );
2443 }
2444}