1use bytes::Bytes;
21use num_traits::{FromPrimitive, WrappingAdd};
22use std::{cmp, marker::PhantomData, mem};
23
24use super::rle::RleDecoder;
25
26use crate::basic::*;
27use crate::data_type::private::ParquetValueType;
28use crate::data_type::*;
29use crate::encodings::decoding::byte_stream_split_decoder::{
30 ByteStreamSplitDecoder, VariableWidthByteStreamSplitDecoder,
31};
32use crate::errors::{ParquetError, Result};
33use crate::schema::types::ColumnDescPtr;
34use crate::util::bit_util::{self, BitReader};
35
36mod byte_stream_split_decoder;
37
38pub(crate) mod private {
39 use super::*;
40
41 pub trait GetDecoder {
46 fn get_decoder<T: DataType<T = Self>>(
47 descr: ColumnDescPtr,
48 encoding: Encoding,
49 ) -> Result<Box<dyn Decoder<T>>> {
50 get_decoder_default(descr, encoding)
51 }
52 }
53
54 fn get_decoder_default<T: DataType>(
55 descr: ColumnDescPtr,
56 encoding: Encoding,
57 ) -> Result<Box<dyn Decoder<T>>> {
58 match encoding {
59 Encoding::PLAIN => Ok(Box::new(PlainDecoder::new(descr.type_length()))),
60 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Err(general_err!(
61 "Cannot initialize this encoding through this function"
62 )),
63 Encoding::RLE
64 | Encoding::DELTA_BINARY_PACKED
65 | Encoding::DELTA_BYTE_ARRAY
66 | Encoding::DELTA_LENGTH_BYTE_ARRAY => Err(general_err!(
67 "Encoding {} is not supported for type",
68 encoding
69 )),
70 e => Err(nyi_err!("Encoding {} is not supported", e)),
71 }
72 }
73
74 impl GetDecoder for bool {
75 fn get_decoder<T: DataType<T = Self>>(
76 descr: ColumnDescPtr,
77 encoding: Encoding,
78 ) -> Result<Box<dyn Decoder<T>>> {
79 match encoding {
80 Encoding::RLE => Ok(Box::new(RleValueDecoder::new())),
81 _ => get_decoder_default(descr, encoding),
82 }
83 }
84 }
85
86 impl GetDecoder for i32 {
87 fn get_decoder<T: DataType<T = Self>>(
88 descr: ColumnDescPtr,
89 encoding: Encoding,
90 ) -> Result<Box<dyn Decoder<T>>> {
91 match encoding {
92 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
93 Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
94 _ => get_decoder_default(descr, encoding),
95 }
96 }
97 }
98
99 impl GetDecoder for i64 {
100 fn get_decoder<T: DataType<T = Self>>(
101 descr: ColumnDescPtr,
102 encoding: Encoding,
103 ) -> Result<Box<dyn Decoder<T>>> {
104 match encoding {
105 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
106 Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
107 _ => get_decoder_default(descr, encoding),
108 }
109 }
110 }
111
112 impl GetDecoder for f32 {
113 fn get_decoder<T: DataType<T = Self>>(
114 descr: ColumnDescPtr,
115 encoding: Encoding,
116 ) -> Result<Box<dyn Decoder<T>>> {
117 match encoding {
118 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
119 _ => get_decoder_default(descr, encoding),
120 }
121 }
122 }
123 impl GetDecoder for f64 {
124 fn get_decoder<T: DataType<T = Self>>(
125 descr: ColumnDescPtr,
126 encoding: Encoding,
127 ) -> Result<Box<dyn Decoder<T>>> {
128 match encoding {
129 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
130 _ => get_decoder_default(descr, encoding),
131 }
132 }
133 }
134
135 impl GetDecoder for ByteArray {
136 fn get_decoder<T: DataType<T = Self>>(
137 descr: ColumnDescPtr,
138 encoding: Encoding,
139 ) -> Result<Box<dyn Decoder<T>>> {
140 match encoding {
141 Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
142 Encoding::DELTA_LENGTH_BYTE_ARRAY => {
143 Ok(Box::new(DeltaLengthByteArrayDecoder::new()))
144 }
145 _ => get_decoder_default(descr, encoding),
146 }
147 }
148 }
149
150 impl GetDecoder for FixedLenByteArray {
151 fn get_decoder<T: DataType<T = Self>>(
152 descr: ColumnDescPtr,
153 encoding: Encoding,
154 ) -> Result<Box<dyn Decoder<T>>> {
155 match encoding {
156 Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(
157 VariableWidthByteStreamSplitDecoder::new(descr.type_length()),
158 )),
159 Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
160 _ => get_decoder_default(descr, encoding),
161 }
162 }
163 }
164
165 impl GetDecoder for Int96 {}
166}
167
168pub trait Decoder<T: DataType>: Send {
173 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()>;
176
177 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize>;
184
185 fn get_spaced(
198 &mut self,
199 buffer: &mut [T::T],
200 null_count: usize,
201 valid_bits: &[u8],
202 ) -> Result<usize> {
203 assert!(buffer.len() >= null_count);
204
205 if null_count == 0 {
207 return self.get(buffer);
208 }
209
210 let num_values = buffer.len();
211 let values_to_read = num_values - null_count;
212 let values_read = self.get(buffer)?;
213 if values_read != values_to_read {
214 return Err(general_err!(
215 "Number of values read: {}, doesn't match expected: {}",
216 values_read,
217 values_to_read
218 ));
219 }
220 let mut values_to_move = values_read;
221 for i in (0..num_values).rev() {
222 if bit_util::get_bit(valid_bits, i) {
223 values_to_move -= 1;
224 buffer.swap(i, values_to_move);
225 }
226 }
227
228 Ok(num_values)
229 }
230
231 fn values_left(&self) -> usize;
233
234 fn encoding(&self) -> Encoding;
236
237 fn skip(&mut self, num_values: usize) -> Result<usize>;
239}
240
241pub fn get_decoder<T: DataType>(
246 descr: ColumnDescPtr,
247 encoding: Encoding,
248) -> Result<Box<dyn Decoder<T>>> {
249 use self::private::GetDecoder;
250 T::T::get_decoder(descr, encoding)
251}
252
253#[derive(Default)]
257pub struct PlainDecoderDetails {
258 pub(crate) num_values: usize,
260
261 pub(crate) start: usize,
263
264 pub(crate) type_length: i32,
266
267 pub(crate) data: Option<Bytes>,
269
270 pub(crate) bit_reader: Option<BitReader>,
272}
273
274pub struct PlainDecoder<T: DataType> {
280 inner: PlainDecoderDetails,
282
283 _phantom: PhantomData<T>,
286}
287
288impl<T: DataType> PlainDecoder<T> {
289 pub fn new(type_length: i32) -> Self {
291 PlainDecoder {
292 inner: PlainDecoderDetails {
293 type_length,
294 num_values: 0,
295 start: 0,
296 data: None,
297 bit_reader: None,
298 },
299 _phantom: PhantomData,
300 }
301 }
302}
303
304impl<T: DataType> Decoder<T> for PlainDecoder<T> {
305 #[inline]
306 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
307 T::T::set_data(&mut self.inner, data, num_values);
308 Ok(())
309 }
310
311 #[inline]
312 fn values_left(&self) -> usize {
313 self.inner.num_values
314 }
315
316 #[inline]
317 fn encoding(&self) -> Encoding {
318 Encoding::PLAIN
319 }
320
321 #[inline]
322 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
323 T::T::decode(buffer, &mut self.inner)
324 }
325
326 #[inline]
327 fn skip(&mut self, num_values: usize) -> Result<usize> {
328 T::T::skip(&mut self.inner, num_values)
329 }
330}
331
332pub struct DictDecoder<T: DataType> {
341 dictionary: Vec<T::T>,
343
344 has_dictionary: bool,
346
347 rle_decoder: Option<RleDecoder>,
349
350 num_values: usize,
352}
353
354impl<T: DataType> Default for DictDecoder<T> {
355 fn default() -> Self {
356 Self::new()
357 }
358}
359
360impl<T: DataType> DictDecoder<T> {
361 pub fn new() -> Self {
363 Self {
364 dictionary: vec![],
365 has_dictionary: false,
366 rle_decoder: None,
367 num_values: 0,
368 }
369 }
370
371 pub fn set_dict(&mut self, mut decoder: Box<dyn Decoder<T>>) -> Result<()> {
373 let num_values = decoder.values_left();
374 self.dictionary.resize(num_values, T::T::default());
375 let _ = decoder.get(&mut self.dictionary)?;
376 self.has_dictionary = true;
377 Ok(())
378 }
379}
380
381impl<T: DataType> Decoder<T> for DictDecoder<T> {
382 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
383 if data.is_empty() {
385 return Err(eof_err!("Not enough bytes to decode bit_width"));
386 }
387
388 let bit_width = data.as_ref()[0];
389 if bit_width > 32 {
390 return Err(general_err!(
391 "Invalid or corrupted RLE bit width {}. Max allowed is 32",
392 bit_width
393 ));
394 }
395 let mut rle_decoder = RleDecoder::new(bit_width);
396 rle_decoder.set_data(data.slice(1..))?;
397 self.num_values = num_values;
398 self.rle_decoder = Some(rle_decoder);
399 Ok(())
400 }
401
402 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
403 assert!(self.rle_decoder.is_some());
404 assert!(self.has_dictionary, "Must call set_dict() first!");
405
406 let rle = self.rle_decoder.as_mut().unwrap();
407 let num_values = cmp::min(buffer.len(), self.num_values);
408 rle.get_batch_with_dict(&self.dictionary[..], buffer, num_values)
409 }
410
411 fn values_left(&self) -> usize {
413 self.num_values
414 }
415
416 fn encoding(&self) -> Encoding {
417 Encoding::RLE_DICTIONARY
418 }
419
420 fn skip(&mut self, num_values: usize) -> Result<usize> {
421 assert!(self.rle_decoder.is_some());
422 assert!(self.has_dictionary, "Must call set_dict() first!");
423
424 let rle = self.rle_decoder.as_mut().unwrap();
425 let num_values = cmp::min(num_values, self.num_values);
426 rle.skip(num_values)
427 }
428}
429
430pub struct RleValueDecoder<T: DataType> {
437 values_left: usize,
438 decoder: RleDecoder,
439 _phantom: PhantomData<T>,
440}
441
442impl<T: DataType> Default for RleValueDecoder<T> {
443 fn default() -> Self {
444 Self::new()
445 }
446}
447
448impl<T: DataType> RleValueDecoder<T> {
449 pub fn new() -> Self {
450 Self {
451 values_left: 0,
452 decoder: RleDecoder::new(1),
453 _phantom: PhantomData,
454 }
455 }
456}
457
458impl<T: DataType> Decoder<T> for RleValueDecoder<T> {
459 #[inline]
460 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
461 ensure_phys_ty!(Type::BOOLEAN, "RleValueDecoder only supports BoolType");
463
464 const I32_SIZE: usize = mem::size_of::<i32>();
466 if data.len() < I32_SIZE {
467 return Err(eof_err!("Not enough bytes to decode"));
468 }
469 let data_size = bit_util::read_num_bytes::<i32>(I32_SIZE, data.as_ref()) as usize;
470 if data.len() - I32_SIZE < data_size {
471 return Err(eof_err!("Not enough bytes to decode"));
472 }
473
474 self.decoder = RleDecoder::new(1);
475 self.decoder
476 .set_data(data.slice(I32_SIZE..I32_SIZE + data_size))?;
477 self.values_left = num_values;
478 Ok(())
479 }
480
481 #[inline]
482 fn values_left(&self) -> usize {
483 self.values_left
484 }
485
486 #[inline]
487 fn encoding(&self) -> Encoding {
488 Encoding::RLE
489 }
490
491 #[inline]
492 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
493 let num_values = cmp::min(buffer.len(), self.values_left);
494 let values_read = self.decoder.get_batch(&mut buffer[..num_values])?;
495 self.values_left -= values_read;
496 Ok(values_read)
497 }
498
499 #[inline]
500 fn skip(&mut self, num_values: usize) -> Result<usize> {
501 let num_values = cmp::min(num_values, self.values_left);
502 let values_skipped = self.decoder.skip(num_values)?;
503 self.values_left -= values_skipped;
504 Ok(values_skipped)
505 }
506}
507
508pub struct DeltaBitPackDecoder<T: DataType> {
516 bit_reader: BitReader,
517 initialized: bool,
518
519 block_size: usize,
522 values_left: usize,
524 mini_blocks_per_block: usize,
526 values_per_mini_block: usize,
528
529 min_delta: T::T,
532 block_end_offset: usize,
534 mini_block_idx: usize,
536 mini_block_bit_widths: Vec<u8>,
538 mini_block_remaining: usize,
540
541 first_value: Option<T::T>,
543 last_value: T::T,
545}
546
547impl<T: DataType> Default for DeltaBitPackDecoder<T>
548where
549 T::T: Default + FromPrimitive + WrappingAdd + Copy,
550{
551 fn default() -> Self {
552 Self::new()
553 }
554}
555
556impl<T: DataType> DeltaBitPackDecoder<T>
557where
558 T::T: Default + FromPrimitive + WrappingAdd + Copy,
559{
560 pub fn new() -> Self {
562 Self {
563 bit_reader: BitReader::from(vec![]),
564 initialized: false,
565 block_size: 0,
566 values_left: 0,
567 mini_blocks_per_block: 0,
568 values_per_mini_block: 0,
569 min_delta: Default::default(),
570 mini_block_idx: 0,
571 mini_block_bit_widths: vec![],
572 mini_block_remaining: 0,
573 block_end_offset: 0,
574 first_value: None,
575 last_value: Default::default(),
576 }
577 }
578
579 pub fn get_offset(&self) -> usize {
581 assert!(self.initialized, "Bit reader is not initialized");
582 match self.values_left {
583 0 => self.bit_reader.get_byte_offset().max(self.block_end_offset),
589 _ => self.bit_reader.get_byte_offset(),
590 }
591 }
592
593 #[inline]
595 fn next_block(&mut self) -> Result<()> {
596 let min_delta = self
597 .bit_reader
598 .get_zigzag_vlq_int()
599 .ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?;
600
601 self.min_delta =
602 T::T::from_i64(min_delta).ok_or_else(|| general_err!("'min_delta' too large"))?;
603
604 self.mini_block_bit_widths.clear();
605 self.bit_reader
606 .get_aligned_bytes(&mut self.mini_block_bit_widths, self.mini_blocks_per_block);
607
608 let mut offset = self.bit_reader.get_byte_offset();
609 let mut remaining = self.values_left;
610
611 for b in &mut self.mini_block_bit_widths {
613 if remaining == 0 {
614 *b = 0;
617 }
618 remaining = remaining.saturating_sub(self.values_per_mini_block);
619 offset += *b as usize * self.values_per_mini_block / 8;
620 }
621 self.block_end_offset = offset;
622
623 if self.mini_block_bit_widths.len() != self.mini_blocks_per_block {
624 return Err(eof_err!("insufficient mini block bit widths"));
625 }
626
627 self.mini_block_remaining = self.values_per_mini_block;
628 self.mini_block_idx = 0;
629
630 Ok(())
631 }
632
633 #[inline]
635 fn next_mini_block(&mut self) -> Result<()> {
636 if self.mini_block_idx + 1 < self.mini_block_bit_widths.len() {
637 self.mini_block_idx += 1;
638 self.mini_block_remaining = self.values_per_mini_block;
639 Ok(())
640 } else {
641 self.next_block()
642 }
643 }
644
645 #[inline]
647 fn check_bit_width(&self, bit_width: usize) -> Result<()> {
648 if bit_width > std::mem::size_of::<T::T>() * 8 {
649 return Err(general_err!(
650 "Invalid delta bit width {} which is larger than expected {} ",
651 bit_width,
652 std::mem::size_of::<T::T>() * 8
653 ));
654 }
655 Ok(())
656 }
657}
658
659impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T>
660where
661 T::T: Default + FromPrimitive + WrappingAdd + Copy,
662{
663 #[inline]
665 fn set_data(&mut self, data: Bytes, _index: usize) -> Result<()> {
666 self.bit_reader = BitReader::new(data);
667 self.initialized = true;
668
669 self.block_size = self
671 .bit_reader
672 .get_vlq_int()
673 .ok_or_else(|| eof_err!("Not enough data to decode 'block_size'"))?
674 .try_into()
675 .map_err(|_| general_err!("invalid 'block_size'"))?;
676
677 self.mini_blocks_per_block = self
678 .bit_reader
679 .get_vlq_int()
680 .ok_or_else(|| eof_err!("Not enough data to decode 'mini_blocks_per_block'"))?
681 .try_into()
682 .map_err(|_| general_err!("invalid 'mini_blocks_per_block'"))?;
683
684 if self.mini_blocks_per_block == 0 {
685 return Err(general_err!("cannot have zero miniblocks per block"));
686 }
687
688 self.values_left = self
689 .bit_reader
690 .get_vlq_int()
691 .ok_or_else(|| eof_err!("Not enough data to decode 'values_left'"))?
692 .try_into()
693 .map_err(|_| general_err!("invalid 'values_left'"))?;
694
695 let first_value = self
696 .bit_reader
697 .get_zigzag_vlq_int()
698 .ok_or_else(|| eof_err!("Not enough data to decode 'first_value'"))?;
699
700 self.first_value =
701 Some(T::T::from_i64(first_value).ok_or_else(|| general_err!("first value too large"))?);
702
703 if self.block_size % 128 != 0 {
704 return Err(general_err!(
705 "'block_size' must be a multiple of 128, got {}",
706 self.block_size
707 ));
708 }
709
710 if self.block_size % self.mini_blocks_per_block != 0 {
711 return Err(general_err!(
712 "'block_size' must be a multiple of 'mini_blocks_per_block' got {} and {}",
713 self.block_size,
714 self.mini_blocks_per_block
715 ));
716 }
717
718 self.mini_block_idx = 0;
720 self.values_per_mini_block = self.block_size / self.mini_blocks_per_block;
721 self.mini_block_remaining = 0;
722 self.mini_block_bit_widths.clear();
723
724 if self.values_per_mini_block % 32 != 0 {
725 return Err(general_err!(
726 "'values_per_mini_block' must be a multiple of 32 got {}",
727 self.values_per_mini_block
728 ));
729 }
730
731 Ok(())
732 }
733
734 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
735 assert!(self.initialized, "Bit reader is not initialized");
736 if buffer.is_empty() {
737 return Ok(0);
738 }
739
740 let mut read = 0;
741 let to_read = buffer.len().min(self.values_left);
742
743 if let Some(value) = self.first_value.take() {
744 self.last_value = value;
745 buffer[0] = value;
746 read += 1;
747 self.values_left -= 1;
748 }
749
750 while read != to_read {
751 if self.mini_block_remaining == 0 {
752 self.next_mini_block()?;
753 }
754
755 let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
756 self.check_bit_width(bit_width)?;
757 let batch_to_read = self.mini_block_remaining.min(to_read - read);
758
759 let batch_read = self
760 .bit_reader
761 .get_batch(&mut buffer[read..read + batch_to_read], bit_width);
762
763 if batch_read != batch_to_read {
764 return Err(general_err!(
765 "Expected to read {} values from miniblock got {}",
766 batch_to_read,
767 batch_read
768 ));
769 }
770
771 let min_delta = self.min_delta.as_i64()?;
783 if bit_width == 0 {
784 if min_delta == 0 {
785 buffer[read..read + batch_read].fill(self.last_value);
786 } else {
787 let mut delta = self.min_delta;
791 for v in &mut buffer[read..read + batch_read] {
792 *v = self.last_value.wrapping_add(&delta);
793 delta = delta.wrapping_add(&self.min_delta);
794 }
795
796 self.last_value = buffer[read + batch_read - 1];
797 }
798 } else {
799 if min_delta == 0 {
803 for v in &mut buffer[read..read + batch_read] {
804 *v = v.wrapping_add(&self.last_value);
805 self.last_value = *v;
806 }
807 } else {
808 for v in &mut buffer[read..read + batch_read] {
809 *v = v
810 .wrapping_add(&self.min_delta)
811 .wrapping_add(&self.last_value);
812 self.last_value = *v;
813 }
814 }
815 }
816
817 read += batch_read;
818 self.mini_block_remaining -= batch_read;
819 self.values_left -= batch_read;
820 }
821
822 Ok(to_read)
823 }
824
825 fn values_left(&self) -> usize {
826 self.values_left
827 }
828
829 fn encoding(&self) -> Encoding {
830 Encoding::DELTA_BINARY_PACKED
831 }
832
833 fn skip(&mut self, num_values: usize) -> Result<usize> {
834 let mut skip = 0;
835 let to_skip = num_values.min(self.values_left);
836 if to_skip == 0 {
837 return Ok(0);
838 }
839
840 if let Some(value) = self.first_value.take() {
842 self.last_value = value;
843 skip += 1;
844 self.values_left -= 1;
845 }
846
847 let mini_block_batch_size = match T::T::PHYSICAL_TYPE {
848 Type::INT32 => 32,
849 Type::INT64 => 64,
850 _ => unreachable!(),
851 };
852
853 let mut skip_buffer = vec![T::T::default(); mini_block_batch_size];
854 while skip < to_skip {
855 if self.mini_block_remaining == 0 {
856 self.next_mini_block()?;
857 }
858
859 let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
860 self.check_bit_width(bit_width)?;
861 let mini_block_to_skip = self.mini_block_remaining.min(to_skip - skip);
862 let mini_block_should_skip = mini_block_to_skip;
863
864 let skip_count = self
865 .bit_reader
866 .get_batch(&mut skip_buffer[0..mini_block_to_skip], bit_width);
867
868 if skip_count != mini_block_to_skip {
869 return Err(general_err!(
870 "Expected to skip {} values from mini block got {}.",
871 mini_block_batch_size,
872 skip_count
873 ));
874 }
875
876 let min_delta = self.min_delta.as_i64()?;
878 if bit_width == 0 {
879 if min_delta != 0 {
881 let mut delta = self.min_delta;
882 for v in &mut skip_buffer[0..skip_count] {
883 *v = self.last_value.wrapping_add(&delta);
884 delta = delta.wrapping_add(&self.min_delta);
885 }
886
887 self.last_value = skip_buffer[skip_count - 1];
888 }
889 } else if min_delta == 0 {
890 for v in &mut skip_buffer[0..skip_count] {
891 *v = v.wrapping_add(&self.last_value);
892
893 self.last_value = *v;
894 }
895 } else {
896 for v in &mut skip_buffer[0..skip_count] {
897 *v = v
898 .wrapping_add(&self.min_delta)
899 .wrapping_add(&self.last_value);
900
901 self.last_value = *v;
902 }
903 }
904
905 skip += mini_block_should_skip;
906 self.mini_block_remaining -= mini_block_should_skip;
907 self.values_left -= mini_block_should_skip;
908 }
909
910 Ok(to_skip)
911 }
912}
913
914pub struct DeltaLengthByteArrayDecoder<T: DataType> {
924 lengths: Vec<i32>,
927
928 current_idx: usize,
930
931 data: Option<Bytes>,
933
934 offset: usize,
936
937 num_values: usize,
939
940 _phantom: PhantomData<T>,
942}
943
944impl<T: DataType> Default for DeltaLengthByteArrayDecoder<T> {
945 fn default() -> Self {
946 Self::new()
947 }
948}
949
950impl<T: DataType> DeltaLengthByteArrayDecoder<T> {
951 pub fn new() -> Self {
953 Self {
954 lengths: vec![],
955 current_idx: 0,
956 data: None,
957 offset: 0,
958 num_values: 0,
959 _phantom: PhantomData,
960 }
961 }
962}
963
964impl<T: DataType> Decoder<T> for DeltaLengthByteArrayDecoder<T> {
965 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
966 match T::get_physical_type() {
967 Type::BYTE_ARRAY => {
968 let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
969 len_decoder.set_data(data.clone(), num_values)?;
970 let num_lengths = len_decoder.values_left();
971 self.lengths.resize(num_lengths, 0);
972 len_decoder.get(&mut self.lengths[..])?;
973
974 self.data = Some(data.slice(len_decoder.get_offset()..));
975 self.offset = 0;
976 self.current_idx = 0;
977 self.num_values = num_lengths;
978 Ok(())
979 }
980 _ => Err(general_err!(
981 "DeltaLengthByteArrayDecoder only support ByteArrayType"
982 )),
983 }
984 }
985
986 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
987 match T::get_physical_type() {
988 Type::BYTE_ARRAY => {
989 assert!(self.data.is_some());
990
991 let data = self.data.as_ref().unwrap();
992 let num_values = cmp::min(buffer.len(), self.num_values);
993
994 for item in buffer.iter_mut().take(num_values) {
995 let len = self.lengths[self.current_idx] as usize;
996 item.set_from_bytes(data.slice(self.offset..self.offset + len));
997
998 self.offset += len;
999 self.current_idx += 1;
1000 }
1001
1002 self.num_values -= num_values;
1003 Ok(num_values)
1004 }
1005 _ => Err(general_err!(
1006 "DeltaLengthByteArrayDecoder only support ByteArrayType"
1007 )),
1008 }
1009 }
1010
1011 fn values_left(&self) -> usize {
1012 self.num_values
1013 }
1014
1015 fn encoding(&self) -> Encoding {
1016 Encoding::DELTA_LENGTH_BYTE_ARRAY
1017 }
1018
1019 fn skip(&mut self, num_values: usize) -> Result<usize> {
1020 match T::get_physical_type() {
1021 Type::BYTE_ARRAY => {
1022 let num_values = cmp::min(num_values, self.num_values);
1023
1024 let next_offset: i32 = self.lengths
1025 [self.current_idx..self.current_idx + num_values]
1026 .iter()
1027 .sum();
1028
1029 self.current_idx += num_values;
1030 self.offset += next_offset as usize;
1031
1032 self.num_values -= num_values;
1033 Ok(num_values)
1034 }
1035 other_type => Err(general_err!(
1036 "DeltaLengthByteArrayDecoder not support {}, only support byte array",
1037 other_type
1038 )),
1039 }
1040 }
1041}
1042
1043pub struct DeltaByteArrayDecoder<T: DataType> {
1053 prefix_lengths: Vec<i32>,
1056
1057 current_idx: usize,
1059
1060 suffix_decoder: Option<DeltaLengthByteArrayDecoder<ByteArrayType>>,
1063
1064 previous_value: Bytes,
1067
1068 num_values: usize,
1070
1071 _phantom: PhantomData<T>,
1073}
1074
1075impl<T: DataType> Default for DeltaByteArrayDecoder<T> {
1076 fn default() -> Self {
1077 Self::new()
1078 }
1079}
1080
1081impl<T: DataType> DeltaByteArrayDecoder<T> {
1082 pub fn new() -> Self {
1084 Self {
1085 prefix_lengths: vec![],
1086 current_idx: 0,
1087 suffix_decoder: None,
1088 previous_value: Bytes::new(),
1089 num_values: 0,
1090 _phantom: PhantomData,
1091 }
1092 }
1093}
1094
1095impl<T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
1096 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
1097 match T::get_physical_type() {
1098 Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1099 let mut prefix_len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
1100 prefix_len_decoder.set_data(data.clone(), num_values)?;
1101 let num_prefixes = prefix_len_decoder.values_left();
1102 self.prefix_lengths.resize(num_prefixes, 0);
1103 prefix_len_decoder.get(&mut self.prefix_lengths[..])?;
1104
1105 let mut suffix_decoder = DeltaLengthByteArrayDecoder::new();
1106 suffix_decoder
1107 .set_data(data.slice(prefix_len_decoder.get_offset()..), num_values)?;
1108 self.suffix_decoder = Some(suffix_decoder);
1109 self.num_values = num_prefixes;
1110 self.current_idx = 0;
1111 self.previous_value = Bytes::new();
1112 Ok(())
1113 }
1114 _ => Err(general_err!(
1115 "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1116 )),
1117 }
1118 }
1119
1120 fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
1121 match T::get_physical_type() {
1122 Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1123 let num_values = cmp::min(buffer.len(), self.num_values);
1124 let mut v: [ByteArray; 1] = [ByteArray::new(); 1];
1125 for item in buffer.iter_mut().take(num_values) {
1126 let suffix_decoder = self
1129 .suffix_decoder
1130 .as_mut()
1131 .expect("decoder not initialized");
1132 suffix_decoder.get(&mut v[..])?;
1133 let suffix = v[0].data();
1134
1135 let prefix_len = self.prefix_lengths[self.current_idx] as usize;
1137
1138 let mut result = Vec::with_capacity(prefix_len + suffix.len());
1140 result.extend_from_slice(&self.previous_value[0..prefix_len]);
1141 result.extend_from_slice(suffix);
1142
1143 let data = Bytes::from(result);
1144 item.set_from_bytes(data.clone());
1145
1146 self.previous_value = data;
1147 self.current_idx += 1;
1148 }
1149
1150 self.num_values -= num_values;
1151 Ok(num_values)
1152 }
1153 _ => Err(general_err!(
1154 "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1155 )),
1156 }
1157 }
1158
1159 fn values_left(&self) -> usize {
1160 self.num_values
1161 }
1162
1163 fn encoding(&self) -> Encoding {
1164 Encoding::DELTA_BYTE_ARRAY
1165 }
1166
1167 fn skip(&mut self, num_values: usize) -> Result<usize> {
1168 let mut buffer = vec![T::T::default(); num_values];
1169 self.get(&mut buffer)
1170 }
1171}
1172
1173#[cfg(test)]
1174mod tests {
1175 use super::{super::encoding::*, *};
1176
1177 use std::f32::consts::PI as PI_f32;
1178 use std::f64::consts::PI as PI_f64;
1179 use std::sync::Arc;
1180
1181 use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
1182 use crate::util::test_common::rand_gen::RandGen;
1183
1184 #[test]
1185 fn test_get_decoders() {
1186 create_and_check_decoder::<Int32Type>(Encoding::PLAIN, None);
1188 create_and_check_decoder::<Int32Type>(Encoding::DELTA_BINARY_PACKED, None);
1189 create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
1190 create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, None);
1191 create_and_check_decoder::<BoolType>(Encoding::RLE, None);
1192
1193 create_and_check_decoder::<Int32Type>(
1195 Encoding::RLE_DICTIONARY,
1196 Some(general_err!(
1197 "Cannot initialize this encoding through this function"
1198 )),
1199 );
1200 create_and_check_decoder::<Int32Type>(
1201 Encoding::PLAIN_DICTIONARY,
1202 Some(general_err!(
1203 "Cannot initialize this encoding through this function"
1204 )),
1205 );
1206 create_and_check_decoder::<Int32Type>(
1207 Encoding::DELTA_LENGTH_BYTE_ARRAY,
1208 Some(general_err!(
1209 "Encoding DELTA_LENGTH_BYTE_ARRAY is not supported for type"
1210 )),
1211 );
1212 create_and_check_decoder::<Int32Type>(
1213 Encoding::DELTA_BYTE_ARRAY,
1214 Some(general_err!(
1215 "Encoding DELTA_BYTE_ARRAY is not supported for type"
1216 )),
1217 );
1218
1219 #[allow(deprecated)]
1221 create_and_check_decoder::<Int32Type>(
1222 Encoding::BIT_PACKED,
1223 Some(nyi_err!("Encoding BIT_PACKED is not supported")),
1224 );
1225 }
1226
1227 #[test]
1228 fn test_plain_decode_int32() {
1229 let data = [42, 18, 52];
1230 let data_bytes = Int32Type::to_byte_array(&data[..]);
1231 let mut buffer = [0; 3];
1232 test_plain_decode::<Int32Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1233 }
1234
1235 #[test]
1236 fn test_plain_skip_int32() {
1237 let data = [42, 18, 52];
1238 let data_bytes = Int32Type::to_byte_array(&data[..]);
1239 test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1240 }
1241
1242 #[test]
1243 fn test_plain_skip_all_int32() {
1244 let data = [42, 18, 52];
1245 let data_bytes = Int32Type::to_byte_array(&data[..]);
1246 test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1247 }
1248
1249 #[test]
1250 fn test_plain_decode_int32_spaced() {
1251 let data = [42, 18, 52];
1252 let expected_data = [0, 42, 0, 18, 0, 0, 52, 0];
1253 let data_bytes = Int32Type::to_byte_array(&data[..]);
1254 let mut buffer = [0; 8];
1255 let num_nulls = 5;
1256 let valid_bits = [0b01001010];
1257 test_plain_decode_spaced::<Int32Type>(
1258 Bytes::from(data_bytes),
1259 3,
1260 -1,
1261 &mut buffer[..],
1262 num_nulls,
1263 &valid_bits,
1264 &expected_data[..],
1265 );
1266 }
1267
1268 #[test]
1269 fn test_plain_decode_int64() {
1270 let data = [42, 18, 52];
1271 let data_bytes = Int64Type::to_byte_array(&data[..]);
1272 let mut buffer = [0; 3];
1273 test_plain_decode::<Int64Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1274 }
1275
1276 #[test]
1277 fn test_plain_skip_int64() {
1278 let data = [42, 18, 52];
1279 let data_bytes = Int64Type::to_byte_array(&data[..]);
1280 test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 2, -1, &data[2..]);
1281 }
1282
1283 #[test]
1284 fn test_plain_skip_all_int64() {
1285 let data = [42, 18, 52];
1286 let data_bytes = Int64Type::to_byte_array(&data[..]);
1287 test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 3, -1, &[]);
1288 }
1289
1290 #[test]
1291 fn test_plain_decode_float() {
1292 let data = [PI_f32, 2.414, 12.51];
1293 let data_bytes = FloatType::to_byte_array(&data[..]);
1294 let mut buffer = [0.0; 3];
1295 test_plain_decode::<FloatType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1296 }
1297
1298 #[test]
1299 fn test_plain_skip_float() {
1300 let data = [PI_f32, 2.414, 12.51];
1301 let data_bytes = FloatType::to_byte_array(&data[..]);
1302 test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1303 }
1304
1305 #[test]
1306 fn test_plain_skip_all_float() {
1307 let data = [PI_f32, 2.414, 12.51];
1308 let data_bytes = FloatType::to_byte_array(&data[..]);
1309 test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 4, -1, &[]);
1310 }
1311
1312 #[test]
1313 fn test_plain_skip_double() {
1314 let data = [PI_f64, 2.414f64, 12.51f64];
1315 let data_bytes = DoubleType::to_byte_array(&data[..]);
1316 test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1317 }
1318
1319 #[test]
1320 fn test_plain_skip_all_double() {
1321 let data = [PI_f64, 2.414f64, 12.51f64];
1322 let data_bytes = DoubleType::to_byte_array(&data[..]);
1323 test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1324 }
1325
1326 #[test]
1327 fn test_plain_decode_double() {
1328 let data = [PI_f64, 2.414f64, 12.51f64];
1329 let data_bytes = DoubleType::to_byte_array(&data[..]);
1330 let mut buffer = [0.0f64; 3];
1331 test_plain_decode::<DoubleType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1332 }
1333
1334 #[test]
1335 fn test_plain_decode_int96() {
1336 let mut data = [Int96::new(); 4];
1337 data[0].set_data(11, 22, 33);
1338 data[1].set_data(44, 55, 66);
1339 data[2].set_data(10, 20, 30);
1340 data[3].set_data(40, 50, 60);
1341 let data_bytes = Int96Type::to_byte_array(&data[..]);
1342 let mut buffer = [Int96::new(); 4];
1343 test_plain_decode::<Int96Type>(Bytes::from(data_bytes), 4, -1, &mut buffer[..], &data[..]);
1344 }
1345
1346 #[test]
1347 fn test_plain_skip_int96() {
1348 let mut data = [Int96::new(); 4];
1349 data[0].set_data(11, 22, 33);
1350 data[1].set_data(44, 55, 66);
1351 data[2].set_data(10, 20, 30);
1352 data[3].set_data(40, 50, 60);
1353 let data_bytes = Int96Type::to_byte_array(&data[..]);
1354 test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 2, -1, &data[2..]);
1355 }
1356
1357 #[test]
1358 fn test_plain_skip_all_int96() {
1359 let mut data = [Int96::new(); 4];
1360 data[0].set_data(11, 22, 33);
1361 data[1].set_data(44, 55, 66);
1362 data[2].set_data(10, 20, 30);
1363 data[3].set_data(40, 50, 60);
1364 let data_bytes = Int96Type::to_byte_array(&data[..]);
1365 test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 8, -1, &[]);
1366 }
1367
1368 #[test]
1369 fn test_plain_decode_bool() {
1370 let data = [
1371 false, true, false, false, true, false, true, true, false, true,
1372 ];
1373 let data_bytes = BoolType::to_byte_array(&data[..]);
1374 let mut buffer = [false; 10];
1375 test_plain_decode::<BoolType>(Bytes::from(data_bytes), 10, -1, &mut buffer[..], &data[..]);
1376 }
1377
1378 #[test]
1379 fn test_plain_skip_bool() {
1380 let data = [
1381 false, true, false, false, true, false, true, true, false, true,
1382 ];
1383 let data_bytes = BoolType::to_byte_array(&data[..]);
1384 test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 5, -1, &data[5..]);
1385 }
1386
1387 #[test]
1388 fn test_plain_skip_all_bool() {
1389 let data = [
1390 false, true, false, false, true, false, true, true, false, true,
1391 ];
1392 let data_bytes = BoolType::to_byte_array(&data[..]);
1393 test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 20, -1, &[]);
1394 }
1395
1396 #[test]
1397 fn test_plain_decode_byte_array() {
1398 let mut data = vec![ByteArray::new(); 2];
1399 data[0].set_data(Bytes::from(String::from("hello")));
1400 data[1].set_data(Bytes::from(String::from("parquet")));
1401 let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1402 let mut buffer = vec![ByteArray::new(); 2];
1403 test_plain_decode::<ByteArrayType>(
1404 Bytes::from(data_bytes),
1405 2,
1406 -1,
1407 &mut buffer[..],
1408 &data[..],
1409 );
1410 }
1411
1412 #[test]
1413 fn test_plain_skip_byte_array() {
1414 let mut data = vec![ByteArray::new(); 2];
1415 data[0].set_data(Bytes::from(String::from("hello")));
1416 data[1].set_data(Bytes::from(String::from("parquet")));
1417 let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1418 test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 1, -1, &data[1..]);
1419 }
1420
1421 #[test]
1422 fn test_plain_skip_all_byte_array() {
1423 let mut data = vec![ByteArray::new(); 2];
1424 data[0].set_data(Bytes::from(String::from("hello")));
1425 data[1].set_data(Bytes::from(String::from("parquet")));
1426 let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1427 test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 2, -1, &[]);
1428 }
1429
1430 #[test]
1431 fn test_plain_decode_fixed_len_byte_array() {
1432 let mut data = vec![FixedLenByteArray::default(); 3];
1433 data[0].set_data(Bytes::from(String::from("bird")));
1434 data[1].set_data(Bytes::from(String::from("come")));
1435 data[2].set_data(Bytes::from(String::from("flow")));
1436 let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1437 let mut buffer = vec![FixedLenByteArray::default(); 3];
1438 test_plain_decode::<FixedLenByteArrayType>(
1439 Bytes::from(data_bytes),
1440 3,
1441 4,
1442 &mut buffer[..],
1443 &data[..],
1444 );
1445 }
1446
1447 #[test]
1448 fn test_plain_skip_fixed_len_byte_array() {
1449 let mut data = vec![FixedLenByteArray::default(); 3];
1450 data[0].set_data(Bytes::from(String::from("bird")));
1451 data[1].set_data(Bytes::from(String::from("come")));
1452 data[2].set_data(Bytes::from(String::from("flow")));
1453 let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1454 test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 1, 4, &data[1..]);
1455 }
1456
1457 #[test]
1458 fn test_plain_skip_all_fixed_len_byte_array() {
1459 let mut data = vec![FixedLenByteArray::default(); 3];
1460 data[0].set_data(Bytes::from(String::from("bird")));
1461 data[1].set_data(Bytes::from(String::from("come")));
1462 data[2].set_data(Bytes::from(String::from("flow")));
1463 let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1464 test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 6, 4, &[]);
1465 }
1466
1467 #[test]
1468 fn test_dict_decoder_empty_data() {
1469 let mut decoder = DictDecoder::<Int32Type>::new();
1470 let err = decoder.set_data(Bytes::new(), 10).unwrap_err();
1471 assert_eq!(err.to_string(), "EOF: Not enough bytes to decode bit_width");
1472 }
1473
1474 fn test_plain_decode<T: DataType>(
1475 data: Bytes,
1476 num_values: usize,
1477 type_length: i32,
1478 buffer: &mut [T::T],
1479 expected: &[T::T],
1480 ) {
1481 let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1482 let result = decoder.set_data(data, num_values);
1483 assert!(result.is_ok());
1484 let result = decoder.get(buffer);
1485 assert!(result.is_ok());
1486 assert_eq!(decoder.values_left(), 0);
1487 assert_eq!(buffer, expected);
1488 }
1489
1490 fn test_plain_skip<T: DataType>(
1491 data: Bytes,
1492 num_values: usize,
1493 skip: usize,
1494 type_length: i32,
1495 expected: &[T::T],
1496 ) {
1497 let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1498 let result = decoder.set_data(data, num_values);
1499 assert!(result.is_ok());
1500 let skipped = decoder.skip(skip).expect("skipping values");
1501
1502 if skip >= num_values {
1503 assert_eq!(skipped, num_values);
1504
1505 let mut buffer = vec![T::T::default(); 1];
1506 let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1507 assert_eq!(remaining, 0);
1508 } else {
1509 assert_eq!(skipped, skip);
1510 let mut buffer = vec![T::T::default(); num_values - skip];
1511 let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1512 assert_eq!(remaining, num_values - skip);
1513 assert_eq!(decoder.values_left(), 0);
1514 assert_eq!(buffer, expected);
1515 }
1516 }
1517
1518 fn test_plain_decode_spaced<T: DataType>(
1519 data: Bytes,
1520 num_values: usize,
1521 type_length: i32,
1522 buffer: &mut [T::T],
1523 num_nulls: usize,
1524 valid_bits: &[u8],
1525 expected: &[T::T],
1526 ) {
1527 let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1528 let result = decoder.set_data(data, num_values);
1529 assert!(result.is_ok());
1530 let result = decoder.get_spaced(buffer, num_nulls, valid_bits);
1531 assert!(result.is_ok());
1532 assert_eq!(num_values + num_nulls, result.unwrap());
1533 assert_eq!(decoder.values_left(), 0);
1534 assert_eq!(buffer, expected);
1535 }
1536
1537 #[test]
1538 #[should_panic(expected = "RleValueEncoder only supports BoolType")]
1539 fn test_rle_value_encode_int32_not_supported() {
1540 let mut encoder = RleValueEncoder::<Int32Type>::new();
1541 encoder.put(&[1, 2, 3, 4]).unwrap();
1542 }
1543
1544 #[test]
1545 #[should_panic(expected = "RleValueDecoder only supports BoolType")]
1546 fn test_rle_value_decode_int32_not_supported() {
1547 let mut decoder = RleValueDecoder::<Int32Type>::new();
1548 decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).unwrap();
1549 }
1550
1551 #[test]
1552 fn test_rle_value_decode_missing_size() {
1553 let mut decoder = RleValueDecoder::<BoolType>::new();
1554 assert!(decoder.set_data(Bytes::from(vec![0]), 1).is_err());
1555 }
1556
1557 #[test]
1558 fn test_rle_value_decode_missing_data() {
1559 let mut decoder = RleValueDecoder::<BoolType>::new();
1560 assert!(decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).is_err());
1561 }
1562
1563 #[test]
1564 fn test_rle_value_decode_bool_decode() {
1565 let data = vec![
1567 BoolType::gen_vec(-1, 256),
1568 BoolType::gen_vec(-1, 257),
1569 BoolType::gen_vec(-1, 126),
1570 ];
1571 test_rle_value_decode::<BoolType>(data);
1572 }
1573
1574 #[test]
1575 #[should_panic(expected = "Bit reader is not initialized")]
1576 fn test_delta_bit_packed_not_initialized_offset() {
1577 let decoder = DeltaBitPackDecoder::<Int32Type>::new();
1579 decoder.get_offset();
1580 }
1581
1582 #[test]
1583 #[should_panic(expected = "Bit reader is not initialized")]
1584 fn test_delta_bit_packed_not_initialized_get() {
1585 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1587 let mut buffer = vec![];
1588 decoder.get(&mut buffer).unwrap();
1589 }
1590
1591 #[test]
1592 fn test_delta_bit_packed_int32_empty() {
1593 let data = vec![vec![0; 0]];
1594 test_delta_bit_packed_decode::<Int32Type>(data);
1595 }
1596
1597 #[test]
1598 fn test_delta_bit_packed_int32_repeat() {
1599 let block_data = vec![
1600 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1601 6, 7, 8,
1602 ];
1603 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1604 }
1605
1606 #[test]
1607 fn test_skip_delta_bit_packed_int32_repeat() {
1608 let block_data = vec![
1609 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1610 6, 7, 8,
1611 ];
1612 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 10);
1613 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1614 }
1615
1616 #[test]
1617 fn test_delta_bit_packed_int32_uneven() {
1618 let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1619 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1620 }
1621
1622 #[test]
1623 fn test_skip_delta_bit_packed_int32_uneven() {
1624 let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1625 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1626 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1627 }
1628
1629 #[test]
1630 fn test_delta_bit_packed_int32_same_values() {
1631 let block_data = vec![
1632 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1633 ];
1634 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1635
1636 let block_data = vec![
1637 -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1638 -127, -127,
1639 ];
1640 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1641 }
1642
1643 #[test]
1644 fn test_skip_delta_bit_packed_int32_same_values() {
1645 let block_data = vec![
1646 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1647 ];
1648 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1649 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1650
1651 let block_data = vec![
1652 -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1653 -127, -127,
1654 ];
1655 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1656 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1657 }
1658
1659 #[test]
1660 fn test_delta_bit_packed_int32_min_max() {
1661 let block_data = vec![
1662 i32::MIN,
1663 i32::MIN,
1664 i32::MIN,
1665 i32::MAX,
1666 i32::MIN,
1667 i32::MAX,
1668 i32::MIN,
1669 i32::MAX,
1670 ];
1671 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1672 }
1673
1674 #[test]
1675 fn test_skip_delta_bit_packed_int32_min_max() {
1676 let block_data = vec![
1677 i32::MIN,
1678 i32::MIN,
1679 i32::MIN,
1680 i32::MAX,
1681 i32::MIN,
1682 i32::MAX,
1683 i32::MIN,
1684 i32::MAX,
1685 ];
1686 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1687 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1688 }
1689
1690 #[test]
1691 fn test_delta_bit_packed_int32_multiple_blocks() {
1692 let data = vec![
1694 Int32Type::gen_vec(-1, 64),
1695 Int32Type::gen_vec(-1, 128),
1696 Int32Type::gen_vec(-1, 64),
1697 ];
1698 test_delta_bit_packed_decode::<Int32Type>(data);
1699 }
1700
1701 #[test]
1702 fn test_delta_bit_packed_int32_data_across_blocks() {
1703 let data = vec![Int32Type::gen_vec(-1, 256), Int32Type::gen_vec(-1, 257)];
1705 test_delta_bit_packed_decode::<Int32Type>(data);
1706 }
1707
1708 #[test]
1709 fn test_delta_bit_packed_int32_with_empty_blocks() {
1710 let data = vec![
1711 Int32Type::gen_vec(-1, 128),
1712 vec![0; 0],
1713 Int32Type::gen_vec(-1, 64),
1714 ];
1715 test_delta_bit_packed_decode::<Int32Type>(data);
1716 }
1717
1718 #[test]
1719 fn test_delta_bit_packed_int64_empty() {
1720 let data = vec![vec![0; 0]];
1721 test_delta_bit_packed_decode::<Int64Type>(data);
1722 }
1723
1724 #[test]
1725 fn test_delta_bit_packed_int64_min_max() {
1726 let block_data = vec![
1727 i64::MIN,
1728 i64::MAX,
1729 i64::MIN,
1730 i64::MAX,
1731 i64::MIN,
1732 i64::MAX,
1733 i64::MIN,
1734 i64::MAX,
1735 ];
1736 test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
1737 }
1738
1739 #[test]
1740 fn test_delta_bit_packed_int64_multiple_blocks() {
1741 let data = vec![
1743 Int64Type::gen_vec(-1, 64),
1744 Int64Type::gen_vec(-1, 128),
1745 Int64Type::gen_vec(-1, 64),
1746 ];
1747 test_delta_bit_packed_decode::<Int64Type>(data);
1748 }
1749
1750 #[test]
1751 fn test_delta_bit_packed_zero_miniblocks() {
1752 let data = vec![
1754 128, 1, 0, ];
1757 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1758 let err = decoder.set_data(data.into(), 0).unwrap_err();
1759 assert_eq!(
1760 err.to_string(),
1761 "Parquet error: cannot have zero miniblocks per block"
1762 );
1763 }
1764
1765 #[test]
1766 fn test_delta_bit_packed_decoder_sample() {
1767 let data_bytes = vec![
1768 128, 1, 4, 3, 58, 28, 6, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1769 0, 0, 0, 0, 0, 0,
1770 ];
1771 let mut decoder: DeltaBitPackDecoder<Int32Type> = DeltaBitPackDecoder::new();
1772 decoder.set_data(data_bytes.into(), 3).unwrap();
1773 assert_eq!(decoder.get_offset(), 5);
1776 let mut result = vec![0, 0, 0];
1777 decoder.get(&mut result).unwrap();
1778 assert_eq!(decoder.get_offset(), 34);
1779 assert_eq!(result, vec![29, 43, 89]);
1780 }
1781
1782 #[test]
1783 fn test_delta_bit_packed_padding() {
1784 let header = vec![
1786 128,
1790 2,
1791 4,
1793 128 + 35,
1795 3,
1796 7,
1798 ];
1799
1800 let block1_header = vec![
1802 0, 0, 1, 0, 0, ];
1805
1806 let block1 = vec![0xFF; 8];
1811
1812 let block2_header = vec![
1814 0, 0, 1, 2, 0xFF, ];
1817
1818 let block2 = vec![0xFF; 24];
1823
1824 let data: Vec<u8> = header
1825 .into_iter()
1826 .chain(block1_header)
1827 .chain(block1)
1828 .chain(block2_header)
1829 .chain(block2)
1830 .collect();
1831
1832 let length = data.len();
1833
1834 let ptr = Bytes::from(data);
1835 let mut reader = BitReader::new(ptr.clone());
1836 assert_eq!(reader.get_vlq_int().unwrap(), 256);
1837 assert_eq!(reader.get_vlq_int().unwrap(), 4);
1838 assert_eq!(reader.get_vlq_int().unwrap(), 419);
1839 assert_eq!(reader.get_vlq_int().unwrap(), 7);
1840
1841 let mut output = vec![0_i32; 420];
1843
1844 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1845 decoder.set_data(ptr.clone(), 0).unwrap();
1846 assert_eq!(decoder.get(&mut output).unwrap(), 419);
1847 assert_eq!(decoder.get_offset(), length);
1848
1849 decoder.set_data(ptr.slice(..12), 0).unwrap();
1851 let err = decoder.get(&mut output).unwrap_err().to_string();
1852 assert!(
1853 err.contains("Expected to read 64 values from miniblock got 8"),
1854 "{}",
1855 err
1856 );
1857 }
1858
1859 #[test]
1860 fn test_delta_bit_packed_int32_single_value_large() {
1861 let block_data = vec![3; 10240];
1862 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1863 }
1864
1865 #[test]
1866 fn test_delta_bit_packed_int32_single_value_skip_large() {
1867 let block_data = vec![3; 10240];
1868 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1869 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 5000);
1870 }
1871
1872 #[test]
1873 fn test_delta_bit_packed_int32_increasing_value_large() {
1874 let block_data = (0i32..10240).collect();
1875 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1876 }
1877
1878 #[test]
1879 fn test_delta_bit_packed_int32_increasing_value_skip_large() {
1880 let block_data = (0i32..10240).collect::<Vec<i32>>();
1881 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1882 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 5000);
1883 }
1884
1885 #[test]
1886 fn test_delta_bit_packed_int32_stepped_value_large() {
1887 let block_data = (0i32..10240).map(|i| i / 2).collect();
1888 test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1889 }
1890
1891 #[test]
1892 fn test_delta_bit_packed_int32_stepped_value_skip_large() {
1893 let block_data = (0i32..10240).map(|i| i / 2).collect::<Vec<i32>>();
1894 test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1895 test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 5000);
1896 }
1897
1898 #[test]
1899 fn test_delta_bit_packed_int32_mixed_large() {
1900 const BLOCK_SIZE: i32 = 133;
1903 let block1_data = (0..BLOCK_SIZE).map(|i| (i * 7) % 11).collect();
1904 let block2_data = vec![3; BLOCK_SIZE as usize];
1905 let block3_data = (0..BLOCK_SIZE).map(|i| (i * 5) % 13).collect();
1906 let block4_data = (0..BLOCK_SIZE).collect();
1907 let block5_data = (0..BLOCK_SIZE).map(|i| (i * 3) % 17).collect();
1908 test_delta_bit_packed_decode::<Int32Type>(vec![
1909 block1_data,
1910 block2_data,
1911 block3_data,
1912 block4_data,
1913 block5_data,
1914 ]);
1915 }
1916
1917 #[test]
1918 fn test_delta_bit_packed_int64_single_value_large() {
1919 let block_data = vec![5; 10240];
1920 test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
1921 }
1922
1923 #[test]
1924 fn test_delta_bit_packed_int64_increasing_value_large() {
1925 let block_data = (0i64..10240).collect();
1926 test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
1927 }
1928
1929 #[test]
1930 fn test_delta_byte_array_same_arrays() {
1931 let data = vec![
1932 vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])],
1933 vec![
1934 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1935 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1936 ],
1937 vec![
1938 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1939 ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1940 ],
1941 ];
1942 test_delta_byte_array_decode(data);
1943 }
1944
1945 #[test]
1946 fn test_delta_byte_array_unique_arrays() {
1947 let data = vec![
1948 vec![ByteArray::from(vec![1])],
1949 vec![ByteArray::from(vec![2, 3]), ByteArray::from(vec![4, 5, 6])],
1950 vec![
1951 ByteArray::from(vec![7, 8]),
1952 ByteArray::from(vec![9, 0, 1, 2]),
1953 ],
1954 ];
1955 test_delta_byte_array_decode(data);
1956 }
1957
1958 #[test]
1959 fn test_delta_byte_array_single_array() {
1960 let data = vec![vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])]];
1961 test_delta_byte_array_decode(data);
1962 }
1963
1964 #[test]
1965 fn test_byte_stream_split_multiple_f32() {
1966 let data = vec![
1967 vec![
1968 f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
1969 f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
1970 ],
1971 vec![f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
1972 ];
1973 test_byte_stream_split_decode::<FloatType>(data, -1);
1974 }
1975
1976 #[test]
1977 fn test_byte_stream_split_f64() {
1978 let data = vec![vec![
1979 f64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
1980 f64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
1981 ]];
1982 test_byte_stream_split_decode::<DoubleType>(data, -1);
1983 }
1984
1985 #[test]
1986 fn test_byte_stream_split_multiple_i32() {
1987 let data = vec![
1988 vec![
1989 i32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
1990 i32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
1991 ],
1992 vec![i32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
1993 ];
1994 test_byte_stream_split_decode::<Int32Type>(data, -1);
1995 }
1996
1997 #[test]
1998 fn test_byte_stream_split_i64() {
1999 let data = vec![vec![
2000 i64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
2001 i64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
2002 ]];
2003 test_byte_stream_split_decode::<Int64Type>(data, -1);
2004 }
2005
2006 fn test_byte_stream_split_flba(type_width: usize) {
2007 let data = vec![
2008 vec![
2009 FixedLenByteArrayType::r#gen(type_width as i32),
2010 FixedLenByteArrayType::r#gen(type_width as i32),
2011 ],
2012 vec![FixedLenByteArrayType::r#gen(type_width as i32)],
2013 ];
2014 test_byte_stream_split_decode::<FixedLenByteArrayType>(data, type_width as i32);
2015 }
2016
2017 #[test]
2018 fn test_byte_stream_split_flba5() {
2019 test_byte_stream_split_flba(5);
2020 }
2021
2022 #[test]
2023 fn test_byte_stream_split_flba16() {
2024 test_byte_stream_split_flba(16);
2025 }
2026
2027 #[test]
2028 fn test_byte_stream_split_flba19() {
2029 test_byte_stream_split_flba(19);
2030 }
2031
2032 #[test]
2033 #[should_panic(expected = "Mismatched FixedLenByteArray sizes: 4 != 5")]
2034 fn test_byte_stream_split_flba_mismatch() {
2035 let data = vec![
2036 vec![
2037 FixedLenByteArray::from(vec![0xAA, 0xAB, 0xAC, 0xAD, 0xAE]),
2038 FixedLenByteArray::from(vec![0xBA, 0xBB, 0xBC, 0xBD, 0xBE]),
2039 ],
2040 vec![FixedLenByteArray::from(vec![0xCA, 0xCB, 0xCC, 0xCD])],
2041 ];
2042 test_byte_stream_split_decode::<FixedLenByteArrayType>(data, 5);
2043 }
2044
2045 #[test]
2046 #[should_panic(expected = "Input data length is not a multiple of type width 4")]
2047 fn test_byte_stream_split_flba_bad_input() {
2048 let mut decoder = VariableWidthByteStreamSplitDecoder::<FixedLenByteArrayType>::new(4);
2049 decoder
2050 .set_data(Bytes::from(vec![1, 2, 3, 4, 5]), 1)
2051 .unwrap();
2052 }
2053
2054 #[test]
2055 fn test_skip_byte_stream_split() {
2056 let block_data = vec![0.3, 0.4, 0.1, 4.10];
2057 test_skip::<FloatType>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
2058 test_skip::<DoubleType>(
2059 block_data.into_iter().map(|x| x as f64).collect(),
2060 Encoding::BYTE_STREAM_SPLIT,
2061 100,
2062 );
2063 }
2064
2065 #[test]
2066 fn test_skip_byte_stream_split_ints() {
2067 let block_data = vec![3, 4, 1, 5];
2068 test_skip::<Int32Type>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
2069 test_skip::<Int64Type>(
2070 block_data.into_iter().map(|x| x as i64).collect(),
2071 Encoding::BYTE_STREAM_SPLIT,
2072 100,
2073 );
2074 }
2075
2076 fn test_rle_value_decode<T: DataType>(data: Vec<Vec<T::T>>) {
2077 test_encode_decode::<T>(data, Encoding::RLE, -1);
2078 }
2079
2080 fn test_delta_bit_packed_decode<T: DataType>(data: Vec<Vec<T::T>>) {
2081 test_encode_decode::<T>(data, Encoding::DELTA_BINARY_PACKED, -1);
2082 }
2083
2084 fn test_byte_stream_split_decode<T: DataType>(data: Vec<Vec<T::T>>, type_width: i32) {
2085 test_encode_decode::<T>(data, Encoding::BYTE_STREAM_SPLIT, type_width);
2086 }
2087
2088 fn test_delta_byte_array_decode(data: Vec<Vec<ByteArray>>) {
2089 test_encode_decode::<ByteArrayType>(data, Encoding::DELTA_BYTE_ARRAY, -1);
2090 }
2091
2092 fn test_encode_decode<T: DataType>(data: Vec<Vec<T::T>>, encoding: Encoding, type_width: i32) {
2097 let col_descr = create_test_col_desc_ptr(type_width, T::get_physical_type());
2098
2099 let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
2101
2102 for v in &data[..] {
2103 encoder.put(&v[..]).expect("ok to encode");
2104 }
2105 let bytes = encoder.flush_buffer().expect("ok to flush buffer");
2106
2107 let expected: Vec<T::T> = data.iter().flat_map(|s| s.clone()).collect();
2109
2110 let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
2112
2113 let mut result = vec![T::T::default(); expected.len()];
2114 decoder
2115 .set_data(bytes, expected.len())
2116 .expect("ok to set data");
2117 let mut result_num_values = 0;
2118 while decoder.values_left() > 0 {
2119 result_num_values += decoder
2120 .get(&mut result[result_num_values..])
2121 .expect("ok to decode");
2122 }
2123 assert_eq!(result_num_values, expected.len());
2124 assert_eq!(result, expected);
2125 }
2126
2127 fn test_skip<T: DataType>(data: Vec<T::T>, encoding: Encoding, skip: usize) {
2128 let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());
2131
2132 let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
2134
2135 encoder.put(&data).expect("ok to encode");
2136
2137 let bytes = encoder.flush_buffer().expect("ok to flush buffer");
2138
2139 let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
2140 decoder.set_data(bytes, data.len()).expect("ok to set data");
2141
2142 if skip >= data.len() {
2143 let skipped = decoder.skip(skip).expect("ok to skip");
2144 assert_eq!(skipped, data.len());
2145
2146 let skipped_again = decoder.skip(skip).expect("ok to skip again");
2147 assert_eq!(skipped_again, 0);
2148 } else {
2149 let skipped = decoder.skip(skip).expect("ok to skip");
2150 assert_eq!(skipped, skip);
2151
2152 let remaining = data.len() - skip;
2153
2154 let expected = &data[skip..];
2155 let mut buffer = vec![T::T::default(); remaining];
2156 let fetched = decoder.get(&mut buffer).expect("ok to decode");
2157 assert_eq!(remaining, fetched);
2158 assert_eq!(&buffer, expected);
2159 }
2160 }
2161
2162 fn create_and_check_decoder<T: DataType>(encoding: Encoding, err: Option<ParquetError>) {
2163 let descr = create_test_col_desc_ptr(-1, T::get_physical_type());
2164 let decoder = get_decoder::<T>(descr, encoding);
2165 match err {
2166 Some(parquet_error) => {
2167 assert_eq!(
2168 decoder.err().unwrap().to_string(),
2169 parquet_error.to_string()
2170 );
2171 }
2172 None => {
2173 assert_eq!(decoder.unwrap().encoding(), encoding);
2174 }
2175 }
2176 }
2177
2178 fn create_test_col_desc_ptr(type_len: i32, t: Type) -> ColumnDescPtr {
2180 let ty = SchemaType::primitive_type_builder("t", t)
2181 .with_length(type_len)
2182 .build()
2183 .unwrap();
2184 Arc::new(ColumnDescriptor::new(
2185 Arc::new(ty),
2186 0,
2187 0,
2188 ColumnPath::new(vec![]),
2189 ))
2190 }
2191
2192 fn usize_to_bytes(v: usize) -> [u8; 4] {
2193 (v as u32).to_ne_bytes()
2194 }
2195
2196 trait ToByteArray<T: DataType> {
2198 #[allow(clippy::wrong_self_convention)]
2199 fn to_byte_array(data: &[T::T]) -> Vec<u8>;
2200 }
2201
2202 macro_rules! to_byte_array_impl {
2203 ($ty: ty) => {
2204 impl ToByteArray<$ty> for $ty {
2205 #[allow(clippy::wrong_self_convention)]
2206 fn to_byte_array(data: &[<$ty as DataType>::T]) -> Vec<u8> {
2207 <$ty as DataType>::T::slice_as_bytes(data).to_vec()
2208 }
2209 }
2210 };
2211 }
2212
2213 to_byte_array_impl!(Int32Type);
2214 to_byte_array_impl!(Int64Type);
2215 to_byte_array_impl!(FloatType);
2216 to_byte_array_impl!(DoubleType);
2217
2218 impl ToByteArray<BoolType> for BoolType {
2219 #[allow(clippy::wrong_self_convention)]
2220 fn to_byte_array(data: &[bool]) -> Vec<u8> {
2221 let mut v = vec![];
2222 for (i, item) in data.iter().enumerate() {
2223 if i % 8 == 0 {
2224 v.push(0);
2225 }
2226 if *item {
2227 v[i / 8] |= 1 << (i % 8);
2228 }
2229 }
2230 v
2231 }
2232 }
2233
2234 impl ToByteArray<Int96Type> for Int96Type {
2235 #[allow(clippy::wrong_self_convention)]
2236 fn to_byte_array(data: &[Int96]) -> Vec<u8> {
2237 let mut v = vec![];
2238 for d in data {
2239 v.extend_from_slice(d.as_bytes());
2240 }
2241 v
2242 }
2243 }
2244
2245 impl ToByteArray<ByteArrayType> for ByteArrayType {
2246 #[allow(clippy::wrong_self_convention)]
2247 fn to_byte_array(data: &[ByteArray]) -> Vec<u8> {
2248 let mut v = vec![];
2249 for d in data {
2250 let buf = d.data();
2251 let len = &usize_to_bytes(buf.len());
2252 v.extend_from_slice(len);
2253 v.extend(buf);
2254 }
2255 v
2256 }
2257 }
2258
2259 impl ToByteArray<FixedLenByteArrayType> for FixedLenByteArrayType {
2260 #[allow(clippy::wrong_self_convention)]
2261 fn to_byte_array(data: &[FixedLenByteArray]) -> Vec<u8> {
2262 let mut v = vec![];
2263 for d in data {
2264 let buf = d.data();
2265 v.extend(buf);
2266 }
2267 v
2268 }
2269 }
2270
2271 #[test]
2272 #[allow(clippy::vec_init_then_push)]
2274 fn test_delta_bit_packed_invalid_bit_width() {
2275 let mut buffer = vec![];
2277 buffer.push(128);
2279 buffer.push(1);
2280 buffer.push(4);
2282 buffer.push(32);
2284 buffer.push(0);
2286 buffer.push(0);
2288 buffer.push(33); buffer.push(0);
2291 buffer.push(0);
2292 buffer.push(0);
2293
2294 let corrupted_buffer = Bytes::from(buffer);
2295
2296 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
2297 decoder.set_data(corrupted_buffer.clone(), 32).unwrap();
2298 let mut read_buffer = vec![0; 32];
2299 let err = decoder.get(&mut read_buffer).unwrap_err();
2300 assert!(
2301 err.to_string()
2302 .contains("Invalid delta bit width 33 which is larger than expected 32"),
2303 "{}",
2304 err
2305 );
2306
2307 let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
2308 decoder.set_data(corrupted_buffer, 32).unwrap();
2309 let err = decoder.skip(32).unwrap_err();
2310 assert!(
2311 err.to_string()
2312 .contains("Invalid delta bit width 33 which is larger than expected 32"),
2313 "{}",
2314 err
2315 );
2316 }
2317}