1use std::{cmp, marker::PhantomData};
21
22use crate::basic::*;
23use crate::data_type::private::ParquetValueType;
24use crate::data_type::*;
25use crate::encodings::rle::RleEncoder;
26use crate::errors::{ParquetError, Result};
27use crate::schema::types::ColumnDescPtr;
28use crate::util::bit_util::{num_required_bits, BitWriter};
29
30use byte_stream_split_encoder::{ByteStreamSplitEncoder, VariableWidthByteStreamSplitEncoder};
31use bytes::Bytes;
32pub use dict_encoder::DictEncoder;
33
34mod byte_stream_split_encoder;
35mod dict_encoder;
36
37pub trait Encoder<T: DataType>: Send {
45 fn put(&mut self, values: &[T::T]) -> Result<()>;
47
48 #[cfg(test)]
53 fn put_spaced(&mut self, values: &[T::T], valid_bits: &[u8]) -> Result<usize> {
54 let num_values = values.len();
55 let mut buffer = Vec::with_capacity(num_values);
56 for (i, item) in values.iter().enumerate().take(num_values) {
58 if crate::util::bit_util::get_bit(valid_bits, i) {
59 buffer.push(item.clone());
60 }
61 }
62 self.put(&buffer[..])?;
63 Ok(buffer.len())
64 }
65
66 fn encoding(&self) -> Encoding;
68
69 fn estimated_data_encoded_size(&self) -> usize;
72
73 fn estimated_memory_size(&self) -> usize;
75
76 fn flush_buffer(&mut self) -> Result<Bytes>;
79}
80
81pub fn get_encoder<T: DataType>(
84 encoding: Encoding,
85 descr: &ColumnDescPtr,
86) -> Result<Box<dyn Encoder<T>>> {
87 let encoder: Box<dyn Encoder<T>> = match encoding {
88 Encoding::PLAIN => Box::new(PlainEncoder::new()),
89 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
90 return Err(general_err!(
91 "Cannot initialize this encoding through this function"
92 ));
93 }
94 Encoding::RLE => Box::new(RleValueEncoder::new()),
95 Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackEncoder::new()),
96 Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayEncoder::new()),
97 Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayEncoder::new()),
98 Encoding::BYTE_STREAM_SPLIT => match T::get_physical_type() {
99 Type::FIXED_LEN_BYTE_ARRAY => Box::new(VariableWidthByteStreamSplitEncoder::new(
100 descr.type_length(),
101 )),
102 _ => Box::new(ByteStreamSplitEncoder::new()),
103 },
104 e => return Err(nyi_err!("Encoding {} is not supported", e)),
105 };
106 Ok(encoder)
107}
108
109pub struct PlainEncoder<T: DataType> {
124 buffer: Vec<u8>,
125 bit_writer: BitWriter,
126 _phantom: PhantomData<T>,
127}
128
129impl<T: DataType> Default for PlainEncoder<T> {
130 fn default() -> Self {
131 Self::new()
132 }
133}
134
135impl<T: DataType> PlainEncoder<T> {
136 pub fn new() -> Self {
138 Self {
139 buffer: vec![],
140 bit_writer: BitWriter::new(256),
141 _phantom: PhantomData,
142 }
143 }
144}
145
146impl<T: DataType> Encoder<T> for PlainEncoder<T> {
147 #[cold]
151 fn encoding(&self) -> Encoding {
152 Encoding::PLAIN
153 }
154
155 fn estimated_data_encoded_size(&self) -> usize {
156 self.buffer.len() + self.bit_writer.bytes_written()
157 }
158
159 #[inline]
160 fn flush_buffer(&mut self) -> Result<Bytes> {
161 self.buffer
162 .extend_from_slice(self.bit_writer.flush_buffer());
163 self.bit_writer.clear();
164 Ok(std::mem::take(&mut self.buffer).into())
165 }
166
167 #[inline]
168 fn put(&mut self, values: &[T::T]) -> Result<()> {
169 T::T::encode(values, &mut self.buffer, &mut self.bit_writer)?;
170 Ok(())
171 }
172
173 fn estimated_memory_size(&self) -> usize {
175 self.buffer.capacity() * std::mem::size_of::<u8>() + self.bit_writer.estimated_memory_size()
176 }
177}
178
179const DEFAULT_RLE_BUFFER_LEN: usize = 1024;
183
184pub struct RleValueEncoder<T: DataType> {
187 encoder: Option<RleEncoder>,
190 _phantom: PhantomData<T>,
191}
192
193impl<T: DataType> Default for RleValueEncoder<T> {
194 fn default() -> Self {
195 Self::new()
196 }
197}
198
199impl<T: DataType> RleValueEncoder<T> {
200 pub fn new() -> Self {
202 Self {
203 encoder: None,
204 _phantom: PhantomData,
205 }
206 }
207}
208
209impl<T: DataType> Encoder<T> for RleValueEncoder<T> {
210 #[inline]
211 fn put(&mut self, values: &[T::T]) -> Result<()> {
212 ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType");
213
214 let rle_encoder = self.encoder.get_or_insert_with(|| {
215 let mut buffer = Vec::with_capacity(DEFAULT_RLE_BUFFER_LEN);
216 buffer.extend_from_slice(&[0; 4]);
218 RleEncoder::new_from_buf(1, buffer)
219 });
220
221 for value in values {
222 let value = value.as_u64()?;
223 rle_encoder.put(value)
224 }
225 Ok(())
226 }
227
228 #[cold]
232 fn encoding(&self) -> Encoding {
233 Encoding::RLE
234 }
235
236 #[inline]
237 fn estimated_data_encoded_size(&self) -> usize {
238 match self.encoder {
239 Some(ref enc) => enc.len(),
240 None => 0,
241 }
242 }
243
244 #[inline]
245 fn flush_buffer(&mut self) -> Result<Bytes> {
246 ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType");
247 let rle_encoder = self
248 .encoder
249 .take()
250 .expect("RLE value encoder is not initialized");
251
252 let mut buf = rle_encoder.consume();
254 assert!(buf.len() >= 4, "should have had padding inserted");
255
256 let len = (buf.len() - 4) as i32;
258 buf[..4].copy_from_slice(&len.to_le_bytes());
259
260 Ok(buf.into())
261 }
262
263 fn estimated_memory_size(&self) -> usize {
265 self.encoder
266 .as_ref()
267 .map_or(0, |enc| enc.estimated_memory_size())
268 }
269}
270
271const MAX_PAGE_HEADER_WRITER_SIZE: usize = 32;
275const DEFAULT_BIT_WRITER_SIZE: usize = 1024 * 1024;
276const DEFAULT_NUM_MINI_BLOCKS: usize = 4;
277
278pub struct DeltaBitPackEncoder<T: DataType> {
302 page_header_writer: BitWriter,
303 bit_writer: BitWriter,
304 total_values: usize,
305 first_value: i64,
306 current_value: i64,
307 block_size: usize,
308 mini_block_size: usize,
309 num_mini_blocks: usize,
310 values_in_block: usize,
311 deltas: Vec<i64>,
312 _phantom: PhantomData<T>,
313}
314
315impl<T: DataType> Default for DeltaBitPackEncoder<T> {
316 fn default() -> Self {
317 Self::new()
318 }
319}
320
321impl<T: DataType> DeltaBitPackEncoder<T> {
322 pub fn new() -> Self {
324 Self::assert_supported_type();
325
326 let mini_block_size = match T::T::PHYSICAL_TYPE {
328 Type::INT32 => 32,
329 Type::INT64 => 64,
330 _ => unreachable!(),
331 };
332
333 let num_mini_blocks = DEFAULT_NUM_MINI_BLOCKS;
334 let block_size = mini_block_size * num_mini_blocks;
335 assert_eq!(block_size % 128, 0);
336
337 DeltaBitPackEncoder {
338 page_header_writer: BitWriter::new(MAX_PAGE_HEADER_WRITER_SIZE),
339 bit_writer: BitWriter::new(DEFAULT_BIT_WRITER_SIZE),
340 total_values: 0,
341 first_value: 0,
342 current_value: 0, block_size, mini_block_size,
345 num_mini_blocks,
346 values_in_block: 0, deltas: vec![0; block_size],
348 _phantom: PhantomData,
349 }
350 }
351
352 fn write_page_header(&mut self) {
355 self.page_header_writer.put_vlq_int(self.block_size as u64);
361 self.page_header_writer
363 .put_vlq_int(self.num_mini_blocks as u64);
364 self.page_header_writer
366 .put_vlq_int(self.total_values as u64);
367 self.page_header_writer.put_zigzag_vlq_int(self.first_value);
369 }
370
371 #[inline(never)]
373 fn flush_block_values(&mut self) -> Result<()> {
374 if self.values_in_block == 0 {
375 return Ok(());
376 }
377
378 let mut min_delta = i64::MAX;
379 for i in 0..self.values_in_block {
380 min_delta = cmp::min(min_delta, self.deltas[i]);
381 }
382
383 self.bit_writer.put_zigzag_vlq_int(min_delta);
385
386 let offset = self.bit_writer.skip(self.num_mini_blocks);
388
389 for i in 0..self.num_mini_blocks {
390 let n = cmp::min(self.mini_block_size, self.values_in_block);
393 if n == 0 {
394 let pad_value = cfg!(test).then(|| 0xFF).unwrap_or(0);
398 for j in i..self.num_mini_blocks {
399 self.bit_writer.write_at(offset + j, pad_value);
400 }
401 break;
402 }
403
404 let mut max_delta = i64::MIN;
406 for j in 0..n {
407 max_delta = cmp::max(max_delta, self.deltas[i * self.mini_block_size + j]);
408 }
409
410 let bit_width = num_required_bits(self.subtract_u64(max_delta, min_delta)) as usize;
412 self.bit_writer.write_at(offset + i, bit_width as u8);
413
414 for j in 0..n {
416 let packed_value =
417 self.subtract_u64(self.deltas[i * self.mini_block_size + j], min_delta);
418 self.bit_writer.put_value(packed_value, bit_width);
419 }
420
421 for _ in n..self.mini_block_size {
423 self.bit_writer.put_value(0, bit_width);
424 }
425
426 self.values_in_block -= n;
427 }
428
429 assert_eq!(
430 self.values_in_block, 0,
431 "Expected 0 values in block, found {}",
432 self.values_in_block
433 );
434 Ok(())
435 }
436}
437
438impl<T: DataType> Encoder<T> for DeltaBitPackEncoder<T> {
441 fn put(&mut self, values: &[T::T]) -> Result<()> {
442 if values.is_empty() {
443 return Ok(());
444 }
445
446 let mut idx = if self.total_values == 0 {
448 self.first_value = self.as_i64(values, 0);
449 self.current_value = self.first_value;
450 1
451 } else {
452 0
453 };
454 self.total_values += values.len();
456
457 while idx < values.len() {
459 let value = self.as_i64(values, idx);
460 self.deltas[self.values_in_block] = self.subtract(value, self.current_value);
461 self.current_value = value;
462 idx += 1;
463 self.values_in_block += 1;
464 if self.values_in_block == self.block_size {
465 self.flush_block_values()?;
466 }
467 }
468 Ok(())
469 }
470
471 #[cold]
475 fn encoding(&self) -> Encoding {
476 Encoding::DELTA_BINARY_PACKED
477 }
478
479 fn estimated_data_encoded_size(&self) -> usize {
480 self.bit_writer.bytes_written()
481 }
482
483 fn flush_buffer(&mut self) -> Result<Bytes> {
484 self.flush_block_values()?;
486 self.write_page_header();
488
489 let mut buffer = Vec::new();
490 buffer.extend_from_slice(self.page_header_writer.flush_buffer());
491 buffer.extend_from_slice(self.bit_writer.flush_buffer());
492
493 self.page_header_writer.clear();
495 self.bit_writer.clear();
496 self.total_values = 0;
497 self.first_value = 0;
498 self.current_value = 0;
499 self.values_in_block = 0;
500
501 Ok(buffer.into())
502 }
503
504 fn estimated_memory_size(&self) -> usize {
506 self.page_header_writer.estimated_memory_size()
507 + self.bit_writer.estimated_memory_size()
508 + self.deltas.capacity() * std::mem::size_of::<i64>()
509 + std::mem::size_of::<Self>()
510 }
511}
512
513trait DeltaBitPackEncoderConversion<T: DataType> {
515 fn assert_supported_type();
517
518 fn as_i64(&self, values: &[T::T], index: usize) -> i64;
519
520 fn subtract(&self, left: i64, right: i64) -> i64;
521
522 fn subtract_u64(&self, left: i64, right: i64) -> u64;
523}
524
525impl<T: DataType> DeltaBitPackEncoderConversion<T> for DeltaBitPackEncoder<T> {
526 #[inline]
527 fn assert_supported_type() {
528 ensure_phys_ty!(
529 Type::INT32 | Type::INT64,
530 "DeltaBitPackDecoder only supports Int32Type and Int64Type"
531 );
532 }
533
534 #[inline]
535 fn as_i64(&self, values: &[T::T], index: usize) -> i64 {
536 values[index]
537 .as_i64()
538 .expect("DeltaBitPackDecoder only supports Int32Type and Int64Type")
539 }
540
541 #[inline]
542 fn subtract(&self, left: i64, right: i64) -> i64 {
543 match T::get_physical_type() {
545 Type::INT32 => (left as i32).wrapping_sub(right as i32) as i64,
546 Type::INT64 => left.wrapping_sub(right),
547 _ => panic!("DeltaBitPackDecoder only supports Int32Type and Int64Type"),
548 }
549 }
550
551 #[inline]
552 fn subtract_u64(&self, left: i64, right: i64) -> u64 {
553 match T::get_physical_type() {
554 Type::INT32 => (left as i32).wrapping_sub(right as i32) as u32 as u64,
556 Type::INT64 => left.wrapping_sub(right) as u64,
557 _ => panic!("DeltaBitPackDecoder only supports Int32Type and Int64Type"),
558 }
559 }
560}
561
562pub struct DeltaLengthByteArrayEncoder<T: DataType> {
569 len_encoder: DeltaBitPackEncoder<Int32Type>,
571 data: Vec<ByteArray>,
573 encoded_size: usize,
575 _phantom: PhantomData<T>,
576}
577
578impl<T: DataType> Default for DeltaLengthByteArrayEncoder<T> {
579 fn default() -> Self {
580 Self::new()
581 }
582}
583
584impl<T: DataType> DeltaLengthByteArrayEncoder<T> {
585 pub fn new() -> Self {
587 Self {
588 len_encoder: DeltaBitPackEncoder::new(),
589 data: vec![],
590 encoded_size: 0,
591 _phantom: PhantomData,
592 }
593 }
594}
595
596impl<T: DataType> Encoder<T> for DeltaLengthByteArrayEncoder<T> {
597 fn put(&mut self, values: &[T::T]) -> Result<()> {
598 ensure_phys_ty!(
599 Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY,
600 "DeltaLengthByteArrayEncoder only supports ByteArrayType"
601 );
602
603 let val_it = || {
604 values
605 .iter()
606 .map(|x| x.as_any().downcast_ref::<ByteArray>().unwrap())
607 };
608
609 let lengths: Vec<i32> = val_it().map(|byte_array| byte_array.len() as i32).collect();
610 self.len_encoder.put(&lengths)?;
611 for byte_array in val_it() {
612 self.encoded_size += byte_array.len();
613 self.data.push(byte_array.clone());
614 }
615
616 Ok(())
617 }
618
619 #[cold]
623 fn encoding(&self) -> Encoding {
624 Encoding::DELTA_LENGTH_BYTE_ARRAY
625 }
626
627 fn estimated_data_encoded_size(&self) -> usize {
628 self.len_encoder.estimated_data_encoded_size() + self.encoded_size
629 }
630
631 fn flush_buffer(&mut self) -> Result<Bytes> {
632 ensure_phys_ty!(
633 Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY,
634 "DeltaLengthByteArrayEncoder only supports ByteArrayType"
635 );
636
637 let mut total_bytes = vec![];
638 let lengths = self.len_encoder.flush_buffer()?;
639 total_bytes.extend_from_slice(&lengths);
640 self.data.iter().for_each(|byte_array| {
641 total_bytes.extend_from_slice(byte_array.data());
642 });
643 self.data.clear();
644 self.encoded_size = 0;
645
646 Ok(total_bytes.into())
647 }
648
649 fn estimated_memory_size(&self) -> usize {
651 self.len_encoder.estimated_memory_size() + self.data.len() + std::mem::size_of::<Self>()
652 }
653}
654
655pub struct DeltaByteArrayEncoder<T: DataType> {
661 prefix_len_encoder: DeltaBitPackEncoder<Int32Type>,
662 suffix_writer: DeltaLengthByteArrayEncoder<ByteArrayType>,
663 previous: Vec<u8>,
664 _phantom: PhantomData<T>,
665}
666
667impl<T: DataType> Default for DeltaByteArrayEncoder<T> {
668 fn default() -> Self {
669 Self::new()
670 }
671}
672
673impl<T: DataType> DeltaByteArrayEncoder<T> {
674 pub fn new() -> Self {
676 Self {
677 prefix_len_encoder: DeltaBitPackEncoder::new(),
678 suffix_writer: DeltaLengthByteArrayEncoder::new(),
679 previous: vec![],
680 _phantom: PhantomData,
681 }
682 }
683}
684
685impl<T: DataType> Encoder<T> for DeltaByteArrayEncoder<T> {
686 fn put(&mut self, values: &[T::T]) -> Result<()> {
687 let mut prefix_lengths: Vec<i32> = vec![];
688 let mut suffixes: Vec<ByteArray> = vec![];
689
690 let values = values
691 .iter()
692 .map(|x| x.as_any())
693 .map(|x| match T::get_physical_type() {
694 Type::BYTE_ARRAY => x.downcast_ref::<ByteArray>().unwrap(),
695 Type::FIXED_LEN_BYTE_ARRAY => x.downcast_ref::<FixedLenByteArray>().unwrap(),
696 _ => panic!(
697 "DeltaByteArrayEncoder only supports ByteArrayType and FixedLenByteArrayType"
698 ),
699 });
700
701 for byte_array in values {
702 let current = byte_array.data();
703 let prefix_len = cmp::min(self.previous.len(), current.len());
706 let mut match_len = 0;
707 while match_len < prefix_len && self.previous[match_len] == current[match_len] {
708 match_len += 1;
709 }
710 prefix_lengths.push(match_len as i32);
711 suffixes.push(byte_array.slice(match_len, byte_array.len() - match_len));
712 self.previous.clear();
714 self.previous.extend_from_slice(current);
715 }
716 self.prefix_len_encoder.put(&prefix_lengths)?;
717 self.suffix_writer.put(&suffixes)?;
718
719 Ok(())
720 }
721
722 #[cold]
726 fn encoding(&self) -> Encoding {
727 Encoding::DELTA_BYTE_ARRAY
728 }
729
730 fn estimated_data_encoded_size(&self) -> usize {
731 self.prefix_len_encoder.estimated_data_encoded_size()
732 + self.suffix_writer.estimated_data_encoded_size()
733 }
734
735 fn flush_buffer(&mut self) -> Result<Bytes> {
736 match T::get_physical_type() {
737 Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
738 let mut total_bytes = vec![];
741 let lengths = self.prefix_len_encoder.flush_buffer()?;
743 total_bytes.extend_from_slice(&lengths);
744 let suffixes = self.suffix_writer.flush_buffer()?;
746 total_bytes.extend_from_slice(&suffixes);
747
748 self.previous.clear();
749 Ok(total_bytes.into())
750 }
751 _ => panic!(
752 "DeltaByteArrayEncoder only supports ByteArrayType and FixedLenByteArrayType"
753 ),
754 }
755 }
756
757 fn estimated_memory_size(&self) -> usize {
759 self.prefix_len_encoder.estimated_memory_size()
760 + self.suffix_writer.estimated_memory_size()
761 + (self.previous.capacity() * std::mem::size_of::<u8>())
762 }
763}
764
765#[cfg(test)]
766mod tests {
767 use super::*;
768
769 use std::sync::Arc;
770
771 use crate::encodings::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder};
772 use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
773 use crate::util::bit_util;
774 use crate::util::test_common::rand_gen::{random_bytes, RandGen};
775
776 const TEST_SET_SIZE: usize = 1024;
777
778 #[test]
779 fn test_get_encoders() {
780 create_and_check_encoder::<Int32Type>(0, Encoding::PLAIN, None);
782 create_and_check_encoder::<Int32Type>(0, Encoding::DELTA_BINARY_PACKED, None);
783 create_and_check_encoder::<Int32Type>(0, Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
784 create_and_check_encoder::<Int32Type>(0, Encoding::DELTA_BYTE_ARRAY, None);
785 create_and_check_encoder::<BoolType>(0, Encoding::RLE, None);
786
787 create_and_check_encoder::<Int32Type>(
789 0,
790 Encoding::RLE_DICTIONARY,
791 Some(general_err!(
792 "Cannot initialize this encoding through this function"
793 )),
794 );
795 create_and_check_encoder::<Int32Type>(
796 0,
797 Encoding::PLAIN_DICTIONARY,
798 Some(general_err!(
799 "Cannot initialize this encoding through this function"
800 )),
801 );
802
803 #[allow(deprecated)]
805 create_and_check_encoder::<Int32Type>(
806 0,
807 Encoding::BIT_PACKED,
808 Some(nyi_err!("Encoding BIT_PACKED is not supported")),
809 );
810 }
811
812 #[test]
813 fn test_bool() {
814 BoolType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
815 BoolType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
816 BoolType::test(Encoding::RLE, TEST_SET_SIZE, -1);
817 }
818
819 #[test]
820 fn test_i32() {
821 Int32Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
822 Int32Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
823 Int32Type::test(Encoding::DELTA_BINARY_PACKED, TEST_SET_SIZE, -1);
824 Int32Type::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
825 }
826
827 #[test]
828 fn test_i64() {
829 Int64Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
830 Int64Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
831 Int64Type::test(Encoding::DELTA_BINARY_PACKED, TEST_SET_SIZE, -1);
832 Int64Type::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
833 }
834
835 #[test]
836 fn test_i96() {
837 Int96Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
838 Int96Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
839 }
840
841 #[test]
842 fn test_float() {
843 FloatType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
844 FloatType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
845 FloatType::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
846 }
847
848 #[test]
849 fn test_double() {
850 DoubleType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
851 DoubleType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
852 DoubleType::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
853 }
854
855 #[test]
856 fn test_byte_array() {
857 ByteArrayType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
858 ByteArrayType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
859 ByteArrayType::test(Encoding::DELTA_LENGTH_BYTE_ARRAY, TEST_SET_SIZE, -1);
860 ByteArrayType::test(Encoding::DELTA_BYTE_ARRAY, TEST_SET_SIZE, -1);
861 }
862
863 #[test]
864 fn test_fixed_len_byte_array() {
865 FixedLenByteArrayType::test(Encoding::PLAIN, TEST_SET_SIZE, 100);
866 FixedLenByteArrayType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, 100);
867 FixedLenByteArrayType::test(Encoding::DELTA_BYTE_ARRAY, TEST_SET_SIZE, 100);
868 FixedLenByteArrayType::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, 100);
869 }
870
871 #[test]
872 fn test_dict_encoded_size() {
873 fn run_test<T: DataType>(type_length: i32, values: &[T::T], expected_size: usize) {
874 let mut encoder = create_test_dict_encoder::<T>(type_length);
875 assert_eq!(encoder.dict_encoded_size(), 0);
876 encoder.put(values).unwrap();
877 assert_eq!(encoder.dict_encoded_size(), expected_size);
878 encoder.flush_buffer().unwrap();
880 assert_eq!(encoder.dict_encoded_size(), expected_size);
881 }
882
883 run_test::<BoolType>(-1, &[true, false, true, false, true], 2);
885 run_test::<Int32Type>(-1, &[1i32, 2i32, 3i32, 4i32, 5i32], 20);
886 run_test::<Int64Type>(-1, &[1i64, 2i64, 3i64, 4i64, 5i64], 40);
887 run_test::<FloatType>(-1, &[1f32, 2f32, 3f32, 4f32, 5f32], 20);
888 run_test::<DoubleType>(-1, &[1f64, 2f64, 3f64, 4f64, 5f64], 40);
889 run_test::<Int96Type>(
891 -1,
892 &[Int96::from(vec![1, 2, 3]), Int96::from(vec![2, 3, 4])],
893 24,
894 );
895 run_test::<ByteArrayType>(-1, &[ByteArray::from("abcd"), ByteArray::from("efj")], 15);
896 run_test::<FixedLenByteArrayType>(
897 2,
898 &[ByteArray::from("ab").into(), ByteArray::from("bc").into()],
899 4,
900 );
901 }
902
903 #[test]
904 fn test_estimated_data_encoded_size() {
905 fn run_test<T: DataType>(
906 encoding: Encoding,
907 type_length: i32,
908 values: &[T::T],
909 initial_size: usize,
910 max_size: usize,
911 flush_size: usize,
912 ) {
913 let mut encoder = match encoding {
914 Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => {
915 Box::new(create_test_dict_encoder::<T>(type_length))
916 }
917 _ => create_test_encoder::<T>(type_length, encoding),
918 };
919 assert_eq!(encoder.estimated_data_encoded_size(), initial_size);
920
921 encoder.put(values).unwrap();
922 assert_eq!(encoder.estimated_data_encoded_size(), max_size);
923
924 encoder.flush_buffer().unwrap();
925 assert_eq!(encoder.estimated_data_encoded_size(), flush_size);
926 }
927
928 run_test::<Int32Type>(Encoding::PLAIN, -1, &[123; 1024], 0, 4096, 0);
930
931 run_test::<Int32Type>(Encoding::RLE_DICTIONARY, -1, &[123, 1024], 0, 2, 0);
935
936 run_test::<Int32Type>(Encoding::DELTA_BINARY_PACKED, -1, &[123; 1024], 0, 35, 0);
938
939 let mut values = vec![];
941 values.extend_from_slice(&[true; 16]);
942 values.extend_from_slice(&[false; 16]);
943 run_test::<BoolType>(Encoding::RLE, -1, &values, 0, 6, 0);
944
945 run_test::<ByteArrayType>(
947 Encoding::DELTA_LENGTH_BYTE_ARRAY,
948 -1,
949 &[ByteArray::from("ab"), ByteArray::from("abc")],
950 0,
951 5, 0,
953 );
954
955 run_test::<ByteArrayType>(
957 Encoding::DELTA_BYTE_ARRAY,
958 -1,
959 &[ByteArray::from("ab"), ByteArray::from("abc")],
960 0,
961 3, 0,
963 );
964
965 run_test::<FloatType>(Encoding::BYTE_STREAM_SPLIT, -1, &[0.1, 0.2], 0, 8, 0);
967 }
968
969 #[test]
970 fn test_byte_stream_split_example_f32() {
971 let mut encoder = create_test_encoder::<FloatType>(0, Encoding::BYTE_STREAM_SPLIT);
973 let mut decoder = create_test_decoder::<FloatType>(0, Encoding::BYTE_STREAM_SPLIT);
974
975 let input = vec![
976 f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
977 f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
978 f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6]),
979 ];
980
981 encoder.put(&input).unwrap();
982 let encoded = encoder.flush_buffer().unwrap();
983
984 assert_eq!(
985 encoded,
986 Bytes::from(vec![
987 0xAA_u8, 0x00, 0xA3, 0xBB, 0x11, 0xB4, 0xCC, 0x22, 0xC5, 0xDD, 0x33, 0xD6
988 ])
989 );
990
991 let mut decoded = vec![0.0; input.len()];
992 decoder.set_data(encoded, input.len()).unwrap();
993 decoder.get(&mut decoded).unwrap();
994
995 assert_eq!(decoded, input);
996 }
997
998 #[test]
1000 fn test_issue_47() {
1001 let mut encoder = create_test_encoder::<ByteArrayType>(0, Encoding::DELTA_BYTE_ARRAY);
1002 let mut decoder = create_test_decoder::<ByteArrayType>(0, Encoding::DELTA_BYTE_ARRAY);
1003
1004 let input = vec![
1005 ByteArray::from("aa"),
1006 ByteArray::from("aaa"),
1007 ByteArray::from("aa"),
1008 ByteArray::from("aaa"),
1009 ];
1010
1011 let mut output = vec![ByteArray::default(); input.len()];
1012
1013 let mut result = put_and_get(&mut encoder, &mut decoder, &input[..2], &mut output[..2]);
1014 assert!(
1015 result.is_ok(),
1016 "first put_and_get() failed with: {}",
1017 result.unwrap_err()
1018 );
1019 result = put_and_get(&mut encoder, &mut decoder, &input[2..], &mut output[2..]);
1020 assert!(
1021 result.is_ok(),
1022 "second put_and_get() failed with: {}",
1023 result.unwrap_err()
1024 );
1025 assert_eq!(output, input);
1026 }
1027
1028 trait EncodingTester<T: DataType> {
1029 fn test(enc: Encoding, total: usize, type_length: i32) {
1030 let result = match enc {
1031 Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => {
1032 Self::test_dict_internal(total, type_length)
1033 }
1034 enc => Self::test_internal(enc, total, type_length),
1035 };
1036
1037 assert!(
1038 result.is_ok(),
1039 "Expected result to be OK but got err:\n {}",
1040 result.unwrap_err()
1041 );
1042 }
1043
1044 fn test_internal(enc: Encoding, total: usize, type_length: i32) -> Result<()>;
1045
1046 fn test_dict_internal(total: usize, type_length: i32) -> Result<()>;
1047 }
1048
1049 impl<T: DataType + RandGen<T>> EncodingTester<T> for T {
1050 fn test_internal(enc: Encoding, total: usize, type_length: i32) -> Result<()> {
1051 let mut encoder = create_test_encoder::<T>(type_length, enc);
1052 let mut decoder = create_test_decoder::<T>(type_length, enc);
1053 let mut values = <T as RandGen<T>>::gen_vec(type_length, total);
1054 let mut result_data = vec![T::T::default(); total];
1055
1056 let num_bytes = bit_util::ceil(total as i64, 8);
1058 let valid_bits = random_bytes(num_bytes as usize);
1059 let values_written = encoder.put_spaced(&values[..], &valid_bits[..])?;
1060 let data = encoder.flush_buffer()?;
1061 decoder.set_data(data, values_written)?;
1062 let _ = decoder.get_spaced(
1063 &mut result_data[..],
1064 values.len() - values_written,
1065 &valid_bits[..],
1066 )?;
1067
1068 for i in 0..total {
1070 if bit_util::get_bit(&valid_bits[..], i) {
1071 assert_eq!(result_data[i], values[i]);
1072 } else {
1073 assert_eq!(result_data[i], T::T::default());
1074 }
1075 }
1076
1077 let mut actual_total = put_and_get(
1078 &mut encoder,
1079 &mut decoder,
1080 &values[..],
1081 &mut result_data[..],
1082 )?;
1083 assert_eq!(actual_total, total);
1084 assert_eq!(result_data, values);
1085
1086 values = <T as RandGen<T>>::gen_vec(type_length, total);
1089 actual_total = put_and_get(
1090 &mut encoder,
1091 &mut decoder,
1092 &values[..],
1093 &mut result_data[..],
1094 )?;
1095 assert_eq!(actual_total, total);
1096 assert_eq!(result_data, values);
1097
1098 Ok(())
1099 }
1100
1101 fn test_dict_internal(total: usize, type_length: i32) -> Result<()> {
1102 let mut encoder = create_test_dict_encoder::<T>(type_length);
1103 let mut values = <T as RandGen<T>>::gen_vec(type_length, total);
1104 encoder.put(&values[..])?;
1105
1106 let mut data = encoder.flush_buffer()?;
1107 let mut decoder = create_test_dict_decoder::<T>();
1108 let mut dict_decoder = PlainDecoder::<T>::new(type_length);
1109 dict_decoder.set_data(encoder.write_dict()?, encoder.num_entries())?;
1110 decoder.set_dict(Box::new(dict_decoder))?;
1111 let mut result_data = vec![T::T::default(); total];
1112 decoder.set_data(data, total)?;
1113 let mut actual_total = decoder.get(&mut result_data)?;
1114
1115 assert_eq!(actual_total, total);
1116 assert_eq!(result_data, values);
1117
1118 values = <T as RandGen<T>>::gen_vec(type_length, total);
1121 encoder.put(&values[..])?;
1122 data = encoder.flush_buffer()?;
1123
1124 let mut dict_decoder = PlainDecoder::<T>::new(type_length);
1125 dict_decoder.set_data(encoder.write_dict()?, encoder.num_entries())?;
1126 decoder.set_dict(Box::new(dict_decoder))?;
1127 decoder.set_data(data, total)?;
1128 actual_total = decoder.get(&mut result_data)?;
1129
1130 assert_eq!(actual_total, total);
1131 assert_eq!(result_data, values);
1132
1133 Ok(())
1134 }
1135 }
1136
1137 fn put_and_get<T: DataType>(
1138 encoder: &mut Box<dyn Encoder<T>>,
1139 decoder: &mut Box<dyn Decoder<T>>,
1140 input: &[T::T],
1141 output: &mut [T::T],
1142 ) -> Result<usize> {
1143 encoder.put(input)?;
1144 let data = encoder.flush_buffer()?;
1145 decoder.set_data(data, input.len())?;
1146 decoder.get(output)
1147 }
1148
1149 fn create_and_check_encoder<T: DataType>(
1150 type_length: i32,
1151 encoding: Encoding,
1152 err: Option<ParquetError>,
1153 ) {
1154 let desc = create_test_col_desc_ptr(type_length, T::get_physical_type());
1155 let encoder = get_encoder::<T>(encoding, &desc);
1156 match err {
1157 Some(parquet_error) => {
1158 assert_eq!(
1159 encoder.err().unwrap().to_string(),
1160 parquet_error.to_string()
1161 )
1162 }
1163 None => assert_eq!(encoder.unwrap().encoding(), encoding),
1164 }
1165 }
1166
1167 fn create_test_col_desc_ptr(type_len: i32, t: Type) -> ColumnDescPtr {
1169 let ty = SchemaType::primitive_type_builder("t", t)
1170 .with_length(type_len)
1171 .build()
1172 .unwrap();
1173 Arc::new(ColumnDescriptor::new(
1174 Arc::new(ty),
1175 0,
1176 0,
1177 ColumnPath::new(vec![]),
1178 ))
1179 }
1180
1181 fn create_test_encoder<T: DataType>(type_len: i32, enc: Encoding) -> Box<dyn Encoder<T>> {
1182 let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
1183 get_encoder(enc, &desc).unwrap()
1184 }
1185
1186 fn create_test_decoder<T: DataType>(type_len: i32, enc: Encoding) -> Box<dyn Decoder<T>> {
1187 let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
1188 get_decoder(desc, enc).unwrap()
1189 }
1190
1191 fn create_test_dict_encoder<T: DataType>(type_len: i32) -> DictEncoder<T> {
1192 let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
1193 DictEncoder::<T>::new(desc)
1194 }
1195
1196 fn create_test_dict_decoder<T: DataType>() -> DictDecoder<T> {
1197 DictDecoder::<T>::new()
1198 }
1199}