1use std::{cmp, marker::PhantomData};
21
22use crate::basic::*;
23use crate::data_type::private::ParquetValueType;
24use crate::data_type::*;
25use crate::encodings::rle::RleEncoder;
26use crate::errors::{ParquetError, Result};
27use crate::schema::types::ColumnDescPtr;
28use crate::util::bit_util::{BitWriter, num_required_bits};
29
30use byte_stream_split_encoder::{ByteStreamSplitEncoder, VariableWidthByteStreamSplitEncoder};
31use bytes::Bytes;
32pub use dict_encoder::DictEncoder;
33
34mod byte_stream_split_encoder;
35mod dict_encoder;
36
37pub trait Encoder<T: DataType>: Send {
45 fn put(&mut self, values: &[T::T]) -> Result<()>;
47
48 #[cfg(test)]
53 fn put_spaced(&mut self, values: &[T::T], valid_bits: &[u8]) -> Result<usize> {
54 let num_values = values.len();
55 let mut buffer = Vec::with_capacity(num_values);
56 for (i, item) in values.iter().enumerate().take(num_values) {
58 if crate::util::bit_util::get_bit(valid_bits, i) {
59 buffer.push(item.clone());
60 }
61 }
62 self.put(&buffer[..])?;
63 Ok(buffer.len())
64 }
65
66 fn encoding(&self) -> Encoding;
68
69 fn estimated_data_encoded_size(&self) -> usize;
72
73 fn estimated_memory_size(&self) -> usize;
75
76 fn flush_buffer(&mut self) -> Result<Bytes>;
79}
80
81pub fn get_encoder<T: DataType>(
84 encoding: Encoding,
85 descr: &ColumnDescPtr,
86) -> Result<Box<dyn Encoder<T>>> {
87 let encoder: Box<dyn Encoder<T>> = match encoding {
88 Encoding::PLAIN => Box::new(PlainEncoder::new()),
89 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
90 return Err(general_err!(
91 "Cannot initialize this encoding through this function"
92 ));
93 }
94 Encoding::RLE => Box::new(RleValueEncoder::new()),
95 Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackEncoder::new()),
96 Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayEncoder::new()),
97 Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayEncoder::new()),
98 Encoding::BYTE_STREAM_SPLIT => match T::get_physical_type() {
99 Type::FIXED_LEN_BYTE_ARRAY => Box::new(VariableWidthByteStreamSplitEncoder::new(
100 descr.type_length(),
101 )),
102 _ => Box::new(ByteStreamSplitEncoder::new()),
103 },
104 e => return Err(nyi_err!("Encoding {} is not supported", e)),
105 };
106 Ok(encoder)
107}
108
109pub struct PlainEncoder<T: DataType> {
124 buffer: Vec<u8>,
125 bit_writer: BitWriter,
126 _phantom: PhantomData<T>,
127}
128
129impl<T: DataType> Default for PlainEncoder<T> {
130 fn default() -> Self {
131 Self::new()
132 }
133}
134
135impl<T: DataType> PlainEncoder<T> {
136 pub fn new() -> Self {
138 Self {
139 buffer: vec![],
140 bit_writer: BitWriter::new(256),
141 _phantom: PhantomData,
142 }
143 }
144}
145
146impl<T: DataType> Encoder<T> for PlainEncoder<T> {
147 #[cold]
151 fn encoding(&self) -> Encoding {
152 Encoding::PLAIN
153 }
154
155 fn estimated_data_encoded_size(&self) -> usize {
156 self.buffer.len() + self.bit_writer.bytes_written()
157 }
158
159 #[inline]
160 fn flush_buffer(&mut self) -> Result<Bytes> {
161 self.buffer
162 .extend_from_slice(self.bit_writer.flush_buffer());
163 self.bit_writer.clear();
164 Ok(std::mem::take(&mut self.buffer).into())
165 }
166
167 #[inline]
168 fn put(&mut self, values: &[T::T]) -> Result<()> {
169 T::T::encode(values, &mut self.buffer, &mut self.bit_writer)?;
170 Ok(())
171 }
172
173 fn estimated_memory_size(&self) -> usize {
175 self.buffer.capacity() * std::mem::size_of::<u8>() + self.bit_writer.estimated_memory_size()
176 }
177}
178
179const DEFAULT_RLE_BUFFER_LEN: usize = 1024;
183
184pub struct RleValueEncoder<T: DataType> {
187 encoder: Option<RleEncoder>,
190 _phantom: PhantomData<T>,
191}
192
193impl<T: DataType> Default for RleValueEncoder<T> {
194 fn default() -> Self {
195 Self::new()
196 }
197}
198
199impl<T: DataType> RleValueEncoder<T> {
200 pub fn new() -> Self {
202 Self {
203 encoder: None,
204 _phantom: PhantomData,
205 }
206 }
207}
208
209impl<T: DataType> Encoder<T> for RleValueEncoder<T> {
210 #[inline]
211 fn put(&mut self, values: &[T::T]) -> Result<()> {
212 ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType");
213
214 let rle_encoder = self.encoder.get_or_insert_with(|| {
215 let mut buffer = Vec::with_capacity(DEFAULT_RLE_BUFFER_LEN);
216 buffer.extend_from_slice(&[0; 4]);
218 RleEncoder::new_from_buf(1, buffer)
219 });
220
221 for value in values {
222 let value = value.as_u64()?;
223 rle_encoder.put(value)
224 }
225 Ok(())
226 }
227
228 #[cold]
232 fn encoding(&self) -> Encoding {
233 Encoding::RLE
234 }
235
236 #[inline]
237 fn estimated_data_encoded_size(&self) -> usize {
238 match self.encoder {
239 Some(ref enc) => enc.len(),
240 None => 0,
241 }
242 }
243
244 #[inline]
245 fn flush_buffer(&mut self) -> Result<Bytes> {
246 ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType");
247 let rle_encoder = self
248 .encoder
249 .take()
250 .expect("RLE value encoder is not initialized");
251
252 let mut buf = rle_encoder.consume();
254 assert!(buf.len() >= 4, "should have had padding inserted");
255
256 let len = (buf.len() - 4) as i32;
258 buf[..4].copy_from_slice(&len.to_le_bytes());
259
260 Ok(buf.into())
261 }
262
263 fn estimated_memory_size(&self) -> usize {
265 self.encoder
266 .as_ref()
267 .map_or(0, |enc| enc.estimated_memory_size())
268 }
269}
270
271const MAX_PAGE_HEADER_WRITER_SIZE: usize = 32;
275const DEFAULT_BIT_WRITER_SIZE: usize = 1024 * 1024;
276const DEFAULT_NUM_MINI_BLOCKS: usize = 4;
277
278pub struct DeltaBitPackEncoder<T: DataType> {
302 page_header_writer: BitWriter,
303 bit_writer: BitWriter,
304 total_values: usize,
305 first_value: i64,
306 current_value: i64,
307 block_size: usize,
308 mini_block_size: usize,
309 num_mini_blocks: usize,
310 values_in_block: usize,
311 deltas: Vec<i64>,
312 _phantom: PhantomData<T>,
313}
314
315impl<T: DataType> Default for DeltaBitPackEncoder<T> {
316 fn default() -> Self {
317 Self::new()
318 }
319}
320
321impl<T: DataType> DeltaBitPackEncoder<T> {
322 pub fn new() -> Self {
324 Self::assert_supported_type();
325
326 let mini_block_size = match T::T::PHYSICAL_TYPE {
328 Type::INT32 => 32,
329 Type::INT64 => 64,
330 _ => unreachable!(),
331 };
332
333 let num_mini_blocks = DEFAULT_NUM_MINI_BLOCKS;
334 let block_size = mini_block_size * num_mini_blocks;
335 assert_eq!(block_size % 128, 0);
336
337 DeltaBitPackEncoder {
338 page_header_writer: BitWriter::new(MAX_PAGE_HEADER_WRITER_SIZE),
339 bit_writer: BitWriter::new(DEFAULT_BIT_WRITER_SIZE),
340 total_values: 0,
341 first_value: 0,
342 current_value: 0, block_size, mini_block_size,
345 num_mini_blocks,
346 values_in_block: 0, deltas: vec![0; block_size],
348 _phantom: PhantomData,
349 }
350 }
351
352 fn write_page_header(&mut self) {
355 self.page_header_writer.put_vlq_int(self.block_size as u64);
361 self.page_header_writer
363 .put_vlq_int(self.num_mini_blocks as u64);
364 self.page_header_writer
366 .put_vlq_int(self.total_values as u64);
367 self.page_header_writer.put_zigzag_vlq_int(self.first_value);
369 }
370
371 #[inline(never)]
373 fn flush_block_values(&mut self) -> Result<()> {
374 if self.values_in_block == 0 {
375 return Ok(());
376 }
377
378 let mut min_delta = i64::MAX;
379 for i in 0..self.values_in_block {
380 min_delta = cmp::min(min_delta, self.deltas[i]);
381 }
382
383 self.bit_writer.put_zigzag_vlq_int(min_delta);
385
386 let offset = self.bit_writer.skip(self.num_mini_blocks);
388
389 for i in 0..self.num_mini_blocks {
390 let n = cmp::min(self.mini_block_size, self.values_in_block);
393 if n == 0 {
394 let pad_value = cfg!(test).then(|| 0xFF).unwrap_or(0);
398 for j in i..self.num_mini_blocks {
399 self.bit_writer.write_at(offset + j, pad_value);
400 }
401 break;
402 }
403
404 let mut max_delta = i64::MIN;
406 for j in 0..n {
407 max_delta = cmp::max(max_delta, self.deltas[i * self.mini_block_size + j]);
408 }
409
410 let bit_width = num_required_bits(self.subtract_u64(max_delta, min_delta)) as usize;
412 self.bit_writer.write_at(offset + i, bit_width as u8);
413
414 for j in 0..n {
416 let packed_value =
417 self.subtract_u64(self.deltas[i * self.mini_block_size + j], min_delta);
418 self.bit_writer.put_value(packed_value, bit_width);
419 }
420
421 for _ in n..self.mini_block_size {
423 self.bit_writer.put_value(0, bit_width);
424 }
425
426 self.values_in_block -= n;
427 }
428
429 assert_eq!(
430 self.values_in_block, 0,
431 "Expected 0 values in block, found {}",
432 self.values_in_block
433 );
434 Ok(())
435 }
436}
437
438impl<T: DataType> Encoder<T> for DeltaBitPackEncoder<T> {
441 fn put(&mut self, values: &[T::T]) -> Result<()> {
442 if values.is_empty() {
443 return Ok(());
444 }
445
446 let mut idx = if self.total_values == 0 {
448 self.first_value = self.as_i64(values, 0);
449 self.current_value = self.first_value;
450 1
451 } else {
452 0
453 };
454 self.total_values += values.len();
456
457 while idx < values.len() {
459 let value = self.as_i64(values, idx);
460 self.deltas[self.values_in_block] = self.subtract(value, self.current_value);
461 self.current_value = value;
462 idx += 1;
463 self.values_in_block += 1;
464 if self.values_in_block == self.block_size {
465 self.flush_block_values()?;
466 }
467 }
468 Ok(())
469 }
470
471 #[cold]
475 fn encoding(&self) -> Encoding {
476 Encoding::DELTA_BINARY_PACKED
477 }
478
479 fn estimated_data_encoded_size(&self) -> usize {
480 self.bit_writer.bytes_written()
481 }
482
483 fn flush_buffer(&mut self) -> Result<Bytes> {
484 self.flush_block_values()?;
486 self.write_page_header();
488
489 let mut buffer = Vec::new();
490 buffer.extend_from_slice(self.page_header_writer.flush_buffer());
491 buffer.extend_from_slice(self.bit_writer.flush_buffer());
492
493 self.page_header_writer.clear();
495 self.bit_writer.clear();
496 self.total_values = 0;
497 self.first_value = 0;
498 self.current_value = 0;
499 self.values_in_block = 0;
500
501 Ok(buffer.into())
502 }
503
504 fn estimated_memory_size(&self) -> usize {
506 self.page_header_writer.estimated_memory_size()
507 + self.bit_writer.estimated_memory_size()
508 + self.deltas.capacity() * std::mem::size_of::<i64>()
509 + std::mem::size_of::<Self>()
510 }
511}
512
513trait DeltaBitPackEncoderConversion<T: DataType> {
515 fn assert_supported_type();
517
518 fn as_i64(&self, values: &[T::T], index: usize) -> i64;
519
520 fn subtract(&self, left: i64, right: i64) -> i64;
521
522 fn subtract_u64(&self, left: i64, right: i64) -> u64;
523}
524
525const DELTA_BIT_PACK_TYPE_ERROR: &str =
526 "DeltaBitPackDecoder only supports Int32Type, UInt32Type, Int64Type, and UInt64Type";
527
528impl<T: DataType> DeltaBitPackEncoderConversion<T> for DeltaBitPackEncoder<T> {
529 #[inline]
530 fn assert_supported_type() {
531 ensure_phys_ty!(Type::INT32 | Type::INT64, "{}", DELTA_BIT_PACK_TYPE_ERROR);
532 }
533
534 #[inline]
535 fn as_i64(&self, values: &[T::T], index: usize) -> i64 {
536 values[index].as_i64().expect(DELTA_BIT_PACK_TYPE_ERROR)
537 }
538
539 #[inline]
540 fn subtract(&self, left: i64, right: i64) -> i64 {
541 match T::get_physical_type() {
543 Type::INT32 => (left as i32).wrapping_sub(right as i32) as i64,
544 Type::INT64 => left.wrapping_sub(right),
545 _ => panic!("{}", DELTA_BIT_PACK_TYPE_ERROR),
546 }
547 }
548
549 #[inline]
550 fn subtract_u64(&self, left: i64, right: i64) -> u64 {
551 match T::get_physical_type() {
552 Type::INT32 => (left as i32).wrapping_sub(right as i32) as u32 as u64,
554 Type::INT64 => left.wrapping_sub(right) as u64,
555 _ => panic!("{}", DELTA_BIT_PACK_TYPE_ERROR),
556 }
557 }
558}
559
560pub struct DeltaLengthByteArrayEncoder<T: DataType> {
567 len_encoder: DeltaBitPackEncoder<Int32Type>,
569 data: Vec<ByteArray>,
571 encoded_size: usize,
573 _phantom: PhantomData<T>,
574}
575
576impl<T: DataType> Default for DeltaLengthByteArrayEncoder<T> {
577 fn default() -> Self {
578 Self::new()
579 }
580}
581
582impl<T: DataType> DeltaLengthByteArrayEncoder<T> {
583 pub fn new() -> Self {
585 Self {
586 len_encoder: DeltaBitPackEncoder::new(),
587 data: vec![],
588 encoded_size: 0,
589 _phantom: PhantomData,
590 }
591 }
592}
593
594impl<T: DataType> Encoder<T> for DeltaLengthByteArrayEncoder<T> {
595 fn put(&mut self, values: &[T::T]) -> Result<()> {
596 ensure_phys_ty!(
597 Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY,
598 "DeltaLengthByteArrayEncoder only supports ByteArrayType"
599 );
600
601 let val_it = || {
602 values
603 .iter()
604 .map(|x| x.as_any().downcast_ref::<ByteArray>().unwrap())
605 };
606
607 let lengths: Vec<i32> = val_it().map(|byte_array| byte_array.len() as i32).collect();
608 self.len_encoder.put(&lengths)?;
609 for byte_array in val_it() {
610 self.encoded_size += byte_array.len();
611 self.data.push(byte_array.clone());
612 }
613
614 Ok(())
615 }
616
617 #[cold]
621 fn encoding(&self) -> Encoding {
622 Encoding::DELTA_LENGTH_BYTE_ARRAY
623 }
624
625 fn estimated_data_encoded_size(&self) -> usize {
626 self.len_encoder.estimated_data_encoded_size() + self.encoded_size
627 }
628
629 fn flush_buffer(&mut self) -> Result<Bytes> {
630 ensure_phys_ty!(
631 Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY,
632 "DeltaLengthByteArrayEncoder only supports ByteArrayType"
633 );
634
635 let mut total_bytes = vec![];
636 let lengths = self.len_encoder.flush_buffer()?;
637 total_bytes.extend_from_slice(&lengths);
638 self.data.iter().for_each(|byte_array| {
639 total_bytes.extend_from_slice(byte_array.data());
640 });
641 self.data.clear();
642 self.encoded_size = 0;
643
644 Ok(total_bytes.into())
645 }
646
647 fn estimated_memory_size(&self) -> usize {
649 self.len_encoder.estimated_memory_size() + self.data.len() + std::mem::size_of::<Self>()
650 }
651}
652
653pub struct DeltaByteArrayEncoder<T: DataType> {
659 prefix_len_encoder: DeltaBitPackEncoder<Int32Type>,
660 suffix_writer: DeltaLengthByteArrayEncoder<ByteArrayType>,
661 previous: Vec<u8>,
662 _phantom: PhantomData<T>,
663}
664
665impl<T: DataType> Default for DeltaByteArrayEncoder<T> {
666 fn default() -> Self {
667 Self::new()
668 }
669}
670
671impl<T: DataType> DeltaByteArrayEncoder<T> {
672 pub fn new() -> Self {
674 Self {
675 prefix_len_encoder: DeltaBitPackEncoder::new(),
676 suffix_writer: DeltaLengthByteArrayEncoder::new(),
677 previous: vec![],
678 _phantom: PhantomData,
679 }
680 }
681}
682
683impl<T: DataType> Encoder<T> for DeltaByteArrayEncoder<T> {
684 fn put(&mut self, values: &[T::T]) -> Result<()> {
685 let mut prefix_lengths: Vec<i32> = vec![];
686 let mut suffixes: Vec<ByteArray> = vec![];
687
688 let values = values
689 .iter()
690 .map(|x| x.as_any())
691 .map(|x| match T::get_physical_type() {
692 Type::BYTE_ARRAY => x.downcast_ref::<ByteArray>().unwrap(),
693 Type::FIXED_LEN_BYTE_ARRAY => x.downcast_ref::<FixedLenByteArray>().unwrap(),
694 _ => panic!(
695 "DeltaByteArrayEncoder only supports ByteArrayType and FixedLenByteArrayType"
696 ),
697 });
698
699 for byte_array in values {
700 let current = byte_array.data();
701 let prefix_len = cmp::min(self.previous.len(), current.len());
704 let mut match_len = 0;
705 while match_len < prefix_len && self.previous[match_len] == current[match_len] {
706 match_len += 1;
707 }
708 prefix_lengths.push(match_len as i32);
709 suffixes.push(byte_array.slice(match_len, byte_array.len() - match_len));
710 self.previous.clear();
712 self.previous.extend_from_slice(current);
713 }
714 self.prefix_len_encoder.put(&prefix_lengths)?;
715 self.suffix_writer.put(&suffixes)?;
716
717 Ok(())
718 }
719
720 #[cold]
724 fn encoding(&self) -> Encoding {
725 Encoding::DELTA_BYTE_ARRAY
726 }
727
728 fn estimated_data_encoded_size(&self) -> usize {
729 self.prefix_len_encoder.estimated_data_encoded_size()
730 + self.suffix_writer.estimated_data_encoded_size()
731 }
732
733 fn flush_buffer(&mut self) -> Result<Bytes> {
734 match T::get_physical_type() {
735 Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
736 let mut total_bytes = vec![];
739 let lengths = self.prefix_len_encoder.flush_buffer()?;
741 total_bytes.extend_from_slice(&lengths);
742 let suffixes = self.suffix_writer.flush_buffer()?;
744 total_bytes.extend_from_slice(&suffixes);
745
746 self.previous.clear();
747 Ok(total_bytes.into())
748 }
749 _ => panic!(
750 "DeltaByteArrayEncoder only supports ByteArrayType and FixedLenByteArrayType"
751 ),
752 }
753 }
754
755 fn estimated_memory_size(&self) -> usize {
757 self.prefix_len_encoder.estimated_memory_size()
758 + self.suffix_writer.estimated_memory_size()
759 + (self.previous.capacity() * std::mem::size_of::<u8>())
760 }
761}
762
763#[cfg(test)]
764mod tests {
765 use super::*;
766
767 use std::sync::Arc;
768
769 use crate::encodings::decoding::{Decoder, DictDecoder, PlainDecoder, get_decoder};
770 use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
771 use crate::util::bit_util;
772 use crate::util::test_common::rand_gen::{RandGen, random_bytes};
773
774 const TEST_SET_SIZE: usize = 1024;
775
776 #[test]
777 fn test_get_encoders() {
778 create_and_check_encoder::<Int32Type>(0, Encoding::PLAIN, None);
780 create_and_check_encoder::<Int32Type>(0, Encoding::DELTA_BINARY_PACKED, None);
781 create_and_check_encoder::<Int32Type>(0, Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
782 create_and_check_encoder::<Int32Type>(0, Encoding::DELTA_BYTE_ARRAY, None);
783 create_and_check_encoder::<BoolType>(0, Encoding::RLE, None);
784
785 create_and_check_encoder::<Int32Type>(
787 0,
788 Encoding::RLE_DICTIONARY,
789 Some(general_err!(
790 "Cannot initialize this encoding through this function"
791 )),
792 );
793 create_and_check_encoder::<Int32Type>(
794 0,
795 Encoding::PLAIN_DICTIONARY,
796 Some(general_err!(
797 "Cannot initialize this encoding through this function"
798 )),
799 );
800
801 #[allow(deprecated)]
803 create_and_check_encoder::<Int32Type>(
804 0,
805 Encoding::BIT_PACKED,
806 Some(nyi_err!("Encoding BIT_PACKED is not supported")),
807 );
808 }
809
810 #[test]
811 fn test_bool() {
812 BoolType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
813 BoolType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
814 BoolType::test(Encoding::RLE, TEST_SET_SIZE, -1);
815 }
816
817 #[test]
818 fn test_i32() {
819 Int32Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
820 Int32Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
821 Int32Type::test(Encoding::DELTA_BINARY_PACKED, TEST_SET_SIZE, -1);
822 Int32Type::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
823 }
824
825 #[test]
826 fn test_i64() {
827 Int64Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
828 Int64Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
829 Int64Type::test(Encoding::DELTA_BINARY_PACKED, TEST_SET_SIZE, -1);
830 Int64Type::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
831 }
832
833 #[test]
834 fn test_i96() {
835 Int96Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
836 Int96Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
837 }
838
839 #[test]
840 fn test_float() {
841 FloatType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
842 FloatType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
843 FloatType::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
844 }
845
846 #[test]
847 fn test_double() {
848 DoubleType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
849 DoubleType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
850 DoubleType::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
851 }
852
853 #[test]
854 fn test_byte_array() {
855 ByteArrayType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
856 ByteArrayType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
857 ByteArrayType::test(Encoding::DELTA_LENGTH_BYTE_ARRAY, TEST_SET_SIZE, -1);
858 ByteArrayType::test(Encoding::DELTA_BYTE_ARRAY, TEST_SET_SIZE, -1);
859 }
860
861 #[test]
862 fn test_fixed_len_byte_array() {
863 FixedLenByteArrayType::test(Encoding::PLAIN, TEST_SET_SIZE, 100);
864 FixedLenByteArrayType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, 100);
865 FixedLenByteArrayType::test(Encoding::DELTA_BYTE_ARRAY, TEST_SET_SIZE, 100);
866 FixedLenByteArrayType::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, 100);
867 }
868
869 #[test]
870 fn test_dict_encoded_size() {
871 fn run_test<T: DataType>(type_length: i32, values: &[T::T], expected_size: usize) {
872 let mut encoder = create_test_dict_encoder::<T>(type_length);
873 assert_eq!(encoder.dict_encoded_size(), 0);
874 encoder.put(values).unwrap();
875 assert_eq!(encoder.dict_encoded_size(), expected_size);
876 encoder.flush_buffer().unwrap();
878 assert_eq!(encoder.dict_encoded_size(), expected_size);
879 }
880
881 run_test::<BoolType>(-1, &[true, false, true, false, true], 2);
883 run_test::<Int32Type>(-1, &[1i32, 2i32, 3i32, 4i32, 5i32], 20);
884 run_test::<Int64Type>(-1, &[1i64, 2i64, 3i64, 4i64, 5i64], 40);
885 run_test::<FloatType>(-1, &[1f32, 2f32, 3f32, 4f32, 5f32], 20);
886 run_test::<DoubleType>(-1, &[1f64, 2f64, 3f64, 4f64, 5f64], 40);
887 run_test::<Int96Type>(
889 -1,
890 &[Int96::from(vec![1, 2, 3]), Int96::from(vec![2, 3, 4])],
891 24,
892 );
893 run_test::<ByteArrayType>(-1, &[ByteArray::from("abcd"), ByteArray::from("efj")], 15);
894 run_test::<FixedLenByteArrayType>(
895 2,
896 &[ByteArray::from("ab").into(), ByteArray::from("bc").into()],
897 4,
898 );
899 }
900
901 #[test]
902 fn test_estimated_data_encoded_size() {
903 fn run_test<T: DataType>(
904 encoding: Encoding,
905 type_length: i32,
906 values: &[T::T],
907 initial_size: usize,
908 max_size: usize,
909 flush_size: usize,
910 ) {
911 let mut encoder = match encoding {
912 Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => {
913 Box::new(create_test_dict_encoder::<T>(type_length))
914 }
915 _ => create_test_encoder::<T>(type_length, encoding),
916 };
917 assert_eq!(encoder.estimated_data_encoded_size(), initial_size);
918
919 encoder.put(values).unwrap();
920 assert_eq!(encoder.estimated_data_encoded_size(), max_size);
921
922 encoder.flush_buffer().unwrap();
923 assert_eq!(encoder.estimated_data_encoded_size(), flush_size);
924 }
925
926 run_test::<Int32Type>(Encoding::PLAIN, -1, &[123; 1024], 0, 4096, 0);
928
929 run_test::<Int32Type>(Encoding::RLE_DICTIONARY, -1, &[123, 1024], 0, 2, 0);
933
934 run_test::<Int32Type>(Encoding::DELTA_BINARY_PACKED, -1, &[123; 1024], 0, 35, 0);
936
937 let mut values = vec![];
939 values.extend_from_slice(&[true; 16]);
940 values.extend_from_slice(&[false; 16]);
941 run_test::<BoolType>(Encoding::RLE, -1, &values, 0, 6, 0);
942
943 run_test::<ByteArrayType>(
945 Encoding::DELTA_LENGTH_BYTE_ARRAY,
946 -1,
947 &[ByteArray::from("ab"), ByteArray::from("abc")],
948 0,
949 5, 0,
951 );
952
953 run_test::<ByteArrayType>(
955 Encoding::DELTA_BYTE_ARRAY,
956 -1,
957 &[ByteArray::from("ab"), ByteArray::from("abc")],
958 0,
959 3, 0,
961 );
962
963 run_test::<FloatType>(Encoding::BYTE_STREAM_SPLIT, -1, &[0.1, 0.2], 0, 8, 0);
965 }
966
967 #[test]
968 fn test_byte_stream_split_example_f32() {
969 let mut encoder = create_test_encoder::<FloatType>(0, Encoding::BYTE_STREAM_SPLIT);
971 let mut decoder = create_test_decoder::<FloatType>(0, Encoding::BYTE_STREAM_SPLIT);
972
973 let input = vec![
974 f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
975 f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
976 f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6]),
977 ];
978
979 encoder.put(&input).unwrap();
980 let encoded = encoder.flush_buffer().unwrap();
981
982 assert_eq!(
983 encoded,
984 Bytes::from(vec![
985 0xAA_u8, 0x00, 0xA3, 0xBB, 0x11, 0xB4, 0xCC, 0x22, 0xC5, 0xDD, 0x33, 0xD6
986 ])
987 );
988
989 let mut decoded = vec![0.0; input.len()];
990 decoder.set_data(encoded, input.len()).unwrap();
991 decoder.get(&mut decoded).unwrap();
992
993 assert_eq!(decoded, input);
994 }
995
996 #[test]
998 fn test_issue_47() {
999 let mut encoder = create_test_encoder::<ByteArrayType>(0, Encoding::DELTA_BYTE_ARRAY);
1000 let mut decoder = create_test_decoder::<ByteArrayType>(0, Encoding::DELTA_BYTE_ARRAY);
1001
1002 let input = vec![
1003 ByteArray::from("aa"),
1004 ByteArray::from("aaa"),
1005 ByteArray::from("aa"),
1006 ByteArray::from("aaa"),
1007 ];
1008
1009 let mut output = vec![ByteArray::default(); input.len()];
1010
1011 let mut result = put_and_get(&mut encoder, &mut decoder, &input[..2], &mut output[..2]);
1012 assert!(
1013 result.is_ok(),
1014 "first put_and_get() failed with: {}",
1015 result.unwrap_err()
1016 );
1017 result = put_and_get(&mut encoder, &mut decoder, &input[2..], &mut output[2..]);
1018 assert!(
1019 result.is_ok(),
1020 "second put_and_get() failed with: {}",
1021 result.unwrap_err()
1022 );
1023 assert_eq!(output, input);
1024 }
1025
1026 trait EncodingTester<T: DataType> {
1027 fn test(enc: Encoding, total: usize, type_length: i32) {
1028 let result = match enc {
1029 Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => {
1030 Self::test_dict_internal(total, type_length)
1031 }
1032 enc => Self::test_internal(enc, total, type_length),
1033 };
1034
1035 assert!(
1036 result.is_ok(),
1037 "Expected result to be OK but got err:\n {}",
1038 result.unwrap_err()
1039 );
1040 }
1041
1042 fn test_internal(enc: Encoding, total: usize, type_length: i32) -> Result<()>;
1043
1044 fn test_dict_internal(total: usize, type_length: i32) -> Result<()>;
1045 }
1046
1047 impl<T: DataType + RandGen<T>> EncodingTester<T> for T {
1048 fn test_internal(enc: Encoding, total: usize, type_length: i32) -> Result<()> {
1049 let mut encoder = create_test_encoder::<T>(type_length, enc);
1050 let mut decoder = create_test_decoder::<T>(type_length, enc);
1051 let mut values = <T as RandGen<T>>::gen_vec(type_length, total);
1052 let mut result_data = vec![T::T::default(); total];
1053
1054 let num_bytes = bit_util::ceil(total as i64, 8);
1056 let valid_bits = random_bytes(num_bytes as usize);
1057 let values_written = encoder.put_spaced(&values[..], &valid_bits[..])?;
1058 let data = encoder.flush_buffer()?;
1059 decoder.set_data(data, values_written)?;
1060 let _ = decoder.get_spaced(
1061 &mut result_data[..],
1062 values.len() - values_written,
1063 &valid_bits[..],
1064 )?;
1065
1066 for i in 0..total {
1068 if bit_util::get_bit(&valid_bits[..], i) {
1069 assert_eq!(result_data[i], values[i]);
1070 } else {
1071 assert_eq!(result_data[i], T::T::default());
1072 }
1073 }
1074
1075 let mut actual_total = put_and_get(
1076 &mut encoder,
1077 &mut decoder,
1078 &values[..],
1079 &mut result_data[..],
1080 )?;
1081 assert_eq!(actual_total, total);
1082 assert_eq!(result_data, values);
1083
1084 values = <T as RandGen<T>>::gen_vec(type_length, total);
1087 actual_total = put_and_get(
1088 &mut encoder,
1089 &mut decoder,
1090 &values[..],
1091 &mut result_data[..],
1092 )?;
1093 assert_eq!(actual_total, total);
1094 assert_eq!(result_data, values);
1095
1096 Ok(())
1097 }
1098
1099 fn test_dict_internal(total: usize, type_length: i32) -> Result<()> {
1100 let mut encoder = create_test_dict_encoder::<T>(type_length);
1101 let mut values = <T as RandGen<T>>::gen_vec(type_length, total);
1102 encoder.put(&values[..])?;
1103
1104 let mut data = encoder.flush_buffer()?;
1105 let mut decoder = create_test_dict_decoder::<T>();
1106 let mut dict_decoder = PlainDecoder::<T>::new(type_length);
1107 dict_decoder.set_data(encoder.write_dict()?, encoder.num_entries())?;
1108 decoder.set_dict(Box::new(dict_decoder))?;
1109 let mut result_data = vec![T::T::default(); total];
1110 decoder.set_data(data, total)?;
1111 let mut actual_total = decoder.get(&mut result_data)?;
1112
1113 assert_eq!(actual_total, total);
1114 assert_eq!(result_data, values);
1115
1116 values = <T as RandGen<T>>::gen_vec(type_length, total);
1119 encoder.put(&values[..])?;
1120 data = encoder.flush_buffer()?;
1121
1122 let mut dict_decoder = PlainDecoder::<T>::new(type_length);
1123 dict_decoder.set_data(encoder.write_dict()?, encoder.num_entries())?;
1124 decoder.set_dict(Box::new(dict_decoder))?;
1125 decoder.set_data(data, total)?;
1126 actual_total = decoder.get(&mut result_data)?;
1127
1128 assert_eq!(actual_total, total);
1129 assert_eq!(result_data, values);
1130
1131 Ok(())
1132 }
1133 }
1134
1135 fn put_and_get<T: DataType>(
1136 encoder: &mut Box<dyn Encoder<T>>,
1137 decoder: &mut Box<dyn Decoder<T>>,
1138 input: &[T::T],
1139 output: &mut [T::T],
1140 ) -> Result<usize> {
1141 encoder.put(input)?;
1142 let data = encoder.flush_buffer()?;
1143 decoder.set_data(data, input.len())?;
1144 decoder.get(output)
1145 }
1146
1147 fn create_and_check_encoder<T: DataType>(
1148 type_length: i32,
1149 encoding: Encoding,
1150 err: Option<ParquetError>,
1151 ) {
1152 let desc = create_test_col_desc_ptr(type_length, T::get_physical_type());
1153 let encoder = get_encoder::<T>(encoding, &desc);
1154 match err {
1155 Some(parquet_error) => {
1156 assert_eq!(
1157 encoder.err().unwrap().to_string(),
1158 parquet_error.to_string()
1159 )
1160 }
1161 None => assert_eq!(encoder.unwrap().encoding(), encoding),
1162 }
1163 }
1164
1165 fn create_test_col_desc_ptr(type_len: i32, t: Type) -> ColumnDescPtr {
1167 let ty = SchemaType::primitive_type_builder("t", t)
1168 .with_length(type_len)
1169 .build()
1170 .unwrap();
1171 Arc::new(ColumnDescriptor::new(
1172 Arc::new(ty),
1173 0,
1174 0,
1175 ColumnPath::new(vec![]),
1176 ))
1177 }
1178
1179 fn create_test_encoder<T: DataType>(type_len: i32, enc: Encoding) -> Box<dyn Encoder<T>> {
1180 let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
1181 get_encoder(enc, &desc).unwrap()
1182 }
1183
1184 fn create_test_decoder<T: DataType>(type_len: i32, enc: Encoding) -> Box<dyn Decoder<T>> {
1185 let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
1186 get_decoder(desc, enc).unwrap()
1187 }
1188
1189 fn create_test_dict_encoder<T: DataType>(type_len: i32) -> DictEncoder<T> {
1190 let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
1191 DictEncoder::<T>::new(desc)
1192 }
1193
1194 fn create_test_dict_decoder<T: DataType>() -> DictDecoder<T> {
1195 DictDecoder::<T>::new()
1196 }
1197}