1use bytes::Bytes;
21use num_traits::{FromPrimitive, WrappingAdd};
22use std::{cmp, marker::PhantomData, mem};
23
24use super::rle::RleDecoder;
25
26use crate::basic::*;
27use crate::data_type::private::ParquetValueType;
28use crate::data_type::*;
29use crate::encodings::decoding::byte_stream_split_decoder::{
30 ByteStreamSplitDecoder, VariableWidthByteStreamSplitDecoder,
31};
32use crate::errors::{ParquetError, Result};
33use crate::schema::types::ColumnDescPtr;
34use crate::util::bit_util::{self, BitReader, FromBitpacked};
35
36mod byte_stream_split_decoder;
37
38pub(crate) mod private {
39 use super::*;
40
41 pub trait GetDecoder {
46 fn get_decoder<T: DataType<T = Self>>(
47 descr: ColumnDescPtr,
48 encoding: Encoding,
49 ) -> Result<Box<dyn Decoder<T>>> {
50 get_decoder_default(descr, encoding)
51 }
52 }
53
54 fn get_decoder_default<T: DataType>(
55 descr: ColumnDescPtr,
56 encoding: Encoding,
57 ) -> Result<Box<dyn Decoder<T>>> {
58 match encoding {
59 Encoding::PLAIN => Ok(Box::new(PlainDecoder::new(descr.type_length()))),
60 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Err(general_err!(
61 "Cannot initialize this encoding through this function"
62 )),
63 Encoding::RLE
64 | Encoding::DELTA_BINARY_PACKED
65 | Encoding::DELTA_BYTE_ARRAY
66 | Encoding::DELTA_LENGTH_BYTE_ARRAY => Err(general_err!(
67 "Encoding {} is not supported for type",
68 encoding
69 )),
70 e => Err(nyi_err!("Encoding {} is not supported", e)),
71 }
72 }
73
74 impl GetDecoder for bool {
75 fn get_decoder<T: DataType<T = Self>>(
76 descr: ColumnDescPtr,
77 encoding: Encoding,
78 ) -> Result<Box<dyn Decoder<T>>> {
79 match encoding {
80 Encoding::RLE => Ok(Box::new(RleValueDecoder::new())),
81 _ => get_decoder_default(descr, encoding),
82 }
83 }
84 }
85
86 impl GetDecoder for i32 {
87 fn get_decoder<T: DataType<T = Self>>(
88 descr: ColumnDescPtr,
89 encoding: Encoding,
90 ) -> Result<Box<dyn Decoder<T>>> {
91 match encoding {
92 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
93 Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
94 _ => get_decoder_default(descr, encoding),
95 }
96 }
97 }
98
99 impl GetDecoder for i64 {
100 fn get_decoder<T: DataType<T = Self>>(
101 descr: ColumnDescPtr,
102 encoding: Encoding,
103 ) -> Result<Box<dyn Decoder<T>>> {
104 match encoding {
105 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
106 Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
107 _ => get_decoder_default(descr, encoding),
108 }
109 }
110 }
111
112 impl GetDecoder for f32 {
113 fn get_decoder<T: DataType<T = Self>>(
114 descr: ColumnDescPtr,
115 encoding: Encoding,
116 ) -> Result<Box<dyn Decoder<T>>> {
117 match encoding {
118 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
119 _ => get_decoder_default(descr, encoding),
120 }
121 }
122 }
123 impl GetDecoder for f64 {
124 fn get_decoder<T: DataType<T = Self>>(
125 descr: ColumnDescPtr,
126 encoding: Encoding,
127 ) -> Result<Box<dyn Decoder<T>>> {
128 match encoding {
129 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
130 _ => get_decoder_default(descr, encoding),
131 }
132 }
133 }
134
135 impl GetDecoder for ByteArray {
136 fn get_decoder<T: DataType<T = Self>>(
137 descr: ColumnDescPtr,
138 encoding: Encoding,
139 ) -> Result<Box<dyn Decoder<T>>> {
140 match encoding {
141 Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
142 Encoding::DELTA_LENGTH_BYTE_ARRAY => {
143 Ok(Box::new(DeltaLengthByteArrayDecoder::new()))
144 }
145 _ => get_decoder_default(descr, encoding),
146 }
147 }
148 }
149
150 impl GetDecoder for FixedLenByteArray {
151 fn get_decoder<T: DataType<T = Self>>(
152 descr: ColumnDescPtr,
153 encoding: Encoding,
154 ) -> Result<Box<dyn Decoder<T>>> {
155 match encoding {
156 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(
157 VariableWidthByteStreamSplitDecoder::new(descr.type_length()),
158 )),
159 Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
160 _ => get_decoder_default(descr, encoding),
161 }
162 }
163 }
164
165 impl GetDecoder for Int96 {}
166}
167
168pub trait Decoder<T: DataType>: Send {
173 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()>;
176
177 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize>;
184
185 fn get_spaced(
198 &mut self,
199 buffer: &mut [T::T],
200 null_count: usize,
201 valid_bits: &[u8],
202 ) -> Result<usize> {
203 assert!(buffer.len() >= null_count);
204
205 if null_count == 0 {
207 return self.get(buffer);
208 }
209
210 let num_values = buffer.len();
211 let values_to_read = num_values - null_count;
212 let values_read = self.get(buffer)?;
213 if values_read != values_to_read {
214 return Err(general_err!(
215 "Number of values read: {}, doesn't match expected: {}",
216 values_read,
217 values_to_read
218 ));
219 }
220 let mut values_to_move = values_read;
221 for i in (0..num_values).rev() {
222 if bit_util::get_bit(valid_bits, i) {
223 values_to_move -= 1;
224 buffer.swap(i, values_to_move);
225 }
226 }
227
228 Ok(num_values)
229 }
230
231 fn values_left(&self) -> usize;
233
234 fn encoding(&self) -> Encoding;
236
237 fn skip(&mut self, num_values: usize) -> Result<usize>;
239}
240
241pub fn get_decoder<T: DataType>(
246 descr: ColumnDescPtr,
247 encoding: Encoding,
248) -> Result<Box<dyn Decoder<T>>> {
249 use self::private::GetDecoder;
250 T::T::get_decoder(descr, encoding)
251}
252
253#[derive(Default)]
257pub struct PlainDecoderDetails {
258 pub(crate) num_values: usize,
260
261 pub(crate) start: usize,
263
264 pub(crate) type_length: i32,
266
267 pub(crate) data: Option<Bytes>,
269
270 pub(crate) bit_reader: Option<BitReader>,
272}
273
274pub struct PlainDecoder<T: DataType> {
280 inner: PlainDecoderDetails,
282
283 _phantom: PhantomData<T>,
286}
287
288impl<T: DataType> PlainDecoder<T> {
289 pub fn new(type_length: i32) -> Self {
291 PlainDecoder {
292 inner: PlainDecoderDetails {
293 type_length,
294 num_values: 0,
295 start: 0,
296 data: None,
297 bit_reader: None,
298 },
299 _phantom: PhantomData,
300 }
301 }
302}
303
304impl<T: DataType> Decoder<T> for PlainDecoder<T> {
305 #[inline]
306 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
307 T::T::set_data(&mut self.inner, data, num_values);
308 Ok(())
309 }
310
311 #[inline]
312 fn values_left(&self) -> usize {
313 self.inner.num_values
314 }
315
316 #[inline]
317 fn encoding(&self) -> Encoding {
318 Encoding::PLAIN
319 }
320
321 #[inline]
322 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
323 T::T::decode(buffer, &mut self.inner)
324 }
325
326 #[inline]
327 fn skip(&mut self, num_values: usize) -> Result<usize> {
328 T::T::skip(&mut self.inner, num_values)
329 }
330}
331
332pub struct DictDecoder<T: DataType> {
341 dictionary: Vec<T::T>,
343
344 has_dictionary: bool,
346
347 rle_decoder: Option<RleDecoder>,
349
350 num_values: usize,
352}
353
354impl<T: DataType> Default for DictDecoder<T> {
355 fn default() -> Self {
356 Self::new()
357 }
358}
359
360impl<T: DataType> DictDecoder<T> {
361 pub fn new() -> Self {
363 Self {
364 dictionary: vec![],
365 has_dictionary: false,
366 rle_decoder: None,
367 num_values: 0,
368 }
369 }
370
371 pub fn set_dict(&mut self, mut decoder: Box<dyn Decoder<T>>) -> Result<()> {
373 let num_values = decoder.values_left();
374 self.dictionary.resize(num_values, T::T::default());
375 let _ = decoder.get(&mut self.dictionary)?;
376 self.has_dictionary = true;
377 Ok(())
378 }
379}
380
381impl<T: DataType> Decoder<T> for DictDecoder<T> {
382 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
383 if data.is_empty() {
385 return Err(eof_err!("Not enough bytes to decode bit_width"));
386 }
387
388 let bit_width = data.as_ref()[0];
389 if bit_width > 32 {
390 return Err(general_err!(
391 "Invalid or corrupted RLE bit width {}. Max allowed is 32",
392 bit_width
393 ));
394 }
395 let mut rle_decoder = RleDecoder::new(bit_width);
396 rle_decoder.set_data(data.slice(1..))?;
397 self.num_values = num_values;
398 self.rle_decoder = Some(rle_decoder);
399 Ok(())
400 }
401
402 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
403 assert!(self.rle_decoder.is_some());
404 assert!(self.has_dictionary, "Must call set_dict() first!");
405
406 let rle = self.rle_decoder.as_mut().unwrap();
407 let num_values = cmp::min(buffer.len(), self.num_values);
408 rle.get_batch_with_dict(&self.dictionary[..], buffer, num_values)
409 }
410
411 fn values_left(&self) -> usize {
413 self.num_values
414 }
415
416 fn encoding(&self) -> Encoding {
417 Encoding::RLE_DICTIONARY
418 }
419
420 fn skip(&mut self, num_values: usize) -> Result<usize> {
421 assert!(self.rle_decoder.is_some());
422 assert!(self.has_dictionary, "Must call set_dict() first!");
423
424 let rle = self.rle_decoder.as_mut().unwrap();
425 let num_values = cmp::min(num_values, self.num_values);
426 rle.skip(num_values)
427 }
428}
429
430pub struct RleValueDecoder<T: DataType> {
437 values_left: usize,
438 decoder: RleDecoder,
439 _phantom: PhantomData<T>,
440}
441
442impl<T: DataType> Default for RleValueDecoder<T> {
443 fn default() -> Self {
444 Self::new()
445 }
446}
447
448impl<T: DataType> RleValueDecoder<T> {
449 pub fn new() -> Self {
450 Self {
451 values_left: 0,
452 decoder: RleDecoder::new(1),
453 _phantom: PhantomData,
454 }
455 }
456}
457
458impl<T: DataType> Decoder<T> for RleValueDecoder<T>
459where
460 T::T: FromBitpacked,
461{
462 #[inline]
463 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
464 ensure_phys_ty!(Type::BOOLEAN, "RleValueDecoder only supports BoolType");
466
467 const I32_SIZE: usize = mem::size_of::<i32>();
469 if data.len() < I32_SIZE {
470 return Err(eof_err!("Not enough bytes to decode"));
471 }
472 let data_size = bit_util::read_num_bytes::<i32>(I32_SIZE, data.as_ref()) as usize;
473 if data.len() - I32_SIZE < data_size {
474 return Err(eof_err!("Not enough bytes to decode"));
475 }
476
477 self.decoder = RleDecoder::new(1);
478 self.decoder
479 .set_data(data.slice(I32_SIZE..I32_SIZE + data_size))?;
480 self.values_left = num_values;
481 Ok(())
482 }
483
484 #[inline]
485 fn values_left(&self) -> usize {
486 self.values_left
487 }
488
489 #[inline]
490 fn encoding(&self) -> Encoding {
491 Encoding::RLE
492 }
493
494 #[inline]
495 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
496 let num_values = cmp::min(buffer.len(), self.values_left);
497 let values_read = self.decoder.get_batch(&mut buffer[..num_values])?;
498 self.values_left -= values_read;
499 Ok(values_read)
500 }
501
502 #[inline]
503 fn skip(&mut self, num_values: usize) -> Result<usize> {
504 let num_values = cmp::min(num_values, self.values_left);
505 let values_skipped = self.decoder.skip(num_values)?;
506 self.values_left -= values_skipped;
507 Ok(values_skipped)
508 }
509}
510
511pub struct DeltaBitPackDecoder<T: DataType> {
519 bit_reader: BitReader,
520 initialized: bool,
521
522 block_size: usize,
525 values_left: usize,
527 mini_blocks_per_block: usize,
529 values_per_mini_block: usize,
531
532 min_delta: T::T,
535 block_end_offset: usize,
537 mini_block_idx: usize,
539 mini_block_bit_widths: Vec<u8>,
541 mini_block_remaining: usize,
543
544 first_value: Option<T::T>,
546 last_value: T::T,
548}
549
550impl<T: DataType> Default for DeltaBitPackDecoder<T>
551where
552 T::T: Default + FromPrimitive + WrappingAdd + Copy,
553{
554 fn default() -> Self {
555 Self::new()
556 }
557}
558
559impl<T: DataType> DeltaBitPackDecoder<T>
560where
561 T::T: Default + FromPrimitive + WrappingAdd + Copy,
562{
563 pub fn new() -> Self {
565 Self {
566 bit_reader: BitReader::from(vec![]),
567 initialized: false,
568 block_size: 0,
569 values_left: 0,
570 mini_blocks_per_block: 0,
571 values_per_mini_block: 0,
572 min_delta: Default::default(),
573 mini_block_idx: 0,
574 mini_block_bit_widths: vec![],
575 mini_block_remaining: 0,
576 block_end_offset: 0,
577 first_value: None,
578 last_value: Default::default(),
579 }
580 }
581
582 pub fn get_offset(&self) -> usize {
584 assert!(self.initialized, "Bit reader is not initialized");
585 match self.values_left {
586 0 => self.bit_reader.get_byte_offset().max(self.block_end_offset),
592 _ => self.bit_reader.get_byte_offset(),
593 }
594 }
595
596 #[inline]
598 fn next_block(&mut self) -> Result<()> {
599 let min_delta = self
600 .bit_reader
601 .get_zigzag_vlq_int()
602 .ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?;
603
604 self.min_delta =
605 T::T::from_i64(min_delta).ok_or_else(|| general_err!("'min_delta' too large"))?;
606
607 self.mini_block_bit_widths.clear();
608 self.bit_reader
609 .get_aligned_bytes(&mut self.mini_block_bit_widths, self.mini_blocks_per_block);
610
611 let mut offset = self.bit_reader.get_byte_offset();
612 let mut remaining = self.values_left;
613
614 for b in &mut self.mini_block_bit_widths {
616 if remaining == 0 {
617 *b = 0;
620 }
621 remaining = remaining.saturating_sub(self.values_per_mini_block);
622 offset += *b as usize * self.values_per_mini_block / 8;
623 }
624 self.block_end_offset = offset;
625
626 if self.mini_block_bit_widths.len() != self.mini_blocks_per_block {
627 return Err(eof_err!("insufficient mini block bit widths"));
628 }
629
630 self.mini_block_remaining = self.values_per_mini_block;
631 self.mini_block_idx = 0;
632
633 Ok(())
634 }
635
636 #[inline]
638 fn next_mini_block(&mut self) -> Result<()> {
639 if self.mini_block_idx + 1 < self.mini_block_bit_widths.len() {
640 self.mini_block_idx += 1;
641 self.mini_block_remaining = self.values_per_mini_block;
642 Ok(())
643 } else {
644 self.next_block()
645 }
646 }
647
648 #[inline]
650 fn check_bit_width(&self, bit_width: usize) -> Result<()> {
651 if bit_width > std::mem::size_of::<T::T>() * 8 {
652 return Err(general_err!(
653 "Invalid delta bit width {} which is larger than expected {} ",
654 bit_width,
655 std::mem::size_of::<T::T>() * 8
656 ));
657 }
658 Ok(())
659 }
660}
661
662impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T>
663where
664 T::T: Default + FromPrimitive + FromBitpacked + WrappingAdd + Copy,
665{
666 #[inline]
668 fn set_data(&mut self, data: Bytes, _index: usize) -> Result<()> {
669 self.bit_reader = BitReader::new(data);
670 self.initialized = true;
671
672 self.block_size = self
674 .bit_reader
675 .get_vlq_int()
676 .ok_or_else(|| eof_err!("Not enough data to decode 'block_size'"))?
677 .try_into()
678 .map_err(|_| general_err!("invalid 'block_size'"))?;
679
680 self.mini_blocks_per_block = self
681 .bit_reader
682 .get_vlq_int()
683 .ok_or_else(|| eof_err!("Not enough data to decode 'mini_blocks_per_block'"))?
684 .try_into()
685 .map_err(|_| general_err!("invalid 'mini_blocks_per_block'"))?;
686
687 if self.mini_blocks_per_block == 0 {
688 return Err(general_err!("cannot have zero miniblocks per block"));
689 }
690
691 self.values_left = self
692 .bit_reader
693 .get_vlq_int()
694 .ok_or_else(|| eof_err!("Not enough data to decode 'values_left'"))?
695 .try_into()
696 .map_err(|_| general_err!("invalid 'values_left'"))?;
697
698 let first_value = self
699 .bit_reader
700 .get_zigzag_vlq_int()
701 .ok_or_else(|| eof_err!("Not enough data to decode 'first_value'"))?;
702
703 self.first_value =
704 Some(T::T::from_i64(first_value).ok_or_else(|| general_err!("first value too large"))?);
705
706 if self.block_size % 128 != 0 {
707 return Err(general_err!(
708 "'block_size' must be a multiple of 128, got {}",
709 self.block_size
710 ));
711 }
712
713 if self.block_size % self.mini_blocks_per_block != 0 {
714 return Err(general_err!(
715 "'block_size' must be a multiple of 'mini_blocks_per_block' got {} and {}",
716 self.block_size,
717 self.mini_blocks_per_block
718 ));
719 }
720
721 self.mini_block_idx = 0;
723 self.values_per_mini_block = self.block_size / self.mini_blocks_per_block;
724 self.mini_block_remaining = 0;
725 self.mini_block_bit_widths.clear();
726
727 if self.values_per_mini_block % 32 != 0 {
728 return Err(general_err!(
729 "'values_per_mini_block' must be a multiple of 32 got {}",
730 self.values_per_mini_block
731 ));
732 }
733
734 Ok(())
735 }
736
737 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
738 assert!(self.initialized, "Bit reader is not initialized");
739 if buffer.is_empty() {
740 return Ok(0);
741 }
742
743 let mut read = 0;
744 let to_read = buffer.len().min(self.values_left);
745
746 if let Some(value) = self.first_value.take() {
747 self.last_value = value;
748 buffer[0] = value;
749 read += 1;
750 self.values_left -= 1;
751 }
752
753 while read != to_read {
754 if self.mini_block_remaining == 0 {
755 self.next_mini_block()?;
756 }
757
758 let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
759 self.check_bit_width(bit_width)?;
760 let batch_to_read = self.mini_block_remaining.min(to_read - read);
761
762 let batch_read = self
763 .bit_reader
764 .get_batch(&mut buffer[read..read + batch_to_read], bit_width);
765
766 if batch_read != batch_to_read {
767 return Err(general_err!(
768 "Expected to read {} values from miniblock got {}",
769 batch_to_read,
770 batch_read
771 ));
772 }
773
774 let min_delta = self.min_delta.as_i64()?;
786 if bit_width == 0 {
787 if min_delta == 0 {
788 buffer[read..read + batch_read].fill(self.last_value);
789 } else {
790 let mut delta = self.min_delta;
794 for v in &mut buffer[read..read + batch_read] {
795 *v = self.last_value.wrapping_add(&delta);
796 delta = delta.wrapping_add(&self.min_delta);
797 }
798
799 self.last_value = buffer[read + batch_read - 1];
800 }
801 } else {
802 if min_delta == 0 {
806 for v in &mut buffer[read..read + batch_read] {
807 *v = v.wrapping_add(&self.last_value);
808 self.last_value = *v;
809 }
810 } else {
811 for v in &mut buffer[read..read + batch_read] {
812 *v = v
813 .wrapping_add(&self.min_delta)
814 .wrapping_add(&self.last_value);
815 self.last_value = *v;
816 }
817 }
818 }
819
820 read += batch_read;
821 self.mini_block_remaining -= batch_read;
822 self.values_left -= batch_read;
823 }
824
825 Ok(to_read)
826 }
827
828 fn values_left(&self) -> usize {
829 self.values_left
830 }
831
832 fn encoding(&self) -> Encoding {
833 Encoding::DELTA_BINARY_PACKED
834 }
835
836 fn skip(&mut self, num_values: usize) -> Result<usize> {
837 let mut skip = 0;
838 let to_skip = num_values.min(self.values_left);
839 if to_skip == 0 {
840 return Ok(0);
841 }
842
843 if let Some(value) = self.first_value.take() {
845 self.last_value = value;
846 skip += 1;
847 self.values_left -= 1;
848 }
849
850 let mini_block_batch_size = match T::T::PHYSICAL_TYPE {
851 Type::INT32 => 32,
852 Type::INT64 => 64,
853 _ => unreachable!(),
854 };
855
856 let mut skip_buffer = vec![T::T::default(); mini_block_batch_size];
857 while skip < to_skip {
858 if self.mini_block_remaining == 0 {
859 self.next_mini_block()?;
860 }
861
862 let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
863 self.check_bit_width(bit_width)?;
864 let mini_block_to_skip = self.mini_block_remaining.min(to_skip - skip);
865
866 let min_delta = self.min_delta.as_i64()?;
867 if bit_width == 0 {
868 if min_delta != 0 {
871 let total = min_delta.wrapping_mul(mini_block_to_skip as i64);
872 let step = T::T::from_i64(total)
873 .ok_or_else(|| general_err!("delta*n overflow in skip"))?;
874 self.last_value = self.last_value.wrapping_add(&step);
875 }
876 } else {
878 let skip_count = self
880 .bit_reader
881 .get_batch(&mut skip_buffer[0..mini_block_to_skip], bit_width);
882
883 if skip_count != mini_block_to_skip {
884 return Err(general_err!(
885 "Expected to skip {} values from mini block got {}.",
886 mini_block_to_skip,
887 skip_count
888 ));
889 }
890
891 if min_delta == 0 {
892 for v in &mut skip_buffer[0..skip_count] {
893 *v = v.wrapping_add(&self.last_value);
894 self.last_value = *v;
895 }
896 } else {
897 for v in &mut skip_buffer[0..skip_count] {
898 *v = v
899 .wrapping_add(&self.min_delta)
900 .wrapping_add(&self.last_value);
901 self.last_value = *v;
902 }
903 }
904 }
905
906 skip += mini_block_to_skip;
907 self.mini_block_remaining -= mini_block_to_skip;
908 self.values_left -= mini_block_to_skip;
909 }
910
911 Ok(to_skip)
912 }
913}
914
915pub struct DeltaLengthByteArrayDecoder<T: DataType> {
925 lengths: Vec<i32>,
928
929 current_idx: usize,
931
932 data: Option<Bytes>,
934
935 offset: usize,
937
938 num_values: usize,
940
941 _phantom: PhantomData<T>,
943}
944
945impl<T: DataType> Default for DeltaLengthByteArrayDecoder<T> {
946 fn default() -> Self {
947 Self::new()
948 }
949}
950
951impl<T: DataType> DeltaLengthByteArrayDecoder<T> {
952 pub fn new() -> Self {
954 Self {
955 lengths: vec![],
956 current_idx: 0,
957 data: None,
958 offset: 0,
959 num_values: 0,
960 _phantom: PhantomData,
961 }
962 }
963}
964
965impl<T: DataType> Decoder<T> for DeltaLengthByteArrayDecoder<T> {
966 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
967 match T::get_physical_type() {
968 Type::BYTE_ARRAY => {
969 let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
970 len_decoder.set_data(data.clone(), num_values)?;
971 let num_lengths = len_decoder.values_left();
972 self.lengths.resize(num_lengths, 0);
973 len_decoder.get(&mut self.lengths[..])?;
974
975 self.data = Some(data.slice(len_decoder.get_offset()..));
976 self.offset = 0;
977 self.current_idx = 0;
978 self.num_values = num_lengths;
979 Ok(())
980 }
981 _ => Err(general_err!(
982 "DeltaLengthByteArrayDecoder only support ByteArrayType"
983 )),
984 }
985 }
986
987 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
988 match T::get_physical_type() {
989 Type::BYTE_ARRAY => {
990 assert!(self.data.is_some());
991
992 let data = self.data.as_ref().unwrap();
993 let num_values = cmp::min(buffer.len(), self.num_values);
994
995 for item in buffer.iter_mut().take(num_values) {
996 let len = self.lengths[self.current_idx] as usize;
997 item.set_from_bytes(data.slice(self.offset..self.offset + len));
998
999 self.offset += len;
1000 self.current_idx += 1;
1001 }
1002
1003 self.num_values -= num_values;
1004 Ok(num_values)
1005 }
1006 _ => Err(general_err!(
1007 "DeltaLengthByteArrayDecoder only support ByteArrayType"
1008 )),
1009 }
1010 }
1011
1012 fn values_left(&self) -> usize {
1013 self.num_values
1014 }
1015
1016 fn encoding(&self) -> Encoding {
1017 Encoding::DELTA_LENGTH_BYTE_ARRAY
1018 }
1019
1020 fn skip(&mut self, num_values: usize) -> Result<usize> {
1021 match T::get_physical_type() {
1022 Type::BYTE_ARRAY => {
1023 let num_values = cmp::min(num_values, self.num_values);
1024
1025 let next_offset: i32 = self.lengths
1026 [self.current_idx..self.current_idx + num_values]
1027 .iter()
1028 .sum();
1029
1030 self.current_idx += num_values;
1031 self.offset += next_offset as usize;
1032
1033 self.num_values -= num_values;
1034 Ok(num_values)
1035 }
1036 other_type => Err(general_err!(
1037 "DeltaLengthByteArrayDecoder not support {}, only support byte array",
1038 other_type
1039 )),
1040 }
1041 }
1042}
1043
1044pub struct DeltaByteArrayDecoder<T: DataType> {
1054 prefix_lengths: Vec<i32>,
1057
1058 current_idx: usize,
1060
1061 suffix_decoder: Option<DeltaLengthByteArrayDecoder<ByteArrayType>>,
1064
1065 previous_value: Bytes,
1068
1069 num_values: usize,
1071
1072 _phantom: PhantomData<T>,
1074}
1075
1076impl<T: DataType> Default for DeltaByteArrayDecoder<T> {
1077 fn default() -> Self {
1078 Self::new()
1079 }
1080}
1081
1082impl<T: DataType> DeltaByteArrayDecoder<T> {
1083 pub fn new() -> Self {
1085 Self {
1086 prefix_lengths: vec![],
1087 current_idx: 0,
1088 suffix_decoder: None,
1089 previous_value: Bytes::new(),
1090 num_values: 0,
1091 _phantom: PhantomData,
1092 }
1093 }
1094}
1095
1096impl<T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
1097 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
1098 match T::get_physical_type() {
1099 Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1100 let mut prefix_len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
1101 prefix_len_decoder.set_data(data.clone(), num_values)?;
1102 let num_prefixes = prefix_len_decoder.values_left();
1103 self.prefix_lengths.resize(num_prefixes, 0);
1104 prefix_len_decoder.get(&mut self.prefix_lengths[..])?;
1105
1106 let mut suffix_decoder = DeltaLengthByteArrayDecoder::new();
1107 suffix_decoder
1108 .set_data(data.slice(prefix_len_decoder.get_offset()..), num_values)?;
1109 self.suffix_decoder = Some(suffix_decoder);
1110 self.num_values = num_prefixes;
1111 self.current_idx = 0;
1112 self.previous_value = Bytes::new();
1113 Ok(())
1114 }
1115 _ => Err(general_err!(
1116 "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1117 )),
1118 }
1119 }
1120
1121 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
1122 match T::get_physical_type() {
1123 Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1124 let num_values = cmp::min(buffer.len(), self.num_values);
1125 let mut v: [ByteArray; 1] = [ByteArray::new(); 1];
1126 for item in buffer.iter_mut().take(num_values) {
1127 let suffix_decoder = self
1130 .suffix_decoder
1131 .as_mut()
1132 .expect("decoder not initialized");
1133 suffix_decoder.get(&mut v[..])?;
1134 let suffix = v[0].data();
1135
1136 let prefix_len = usize::try_from(self.prefix_lengths[self.current_idx])
1138 .map_err(|_| {
1139 general_err!(
1140 "Invalid DELTA_BYTE_ARRAY prefix length {}",
1141 self.prefix_lengths[self.current_idx]
1142 )
1143 })?;
1144
1145 if prefix_len > self.previous_value.len() {
1146 return Err(general_err!(
1147 "Invalid DELTA_BYTE_ARRAY prefix length {} exceeds previous value length {}",
1148 prefix_len,
1149 self.previous_value.len()
1150 ));
1151 }
1152
1153 let mut result = Vec::with_capacity(prefix_len + suffix.len());
1155 result.extend_from_slice(&self.previous_value[0..prefix_len]);
1156 result.extend_from_slice(suffix);
1157
1158 let data = Bytes::from(result);
1159 item.set_from_bytes(data.clone());
1160
1161 self.previous_value = data;
1162 self.current_idx += 1;
1163 }
1164
1165 self.num_values -= num_values;
1166 Ok(num_values)
1167 }
1168 _ => Err(general_err!(
1169 "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1170 )),
1171 }
1172 }
1173
1174 fn values_left(&self) -> usize {
1175 self.num_values
1176 }
1177
1178 fn encoding(&self) -> Encoding {
1179 Encoding::DELTA_BYTE_ARRAY
1180 }
1181
1182 fn skip(&mut self, num_values: usize) -> Result<usize> {
1183 let mut buffer = vec![T::T::default(); num_values];
1184 self.get(&mut buffer)
1185 }
1186}
1187
1188#[cfg(test)]
1189mod tests {
1190 use super::{super::encoding::*, *};
1191
1192 use std::f32::consts::PI as PI_f32;
1193 use std::f64::consts::PI as PI_f64;
1194 use std::sync::Arc;
1195
1196 use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
1197 use crate::util::test_common::rand_gen::RandGen;
1198
1199 #[test]
1200 fn test_delta_byte_array_invalid_prefix_len_returns_error() {
1201 let col_descr = create_test_col_desc_ptr(-1, Type::BYTE_ARRAY);
1202
1203 let mut encoder =
1204 get_encoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, &col_descr).unwrap();
1205 let input = vec![ByteArray::from("a"), ByteArray::from("ab")];
1206 encoder.put(&input).unwrap();
1207 let encoded = encoder.flush_buffer().unwrap();
1208
1209 let mut prefix_len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
1211 prefix_len_decoder
1212 .set_data(encoded.clone(), input.len())
1213 .unwrap();
1214 let num_prefixes = prefix_len_decoder.values_left();
1215 let mut prefix_lengths = vec![0; num_prefixes];
1216 prefix_len_decoder.get(&mut prefix_lengths).unwrap();
1217
1218 assert_eq!(prefix_lengths, vec![0, 1]);
1220
1221 let prefix_stream_end = prefix_len_decoder.get_offset();
1222
1223 let mut prefix_encoder = get_encoder::<Int32Type>(
1227 Encoding::DELTA_BINARY_PACKED,
1228 &create_test_col_desc_ptr(-1, Type::INT32),
1229 )
1230 .unwrap();
1231 prefix_encoder.put(&[1i32, 1i32]).unwrap();
1232 let corrupted_prefix = prefix_encoder.flush_buffer().unwrap();
1233
1234 let mut corrupted = Vec::new();
1235 corrupted.extend_from_slice(corrupted_prefix.as_ref());
1236 corrupted.extend_from_slice(&encoded[prefix_stream_end..]);
1237
1238 let mut decoder = DeltaByteArrayDecoder::<ByteArrayType>::new();
1239 decoder
1240 .set_data(Bytes::from(corrupted), input.len())
1241 .unwrap();
1242
1243 let mut out = vec![ByteArray::new(); input.len()];
1244
1245 let err = decoder.get(&mut out).unwrap_err();
1246 assert!(
1247 err.to_string()
1248 .contains("Invalid DELTA_BYTE_ARRAY prefix length"),
1249 "{}",
1250 err
1251 );
1252 }
1253
1254 #[test]
1255 fn test_delta_byte_array_negative_prefix_len_returns_error() {
1256 let col_descr = create_test_col_desc_ptr(-1, Type::BYTE_ARRAY);
1257
1258 let mut encoder =
1259 get_encoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, &col_descr).unwrap();
1260 let input = vec![ByteArray::from("a"), ByteArray::from("ab")];
1261 encoder.put(&input).unwrap();
1262 let encoded = encoder.flush_buffer().unwrap();
1263
1264 let mut decoder = DeltaByteArrayDecoder::<ByteArrayType>::new();
1265 decoder.set_data(encoded, input.len()).unwrap();
1266
1267 decoder.prefix_lengths[0] = -1;
1269 let mut out = vec![ByteArray::new(); input.len()];
1270
1271 let err = decoder.get(&mut out).unwrap_err();
1272 assert!(
1273 err.to_string()
1274 .contains("Invalid DELTA_BYTE_ARRAY prefix length"),
1275 "{}",
1276 err
1277 );
1278 }
1279
1280 #[test]
1281 fn test_get_decoders() {
1282 create_and_check_decoder::<Int32Type>(Encoding::PLAIN, None);
1284 create_and_check_decoder::<Int32Type>(Encoding::DELTA_BINARY_PACKED, None);
1285 create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
1286 create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, None);
1287 create_and_check_decoder::<BoolType>(Encoding::RLE, None);
1288
1289 create_and_check_decoder::<Int32Type>(
1291 Encoding::RLE_DICTIONARY,
1292 Some(general_err!(
1293 "Cannot initialize this encoding through this function"
1294 )),
1295 );
1296 create_and_check_decoder::<Int32Type>(
1297 Encoding::PLAIN_DICTIONARY,
1298 Some(general_err!(
1299 "Cannot initialize this encoding through this function"
1300 )),
1301 );
1302 create_and_check_decoder::<Int32Type>(
1303 Encoding::DELTA_LENGTH_BYTE_ARRAY,
1304 Some(general_err!(
1305 "Encoding DELTA_LENGTH_BYTE_ARRAY is not supported for type"
1306 )),
1307 );
1308 create_and_check_decoder::<Int32Type>(
1309 Encoding::DELTA_BYTE_ARRAY,
1310 Some(general_err!(
1311 "Encoding DELTA_BYTE_ARRAY is not supported for type"
1312 )),
1313 );
1314
1315 #[allow(deprecated)]
1317 create_and_check_decoder::<Int32Type>(
1318 Encoding::BIT_PACKED,
1319 Some(nyi_err!("Encoding BIT_PACKED is not supported")),
1320 );
1321 }
1322
1323 #[test]
1324 fn test_plain_decode_int32() {
1325 let data = [42, 18, 52];
1326 let data_bytes = Int32Type::to_byte_array(&data[..]);
1327 let mut buffer = [0; 3];
1328 test_plain_decode::<Int32Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1329 }
1330
1331 #[test]
1332 fn test_plain_skip_int32() {
1333 let data = [42, 18, 52];
1334 let data_bytes = Int32Type::to_byte_array(&data[..]);
1335 test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1336 }
1337
1338 #[test]
1339 fn test_plain_skip_all_int32() {
1340 let data = [42, 18, 52];
1341 let data_bytes = Int32Type::to_byte_array(&data[..]);
1342 test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1343 }
1344
1345 #[test]
1346 fn test_plain_decode_int32_spaced() {
1347 let data = [42, 18, 52];
1348 let expected_data = [0, 42, 0, 18, 0, 0, 52, 0];
1349 let data_bytes = Int32Type::to_byte_array(&data[..]);
1350 let mut buffer = [0; 8];
1351 let num_nulls = 5;
1352 let valid_bits = [0b01001010];
1353 test_plain_decode_spaced::<Int32Type>(
1354 Bytes::from(data_bytes),
1355 3,
1356 -1,
1357 &mut buffer[..],
1358 num_nulls,
1359 &valid_bits,
1360 &expected_data[..],
1361 );
1362 }
1363
1364 #[test]
1365 fn test_plain_decode_int64() {
1366 let data = [42, 18, 52];
1367 let data_bytes = Int64Type::to_byte_array(&data[..]);
1368 let mut buffer = [0; 3];
1369 test_plain_decode::<Int64Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1370 }
1371
1372 #[test]
1373 fn test_plain_skip_int64() {
1374 let data = [42, 18, 52];
1375 let data_bytes = Int64Type::to_byte_array(&data[..]);
1376 test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 2, -1, &data[2..]);
1377 }
1378
1379 #[test]
1380 fn test_plain_skip_all_int64() {
1381 let data = [42, 18, 52];
1382 let data_bytes = Int64Type::to_byte_array(&data[..]);
1383 test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 3, -1, &[]);
1384 }
1385
1386 #[test]
1387 fn test_plain_decode_float() {
1388 let data = [PI_f32, 2.414, 12.51];
1389 let data_bytes = FloatType::to_byte_array(&data[..]);
1390 let mut buffer = [0.0; 3];
1391 test_plain_decode::<FloatType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1392 }
1393
1394 #[test]
1395 fn test_plain_skip_float() {
1396 let data = [PI_f32, 2.414, 12.51];
1397 let data_bytes = FloatType::to_byte_array(&data[..]);
1398 test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1399 }
1400
1401 #[test]
1402 fn test_plain_skip_all_float() {
1403 let data = [PI_f32, 2.414, 12.51];
1404 let data_bytes = FloatType::to_byte_array(&data[..]);
1405 test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 4, -1, &[]);
1406 }
1407
1408 #[test]
1409 fn test_plain_skip_double() {
1410 let data = [PI_f64, 2.414f64, 12.51f64];
1411 let data_bytes = DoubleType::to_byte_array(&data[..]);
1412 test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1413 }
1414
1415 #[test]
1416 fn test_plain_skip_all_double() {
1417 let data = [PI_f64, 2.414f64, 12.51f64];
1418 let data_bytes = DoubleType::to_byte_array(&data[..]);
1419 test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1420 }
1421
1422 #[test]
1423 fn test_plain_decode_double() {
1424 let data = [PI_f64, 2.414f64, 12.51f64];
1425 let data_bytes = DoubleType::to_byte_array(&data[..]);
1426 let mut buffer = [0.0f64; 3];
1427 test_plain_decode::<DoubleType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1428 }
1429
1430 #[test]
1431 fn test_plain_decode_int96() {
1432 let mut data = [Int96::new(); 4];
1433 data[0].set_data(11, 22, 33);
1434 data[1].set_data(44, 55, 66);
1435 data[2].set_data(10, 20, 30);
1436 data[3].set_data(40, 50, 60);
1437 let data_bytes = Int96Type::to_byte_array(&data[..]);
1438 let mut buffer = [Int96::new(); 4];
1439 test_plain_decode::<Int96Type>(Bytes::from(data_bytes), 4, -1, &mut buffer[..], &data[..]);
1440 }
1441
1442 #[test]
1443 fn test_plain_skip_int96() {
1444 let mut data = [Int96::new(); 4];
1445 data[0].set_data(11, 22, 33);
1446 data[1].set_data(44, 55, 66);
1447 data[2].set_data(10, 20, 30);
1448 data[3].set_data(40, 50, 60);
1449 let data_bytes = Int96Type::to_byte_array(&data[..]);
1450 test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 2, -1, &data[2..]);
1451 }
1452
1453 #[test]
1454 fn test_plain_skip_all_int96() {
1455 let mut data = [Int96::new(); 4];
1456 data[0].set_data(11, 22, 33);
1457 data[1].set_data(44, 55, 66);
1458 data[2].set_data(10, 20, 30);
1459 data[3].set_data(40, 50, 60);
1460 let data_bytes = Int96Type::to_byte_array(&data[..]);
1461 test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 8, -1, &[]);
1462 }
1463
1464 #[test]
1465 fn test_plain_decode_bool() {
1466 let data = [
1467 false, true, false, false, true, false, true, true, false, true,
1468 ];
1469 let data_bytes = BoolType::to_byte_array(&data[..]);
1470 let mut buffer = [false; 10];
1471 test_plain_decode::<BoolType>(Bytes::from(data_bytes), 10, -1, &mut buffer[..], &data[..]);
1472 }
1473
1474 #[test]
1475 fn test_plain_skip_bool() {
1476 let data = [
1477 false, true, false, false, true, false, true, true, false, true,
1478 ];
1479 let data_bytes = BoolType::to_byte_array(&data[..]);
1480 test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 5, -1, &data[5..]);
1481 }
1482
1483 #[test]
1484 fn test_plain_skip_all_bool() {
1485 let data = [
1486 false, true, false, false, true, false, true, true, false, true,
1487 ];
1488 let data_bytes = BoolType::to_byte_array(&data[..]);
1489 test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 20, -1, &[]);
1490 }
1491
1492 #[test]
1493 fn test_plain_decode_byte_array() {
1494 let mut data = vec![ByteArray::new(); 2];
1495 data[0].set_data(Bytes::from(String::from("hello")));
1496 data[1].set_data(Bytes::from(String::from("parquet")));
1497 let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1498 let mut buffer = vec![ByteArray::new(); 2];
1499 test_plain_decode::<ByteArrayType>(
1500 Bytes::from(data_bytes),
1501 2,
1502 -1,
1503 &mut buffer[..],
1504 &data[..],
1505 );
1506 }
1507
1508 #[test]
1509 fn test_plain_skip_byte_array() {
1510 let mut data = vec![ByteArray::new(); 2];
1511 data[0].set_data(Bytes::from(String::from("hello")));
1512 data[1].set_data(Bytes::from(String::from("parquet")));
1513 let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1514 test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 1, -1, &data[1..]);
1515 }
1516
1517 #[test]
1518 fn test_plain_skip_all_byte_array() {
1519 let mut data = vec![ByteArray::new(); 2];
1520 data[0].set_data(Bytes::from(String::from("hello")));
1521 data[1].set_data(Bytes::from(String::from("parquet")));
1522 let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1523 test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 2, -1, &[]);
1524 }
1525
1526 #[test]
1527 fn test_plain_decode_fixed_len_byte_array() {
1528 let mut data = vec![FixedLenByteArray::default(); 3];
1529 data[0].set_data(Bytes::from(String::from("bird")));
1530 data[1].set_data(Bytes::from(String::from("come")));
1531 data[2].set_data(Bytes::from(String::from("flow")));
1532 let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1533 let mut buffer = vec![FixedLenByteArray::default(); 3];
1534 test_plain_decode::<FixedLenByteArrayType>(
1535 Bytes::from(data_bytes),
1536 3,
1537 4,
1538 &mut buffer[..],
1539 &data[..],
1540 );
1541 }
1542
1543 #[test]
1544 fn test_plain_skip_fixed_len_byte_array() {
1545 let mut data = vec![FixedLenByteArray::default(); 3];
1546 data[0].set_data(Bytes::from(String::from("bird")));
1547 data[1].set_data(Bytes::from(String::from("come")));
1548 data[2].set_data(Bytes::from(String::from("flow")));
1549 let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1550 test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 1, 4, &data[1..]);
1551 }
1552
1553 #[test]
1554 fn test_plain_skip_all_fixed_len_byte_array() {
1555 let mut data = vec![FixedLenByteArray::default(); 3];
1556 data[0].set_data(Bytes::from(String::from("bird")));
1557 data[1].set_data(Bytes::from(String::from("come")));
1558 data[2].set_data(Bytes::from(String::from("flow")));
1559 let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1560 test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 6, 4, &[]);
1561 }
1562
1563 #[test]
1564 fn test_dict_decoder_empty_data() {
1565 let mut decoder = DictDecoder::<Int32Type>::new();
1566 let err = decoder.set_data(Bytes::new(), 10).unwrap_err();
1567 assert_eq!(err.to_string(), "EOF: Not enough bytes to decode bit_width");
1568 }
1569
1570 fn test_plain_decode<T: DataType>(
1571 data: Bytes,
1572 num_values: usize,
1573 type_length: i32,
1574 buffer: &mut [T::T],
1575 expected: &[T::T],
1576 ) {
1577 let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1578 let result = decoder.set_data(data, num_values);
1579 assert!(result.is_ok());
1580 let result = decoder.get(buffer);
1581 assert!(result.is_ok());
1582 assert_eq!(decoder.values_left(), 0);
1583 assert_eq!(buffer, expected);
1584 }
1585
1586 fn test_plain_skip<T: DataType>(
1587 data: Bytes,
1588 num_values: usize,
1589 skip: usize,
1590 type_length: i32,
1591 expected: &[T::T],
1592 ) {
1593 let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1594 let result = decoder.set_data(data, num_values);
1595 assert!(result.is_ok());
1596 let skipped = decoder.skip(skip).expect("skipping values");
1597
1598 if skip >= num_values {
1599 assert_eq!(skipped, num_values);
1600
1601 let mut buffer = vec![T::T::default(); 1];
1602 let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1603 assert_eq!(remaining, 0);
1604 } else {
1605 assert_eq!(skipped, skip);
1606 let mut buffer = vec![T::T::default(); num_values - skip];
1607 let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1608 assert_eq!(remaining, num_values - skip);
1609 assert_eq!(decoder.values_left(), 0);
1610 assert_eq!(buffer, expected);
1611 }
1612 }
1613
1614 fn test_plain_decode_spaced<T: DataType>(
1615 data: Bytes,
1616 num_values: usize,
1617 type_length: i32,
1618 buffer: &mut [T::T],
1619 num_nulls: usize,
1620 valid_bits: &[u8],
1621 expected: &[T::T],
1622 ) {
1623 let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1624 let result = decoder.set_data(data, num_values);
1625 assert!(result.is_ok());
1626 let result = decoder.get_spaced(buffer, num_nulls, valid_bits);
1627 assert!(result.is_ok());
1628 assert_eq!(num_values + num_nulls, result.unwrap());
1629 assert_eq!(decoder.values_left(), 0);
1630 assert_eq!(buffer, expected);
1631 }
1632
1633 #[test]
1634 #[should_panic(expected = "RleValueEncoder only supports BoolType")]
1635 fn test_rle_value_encode_int32_not_supported() {
1636 let mut encoder = RleValueEncoder::<Int32Type>::new();
1637 encoder.put(&[1, 2, 3, 4]).unwrap();
1638 }
1639
1640 #[test]
1641 #[should_panic(expected = "RleValueDecoder only supports BoolType")]
1642 fn test_rle_value_decode_int32_not_supported() {
1643 let mut decoder = RleValueDecoder::<Int32Type>::new();
1644 decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).unwrap();
1645 }
1646
1647 #[test]
1648 fn test_rle_value_decode_missing_size() {
1649 let mut decoder = RleValueDecoder::<BoolType>::new();
1650 assert!(decoder.set_data(Bytes::from(vec![0]), 1).is_err());
1651 }
1652
1653 #[test]
1654 fn test_rle_value_decode_missing_data() {
1655 let mut decoder = RleValueDecoder::<BoolType>::new();
1656 assert!(decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).is_err());
1657 }
1658
1659 #[test]
1660 fn test_rle_value_decode_bool_decode() {
1661 let data = vec![
1663 BoolType::gen_vec(-1, 256),
1664 BoolType::gen_vec(-1, 257),
1665 BoolType::gen_vec(-1, 126),
1666 ];
1667 test_rle_value_decode::<BoolType>(data);
1668 }
1669
1670 #[test]
1671 #[should_panic(expected = "Bit reader is not initialized")]
1672 fn test_delta_bit_packed_not_initialized_offset() {
1673 let decoder = DeltaBitPackDecoder::<Int32Type>::new();
1675 decoder.get_offset();
1676 }
1677
1678 #[test]
1679 #[should_panic(expected = "Bit reader is not initialized")]
1680 fn test_delta_bit_packed_not_initialized_get() {
1681 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1683 let mut buffer = vec![];
1684 decoder.get(&mut buffer).unwrap();
1685 }
1686
1687 #[test]
1688 fn test_delta_bit_packed_int32_empty() {
1689 let data = vec![vec![0; 0]];
1690 test_delta_bit_packed_decode::<Int32Type>(data);
1691 }
1692
1693 #[test]
1694 fn test_delta_bit_packed_int32_repeat() {
1695 let block_data = vec![
1696 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1697 6, 7, 8,
1698 ];
1699 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1700 }
1701
1702 #[test]
1703 fn test_skip_delta_bit_packed_int32_repeat() {
1704 let block_data = vec![
1705 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1706 6, 7, 8,
1707 ];
1708 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 10);
1709 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1710 }
1711
1712 #[test]
1713 fn test_delta_bit_packed_int32_uneven() {
1714 let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1715 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1716 }
1717
1718 #[test]
1719 fn test_skip_delta_bit_packed_int32_uneven() {
1720 let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1721 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1722 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1723 }
1724
1725 #[test]
1726 fn test_delta_bit_packed_int32_same_values() {
1727 let block_data = vec![
1728 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1729 ];
1730 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1731
1732 let block_data = vec![
1733 -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1734 -127, -127,
1735 ];
1736 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1737 }
1738
1739 #[test]
1740 fn test_skip_delta_bit_packed_int32_same_values() {
1741 let block_data = vec![
1742 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1743 ];
1744 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1745 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1746
1747 let block_data = vec![
1748 -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1749 -127, -127,
1750 ];
1751 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1752 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1753 }
1754
1755 #[test]
1756 fn test_delta_bit_packed_int32_min_max() {
1757 let block_data = vec![
1758 i32::MIN,
1759 i32::MIN,
1760 i32::MIN,
1761 i32::MAX,
1762 i32::MIN,
1763 i32::MAX,
1764 i32::MIN,
1765 i32::MAX,
1766 ];
1767 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1768 }
1769
1770 #[test]
1771 fn test_skip_delta_bit_packed_int32_min_max() {
1772 let block_data = vec![
1773 i32::MIN,
1774 i32::MIN,
1775 i32::MIN,
1776 i32::MAX,
1777 i32::MIN,
1778 i32::MAX,
1779 i32::MIN,
1780 i32::MAX,
1781 ];
1782 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1783 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1784 }
1785
1786 #[test]
1787 fn test_skip_delta_bit_packed_bw0_uniform_step_i32() {
1788 let data: Vec<i32> = (0..128).map(|i| i * 7).collect();
1791 test_skip::<Int32Type>(data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1792 test_skip::<Int32Type>(data, Encoding::DELTA_BINARY_PACKED, 200);
1793 }
1794
1795 #[test]
1796 fn test_skip_delta_bit_packed_bw0_uniform_step_i64() {
1797 let data: Vec<i64> = (0..128).map(|i| i * 100).collect();
1799 test_skip::<Int64Type>(data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1800 test_skip::<Int64Type>(data, Encoding::DELTA_BINARY_PACKED, 200);
1801 }
1802
1803 #[test]
1804 fn test_delta_bit_packed_int32_multiple_blocks() {
1805 let data = vec![
1807 Int32Type::gen_vec(-1, 64),
1808 Int32Type::gen_vec(-1, 128),
1809 Int32Type::gen_vec(-1, 64),
1810 ];
1811 test_delta_bit_packed_decode::<Int32Type>(data);
1812 }
1813
1814 #[test]
1815 fn test_delta_bit_packed_int32_data_across_blocks() {
1816 let data = vec![Int32Type::gen_vec(-1, 256), Int32Type::gen_vec(-1, 257)];
1818 test_delta_bit_packed_decode::<Int32Type>(data);
1819 }
1820
1821 #[test]
1822 fn test_delta_bit_packed_int32_with_empty_blocks() {
1823 let data = vec![
1824 Int32Type::gen_vec(-1, 128),
1825 vec![0; 0],
1826 Int32Type::gen_vec(-1, 64),
1827 ];
1828 test_delta_bit_packed_decode::<Int32Type>(data);
1829 }
1830
1831 #[test]
1832 fn test_delta_bit_packed_int64_empty() {
1833 let data = vec![vec![0; 0]];
1834 test_delta_bit_packed_decode::<Int64Type>(data);
1835 }
1836
1837 #[test]
1838 fn test_delta_bit_packed_int64_min_max() {
1839 let block_data = vec![
1840 i64::MIN,
1841 i64::MAX,
1842 i64::MIN,
1843 i64::MAX,
1844 i64::MIN,
1845 i64::MAX,
1846 i64::MIN,
1847 i64::MAX,
1848 ];
1849 test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
1850 }
1851
1852 #[test]
1853 fn test_delta_bit_packed_int64_multiple_blocks() {
1854 let data = vec![
1856 Int64Type::gen_vec(-1, 64),
1857 Int64Type::gen_vec(-1, 128),
1858 Int64Type::gen_vec(-1, 64),
1859 ];
1860 test_delta_bit_packed_decode::<Int64Type>(data);
1861 }
1862
1863 #[test]
1864 fn test_delta_bit_packed_zero_miniblocks() {
1865 let data = vec![
1867 128, 1, 0, ];
1870 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1871 let err = decoder.set_data(data.into(), 0).unwrap_err();
1872 assert_eq!(
1873 err.to_string(),
1874 "Parquet error: cannot have zero miniblocks per block"
1875 );
1876 }
1877
1878 #[test]
1879 fn test_delta_bit_packed_decoder_sample() {
1880 let data_bytes = vec![
1881 128, 1, 4, 3, 58, 28, 6, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1882 0, 0, 0, 0, 0, 0,
1883 ];
1884 let mut decoder: DeltaBitPackDecoder<Int32Type> = DeltaBitPackDecoder::new();
1885 decoder.set_data(data_bytes.into(), 3).unwrap();
1886 assert_eq!(decoder.get_offset(), 5);
1889 let mut result = vec![0, 0, 0];
1890 decoder.get(&mut result).unwrap();
1891 assert_eq!(decoder.get_offset(), 34);
1892 assert_eq!(result, vec![29, 43, 89]);
1893 }
1894
1895 #[test]
1896 fn test_delta_bit_packed_padding() {
1897 let header = vec![
1899 128,
1903 2,
1904 4,
1906 128 + 35,
1908 3,
1909 7,
1911 ];
1912
1913 let block1_header = vec![
1915 0, 0, 1, 0, 0, ];
1918
1919 let block1 = vec![0xFF; 8];
1924
1925 let block2_header = vec![
1927 0, 0, 1, 2, 0xFF, ];
1930
1931 let block2 = vec![0xFF; 24];
1936
1937 let data: Vec<u8> = header
1938 .into_iter()
1939 .chain(block1_header)
1940 .chain(block1)
1941 .chain(block2_header)
1942 .chain(block2)
1943 .collect();
1944
1945 let length = data.len();
1946
1947 let ptr = Bytes::from(data);
1948 let mut reader = BitReader::new(ptr.clone());
1949 assert_eq!(reader.get_vlq_int().unwrap(), 256);
1950 assert_eq!(reader.get_vlq_int().unwrap(), 4);
1951 assert_eq!(reader.get_vlq_int().unwrap(), 419);
1952 assert_eq!(reader.get_vlq_int().unwrap(), 7);
1953
1954 let mut output = vec![0_i32; 420];
1956
1957 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1958 decoder.set_data(ptr.clone(), 0).unwrap();
1959 assert_eq!(decoder.get(&mut output).unwrap(), 419);
1960 assert_eq!(decoder.get_offset(), length);
1961
1962 decoder.set_data(ptr.slice(..12), 0).unwrap();
1964 let err = decoder.get(&mut output).unwrap_err().to_string();
1965 assert!(
1966 err.contains("Expected to read 64 values from miniblock got 8"),
1967 "{}",
1968 err
1969 );
1970 }
1971
1972 #[test]
1973 fn test_delta_bit_packed_int32_single_value_large() {
1974 let block_data = vec![3; 10240];
1975 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1976 }
1977
1978 #[test]
1979 fn test_delta_bit_packed_int32_single_value_skip_large() {
1980 let block_data = vec![3; 10240];
1981 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1982 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 5000);
1983 }
1984
1985 #[test]
1986 fn test_delta_bit_packed_int32_increasing_value_large() {
1987 let block_data = (0i32..10240).collect();
1988 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1989 }
1990
1991 #[test]
1992 fn test_delta_bit_packed_int32_increasing_value_skip_large() {
1993 let block_data = (0i32..10240).collect::<Vec<i32>>();
1994 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1995 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 5000);
1996 }
1997
1998 #[test]
1999 fn test_delta_bit_packed_int32_stepped_value_large() {
2000 let block_data = (0i32..10240).map(|i| i / 2).collect();
2001 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
2002 }
2003
2004 #[test]
2005 fn test_delta_bit_packed_int32_stepped_value_skip_large() {
2006 let block_data = (0i32..10240).map(|i| i / 2).collect::<Vec<i32>>();
2007 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
2008 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 5000);
2009 }
2010
2011 #[test]
2012 fn test_delta_bit_packed_int32_mixed_large() {
2013 const BLOCK_SIZE: i32 = 133;
2016 let block1_data = (0..BLOCK_SIZE).map(|i| (i * 7) % 11).collect();
2017 let block2_data = vec![3; BLOCK_SIZE as usize];
2018 let block3_data = (0..BLOCK_SIZE).map(|i| (i * 5) % 13).collect();
2019 let block4_data = (0..BLOCK_SIZE).collect();
2020 let block5_data = (0..BLOCK_SIZE).map(|i| (i * 3) % 17).collect();
2021 test_delta_bit_packed_decode::<Int32Type>(vec![
2022 block1_data,
2023 block2_data,
2024 block3_data,
2025 block4_data,
2026 block5_data,
2027 ]);
2028 }
2029
2030 #[test]
2031 fn test_delta_bit_packed_int64_single_value_large() {
2032 let block_data = vec![5; 10240];
2033 test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
2034 }
2035
2036 #[test]
2037 fn test_delta_bit_packed_int64_increasing_value_large() {
2038 let block_data = (0i64..10240).collect();
2039 test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
2040 }
2041
2042 #[test]
2043 fn test_delta_byte_array_same_arrays() {
2044 let data = vec![
2045 vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])],
2046 vec![
2047 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
2048 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
2049 ],
2050 vec![
2051 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
2052 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
2053 ],
2054 ];
2055 test_delta_byte_array_decode(data);
2056 }
2057
2058 #[test]
2059 fn test_delta_byte_array_unique_arrays() {
2060 let data = vec![
2061 vec![ByteArray::from(vec![1])],
2062 vec![ByteArray::from(vec![2, 3]), ByteArray::from(vec![4, 5, 6])],
2063 vec![
2064 ByteArray::from(vec![7, 8]),
2065 ByteArray::from(vec![9, 0, 1, 2]),
2066 ],
2067 ];
2068 test_delta_byte_array_decode(data);
2069 }
2070
2071 #[test]
2072 fn test_delta_byte_array_single_array() {
2073 let data = vec![vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])]];
2074 test_delta_byte_array_decode(data);
2075 }
2076
2077 #[test]
2078 fn test_byte_stream_split_multiple_f32() {
2079 let data = vec![
2080 vec![
2081 f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
2082 f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
2083 ],
2084 vec![f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
2085 ];
2086 test_byte_stream_split_decode::<FloatType>(data, -1);
2087 }
2088
2089 #[test]
2090 fn test_byte_stream_split_f64() {
2091 let data = vec![vec![
2092 f64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
2093 f64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
2094 ]];
2095 test_byte_stream_split_decode::<DoubleType>(data, -1);
2096 }
2097
2098 #[test]
2099 fn test_byte_stream_split_multiple_i32() {
2100 let data = vec![
2101 vec![
2102 i32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
2103 i32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
2104 ],
2105 vec![i32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
2106 ];
2107 test_byte_stream_split_decode::<Int32Type>(data, -1);
2108 }
2109
2110 #[test]
2111 fn test_byte_stream_split_i64() {
2112 let data = vec![vec![
2113 i64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
2114 i64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
2115 ]];
2116 test_byte_stream_split_decode::<Int64Type>(data, -1);
2117 }
2118
2119 fn test_byte_stream_split_flba(type_width: usize) {
2120 let data = vec![
2121 vec![
2122 FixedLenByteArrayType::r#gen(type_width as i32),
2123 FixedLenByteArrayType::r#gen(type_width as i32),
2124 ],
2125 vec![FixedLenByteArrayType::r#gen(type_width as i32)],
2126 ];
2127 test_byte_stream_split_decode::<FixedLenByteArrayType>(data, type_width as i32);
2128 }
2129
2130 #[test]
2131 fn test_byte_stream_split_flba5() {
2132 test_byte_stream_split_flba(5);
2133 }
2134
2135 #[test]
2136 fn test_byte_stream_split_flba16() {
2137 test_byte_stream_split_flba(16);
2138 }
2139
2140 #[test]
2141 fn test_byte_stream_split_flba19() {
2142 test_byte_stream_split_flba(19);
2143 }
2144
2145 #[test]
2146 #[should_panic(expected = "Mismatched FixedLenByteArray sizes: 4 != 5")]
2147 fn test_byte_stream_split_flba_mismatch() {
2148 let data = vec![
2149 vec![
2150 FixedLenByteArray::from(vec![0xAA, 0xAB, 0xAC, 0xAD, 0xAE]),
2151 FixedLenByteArray::from(vec![0xBA, 0xBB, 0xBC, 0xBD, 0xBE]),
2152 ],
2153 vec![FixedLenByteArray::from(vec![0xCA, 0xCB, 0xCC, 0xCD])],
2154 ];
2155 test_byte_stream_split_decode::<FixedLenByteArrayType>(data, 5);
2156 }
2157
2158 #[test]
2159 #[should_panic(expected = "Input data length is not a multiple of type width 4")]
2160 fn test_byte_stream_split_flba_bad_input() {
2161 let mut decoder = VariableWidthByteStreamSplitDecoder::<FixedLenByteArrayType>::new(4);
2162 decoder
2163 .set_data(Bytes::from(vec![1, 2, 3, 4, 5]), 1)
2164 .unwrap();
2165 }
2166
2167 #[test]
2168 fn test_skip_byte_stream_split() {
2169 let block_data = vec![0.3, 0.4, 0.1, 4.10];
2170 test_skip::<FloatType>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
2171 test_skip::<DoubleType>(
2172 block_data.into_iter().map(|x| x as f64).collect(),
2173 Encoding::BYTE_STREAM_SPLIT,
2174 100,
2175 );
2176 }
2177
2178 #[test]
2179 fn test_skip_byte_stream_split_ints() {
2180 let block_data = vec![3, 4, 1, 5];
2181 test_skip::<Int32Type>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
2182 test_skip::<Int64Type>(
2183 block_data.into_iter().map(|x| x as i64).collect(),
2184 Encoding::BYTE_STREAM_SPLIT,
2185 100,
2186 );
2187 }
2188
2189 fn test_rle_value_decode<T: DataType>(data: Vec<Vec<T::T>>) {
2190 test_encode_decode::<T>(data, Encoding::RLE, -1);
2191 }
2192
2193 fn test_delta_bit_packed_decode<T: DataType>(data: Vec<Vec<T::T>>) {
2194 test_encode_decode::<T>(data, Encoding::DELTA_BINARY_PACKED, -1);
2195 }
2196
2197 fn test_byte_stream_split_decode<T: DataType>(data: Vec<Vec<T::T>>, type_width: i32) {
2198 test_encode_decode::<T>(data, Encoding::BYTE_STREAM_SPLIT, type_width);
2199 }
2200
2201 fn test_delta_byte_array_decode(data: Vec<Vec<ByteArray>>) {
2202 test_encode_decode::<ByteArrayType>(data, Encoding::DELTA_BYTE_ARRAY, -1);
2203 }
2204
2205 fn test_encode_decode<T: DataType>(data: Vec<Vec<T::T>>, encoding: Encoding, type_width: i32) {
2210 let col_descr = create_test_col_desc_ptr(type_width, T::get_physical_type());
2211
2212 let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
2214
2215 for v in &data[..] {
2216 encoder.put(&v[..]).expect("ok to encode");
2217 }
2218 let bytes = encoder.flush_buffer().expect("ok to flush buffer");
2219
2220 let expected: Vec<T::T> = data.iter().flat_map(|s| s.clone()).collect();
2222
2223 let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
2225
2226 let mut result = vec![T::T::default(); expected.len()];
2227 decoder
2228 .set_data(bytes, expected.len())
2229 .expect("ok to set data");
2230 let mut result_num_values = 0;
2231 while decoder.values_left() > 0 {
2232 result_num_values += decoder
2233 .get(&mut result[result_num_values..])
2234 .expect("ok to decode");
2235 }
2236 assert_eq!(result_num_values, expected.len());
2237 assert_eq!(result, expected);
2238 }
2239
2240 fn test_skip<T: DataType>(data: Vec<T::T>, encoding: Encoding, skip: usize) {
2241 let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());
2244
2245 let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
2247
2248 encoder.put(&data).expect("ok to encode");
2249
2250 let bytes = encoder.flush_buffer().expect("ok to flush buffer");
2251
2252 let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
2253 decoder.set_data(bytes, data.len()).expect("ok to set data");
2254
2255 if skip >= data.len() {
2256 let skipped = decoder.skip(skip).expect("ok to skip");
2257 assert_eq!(skipped, data.len());
2258
2259 let skipped_again = decoder.skip(skip).expect("ok to skip again");
2260 assert_eq!(skipped_again, 0);
2261 } else {
2262 let skipped = decoder.skip(skip).expect("ok to skip");
2263 assert_eq!(skipped, skip);
2264
2265 let remaining = data.len() - skip;
2266
2267 let expected = &data[skip..];
2268 let mut buffer = vec![T::T::default(); remaining];
2269 let fetched = decoder.get(&mut buffer).expect("ok to decode");
2270 assert_eq!(remaining, fetched);
2271 assert_eq!(&buffer, expected);
2272 }
2273 }
2274
2275 fn create_and_check_decoder<T: DataType>(encoding: Encoding, err: Option<ParquetError>) {
2276 let descr = create_test_col_desc_ptr(-1, T::get_physical_type());
2277 let decoder = get_decoder::<T>(descr, encoding);
2278 match err {
2279 Some(parquet_error) => {
2280 assert_eq!(
2281 decoder.err().unwrap().to_string(),
2282 parquet_error.to_string()
2283 );
2284 }
2285 None => {
2286 assert_eq!(decoder.unwrap().encoding(), encoding);
2287 }
2288 }
2289 }
2290
2291 fn create_test_col_desc_ptr(type_len: i32, t: Type) -> ColumnDescPtr {
2293 let ty = SchemaType::primitive_type_builder("t", t)
2294 .with_length(type_len)
2295 .build()
2296 .unwrap();
2297 Arc::new(ColumnDescriptor::new(
2298 Arc::new(ty),
2299 0,
2300 0,
2301 ColumnPath::new(vec![]),
2302 ))
2303 }
2304
2305 fn usize_to_bytes(v: usize) -> [u8; 4] {
2306 (v as u32).to_ne_bytes()
2307 }
2308
2309 trait ToByteArray<T: DataType> {
2311 #[allow(clippy::wrong_self_convention)]
2312 fn to_byte_array(data: &[T::T]) -> Vec<u8>;
2313 }
2314
2315 macro_rules! to_byte_array_impl {
2316 ($ty: ty) => {
2317 impl ToByteArray<$ty> for $ty {
2318 #[allow(clippy::wrong_self_convention)]
2319 fn to_byte_array(data: &[<$ty as DataType>::T]) -> Vec<u8> {
2320 <$ty as DataType>::T::slice_as_bytes(data).to_vec()
2321 }
2322 }
2323 };
2324 }
2325
2326 to_byte_array_impl!(Int32Type);
2327 to_byte_array_impl!(Int64Type);
2328 to_byte_array_impl!(FloatType);
2329 to_byte_array_impl!(DoubleType);
2330
2331 impl ToByteArray<BoolType> for BoolType {
2332 #[allow(clippy::wrong_self_convention)]
2333 fn to_byte_array(data: &[bool]) -> Vec<u8> {
2334 let mut v = vec![];
2335 for (i, item) in data.iter().enumerate() {
2336 if i % 8 == 0 {
2337 v.push(0);
2338 }
2339 if *item {
2340 v[i / 8] |= 1 << (i % 8);
2341 }
2342 }
2343 v
2344 }
2345 }
2346
2347 impl ToByteArray<Int96Type> for Int96Type {
2348 #[allow(clippy::wrong_self_convention)]
2349 fn to_byte_array(data: &[Int96]) -> Vec<u8> {
2350 let mut v = vec![];
2351 for d in data {
2352 v.extend_from_slice(d.as_bytes());
2353 }
2354 v
2355 }
2356 }
2357
2358 impl ToByteArray<ByteArrayType> for ByteArrayType {
2359 #[allow(clippy::wrong_self_convention)]
2360 fn to_byte_array(data: &[ByteArray]) -> Vec<u8> {
2361 let mut v = vec![];
2362 for d in data {
2363 let buf = d.data();
2364 let len = &usize_to_bytes(buf.len());
2365 v.extend_from_slice(len);
2366 v.extend(buf);
2367 }
2368 v
2369 }
2370 }
2371
2372 impl ToByteArray<FixedLenByteArrayType> for FixedLenByteArrayType {
2373 #[allow(clippy::wrong_self_convention)]
2374 fn to_byte_array(data: &[FixedLenByteArray]) -> Vec<u8> {
2375 let mut v = vec![];
2376 for d in data {
2377 let buf = d.data();
2378 v.extend(buf);
2379 }
2380 v
2381 }
2382 }
2383
2384 #[test]
2385 #[allow(clippy::vec_init_then_push)]
2387 fn test_delta_bit_packed_invalid_bit_width() {
2388 let mut buffer = vec![];
2390 buffer.push(128);
2392 buffer.push(1);
2393 buffer.push(4);
2395 buffer.push(32);
2397 buffer.push(0);
2399 buffer.push(0);
2401 buffer.push(33); buffer.push(0);
2404 buffer.push(0);
2405 buffer.push(0);
2406
2407 let corrupted_buffer = Bytes::from(buffer);
2408
2409 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
2410 decoder.set_data(corrupted_buffer.clone(), 32).unwrap();
2411 let mut read_buffer = vec![0; 32];
2412 let err = decoder.get(&mut read_buffer).unwrap_err();
2413 assert!(
2414 err.to_string()
2415 .contains("Invalid delta bit width 33 which is larger than expected 32"),
2416 "{}",
2417 err
2418 );
2419
2420 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
2421 decoder.set_data(corrupted_buffer, 32).unwrap();
2422 let err = decoder.skip(32).unwrap_err();
2423 assert!(
2424 err.to_string()
2425 .contains("Invalid delta bit width 33 which is larger than expected 32"),
2426 "{}",
2427 err
2428 );
2429 }
2430}