1use std::{cmp, mem::size_of};
38
39use bytes::Bytes;
40
41use crate::errors::{ParquetError, Result};
42use crate::util::bit_util::{self, BitReader, BitWriter, FromBytes};
43
44const MAX_GROUPS_PER_BIT_PACKED_RUN: usize = 1 << 6;
46
47pub struct RleEncoder {
50 bit_width: u8,
52
53 bit_writer: BitWriter,
55
56 buffered_values: [u64; 8],
58
59 num_buffered_values: usize,
61
62 current_value: u64,
65
66 repeat_count: usize,
69
70 bit_packed_count: usize,
73
74 indicator_byte_pos: i64,
76}
77
78impl RleEncoder {
79 #[allow(unused)]
80 pub fn new(bit_width: u8, buffer_len: usize) -> Self {
81 let buffer = Vec::with_capacity(buffer_len);
82 RleEncoder::new_from_buf(bit_width, buffer)
83 }
84
85 pub fn new_from_buf(bit_width: u8, buffer: Vec<u8>) -> Self {
87 let bit_writer = BitWriter::new_from_buf(buffer);
88 RleEncoder {
89 bit_width,
90 bit_writer,
91 buffered_values: [0; 8],
92 num_buffered_values: 0,
93 current_value: 0,
94 repeat_count: 0,
95 bit_packed_count: 0,
96 indicator_byte_pos: -1,
97 }
98 }
99
100 pub fn max_buffer_size(bit_width: u8, num_values: usize) -> usize {
103 let num_runs = bit_util::ceil(num_values, 8);
105
106 let bytes_per_run = bit_width as usize;
108
109 let bit_packed_max_size = num_runs + num_runs * bytes_per_run;
111
112 let rle_len_prefix = 1;
114
115 let min_rle_run_size = rle_len_prefix + bit_util::ceil(bit_width as usize, 8);
117
118 let rle_max_size = num_runs * min_rle_run_size;
120
121 bit_packed_max_size.max(rle_max_size)
122 }
123
124 #[inline]
126 pub fn put(&mut self, value: u64) {
127 if self.current_value == value {
130 self.repeat_count += 1;
131 if self.repeat_count > 8 {
132 return;
134 }
135 } else {
136 if self.repeat_count >= 8 {
137 assert_eq!(self.bit_packed_count, 0);
139 self.flush_rle_run();
140 }
141 self.repeat_count = 1;
142 self.current_value = value;
143 }
144
145 self.buffered_values[self.num_buffered_values] = value;
146 self.num_buffered_values += 1;
147 if self.num_buffered_values == 8 {
148 assert_eq!(self.bit_packed_count % 8, 0);
150 self.flush_buffered_values();
151 }
152 }
153
154 #[inline]
155 #[allow(unused)]
156 pub fn buffer(&self) -> &[u8] {
157 self.bit_writer.buffer()
158 }
159
160 #[inline]
161 pub fn len(&self) -> usize {
162 self.bit_writer.bytes_written()
163 }
164
165 #[allow(unused)]
166 pub fn is_empty(&self) -> bool {
167 self.bit_writer.bytes_written() == 0
168 }
169
170 #[inline]
171 pub fn consume(mut self) -> Vec<u8> {
172 self.flush();
173 self.bit_writer.consume()
174 }
175
176 #[inline]
179 #[allow(unused)]
180 pub fn flush_buffer(&mut self) -> &[u8] {
181 self.flush();
182 self.bit_writer.flush_buffer()
183 }
184
185 #[inline]
188 #[allow(unused)]
189 pub fn clear(&mut self) {
190 self.bit_writer.clear();
191 self.num_buffered_values = 0;
192 self.current_value = 0;
193 self.repeat_count = 0;
194 self.bit_packed_count = 0;
195 self.indicator_byte_pos = -1;
196 }
197
198 #[inline]
201 pub fn flush(&mut self) {
202 if self.bit_packed_count > 0 || self.repeat_count > 0 || self.num_buffered_values > 0 {
203 let all_repeat = self.bit_packed_count == 0
204 && (self.repeat_count == self.num_buffered_values || self.num_buffered_values == 0);
205 if self.repeat_count > 0 && all_repeat {
206 self.flush_rle_run();
207 } else {
208 if self.num_buffered_values > 0 {
210 while self.num_buffered_values < 8 {
211 self.buffered_values[self.num_buffered_values] = 0;
212 self.num_buffered_values += 1;
213 }
214 }
215 self.bit_packed_count += self.num_buffered_values;
216 self.flush_bit_packed_run(true);
217 self.repeat_count = 0;
218 }
219 }
220 }
221
222 fn flush_rle_run(&mut self) {
223 assert!(self.repeat_count > 0);
224 let indicator_value = self.repeat_count << 1;
225 self.bit_writer.put_vlq_int(indicator_value as u64);
226 self.bit_writer.put_aligned(
227 self.current_value,
228 bit_util::ceil(self.bit_width as usize, 8),
229 );
230 self.num_buffered_values = 0;
231 self.repeat_count = 0;
232 }
233
234 fn flush_bit_packed_run(&mut self, update_indicator_byte: bool) {
235 if self.indicator_byte_pos < 0 {
236 self.indicator_byte_pos = self.bit_writer.skip(1) as i64;
237 }
238
239 for i in 0..self.num_buffered_values {
241 self.bit_writer
242 .put_value(self.buffered_values[i], self.bit_width as usize);
243 }
244 self.num_buffered_values = 0;
245 if update_indicator_byte {
246 let num_groups = self.bit_packed_count / 8;
248 let indicator_byte = ((num_groups << 1) | 1) as u8;
249 self.bit_writer
250 .put_aligned_offset(indicator_byte, 1, self.indicator_byte_pos as usize);
251 self.indicator_byte_pos = -1;
252 self.bit_packed_count = 0;
253 }
254 }
255
256 #[inline(never)]
257 fn flush_buffered_values(&mut self) {
258 if self.repeat_count >= 8 {
259 self.num_buffered_values = 0;
260 if self.bit_packed_count > 0 {
261 assert_eq!(self.bit_packed_count % 8, 0);
264 self.flush_bit_packed_run(true)
265 }
266 return;
267 }
268
269 self.bit_packed_count += self.num_buffered_values;
270 let num_groups = self.bit_packed_count / 8;
271 if num_groups + 1 >= MAX_GROUPS_PER_BIT_PACKED_RUN {
272 assert!(self.indicator_byte_pos >= 0);
275 self.flush_bit_packed_run(true);
276 } else {
277 self.flush_bit_packed_run(false);
278 }
279 self.repeat_count = 0;
280 }
281
282 pub(crate) fn estimated_memory_size(&self) -> usize {
284 self.bit_writer.estimated_memory_size() + std::mem::size_of::<Self>()
285 }
286}
287
288const RLE_DECODER_INDEX_BUFFER_SIZE: usize = 1024;
290
291pub struct RleDecoder {
293 bit_width: u8,
295
296 bit_reader: Option<BitReader>,
298
299 index_buf: Option<Box<[i32; RLE_DECODER_INDEX_BUFFER_SIZE]>>,
301
302 rle_left: u32,
304
305 bit_packed_left: u32,
307
308 current_value: Option<u64>,
310}
311
312impl RleDecoder {
313 pub fn new(bit_width: u8) -> Self {
314 RleDecoder {
315 bit_width,
316 rle_left: 0,
317 bit_packed_left: 0,
318 bit_reader: None,
319 index_buf: None,
320 current_value: None,
321 }
322 }
323
324 #[inline]
325 pub fn set_data(&mut self, data: Bytes) {
326 if let Some(ref mut bit_reader) = self.bit_reader {
327 bit_reader.reset(data);
328 } else {
329 self.bit_reader = Some(BitReader::new(data));
330 }
331
332 let _ = self.reload();
333 }
334
335 #[inline(never)]
338 #[allow(unused)]
339 pub fn get<T: FromBytes>(&mut self) -> Result<Option<T>> {
340 assert!(size_of::<T>() <= 8);
341
342 while self.rle_left == 0 && self.bit_packed_left == 0 {
343 if !self.reload() {
344 return Ok(None);
345 }
346 }
347
348 let value = if self.rle_left > 0 {
349 let rle_value = T::try_from_le_slice(
350 &self
351 .current_value
352 .as_mut()
353 .expect("current_value should be Some")
354 .to_ne_bytes(),
355 )?;
356 self.rle_left -= 1;
357 rle_value
358 } else {
359 let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be Some");
361 let bit_packed_value = bit_reader
362 .get_value(self.bit_width as usize)
363 .ok_or_else(|| eof_err!("Not enough data for 'bit_packed_value'"))?;
364 self.bit_packed_left -= 1;
365 bit_packed_value
366 };
367
368 Ok(Some(value))
369 }
370
371 #[inline(never)]
372 pub fn get_batch<T: FromBytes + Clone>(&mut self, buffer: &mut [T]) -> Result<usize> {
373 assert!(size_of::<T>() <= 8);
374
375 let mut values_read = 0;
376 while values_read < buffer.len() {
377 if self.rle_left > 0 {
378 let num_values = cmp::min(buffer.len() - values_read, self.rle_left as usize);
379 let repeated_value =
380 T::try_from_le_slice(&self.current_value.as_mut().unwrap().to_ne_bytes())?;
381 buffer[values_read..values_read + num_values].fill(repeated_value);
382 self.rle_left -= num_values as u32;
383 values_read += num_values;
384 } else if self.bit_packed_left > 0 {
385 let mut num_values =
386 cmp::min(buffer.len() - values_read, self.bit_packed_left as usize);
387 let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be set");
388
389 num_values = bit_reader.get_batch::<T>(
390 &mut buffer[values_read..values_read + num_values],
391 self.bit_width as usize,
392 );
393 if num_values == 0 {
394 self.bit_packed_left = 0;
396 continue;
397 }
398 self.bit_packed_left -= num_values as u32;
399 values_read += num_values;
400 } else if !self.reload() {
401 break;
402 }
403 }
404
405 Ok(values_read)
406 }
407
408 #[inline(never)]
409 pub fn skip(&mut self, num_values: usize) -> Result<usize> {
410 let mut values_skipped = 0;
411 while values_skipped < num_values {
412 if self.rle_left > 0 {
413 let num_values = cmp::min(num_values - values_skipped, self.rle_left as usize);
414 self.rle_left -= num_values as u32;
415 values_skipped += num_values;
416 } else if self.bit_packed_left > 0 {
417 let mut num_values =
418 cmp::min(num_values - values_skipped, self.bit_packed_left as usize);
419 let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be set");
420
421 num_values = bit_reader.skip(num_values, self.bit_width as usize);
422 if num_values == 0 {
423 self.bit_packed_left = 0;
425 continue;
426 }
427 self.bit_packed_left -= num_values as u32;
428 values_skipped += num_values;
429 } else if !self.reload() {
430 break;
431 }
432 }
433
434 Ok(values_skipped)
435 }
436
437 #[inline(never)]
438 pub fn get_batch_with_dict<T>(
439 &mut self,
440 dict: &[T],
441 buffer: &mut [T],
442 max_values: usize,
443 ) -> Result<usize>
444 where
445 T: Default + Clone,
446 {
447 assert!(buffer.len() >= max_values);
448
449 let mut values_read = 0;
450 while values_read < max_values {
451 let index_buf = self.index_buf.get_or_insert_with(|| Box::new([0; 1024]));
452
453 if self.rle_left > 0 {
454 let num_values = cmp::min(max_values - values_read, self.rle_left as usize);
455 let dict_idx = self.current_value.unwrap() as usize;
456 let dict_value = dict[dict_idx].clone();
457
458 buffer[values_read..values_read + num_values].fill(dict_value);
459
460 self.rle_left -= num_values as u32;
461 values_read += num_values;
462 } else if self.bit_packed_left > 0 {
463 let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be set");
464
465 loop {
466 let to_read = index_buf
467 .len()
468 .min(max_values - values_read)
469 .min(self.bit_packed_left as usize);
470
471 if to_read == 0 {
472 break;
473 }
474
475 let num_values = bit_reader
476 .get_batch::<i32>(&mut index_buf[..to_read], self.bit_width as usize);
477 if num_values == 0 {
478 self.bit_packed_left = 0;
480 break;
481 }
482 buffer[values_read..values_read + num_values]
483 .iter_mut()
484 .zip(index_buf[..num_values].iter())
485 .for_each(|(b, i)| b.clone_from(&dict[*i as usize]));
486 self.bit_packed_left -= num_values as u32;
487 values_read += num_values;
488 if num_values < to_read {
489 break;
490 }
491 }
492 } else if !self.reload() {
493 break;
494 }
495 }
496
497 Ok(values_read)
498 }
499
500 #[inline]
501 fn reload(&mut self) -> bool {
502 let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be set");
503
504 if let Some(indicator_value) = bit_reader.get_vlq_int() {
505 if indicator_value == 0 {
509 return false;
510 }
511 if indicator_value & 1 == 1 {
512 self.bit_packed_left = ((indicator_value >> 1) * 8) as u32;
513 } else {
514 self.rle_left = (indicator_value >> 1) as u32;
515 let value_width = bit_util::ceil(self.bit_width as usize, 8);
516 self.current_value = bit_reader.get_aligned::<u64>(value_width);
517 assert!(self.current_value.is_some());
518 }
519 true
520 } else {
521 false
522 }
523 }
524}
525
526#[cfg(test)]
527mod tests {
528 use super::*;
529
530 use crate::util::bit_util::ceil;
531 use rand::{self, distributions::Standard, thread_rng, Rng, SeedableRng};
532
533 const MAX_WIDTH: usize = 32;
534
535 #[test]
536 fn test_rle_decode_int32() {
537 let data = vec![0x03, 0x88, 0xC6, 0xFA];
540 let mut decoder: RleDecoder = RleDecoder::new(3);
541 decoder.set_data(data.into());
542 let mut buffer = vec![0; 8];
543 let expected = vec![0, 1, 2, 3, 4, 5, 6, 7];
544 let result = decoder.get_batch::<i32>(&mut buffer);
545 assert!(result.is_ok());
546 assert_eq!(buffer, expected);
547 }
548
549 #[test]
550 fn test_rle_skip_int32() {
551 let data = vec![0x03, 0x88, 0xC6, 0xFA];
554 let mut decoder: RleDecoder = RleDecoder::new(3);
555 decoder.set_data(data.into());
556 let expected = vec![2, 3, 4, 5, 6, 7];
557 let skipped = decoder.skip(2).expect("skipping values");
558 assert_eq!(skipped, 2);
559
560 let mut buffer = vec![0; 6];
561 let remaining = decoder
562 .get_batch::<i32>(&mut buffer)
563 .expect("getting remaining");
564 assert_eq!(remaining, 6);
565 assert_eq!(buffer, expected);
566 }
567
568 #[test]
569 fn test_rle_consume_flush_buffer() {
570 let data = vec![1, 1, 1, 2, 2, 3, 3, 3];
571 let mut encoder1 = RleEncoder::new(3, 256);
572 let mut encoder2 = RleEncoder::new(3, 256);
573 for value in data {
574 encoder1.put(value as u64);
575 encoder2.put(value as u64);
576 }
577 let res1 = encoder1.flush_buffer();
578 let res2 = encoder2.consume();
579 assert_eq!(res1, &res2[..]);
580 }
581
582 #[test]
583 fn test_rle_decode_bool() {
584 let data1 = vec![0x64, 0x01, 0x64, 0x00];
587
588 let data2 = vec![
592 0x1B, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x0A,
593 ];
594
595 let mut decoder: RleDecoder = RleDecoder::new(1);
596 decoder.set_data(data1.into());
597 let mut buffer = vec![false; 100];
598 let mut expected = vec![];
599 for i in 0..100 {
600 if i < 50 {
601 expected.push(true);
602 } else {
603 expected.push(false);
604 }
605 }
606 let result = decoder.get_batch::<bool>(&mut buffer);
607 assert!(result.is_ok());
608 assert_eq!(buffer, expected);
609
610 decoder.set_data(data2.into());
611 let mut buffer = vec![false; 100];
612 let mut expected = vec![];
613 for i in 0..100 {
614 if i % 2 == 0 {
615 expected.push(false);
616 } else {
617 expected.push(true);
618 }
619 }
620 let result = decoder.get_batch::<bool>(&mut buffer);
621 assert!(result.is_ok());
622 assert_eq!(buffer, expected);
623 }
624
625 #[test]
626 fn test_rle_skip_bool() {
627 let data1 = vec![0x64, 0x01, 0x64, 0x00];
630
631 let data2 = vec![
635 0x1B, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x0A,
636 ];
637
638 let mut decoder: RleDecoder = RleDecoder::new(1);
639 decoder.set_data(data1.into());
640 let mut buffer = vec![true; 50];
641 let expected = vec![false; 50];
642
643 let skipped = decoder.skip(50).expect("skipping first 50");
644 assert_eq!(skipped, 50);
645 let remainder = decoder
646 .get_batch::<bool>(&mut buffer)
647 .expect("getting remaining 50");
648 assert_eq!(remainder, 50);
649 assert_eq!(buffer, expected);
650
651 decoder.set_data(data2.into());
652 let mut buffer = vec![false; 50];
653 let mut expected = vec![];
654 for i in 0..50 {
655 if i % 2 == 0 {
656 expected.push(false);
657 } else {
658 expected.push(true);
659 }
660 }
661 let skipped = decoder.skip(50).expect("skipping first 50");
662 assert_eq!(skipped, 50);
663 let remainder = decoder
664 .get_batch::<bool>(&mut buffer)
665 .expect("getting remaining 50");
666 assert_eq!(remainder, 50);
667 assert_eq!(buffer, expected);
668 }
669
670 #[test]
671 fn test_rle_decode_with_dict_int32() {
672 let dict = vec![10, 20, 30];
675 let data = vec![0x06, 0x00, 0x08, 0x01, 0x0A, 0x02];
676 let mut decoder: RleDecoder = RleDecoder::new(3);
677 decoder.set_data(data.into());
678 let mut buffer = vec![0; 12];
679 let expected = vec![10, 10, 10, 20, 20, 20, 20, 30, 30, 30, 30, 30];
680 let result = decoder.get_batch_with_dict::<i32>(&dict, &mut buffer, 12);
681 assert!(result.is_ok());
682 assert_eq!(buffer, expected);
683
684 let dict = vec!["aaa", "bbb", "ccc", "ddd", "eee", "fff"];
688 let data = vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B];
689 let mut decoder: RleDecoder = RleDecoder::new(3);
690 decoder.set_data(data.into());
691 let mut buffer = vec![""; 12];
692 let expected = vec![
693 "ddd", "eee", "fff", "ddd", "eee", "fff", "ddd", "eee", "fff", "eee", "fff", "fff",
694 ];
695 let result =
696 decoder.get_batch_with_dict::<&str>(dict.as_slice(), buffer.as_mut_slice(), 12);
697 assert!(result.is_ok());
698 assert_eq!(buffer, expected);
699 }
700
701 #[test]
702 fn test_rle_skip_dict() {
703 let dict = vec![10, 20, 30];
706 let data = vec![0x06, 0x00, 0x08, 0x01, 0x0A, 0x02];
707 let mut decoder: RleDecoder = RleDecoder::new(3);
708 decoder.set_data(data.into());
709 let mut buffer = vec![0; 10];
710 let expected = vec![10, 20, 20, 20, 20, 30, 30, 30, 30, 30];
711 let skipped = decoder.skip(2).expect("skipping two values");
712 assert_eq!(skipped, 2);
713 let remainder = decoder
714 .get_batch_with_dict::<i32>(&dict, &mut buffer, 10)
715 .expect("getting remainder");
716 assert_eq!(remainder, 10);
717 assert_eq!(buffer, expected);
718
719 let dict = vec!["aaa", "bbb", "ccc", "ddd", "eee", "fff"];
723 let data = vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B];
724 let mut decoder: RleDecoder = RleDecoder::new(3);
725 decoder.set_data(data.into());
726 let mut buffer = vec![""; 8];
727 let expected = vec!["eee", "fff", "ddd", "eee", "fff", "eee", "fff", "fff"];
728 let skipped = decoder.skip(4).expect("skipping four values");
729 assert_eq!(skipped, 4);
730 let remainder = decoder
731 .get_batch_with_dict::<&str>(dict.as_slice(), buffer.as_mut_slice(), 8)
732 .expect("getting remainder");
733 assert_eq!(remainder, 8);
734 assert_eq!(buffer, expected);
735 }
736
737 fn validate_rle(
738 values: &[i64],
739 bit_width: u8,
740 expected_encoding: Option<&[u8]>,
741 expected_len: i32,
742 ) {
743 let buffer_len = 64 * 1024;
744 let mut encoder = RleEncoder::new(bit_width, buffer_len);
745 for v in values {
746 encoder.put(*v as u64)
747 }
748 let buffer: Bytes = encoder.consume().into();
749 if expected_len != -1 {
750 assert_eq!(buffer.len(), expected_len as usize);
751 }
752 if let Some(b) = expected_encoding {
753 assert_eq!(buffer.as_ref(), b);
754 }
755
756 let mut decoder = RleDecoder::new(bit_width);
758 decoder.set_data(buffer.clone());
759 for v in values {
760 let val: i64 = decoder
761 .get()
762 .expect("get() should be OK")
763 .expect("get() should return more value");
764 assert_eq!(val, *v);
765 }
766
767 decoder.set_data(buffer);
769 let mut values_read: Vec<i64> = vec![0; values.len()];
770 decoder
771 .get_batch(&mut values_read[..])
772 .expect("get_batch() should be OK");
773 assert_eq!(&values_read[..], values);
774 }
775
776 #[test]
777 fn test_rle_specific_sequences() {
778 let mut expected_buffer = Vec::new();
779 let mut values = vec![0; 50];
780 values.resize(100, 1);
781
782 expected_buffer.push(50 << 1);
783 expected_buffer.push(0);
784 expected_buffer.push(50 << 1);
785 expected_buffer.push(1);
786
787 for width in 1..9 {
788 validate_rle(&values[..], width, Some(&expected_buffer[..]), 4);
789 }
790 for width in 9..MAX_WIDTH + 1 {
791 validate_rle(
792 &values[..],
793 width as u8,
794 None,
795 2 * (1 + bit_util::ceil(width as i64, 8) as i32),
796 );
797 }
798
799 values.clear();
801 expected_buffer.clear();
802 for i in 0..101 {
803 values.push(i % 2);
804 }
805 let num_groups = bit_util::ceil(100, 8) as u8;
806 expected_buffer.push((num_groups << 1) | 1);
807 expected_buffer.resize(expected_buffer.len() + 100 / 8, 0b10101010);
808
809 expected_buffer.push(0b00001010);
811 validate_rle(
812 &values,
813 1,
814 Some(&expected_buffer[..]),
815 1 + num_groups as i32,
816 );
817 for width in 2..MAX_WIDTH + 1 {
818 let num_values = bit_util::ceil(100, 8) * 8;
819 validate_rle(
820 &values,
821 width as u8,
822 None,
823 1 + bit_util::ceil(width as i64 * num_values, 8) as i32,
824 );
825 }
826 }
827
828 fn test_rle_values(bit_width: usize, num_vals: usize, value: i32) {
831 let mod_val = if bit_width == 64 {
832 1
833 } else {
834 1u64 << bit_width
835 };
836 let mut values: Vec<i64> = vec![];
837 for v in 0..num_vals {
838 let val = if value == -1 {
839 v as i64 % mod_val as i64
840 } else {
841 value as i64
842 };
843 values.push(val);
844 }
845 validate_rle(&values, bit_width as u8, None, -1);
846 }
847
848 #[test]
849 fn test_values() {
850 for width in 1..MAX_WIDTH + 1 {
851 test_rle_values(width, 1, -1);
852 test_rle_values(width, 1024, -1);
853 test_rle_values(width, 1024, 0);
854 test_rle_values(width, 1024, 1);
855 }
856 }
857
858 #[test]
859 fn test_truncated_rle() {
860 let mut data: Vec<u8> = vec![
867 (3 << 1) | 1, ];
869 data.extend(std::iter::repeat(0xFF).take(20));
870 let data: Bytes = data.into();
871
872 let mut decoder = RleDecoder::new(8);
873 decoder.set_data(data.clone());
874
875 let mut output = vec![0_u16; 100];
876 let read = decoder.get_batch(&mut output).unwrap();
877
878 assert_eq!(read, 20);
879 assert!(output.iter().take(20).all(|x| *x == 255));
880
881 decoder.set_data(data);
883
884 let dict: Vec<u16> = (0..256).collect();
885 let mut output = vec![0_u16; 100];
886 let read = decoder
887 .get_batch_with_dict(&dict, &mut output, 100)
888 .unwrap();
889
890 assert_eq!(read, 20);
891 assert!(output.iter().take(20).all(|x| *x == 255));
892 }
893
894 #[test]
895 fn test_rle_padded() {
896 let values: Vec<i16> = vec![0, 1, 1, 3, 1, 0];
897 let bit_width = 2;
898 let buffer_len = RleEncoder::max_buffer_size(bit_width, values.len());
899 let mut encoder = RleEncoder::new(bit_width, buffer_len + 1);
900 for v in &values {
901 encoder.put(*v as u64)
902 }
903
904 let mut buffer = encoder.consume();
905 buffer.push(0);
906
907 let mut decoder = RleDecoder::new(bit_width);
908 decoder.set_data(buffer.into());
909
910 let mut actual_values: Vec<i16> = vec![0; 12];
913 let r = decoder
914 .get_batch(&mut actual_values)
915 .expect("get_batch() should be OK");
916
917 assert_eq!(r, 8);
920 assert_eq!(actual_values[..6], values);
921 assert_eq!(actual_values[6], 0);
922 assert_eq!(actual_values[7], 0);
923 }
924
925 #[test]
926 fn test_long_run() {
927 let mut writer = BitWriter::new(1024);
933 let bit_width = 1;
934
935 let num_values = 2002;
938
939 let run_bytes = ceil(num_values * bit_width, 8) as u64;
941 writer.put_vlq_int((run_bytes << 1) | 1);
942 for _ in 0..run_bytes {
943 writer.put_aligned(0xFF_u8, 1);
944 }
945 let buffer: Bytes = writer.consume().into();
946
947 let mut decoder = RleDecoder::new(1);
948 decoder.set_data(buffer.clone());
949
950 let mut decoded: Vec<i16> = vec![0; num_values];
951 let r = decoder.get_batch(&mut decoded).unwrap();
952 assert_eq!(r, num_values);
953 assert_eq!(vec![1; num_values], decoded);
954
955 decoder.set_data(buffer);
956 let r = decoder
957 .get_batch_with_dict(&[0, 23], &mut decoded, num_values)
958 .unwrap();
959 assert_eq!(r, num_values);
960 assert_eq!(vec![23; num_values], decoded);
961 }
962
963 #[test]
964 fn test_rle_specific_roundtrip() {
965 let bit_width = 1;
966 let values: Vec<i16> = vec![0, 1, 1, 1, 1, 0, 0, 0, 0, 1];
967 let buffer_len = RleEncoder::max_buffer_size(bit_width, values.len());
968 let mut encoder = RleEncoder::new(bit_width, buffer_len);
969 for v in &values {
970 encoder.put(*v as u64)
971 }
972 let buffer = encoder.consume();
973 let mut decoder = RleDecoder::new(bit_width);
974 decoder.set_data(Bytes::from(buffer));
975 let mut actual_values: Vec<i16> = vec![0; values.len()];
976 decoder
977 .get_batch(&mut actual_values)
978 .expect("get_batch() should be OK");
979 assert_eq!(actual_values, values);
980 }
981
982 fn test_round_trip(values: &[i32], bit_width: u8) {
983 let buffer_len = 64 * 1024;
984 let mut encoder = RleEncoder::new(bit_width, buffer_len);
985 for v in values {
986 encoder.put(*v as u64)
987 }
988
989 let buffer = Bytes::from(encoder.consume());
990
991 let mut decoder = RleDecoder::new(bit_width);
993 decoder.set_data(buffer.clone());
994 for v in values {
995 let val = decoder
996 .get::<i32>()
997 .expect("get() should be OK")
998 .expect("get() should return value");
999 assert_eq!(val, *v);
1000 }
1001
1002 let mut decoder = RleDecoder::new(bit_width);
1004 decoder.set_data(buffer);
1005 let mut values_read: Vec<i32> = vec![0; values.len()];
1006 decoder
1007 .get_batch(&mut values_read[..])
1008 .expect("get_batch() should be OK");
1009 assert_eq!(&values_read[..], values);
1010 }
1011
1012 #[test]
1013 fn test_random() {
1014 let seed_len = 32;
1015 let niters = 50;
1016 let ngroups = 1000;
1017 let max_group_size = 15;
1018 let mut values = vec![];
1019
1020 for _ in 0..niters {
1021 values.clear();
1022 let rng = thread_rng();
1023 let seed_vec: Vec<u8> = rng.sample_iter::<u8, _>(&Standard).take(seed_len).collect();
1024 let mut seed = [0u8; 32];
1025 seed.copy_from_slice(&seed_vec[0..seed_len]);
1026 let mut gen = rand::rngs::StdRng::from_seed(seed);
1027
1028 let mut parity = false;
1029 for _ in 0..ngroups {
1030 let mut group_size = gen.gen_range(1..20);
1031 if group_size > max_group_size {
1032 group_size = 1;
1033 }
1034 for _ in 0..group_size {
1035 values.push(parity as i32);
1036 }
1037 parity = !parity;
1038 }
1039 let bit_width = bit_util::num_required_bits(values.len() as u64);
1040 assert!(bit_width < 64);
1041 test_round_trip(&values[..], bit_width);
1042 }
1043 }
1044}