1use std::{cmp, mem::size_of};
38
39use bytes::Bytes;
40
41use crate::errors::{ParquetError, Result};
42use crate::util::bit_util::{self, BitReader, BitWriter, FromBytes};
43
44const MAX_GROUPS_PER_BIT_PACKED_RUN: usize = 1 << 6;
46
47pub struct RleEncoder {
50 bit_width: u8,
52
53 bit_writer: BitWriter,
55
56 buffered_values: [u64; 8],
58
59 num_buffered_values: usize,
61
62 current_value: u64,
65
66 repeat_count: usize,
69
70 bit_packed_count: usize,
73
74 indicator_byte_pos: i64,
76}
77
78impl RleEncoder {
79 #[allow(unused)]
80 pub fn new(bit_width: u8, buffer_len: usize) -> Self {
81 let buffer = Vec::with_capacity(buffer_len);
82 RleEncoder::new_from_buf(bit_width, buffer)
83 }
84
85 pub fn new_from_buf(bit_width: u8, buffer: Vec<u8>) -> Self {
87 assert!(bit_width <= 64);
88 let bit_writer = BitWriter::new_from_buf(buffer);
89 RleEncoder {
90 bit_width,
91 bit_writer,
92 buffered_values: [0; 8],
93 num_buffered_values: 0,
94 current_value: 0,
95 repeat_count: 0,
96 bit_packed_count: 0,
97 indicator_byte_pos: -1,
98 }
99 }
100
101 pub fn max_buffer_size(bit_width: u8, num_values: usize) -> usize {
104 let num_runs = bit_util::ceil(num_values, 8);
106
107 let bytes_per_run = bit_width as usize;
109
110 let bit_packed_max_size = num_runs + num_runs * bytes_per_run;
112
113 let rle_len_prefix = 1;
115
116 let min_rle_run_size = rle_len_prefix + bit_util::ceil(bit_width as usize, 8);
118
119 let rle_max_size = num_runs * min_rle_run_size;
121
122 bit_packed_max_size.max(rle_max_size)
123 }
124
125 #[inline]
127 pub fn put(&mut self, value: u64) {
128 if self.current_value == value {
131 self.repeat_count += 1;
132 if self.repeat_count > 8 {
133 return;
135 }
136 } else {
137 if self.repeat_count >= 8 {
138 debug_assert_eq!(self.bit_packed_count, 0);
140 self.flush_rle_run();
141 }
142 self.repeat_count = 1;
143 self.current_value = value;
144 }
145
146 self.buffered_values[self.num_buffered_values] = value;
147 self.num_buffered_values += 1;
148 if self.num_buffered_values == 8 {
149 debug_assert_eq!(self.bit_packed_count % 8, 0);
151 self.flush_buffered_values();
152 }
153 }
154
155 #[inline]
156 #[allow(unused)]
157 pub fn buffer(&self) -> &[u8] {
158 self.bit_writer.buffer()
159 }
160
161 #[inline]
162 pub fn len(&self) -> usize {
163 self.bit_writer.bytes_written()
164 }
165
166 #[allow(unused)]
167 pub fn is_empty(&self) -> bool {
168 self.bit_writer.bytes_written() == 0
169 }
170
171 #[inline]
172 pub fn consume(mut self) -> Vec<u8> {
173 self.flush();
174 self.bit_writer.consume()
175 }
176
177 #[inline]
180 #[allow(unused)]
181 pub fn flush_buffer(&mut self) -> &[u8] {
182 self.flush();
183 self.bit_writer.flush_buffer()
184 }
185
186 #[inline]
189 #[allow(unused)]
190 pub fn clear(&mut self) {
191 self.bit_writer.clear();
192 self.num_buffered_values = 0;
193 self.current_value = 0;
194 self.repeat_count = 0;
195 self.bit_packed_count = 0;
196 self.indicator_byte_pos = -1;
197 }
198
199 #[inline]
202 pub fn flush(&mut self) {
203 if self.bit_packed_count > 0 || self.repeat_count > 0 || self.num_buffered_values > 0 {
204 let all_repeat = self.bit_packed_count == 0
205 && (self.repeat_count == self.num_buffered_values || self.num_buffered_values == 0);
206 if self.repeat_count > 0 && all_repeat {
207 self.flush_rle_run();
208 } else {
209 if self.num_buffered_values > 0 {
211 while self.num_buffered_values < 8 {
212 self.buffered_values[self.num_buffered_values] = 0;
213 self.num_buffered_values += 1;
214 }
215 }
216 self.bit_packed_count += self.num_buffered_values;
217 self.flush_bit_packed_run(true);
218 self.repeat_count = 0;
219 }
220 }
221 }
222
223 fn flush_rle_run(&mut self) {
224 debug_assert!(self.repeat_count > 0);
225 let indicator_value = self.repeat_count << 1;
226 self.bit_writer.put_vlq_int(indicator_value as u64);
227 self.bit_writer.put_aligned(
228 self.current_value,
229 bit_util::ceil(self.bit_width as usize, 8),
230 );
231 self.num_buffered_values = 0;
232 self.repeat_count = 0;
233 }
234
235 fn flush_bit_packed_run(&mut self, update_indicator_byte: bool) {
236 if self.indicator_byte_pos < 0 {
237 self.indicator_byte_pos = self.bit_writer.skip(1) as i64;
238 }
239
240 for v in &self.buffered_values[..self.num_buffered_values] {
242 self.bit_writer.put_value(*v, self.bit_width as usize);
243 }
244 self.num_buffered_values = 0;
245 if update_indicator_byte {
246 let num_groups = self.bit_packed_count / 8;
248 let indicator_byte = ((num_groups << 1) | 1) as u8;
249 self.bit_writer
250 .put_aligned_offset(indicator_byte, 1, self.indicator_byte_pos as usize);
251 self.indicator_byte_pos = -1;
252 self.bit_packed_count = 0;
253 }
254 }
255
256 fn flush_buffered_values(&mut self) {
257 if self.repeat_count >= 8 {
258 self.num_buffered_values = 0;
259 if self.bit_packed_count > 0 {
260 debug_assert_eq!(self.bit_packed_count % 8, 0);
263 self.flush_bit_packed_run(true)
264 }
265 return;
266 }
267
268 self.bit_packed_count += self.num_buffered_values;
269 let num_groups = self.bit_packed_count / 8;
270 if num_groups + 1 >= MAX_GROUPS_PER_BIT_PACKED_RUN {
271 debug_assert!(self.indicator_byte_pos >= 0);
274 self.flush_bit_packed_run(true);
275 } else {
276 self.flush_bit_packed_run(false);
277 }
278 self.repeat_count = 0;
279 }
280
281 pub(crate) fn estimated_memory_size(&self) -> usize {
283 self.bit_writer.estimated_memory_size() + std::mem::size_of::<Self>()
284 }
285}
286
287const RLE_DECODER_INDEX_BUFFER_SIZE: usize = 1024;
289
290pub struct RleDecoder {
292 bit_width: u8,
294
295 bit_reader: Option<BitReader>,
297
298 index_buf: Option<Box<[i32; RLE_DECODER_INDEX_BUFFER_SIZE]>>,
300
301 rle_left: u32,
303
304 bit_packed_left: u32,
306
307 current_value: Option<u64>,
309}
310
311impl RleDecoder {
312 pub fn new(bit_width: u8) -> Self {
313 RleDecoder {
314 bit_width,
315 rle_left: 0,
316 bit_packed_left: 0,
317 bit_reader: None,
318 index_buf: None,
319 current_value: None,
320 }
321 }
322
323 #[inline]
324 pub fn set_data(&mut self, data: Bytes) -> Result<()> {
325 if let Some(ref mut bit_reader) = self.bit_reader {
326 bit_reader.reset(data);
327 } else {
328 self.bit_reader = Some(BitReader::new(data));
329 }
330
331 let _ = self.reload()?;
335 Ok(())
336 }
337
338 #[inline(never)]
341 #[allow(unused)]
342 pub fn get<T: FromBytes>(&mut self) -> Result<Option<T>> {
343 assert!(size_of::<T>() <= 8);
344
345 while self.rle_left == 0 && self.bit_packed_left == 0 {
346 if !self.reload()? {
347 return Ok(None);
348 }
349 }
350
351 let value = if self.rle_left > 0 {
352 let rle_value = T::try_from_le_slice(
353 &self
354 .current_value
355 .as_mut()
356 .ok_or_else(|| general_err!("current_value should be Some"))?
357 .to_ne_bytes(),
358 )?;
359 self.rle_left -= 1;
360 rle_value
361 } else {
362 let bit_reader = self
364 .bit_reader
365 .as_mut()
366 .ok_or_else(|| general_err!("bit_reader should be Some"))?;
367 let bit_packed_value = bit_reader
368 .get_value(self.bit_width as usize)
369 .ok_or_else(|| eof_err!("Not enough data for 'bit_packed_value'"))?;
370 self.bit_packed_left -= 1;
371 bit_packed_value
372 };
373
374 Ok(Some(value))
375 }
376
377 #[inline(never)]
378 pub fn get_batch<T: FromBytes + Clone>(&mut self, buffer: &mut [T]) -> Result<usize> {
379 assert!(size_of::<T>() <= 8);
380
381 let mut values_read = 0;
382 while values_read < buffer.len() {
383 if self.rle_left > 0 {
384 let num_values = cmp::min(buffer.len() - values_read, self.rle_left as usize);
385 let repeated_value =
386 T::try_from_le_slice(&self.current_value.as_mut().unwrap().to_ne_bytes())?;
387 buffer[values_read..values_read + num_values].fill(repeated_value);
388 self.rle_left -= num_values as u32;
389 values_read += num_values;
390 } else if self.bit_packed_left > 0 {
391 let mut num_values =
392 cmp::min(buffer.len() - values_read, self.bit_packed_left as usize);
393 let bit_reader = self
394 .bit_reader
395 .as_mut()
396 .ok_or_else(|| ParquetError::General("bit_reader should be set".into()))?;
397
398 num_values = bit_reader.get_batch::<T>(
399 &mut buffer[values_read..values_read + num_values],
400 self.bit_width as usize,
401 );
402 if num_values == 0 {
403 self.bit_packed_left = 0;
405 continue;
406 }
407 self.bit_packed_left -= num_values as u32;
408 values_read += num_values;
409 } else if !self.reload()? {
410 break;
411 }
412 }
413
414 Ok(values_read)
415 }
416
417 #[inline(never)]
418 pub fn skip(&mut self, num_values: usize) -> Result<usize> {
419 let mut values_skipped = 0;
420 while values_skipped < num_values {
421 if self.rle_left > 0 {
422 let num_values = cmp::min(num_values - values_skipped, self.rle_left as usize);
423 self.rle_left -= num_values as u32;
424 values_skipped += num_values;
425 } else if self.bit_packed_left > 0 {
426 let mut num_values =
427 cmp::min(num_values - values_skipped, self.bit_packed_left as usize);
428 let bit_reader = self
429 .bit_reader
430 .as_mut()
431 .ok_or_else(|| general_err!("bit_reader should be set"))?;
432
433 num_values = bit_reader.skip(num_values, self.bit_width as usize);
434 if num_values == 0 {
435 self.bit_packed_left = 0;
437 continue;
438 }
439 self.bit_packed_left -= num_values as u32;
440 values_skipped += num_values;
441 } else if !self.reload()? {
442 break;
443 }
444 }
445
446 Ok(values_skipped)
447 }
448
449 #[inline(never)]
450 pub fn get_batch_with_dict<T>(
451 &mut self,
452 dict: &[T],
453 buffer: &mut [T],
454 max_values: usize,
455 ) -> Result<usize>
456 where
457 T: Default + Clone,
458 {
459 assert!(buffer.len() >= max_values);
460
461 let mut values_read = 0;
462 while values_read < max_values {
463 let index_buf = self.index_buf.get_or_insert_with(|| Box::new([0; 1024]));
464
465 if self.rle_left > 0 {
466 let num_values = cmp::min(max_values - values_read, self.rle_left as usize);
467 let dict_idx = self.current_value.unwrap() as usize;
468 let dict_value = dict[dict_idx].clone();
469
470 buffer[values_read..values_read + num_values].fill(dict_value);
471
472 self.rle_left -= num_values as u32;
473 values_read += num_values;
474 } else if self.bit_packed_left > 0 {
475 let bit_reader = self
476 .bit_reader
477 .as_mut()
478 .ok_or_else(|| general_err!("bit_reader should be set"))?;
479
480 loop {
481 let to_read = index_buf
482 .len()
483 .min(max_values - values_read)
484 .min(self.bit_packed_left as usize);
485
486 if to_read == 0 {
487 break;
488 }
489
490 let num_values = bit_reader
491 .get_batch::<i32>(&mut index_buf[..to_read], self.bit_width as usize);
492 if num_values == 0 {
493 self.bit_packed_left = 0;
495 break;
496 }
497 buffer[values_read..values_read + num_values]
498 .iter_mut()
499 .zip(index_buf[..num_values].iter())
500 .for_each(|(b, i)| b.clone_from(&dict[*i as usize]));
501 self.bit_packed_left -= num_values as u32;
502 values_read += num_values;
503 if num_values < to_read {
504 break;
505 }
506 }
507 } else if !self.reload()? {
508 break;
509 }
510 }
511
512 Ok(values_read)
513 }
514
515 #[inline]
516 fn reload(&mut self) -> Result<bool> {
517 let bit_reader = self
518 .bit_reader
519 .as_mut()
520 .ok_or_else(|| general_err!("bit_reader should be set"))?;
521
522 if let Some(indicator_value) = bit_reader.get_vlq_int() {
523 if indicator_value == 0 {
527 return Ok(false);
528 }
529 if indicator_value & 1 == 1 {
530 self.bit_packed_left = ((indicator_value >> 1) * 8) as u32;
531 } else {
532 self.rle_left = (indicator_value >> 1) as u32;
533 let value_width = bit_util::ceil(self.bit_width as usize, 8);
534 self.current_value = bit_reader.get_aligned::<u64>(value_width);
535 self.current_value.ok_or_else(|| {
536 general_err!("parquet_data_error: not enough data for RLE decoding")
537 })?;
538 }
539 Ok(true)
540 } else {
541 Ok(false)
542 }
543 }
544}
545
546#[cfg(test)]
547mod tests {
548 use super::*;
549
550 use crate::util::bit_util::ceil;
551 use rand::{self, Rng, SeedableRng, distr::StandardUniform, rng};
552
553 const MAX_WIDTH: usize = 32;
554
555 #[test]
556 fn test_rle_decode_int32() {
557 let data = vec![0x03, 0x88, 0xC6, 0xFA];
560 let mut decoder: RleDecoder = RleDecoder::new(3);
561 decoder.set_data(data.into()).unwrap();
562 let mut buffer = vec![0; 8];
563 let expected = vec![0, 1, 2, 3, 4, 5, 6, 7];
564 let result = decoder.get_batch::<i32>(&mut buffer);
565 assert!(result.is_ok());
566 assert_eq!(buffer, expected);
567 }
568
569 #[test]
570 fn test_rle_skip_int32() {
571 let data = vec![0x03, 0x88, 0xC6, 0xFA];
574 let mut decoder: RleDecoder = RleDecoder::new(3);
575 decoder.set_data(data.into()).unwrap();
576 let expected = vec![2, 3, 4, 5, 6, 7];
577 let skipped = decoder.skip(2).expect("skipping values");
578 assert_eq!(skipped, 2);
579
580 let mut buffer = vec![0; 6];
581 let remaining = decoder
582 .get_batch::<i32>(&mut buffer)
583 .expect("getting remaining");
584 assert_eq!(remaining, 6);
585 assert_eq!(buffer, expected);
586 }
587
588 #[test]
589 fn test_rle_consume_flush_buffer() {
590 let data = vec![1, 1, 1, 2, 2, 3, 3, 3];
591 let mut encoder1 = RleEncoder::new(3, 256);
592 let mut encoder2 = RleEncoder::new(3, 256);
593 for value in data {
594 encoder1.put(value as u64);
595 encoder2.put(value as u64);
596 }
597 let res1 = encoder1.flush_buffer();
598 let res2 = encoder2.consume();
599 assert_eq!(res1, &res2[..]);
600 }
601
602 #[test]
603 fn test_rle_decode_bool() {
604 let data1 = vec![0x64, 0x01, 0x64, 0x00];
607
608 let data2 = vec![
612 0x1B, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x0A,
613 ];
614
615 let mut decoder: RleDecoder = RleDecoder::new(1);
616 decoder.set_data(data1.into()).unwrap();
617 let mut buffer = vec![false; 100];
618 let mut expected = vec![];
619 for i in 0..100 {
620 if i < 50 {
621 expected.push(true);
622 } else {
623 expected.push(false);
624 }
625 }
626 let result = decoder.get_batch::<bool>(&mut buffer);
627 assert!(result.is_ok());
628 assert_eq!(buffer, expected);
629
630 decoder.set_data(data2.into()).unwrap();
631 let mut buffer = vec![false; 100];
632 let mut expected = vec![];
633 for i in 0..100 {
634 if i % 2 == 0 {
635 expected.push(false);
636 } else {
637 expected.push(true);
638 }
639 }
640 let result = decoder.get_batch::<bool>(&mut buffer);
641 assert!(result.is_ok());
642 assert_eq!(buffer, expected);
643 }
644
645 #[test]
646 fn test_rle_skip_bool() {
647 let data1 = vec![0x64, 0x01, 0x64, 0x00];
650
651 let data2 = vec![
655 0x1B, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x0A,
656 ];
657
658 let mut decoder: RleDecoder = RleDecoder::new(1);
659 decoder.set_data(data1.into()).unwrap();
660 let mut buffer = vec![true; 50];
661 let expected = vec![false; 50];
662
663 let skipped = decoder.skip(50).expect("skipping first 50");
664 assert_eq!(skipped, 50);
665 let remainder = decoder
666 .get_batch::<bool>(&mut buffer)
667 .expect("getting remaining 50");
668 assert_eq!(remainder, 50);
669 assert_eq!(buffer, expected);
670
671 decoder.set_data(data2.into()).unwrap();
672 let mut buffer = vec![false; 50];
673 let mut expected = vec![];
674 for i in 0..50 {
675 if i % 2 == 0 {
676 expected.push(false);
677 } else {
678 expected.push(true);
679 }
680 }
681 let skipped = decoder.skip(50).expect("skipping first 50");
682 assert_eq!(skipped, 50);
683 let remainder = decoder
684 .get_batch::<bool>(&mut buffer)
685 .expect("getting remaining 50");
686 assert_eq!(remainder, 50);
687 assert_eq!(buffer, expected);
688 }
689
690 #[test]
691 fn test_rle_decode_with_dict_int32() {
692 let dict = vec![10, 20, 30];
695 let data = vec![0x06, 0x00, 0x08, 0x01, 0x0A, 0x02];
696 let mut decoder: RleDecoder = RleDecoder::new(3);
697 decoder.set_data(data.into()).unwrap();
698 let mut buffer = vec![0; 12];
699 let expected = vec![10, 10, 10, 20, 20, 20, 20, 30, 30, 30, 30, 30];
700 let result = decoder.get_batch_with_dict::<i32>(&dict, &mut buffer, 12);
701 assert!(result.is_ok());
702 assert_eq!(buffer, expected);
703
704 let dict = vec!["aaa", "bbb", "ccc", "ddd", "eee", "fff"];
708 let data = vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B];
709 let mut decoder: RleDecoder = RleDecoder::new(3);
710 decoder.set_data(data.into()).unwrap();
711 let mut buffer = vec![""; 12];
712 let expected = vec![
713 "ddd", "eee", "fff", "ddd", "eee", "fff", "ddd", "eee", "fff", "eee", "fff", "fff",
714 ];
715 let result =
716 decoder.get_batch_with_dict::<&str>(dict.as_slice(), buffer.as_mut_slice(), 12);
717 assert!(result.is_ok());
718 assert_eq!(buffer, expected);
719 }
720
721 #[test]
722 fn test_rle_skip_dict() {
723 let dict = vec![10, 20, 30];
726 let data = vec![0x06, 0x00, 0x08, 0x01, 0x0A, 0x02];
727 let mut decoder: RleDecoder = RleDecoder::new(3);
728 decoder.set_data(data.into()).unwrap();
729 let mut buffer = vec![0; 10];
730 let expected = vec![10, 20, 20, 20, 20, 30, 30, 30, 30, 30];
731 let skipped = decoder.skip(2).expect("skipping two values");
732 assert_eq!(skipped, 2);
733 let remainder = decoder
734 .get_batch_with_dict::<i32>(&dict, &mut buffer, 10)
735 .expect("getting remainder");
736 assert_eq!(remainder, 10);
737 assert_eq!(buffer, expected);
738
739 let dict = vec!["aaa", "bbb", "ccc", "ddd", "eee", "fff"];
743 let data = vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B];
744 let mut decoder: RleDecoder = RleDecoder::new(3);
745 decoder.set_data(data.into()).unwrap();
746 let mut buffer = vec![""; 8];
747 let expected = vec!["eee", "fff", "ddd", "eee", "fff", "eee", "fff", "fff"];
748 let skipped = decoder.skip(4).expect("skipping four values");
749 assert_eq!(skipped, 4);
750 let remainder = decoder
751 .get_batch_with_dict::<&str>(dict.as_slice(), buffer.as_mut_slice(), 8)
752 .expect("getting remainder");
753 assert_eq!(remainder, 8);
754 assert_eq!(buffer, expected);
755 }
756
757 fn validate_rle(
758 values: &[i64],
759 bit_width: u8,
760 expected_encoding: Option<&[u8]>,
761 expected_len: i32,
762 ) {
763 let buffer_len = 64 * 1024;
764 let mut encoder = RleEncoder::new(bit_width, buffer_len);
765 for v in values {
766 encoder.put(*v as u64)
767 }
768 let buffer: Bytes = encoder.consume().into();
769 if expected_len != -1 {
770 assert_eq!(buffer.len(), expected_len as usize);
771 }
772 if let Some(b) = expected_encoding {
773 assert_eq!(buffer.as_ref(), b);
774 }
775
776 let mut decoder = RleDecoder::new(bit_width);
778 decoder.set_data(buffer.clone()).unwrap();
779 for v in values {
780 let val: i64 = decoder
781 .get()
782 .expect("get() should be OK")
783 .expect("get() should return more value");
784 assert_eq!(val, *v);
785 }
786
787 decoder.set_data(buffer).unwrap();
789 let mut values_read: Vec<i64> = vec![0; values.len()];
790 decoder
791 .get_batch(&mut values_read[..])
792 .expect("get_batch() should be OK");
793 assert_eq!(&values_read[..], values);
794 }
795
796 #[test]
797 fn test_rle_specific_sequences() {
798 let mut expected_buffer = Vec::new();
799 let mut values = vec![0; 50];
800 values.resize(100, 1);
801
802 expected_buffer.push(50 << 1);
803 expected_buffer.push(0);
804 expected_buffer.push(50 << 1);
805 expected_buffer.push(1);
806
807 for width in 1..9 {
808 validate_rle(&values[..], width, Some(&expected_buffer[..]), 4);
809 }
810 for width in 9..MAX_WIDTH + 1 {
811 validate_rle(
812 &values[..],
813 width as u8,
814 None,
815 2 * (1 + bit_util::ceil(width as i64, 8) as i32),
816 );
817 }
818
819 values.clear();
821 expected_buffer.clear();
822 for i in 0..101 {
823 values.push(i % 2);
824 }
825 let num_groups = bit_util::ceil(100, 8) as u8;
826 expected_buffer.push((num_groups << 1) | 1);
827 expected_buffer.resize(expected_buffer.len() + 100 / 8, 0b10101010);
828
829 expected_buffer.push(0b00001010);
831 validate_rle(
832 &values,
833 1,
834 Some(&expected_buffer[..]),
835 1 + num_groups as i32,
836 );
837 for width in 2..MAX_WIDTH + 1 {
838 let num_values = bit_util::ceil(100, 8) * 8;
839 validate_rle(
840 &values,
841 width as u8,
842 None,
843 1 + bit_util::ceil(width as i64 * num_values, 8) as i32,
844 );
845 }
846 }
847
848 fn test_rle_values(bit_width: usize, num_vals: usize, value: i32) {
851 let mod_val = if bit_width == 64 {
852 1
853 } else {
854 1u64 << bit_width
855 };
856 let mut values: Vec<i64> = vec![];
857 for v in 0..num_vals {
858 let val = if value == -1 {
859 v as i64 % mod_val as i64
860 } else {
861 value as i64
862 };
863 values.push(val);
864 }
865 validate_rle(&values, bit_width as u8, None, -1);
866 }
867
868 #[test]
869 fn test_values() {
870 for width in 1..MAX_WIDTH + 1 {
871 test_rle_values(width, 1, -1);
872 test_rle_values(width, 1024, -1);
873 test_rle_values(width, 1024, 0);
874 test_rle_values(width, 1024, 1);
875 }
876 }
877
878 #[test]
879 fn test_truncated_rle() {
880 let mut data: Vec<u8> = vec![
887 (3 << 1) | 1, ];
889 data.extend(std::iter::repeat_n(0xFF, 20));
890 let data: Bytes = data.into();
891
892 let mut decoder = RleDecoder::new(8);
893 decoder.set_data(data.clone()).unwrap();
894
895 let mut output = vec![0_u16; 100];
896 let read = decoder.get_batch(&mut output).unwrap();
897
898 assert_eq!(read, 20);
899 assert!(output.iter().take(20).all(|x| *x == 255));
900
901 decoder.set_data(data).unwrap();
903
904 let dict: Vec<u16> = (0..256).collect();
905 let mut output = vec![0_u16; 100];
906 let read = decoder
907 .get_batch_with_dict(&dict, &mut output, 100)
908 .unwrap();
909
910 assert_eq!(read, 20);
911 assert!(output.iter().take(20).all(|x| *x == 255));
912 }
913
914 #[test]
915 fn test_rle_padded() {
916 let values: Vec<i16> = vec![0, 1, 1, 3, 1, 0];
917 let bit_width = 2;
918 let buffer_len = RleEncoder::max_buffer_size(bit_width, values.len());
919 let mut encoder = RleEncoder::new(bit_width, buffer_len + 1);
920 for v in &values {
921 encoder.put(*v as u64)
922 }
923
924 let mut buffer = encoder.consume();
925 buffer.push(0);
926
927 let mut decoder = RleDecoder::new(bit_width);
928 decoder.set_data(buffer.into()).unwrap();
929
930 let mut actual_values: Vec<i16> = vec![0; 12];
933 let r = decoder
934 .get_batch(&mut actual_values)
935 .expect("get_batch() should be OK");
936
937 assert_eq!(r, 8);
940 assert_eq!(actual_values[..6], values);
941 assert_eq!(actual_values[6], 0);
942 assert_eq!(actual_values[7], 0);
943 }
944
945 #[test]
946 fn test_long_run() {
947 let mut writer = BitWriter::new(1024);
953 let bit_width = 1;
954
955 let num_values = 2002;
958
959 let run_bytes = ceil(num_values * bit_width, 8) as u64;
961 writer.put_vlq_int((run_bytes << 1) | 1);
962 for _ in 0..run_bytes {
963 writer.put_aligned(0xFF_u8, 1);
964 }
965 let buffer: Bytes = writer.consume().into();
966
967 let mut decoder = RleDecoder::new(1);
968 decoder.set_data(buffer.clone()).unwrap();
969
970 let mut decoded: Vec<i16> = vec![0; num_values];
971 let r = decoder.get_batch(&mut decoded).unwrap();
972 assert_eq!(r, num_values);
973 assert_eq!(vec![1; num_values], decoded);
974
975 decoder.set_data(buffer).unwrap();
976 let r = decoder
977 .get_batch_with_dict(&[0, 23], &mut decoded, num_values)
978 .unwrap();
979 assert_eq!(r, num_values);
980 assert_eq!(vec![23; num_values], decoded);
981 }
982
983 #[test]
984 fn test_rle_specific_roundtrip() {
985 let bit_width = 1;
986 let values: Vec<i16> = vec![0, 1, 1, 1, 1, 0, 0, 0, 0, 1];
987 let buffer_len = RleEncoder::max_buffer_size(bit_width, values.len());
988 let mut encoder = RleEncoder::new(bit_width, buffer_len);
989 for v in &values {
990 encoder.put(*v as u64)
991 }
992 let buffer = encoder.consume();
993 let mut decoder = RleDecoder::new(bit_width);
994 decoder.set_data(Bytes::from(buffer)).unwrap();
995 let mut actual_values: Vec<i16> = vec![0; values.len()];
996 decoder
997 .get_batch(&mut actual_values)
998 .expect("get_batch() should be OK");
999 assert_eq!(actual_values, values);
1000 }
1001
1002 fn test_round_trip(values: &[i32], bit_width: u8) {
1003 let buffer_len = 64 * 1024;
1004 let mut encoder = RleEncoder::new(bit_width, buffer_len);
1005 for v in values {
1006 encoder.put(*v as u64)
1007 }
1008
1009 let buffer = Bytes::from(encoder.consume());
1010
1011 let mut decoder = RleDecoder::new(bit_width);
1013 decoder.set_data(buffer.clone()).unwrap();
1014 for v in values {
1015 let val = decoder
1016 .get::<i32>()
1017 .expect("get() should be OK")
1018 .expect("get() should return value");
1019 assert_eq!(val, *v);
1020 }
1021
1022 let mut decoder = RleDecoder::new(bit_width);
1024 decoder.set_data(buffer).unwrap();
1025 let mut values_read: Vec<i32> = vec![0; values.len()];
1026 decoder
1027 .get_batch(&mut values_read[..])
1028 .expect("get_batch() should be OK");
1029 assert_eq!(&values_read[..], values);
1030 }
1031
1032 #[test]
1033 fn test_random() {
1034 let seed_len = 32;
1035 let niters = 50;
1036 let ngroups = 1000;
1037 let max_group_size = 15;
1038 let mut values = vec![];
1039
1040 for _ in 0..niters {
1041 values.clear();
1042 let rng = rng();
1043 let seed_vec: Vec<u8> = rng
1044 .sample_iter::<u8, _>(&StandardUniform)
1045 .take(seed_len)
1046 .collect();
1047 let mut seed = [0u8; 32];
1048 seed.copy_from_slice(&seed_vec[0..seed_len]);
1049 let mut r#gen = rand::rngs::StdRng::from_seed(seed);
1050
1051 let mut parity = false;
1052 for _ in 0..ngroups {
1053 let mut group_size = r#gen.random_range(1..20);
1054 if group_size > max_group_size {
1055 group_size = 1;
1056 }
1057 for _ in 0..group_size {
1058 values.push(parity as i32);
1059 }
1060 parity = !parity;
1061 }
1062 let bit_width = bit_util::num_required_bits(values.len() as u64);
1063 assert!(bit_width < 64);
1064 test_round_trip(&values[..], bit_width);
1065 }
1066 }
1067}