parquet/encodings/
rle.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Rle/Bit-Packing Hybrid Encoding
19//! The grammar for this encoding looks like the following (copied verbatim
20//! from <https://github.com/Parquet/parquet-format/blob/master/Encodings.md>):
21//!
22//! rle-bit-packed-hybrid: `<length>` `<encoded-data>`
23//! length := length of the `<encoded-data>` in bytes stored as 4 bytes little endian
24//! encoded-data := `<run>`*
25//! run := `<bit-packed-run>` | `<rle-run>`
26//! bit-packed-run := `<bit-packed-header>` `<bit-packed-values>`
27//! bit-packed-header := varint-encode(`<bit-pack-count>` << 1 | 1)
28//! we always bit-pack a multiple of 8 values at a time, so we only store the number of
29//! values / 8
30//! bit-pack-count := (number of values in this run) / 8
31//! bit-packed-values := *see 1 below*
32//! rle-run := `<rle-header>` `<repeated-value>`
33//! rle-header := varint-encode( (number of times repeated) << 1)
34//! repeated-value := value that is repeated, using a fixed-width of
35//! round-up-to-next-byte(bit-width)
36
37use std::{cmp, mem::size_of};
38
39use bytes::Bytes;
40
41use crate::errors::{ParquetError, Result};
42use crate::util::bit_util::{self, BitReader, BitWriter, FromBytes};
43
44/// Maximum groups of 8 values per bit-packed run. Current value is 64.
45const MAX_GROUPS_PER_BIT_PACKED_RUN: usize = 1 << 6;
46
47/// A RLE/Bit-Packing hybrid encoder.
48// TODO: tracking memory usage
49pub struct RleEncoder {
50    // Number of bits needed to encode the value. Must be in the range of [0, 64].
51    bit_width: u8,
52
53    // Underlying writer which holds an internal buffer.
54    bit_writer: BitWriter,
55
56    // Buffered values for bit-packed runs.
57    buffered_values: [u64; 8],
58
59    // Number of current buffered values. Must be less than 8.
60    num_buffered_values: usize,
61
62    // The current (also last) value that was written and the count of how many
63    // times in a row that value has been seen.
64    current_value: u64,
65
66    // The number of repetitions for `current_value`. If this gets too high we'd
67    // switch to use RLE encoding.
68    repeat_count: usize,
69
70    // Number of bit-packed values in the current run. This doesn't include values
71    // in `buffered_values`.
72    bit_packed_count: usize,
73
74    // The position of the indicator byte in the `bit_writer`.
75    indicator_byte_pos: i64,
76}
77
78impl RleEncoder {
79    #[allow(unused)]
80    pub fn new(bit_width: u8, buffer_len: usize) -> Self {
81        let buffer = Vec::with_capacity(buffer_len);
82        RleEncoder::new_from_buf(bit_width, buffer)
83    }
84
85    /// Initialize the encoder from existing `buffer`
86    pub fn new_from_buf(bit_width: u8, buffer: Vec<u8>) -> Self {
87        let bit_writer = BitWriter::new_from_buf(buffer);
88        RleEncoder {
89            bit_width,
90            bit_writer,
91            buffered_values: [0; 8],
92            num_buffered_values: 0,
93            current_value: 0,
94            repeat_count: 0,
95            bit_packed_count: 0,
96            indicator_byte_pos: -1,
97        }
98    }
99
100    /// Returns the maximum buffer size to encode `num_values` values with
101    /// `bit_width`.
102    pub fn max_buffer_size(bit_width: u8, num_values: usize) -> usize {
103        // The maximum size occurs with the shortest possible runs of 8
104        let num_runs = bit_util::ceil(num_values, 8);
105
106        // The number of bytes in a run of 8
107        let bytes_per_run = bit_width as usize;
108
109        // The maximum size if stored as shortest possible bit packed runs of 8
110        let bit_packed_max_size = num_runs + num_runs * bytes_per_run;
111
112        // The length of `8` VLQ encoded
113        let rle_len_prefix = 1;
114
115        // The length of an RLE run of 8
116        let min_rle_run_size = rle_len_prefix + bit_util::ceil(bit_width as usize, 8);
117
118        // The maximum size if stored as shortest possible RLE runs of 8
119        let rle_max_size = num_runs * min_rle_run_size;
120
121        bit_packed_max_size.max(rle_max_size)
122    }
123
124    /// Encodes `value`, which must be representable with `bit_width` bits.
125    #[inline]
126    pub fn put(&mut self, value: u64) {
127        // This function buffers 8 values at a time. After seeing 8 values, it
128        // decides whether the current run should be encoded in bit-packed or RLE.
129        if self.current_value == value {
130            self.repeat_count += 1;
131            if self.repeat_count > 8 {
132                // A continuation of last value. No need to buffer.
133                return;
134            }
135        } else {
136            if self.repeat_count >= 8 {
137                // The current RLE run has ended and we've gathered enough. Flush first.
138                assert_eq!(self.bit_packed_count, 0);
139                self.flush_rle_run();
140            }
141            self.repeat_count = 1;
142            self.current_value = value;
143        }
144
145        self.buffered_values[self.num_buffered_values] = value;
146        self.num_buffered_values += 1;
147        if self.num_buffered_values == 8 {
148            // Buffered values are full. Flush them.
149            assert_eq!(self.bit_packed_count % 8, 0);
150            self.flush_buffered_values();
151        }
152    }
153
154    #[inline]
155    #[allow(unused)]
156    pub fn buffer(&self) -> &[u8] {
157        self.bit_writer.buffer()
158    }
159
160    #[inline]
161    pub fn len(&self) -> usize {
162        self.bit_writer.bytes_written()
163    }
164
165    #[allow(unused)]
166    pub fn is_empty(&self) -> bool {
167        self.bit_writer.bytes_written() == 0
168    }
169
170    #[inline]
171    pub fn consume(mut self) -> Vec<u8> {
172        self.flush();
173        self.bit_writer.consume()
174    }
175
176    /// Borrow equivalent of the `consume` method.
177    /// Call `clear()` after invoking this method.
178    #[inline]
179    #[allow(unused)]
180    pub fn flush_buffer(&mut self) -> &[u8] {
181        self.flush();
182        self.bit_writer.flush_buffer()
183    }
184
185    /// Clears the internal state so this encoder can be reused (e.g., after becoming
186    /// full).
187    #[inline]
188    #[allow(unused)]
189    pub fn clear(&mut self) {
190        self.bit_writer.clear();
191        self.num_buffered_values = 0;
192        self.current_value = 0;
193        self.repeat_count = 0;
194        self.bit_packed_count = 0;
195        self.indicator_byte_pos = -1;
196    }
197
198    /// Flushes all remaining values and return the final byte buffer maintained by the
199    /// internal writer.
200    #[inline]
201    pub fn flush(&mut self) {
202        if self.bit_packed_count > 0 || self.repeat_count > 0 || self.num_buffered_values > 0 {
203            let all_repeat = self.bit_packed_count == 0
204                && (self.repeat_count == self.num_buffered_values || self.num_buffered_values == 0);
205            if self.repeat_count > 0 && all_repeat {
206                self.flush_rle_run();
207            } else {
208                // Buffer the last group of bit-packed values to 8 by padding with 0s.
209                if self.num_buffered_values > 0 {
210                    while self.num_buffered_values < 8 {
211                        self.buffered_values[self.num_buffered_values] = 0;
212                        self.num_buffered_values += 1;
213                    }
214                }
215                self.bit_packed_count += self.num_buffered_values;
216                self.flush_bit_packed_run(true);
217                self.repeat_count = 0;
218            }
219        }
220    }
221
222    fn flush_rle_run(&mut self) {
223        assert!(self.repeat_count > 0);
224        let indicator_value = self.repeat_count << 1;
225        self.bit_writer.put_vlq_int(indicator_value as u64);
226        self.bit_writer.put_aligned(
227            self.current_value,
228            bit_util::ceil(self.bit_width as usize, 8),
229        );
230        self.num_buffered_values = 0;
231        self.repeat_count = 0;
232    }
233
234    fn flush_bit_packed_run(&mut self, update_indicator_byte: bool) {
235        if self.indicator_byte_pos < 0 {
236            self.indicator_byte_pos = self.bit_writer.skip(1) as i64;
237        }
238
239        // Write all buffered values as bit-packed literals
240        for i in 0..self.num_buffered_values {
241            self.bit_writer
242                .put_value(self.buffered_values[i], self.bit_width as usize);
243        }
244        self.num_buffered_values = 0;
245        if update_indicator_byte {
246            // Write the indicator byte to the reserved position in `bit_writer`
247            let num_groups = self.bit_packed_count / 8;
248            let indicator_byte = ((num_groups << 1) | 1) as u8;
249            self.bit_writer
250                .put_aligned_offset(indicator_byte, 1, self.indicator_byte_pos as usize);
251            self.indicator_byte_pos = -1;
252            self.bit_packed_count = 0;
253        }
254    }
255
256    #[inline(never)]
257    fn flush_buffered_values(&mut self) {
258        if self.repeat_count >= 8 {
259            self.num_buffered_values = 0;
260            if self.bit_packed_count > 0 {
261                // In this case we choose RLE encoding. Flush the current buffered values
262                // as bit-packed encoding.
263                assert_eq!(self.bit_packed_count % 8, 0);
264                self.flush_bit_packed_run(true)
265            }
266            return;
267        }
268
269        self.bit_packed_count += self.num_buffered_values;
270        let num_groups = self.bit_packed_count / 8;
271        if num_groups + 1 >= MAX_GROUPS_PER_BIT_PACKED_RUN {
272            // We've reached the maximum value that can be hold in a single bit-packed
273            // run.
274            assert!(self.indicator_byte_pos >= 0);
275            self.flush_bit_packed_run(true);
276        } else {
277            self.flush_bit_packed_run(false);
278        }
279        self.repeat_count = 0;
280    }
281
282    /// return the estimated memory size of this encoder.
283    pub(crate) fn estimated_memory_size(&self) -> usize {
284        self.bit_writer.estimated_memory_size() + std::mem::size_of::<Self>()
285    }
286}
287
288/// Size, in number of `i32s` of buffer to use for RLE batch reading
289const RLE_DECODER_INDEX_BUFFER_SIZE: usize = 1024;
290
291/// A RLE/Bit-Packing hybrid decoder.
292pub struct RleDecoder {
293    // Number of bits used to encode the value. Must be between [0, 64].
294    bit_width: u8,
295
296    // Bit reader loaded with input buffer.
297    bit_reader: Option<BitReader>,
298
299    // Buffer used when `bit_reader` is not `None`, for batch reading.
300    index_buf: Option<Box<[i32; RLE_DECODER_INDEX_BUFFER_SIZE]>>,
301
302    // The remaining number of values in RLE for this run
303    rle_left: u32,
304
305    // The remaining number of values in Bit-Packing for this run
306    bit_packed_left: u32,
307
308    // The current value for the case of RLE mode
309    current_value: Option<u64>,
310}
311
312impl RleDecoder {
313    pub fn new(bit_width: u8) -> Self {
314        RleDecoder {
315            bit_width,
316            rle_left: 0,
317            bit_packed_left: 0,
318            bit_reader: None,
319            index_buf: None,
320            current_value: None,
321        }
322    }
323
324    #[inline]
325    pub fn set_data(&mut self, data: Bytes) {
326        if let Some(ref mut bit_reader) = self.bit_reader {
327            bit_reader.reset(data);
328        } else {
329            self.bit_reader = Some(BitReader::new(data));
330        }
331
332        let _ = self.reload();
333    }
334
335    // These functions inline badly, they tend to inline and then create very large loop unrolls
336    // that damage L1d-cache occupancy. This results in a ~18% performance drop
337    #[inline(never)]
338    #[allow(unused)]
339    pub fn get<T: FromBytes>(&mut self) -> Result<Option<T>> {
340        assert!(size_of::<T>() <= 8);
341
342        while self.rle_left == 0 && self.bit_packed_left == 0 {
343            if !self.reload() {
344                return Ok(None);
345            }
346        }
347
348        let value = if self.rle_left > 0 {
349            let rle_value = T::try_from_le_slice(
350                &self
351                    .current_value
352                    .as_mut()
353                    .expect("current_value should be Some")
354                    .to_ne_bytes(),
355            )?;
356            self.rle_left -= 1;
357            rle_value
358        } else {
359            // self.bit_packed_left > 0
360            let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be Some");
361            let bit_packed_value = bit_reader
362                .get_value(self.bit_width as usize)
363                .ok_or_else(|| eof_err!("Not enough data for 'bit_packed_value'"))?;
364            self.bit_packed_left -= 1;
365            bit_packed_value
366        };
367
368        Ok(Some(value))
369    }
370
371    #[inline(never)]
372    pub fn get_batch<T: FromBytes + Clone>(&mut self, buffer: &mut [T]) -> Result<usize> {
373        assert!(size_of::<T>() <= 8);
374
375        let mut values_read = 0;
376        while values_read < buffer.len() {
377            if self.rle_left > 0 {
378                let num_values = cmp::min(buffer.len() - values_read, self.rle_left as usize);
379                let repeated_value =
380                    T::try_from_le_slice(&self.current_value.as_mut().unwrap().to_ne_bytes())?;
381                buffer[values_read..values_read + num_values].fill(repeated_value);
382                self.rle_left -= num_values as u32;
383                values_read += num_values;
384            } else if self.bit_packed_left > 0 {
385                let mut num_values =
386                    cmp::min(buffer.len() - values_read, self.bit_packed_left as usize);
387                let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be set");
388
389                num_values = bit_reader.get_batch::<T>(
390                    &mut buffer[values_read..values_read + num_values],
391                    self.bit_width as usize,
392                );
393                if num_values == 0 {
394                    // Handle writers which truncate the final block
395                    self.bit_packed_left = 0;
396                    continue;
397                }
398                self.bit_packed_left -= num_values as u32;
399                values_read += num_values;
400            } else if !self.reload() {
401                break;
402            }
403        }
404
405        Ok(values_read)
406    }
407
408    #[inline(never)]
409    pub fn skip(&mut self, num_values: usize) -> Result<usize> {
410        let mut values_skipped = 0;
411        while values_skipped < num_values {
412            if self.rle_left > 0 {
413                let num_values = cmp::min(num_values - values_skipped, self.rle_left as usize);
414                self.rle_left -= num_values as u32;
415                values_skipped += num_values;
416            } else if self.bit_packed_left > 0 {
417                let mut num_values =
418                    cmp::min(num_values - values_skipped, self.bit_packed_left as usize);
419                let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be set");
420
421                num_values = bit_reader.skip(num_values, self.bit_width as usize);
422                if num_values == 0 {
423                    // Handle writers which truncate the final block
424                    self.bit_packed_left = 0;
425                    continue;
426                }
427                self.bit_packed_left -= num_values as u32;
428                values_skipped += num_values;
429            } else if !self.reload() {
430                break;
431            }
432        }
433
434        Ok(values_skipped)
435    }
436
437    #[inline(never)]
438    pub fn get_batch_with_dict<T>(
439        &mut self,
440        dict: &[T],
441        buffer: &mut [T],
442        max_values: usize,
443    ) -> Result<usize>
444    where
445        T: Default + Clone,
446    {
447        assert!(buffer.len() >= max_values);
448
449        let mut values_read = 0;
450        while values_read < max_values {
451            let index_buf = self.index_buf.get_or_insert_with(|| Box::new([0; 1024]));
452
453            if self.rle_left > 0 {
454                let num_values = cmp::min(max_values - values_read, self.rle_left as usize);
455                let dict_idx = self.current_value.unwrap() as usize;
456                let dict_value = dict[dict_idx].clone();
457
458                buffer[values_read..values_read + num_values].fill(dict_value);
459
460                self.rle_left -= num_values as u32;
461                values_read += num_values;
462            } else if self.bit_packed_left > 0 {
463                let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be set");
464
465                loop {
466                    let to_read = index_buf
467                        .len()
468                        .min(max_values - values_read)
469                        .min(self.bit_packed_left as usize);
470
471                    if to_read == 0 {
472                        break;
473                    }
474
475                    let num_values = bit_reader
476                        .get_batch::<i32>(&mut index_buf[..to_read], self.bit_width as usize);
477                    if num_values == 0 {
478                        // Handle writers which truncate the final block
479                        self.bit_packed_left = 0;
480                        break;
481                    }
482                    buffer[values_read..values_read + num_values]
483                        .iter_mut()
484                        .zip(index_buf[..num_values].iter())
485                        .for_each(|(b, i)| b.clone_from(&dict[*i as usize]));
486                    self.bit_packed_left -= num_values as u32;
487                    values_read += num_values;
488                    if num_values < to_read {
489                        break;
490                    }
491                }
492            } else if !self.reload() {
493                break;
494            }
495        }
496
497        Ok(values_read)
498    }
499
500    #[inline]
501    fn reload(&mut self) -> bool {
502        let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be set");
503
504        if let Some(indicator_value) = bit_reader.get_vlq_int() {
505            // fastparquet adds padding to the end of pages. This is not spec-compliant
506            // but is handled by the C++ implementation
507            // <https://github.com/apache/arrow/blob/8074496cb41bc8ec8fe9fc814ca5576d89a6eb94/cpp/src/arrow/util/rle_encoding.h#L653>
508            if indicator_value == 0 {
509                return false;
510            }
511            if indicator_value & 1 == 1 {
512                self.bit_packed_left = ((indicator_value >> 1) * 8) as u32;
513            } else {
514                self.rle_left = (indicator_value >> 1) as u32;
515                let value_width = bit_util::ceil(self.bit_width as usize, 8);
516                self.current_value = bit_reader.get_aligned::<u64>(value_width);
517                assert!(self.current_value.is_some());
518            }
519            true
520        } else {
521            false
522        }
523    }
524}
525
526#[cfg(test)]
527mod tests {
528    use super::*;
529
530    use crate::util::bit_util::ceil;
531    use rand::{self, distributions::Standard, thread_rng, Rng, SeedableRng};
532
533    const MAX_WIDTH: usize = 32;
534
535    #[test]
536    fn test_rle_decode_int32() {
537        // Test data: 0-7 with bit width 3
538        // 00000011 10001000 11000110 11111010
539        let data = vec![0x03, 0x88, 0xC6, 0xFA];
540        let mut decoder: RleDecoder = RleDecoder::new(3);
541        decoder.set_data(data.into());
542        let mut buffer = vec![0; 8];
543        let expected = vec![0, 1, 2, 3, 4, 5, 6, 7];
544        let result = decoder.get_batch::<i32>(&mut buffer);
545        assert!(result.is_ok());
546        assert_eq!(buffer, expected);
547    }
548
549    #[test]
550    fn test_rle_skip_int32() {
551        // Test data: 0-7 with bit width 3
552        // 00000011 10001000 11000110 11111010
553        let data = vec![0x03, 0x88, 0xC6, 0xFA];
554        let mut decoder: RleDecoder = RleDecoder::new(3);
555        decoder.set_data(data.into());
556        let expected = vec![2, 3, 4, 5, 6, 7];
557        let skipped = decoder.skip(2).expect("skipping values");
558        assert_eq!(skipped, 2);
559
560        let mut buffer = vec![0; 6];
561        let remaining = decoder
562            .get_batch::<i32>(&mut buffer)
563            .expect("getting remaining");
564        assert_eq!(remaining, 6);
565        assert_eq!(buffer, expected);
566    }
567
568    #[test]
569    fn test_rle_consume_flush_buffer() {
570        let data = vec![1, 1, 1, 2, 2, 3, 3, 3];
571        let mut encoder1 = RleEncoder::new(3, 256);
572        let mut encoder2 = RleEncoder::new(3, 256);
573        for value in data {
574            encoder1.put(value as u64);
575            encoder2.put(value as u64);
576        }
577        let res1 = encoder1.flush_buffer();
578        let res2 = encoder2.consume();
579        assert_eq!(res1, &res2[..]);
580    }
581
582    #[test]
583    fn test_rle_decode_bool() {
584        // RLE test data: 50 1s followed by 50 0s
585        // 01100100 00000001 01100100 00000000
586        let data1 = vec![0x64, 0x01, 0x64, 0x00];
587
588        // Bit-packing test data: alternating 1s and 0s, 100 total
589        // 100 / 8 = 13 groups
590        // 00011011 10101010 ... 00001010
591        let data2 = vec![
592            0x1B, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x0A,
593        ];
594
595        let mut decoder: RleDecoder = RleDecoder::new(1);
596        decoder.set_data(data1.into());
597        let mut buffer = vec![false; 100];
598        let mut expected = vec![];
599        for i in 0..100 {
600            if i < 50 {
601                expected.push(true);
602            } else {
603                expected.push(false);
604            }
605        }
606        let result = decoder.get_batch::<bool>(&mut buffer);
607        assert!(result.is_ok());
608        assert_eq!(buffer, expected);
609
610        decoder.set_data(data2.into());
611        let mut buffer = vec![false; 100];
612        let mut expected = vec![];
613        for i in 0..100 {
614            if i % 2 == 0 {
615                expected.push(false);
616            } else {
617                expected.push(true);
618            }
619        }
620        let result = decoder.get_batch::<bool>(&mut buffer);
621        assert!(result.is_ok());
622        assert_eq!(buffer, expected);
623    }
624
625    #[test]
626    fn test_rle_skip_bool() {
627        // RLE test data: 50 1s followed by 50 0s
628        // 01100100 00000001 01100100 00000000
629        let data1 = vec![0x64, 0x01, 0x64, 0x00];
630
631        // Bit-packing test data: alternating 1s and 0s, 100 total
632        // 100 / 8 = 13 groups
633        // 00011011 10101010 ... 00001010
634        let data2 = vec![
635            0x1B, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x0A,
636        ];
637
638        let mut decoder: RleDecoder = RleDecoder::new(1);
639        decoder.set_data(data1.into());
640        let mut buffer = vec![true; 50];
641        let expected = vec![false; 50];
642
643        let skipped = decoder.skip(50).expect("skipping first 50");
644        assert_eq!(skipped, 50);
645        let remainder = decoder
646            .get_batch::<bool>(&mut buffer)
647            .expect("getting remaining 50");
648        assert_eq!(remainder, 50);
649        assert_eq!(buffer, expected);
650
651        decoder.set_data(data2.into());
652        let mut buffer = vec![false; 50];
653        let mut expected = vec![];
654        for i in 0..50 {
655            if i % 2 == 0 {
656                expected.push(false);
657            } else {
658                expected.push(true);
659            }
660        }
661        let skipped = decoder.skip(50).expect("skipping first 50");
662        assert_eq!(skipped, 50);
663        let remainder = decoder
664            .get_batch::<bool>(&mut buffer)
665            .expect("getting remaining 50");
666        assert_eq!(remainder, 50);
667        assert_eq!(buffer, expected);
668    }
669
670    #[test]
671    fn test_rle_decode_with_dict_int32() {
672        // Test RLE encoding: 3 0s followed by 4 1s followed by 5 2s
673        // 00000110 00000000 00001000 00000001 00001010 00000010
674        let dict = vec![10, 20, 30];
675        let data = vec![0x06, 0x00, 0x08, 0x01, 0x0A, 0x02];
676        let mut decoder: RleDecoder = RleDecoder::new(3);
677        decoder.set_data(data.into());
678        let mut buffer = vec![0; 12];
679        let expected = vec![10, 10, 10, 20, 20, 20, 20, 30, 30, 30, 30, 30];
680        let result = decoder.get_batch_with_dict::<i32>(&dict, &mut buffer, 12);
681        assert!(result.is_ok());
682        assert_eq!(buffer, expected);
683
684        // Test bit-pack encoding: 345345345455 (2 groups: 8 and 4)
685        // 011 100 101 011 100 101 011 100 101 100 101 101
686        // 00000011 01100011 11000111 10001110 00000011 01100101 00001011
687        let dict = vec!["aaa", "bbb", "ccc", "ddd", "eee", "fff"];
688        let data = vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B];
689        let mut decoder: RleDecoder = RleDecoder::new(3);
690        decoder.set_data(data.into());
691        let mut buffer = vec![""; 12];
692        let expected = vec![
693            "ddd", "eee", "fff", "ddd", "eee", "fff", "ddd", "eee", "fff", "eee", "fff", "fff",
694        ];
695        let result =
696            decoder.get_batch_with_dict::<&str>(dict.as_slice(), buffer.as_mut_slice(), 12);
697        assert!(result.is_ok());
698        assert_eq!(buffer, expected);
699    }
700
701    #[test]
702    fn test_rle_skip_dict() {
703        // Test RLE encoding: 3 0s followed by 4 1s followed by 5 2s
704        // 00000110 00000000 00001000 00000001 00001010 00000010
705        let dict = vec![10, 20, 30];
706        let data = vec![0x06, 0x00, 0x08, 0x01, 0x0A, 0x02];
707        let mut decoder: RleDecoder = RleDecoder::new(3);
708        decoder.set_data(data.into());
709        let mut buffer = vec![0; 10];
710        let expected = vec![10, 20, 20, 20, 20, 30, 30, 30, 30, 30];
711        let skipped = decoder.skip(2).expect("skipping two values");
712        assert_eq!(skipped, 2);
713        let remainder = decoder
714            .get_batch_with_dict::<i32>(&dict, &mut buffer, 10)
715            .expect("getting remainder");
716        assert_eq!(remainder, 10);
717        assert_eq!(buffer, expected);
718
719        // Test bit-pack encoding: 345345345455 (2 groups: 8 and 4)
720        // 011 100 101 011 100 101 011 100 101 100 101 101
721        // 00000011 01100011 11000111 10001110 00000011 01100101 00001011
722        let dict = vec!["aaa", "bbb", "ccc", "ddd", "eee", "fff"];
723        let data = vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B];
724        let mut decoder: RleDecoder = RleDecoder::new(3);
725        decoder.set_data(data.into());
726        let mut buffer = vec![""; 8];
727        let expected = vec!["eee", "fff", "ddd", "eee", "fff", "eee", "fff", "fff"];
728        let skipped = decoder.skip(4).expect("skipping four values");
729        assert_eq!(skipped, 4);
730        let remainder = decoder
731            .get_batch_with_dict::<&str>(dict.as_slice(), buffer.as_mut_slice(), 8)
732            .expect("getting remainder");
733        assert_eq!(remainder, 8);
734        assert_eq!(buffer, expected);
735    }
736
737    fn validate_rle(
738        values: &[i64],
739        bit_width: u8,
740        expected_encoding: Option<&[u8]>,
741        expected_len: i32,
742    ) {
743        let buffer_len = 64 * 1024;
744        let mut encoder = RleEncoder::new(bit_width, buffer_len);
745        for v in values {
746            encoder.put(*v as u64)
747        }
748        let buffer: Bytes = encoder.consume().into();
749        if expected_len != -1 {
750            assert_eq!(buffer.len(), expected_len as usize);
751        }
752        if let Some(b) = expected_encoding {
753            assert_eq!(buffer.as_ref(), b);
754        }
755
756        // Verify read
757        let mut decoder = RleDecoder::new(bit_width);
758        decoder.set_data(buffer.clone());
759        for v in values {
760            let val: i64 = decoder
761                .get()
762                .expect("get() should be OK")
763                .expect("get() should return more value");
764            assert_eq!(val, *v);
765        }
766
767        // Verify batch read
768        decoder.set_data(buffer);
769        let mut values_read: Vec<i64> = vec![0; values.len()];
770        decoder
771            .get_batch(&mut values_read[..])
772            .expect("get_batch() should be OK");
773        assert_eq!(&values_read[..], values);
774    }
775
776    #[test]
777    fn test_rle_specific_sequences() {
778        let mut expected_buffer = Vec::new();
779        let mut values = vec![0; 50];
780        values.resize(100, 1);
781
782        expected_buffer.push(50 << 1);
783        expected_buffer.push(0);
784        expected_buffer.push(50 << 1);
785        expected_buffer.push(1);
786
787        for width in 1..9 {
788            validate_rle(&values[..], width, Some(&expected_buffer[..]), 4);
789        }
790        for width in 9..MAX_WIDTH + 1 {
791            validate_rle(
792                &values[..],
793                width as u8,
794                None,
795                2 * (1 + bit_util::ceil(width as i64, 8) as i32),
796            );
797        }
798
799        // Test 100 0's and 1's alternating
800        values.clear();
801        expected_buffer.clear();
802        for i in 0..101 {
803            values.push(i % 2);
804        }
805        let num_groups = bit_util::ceil(100, 8) as u8;
806        expected_buffer.push((num_groups << 1) | 1);
807        expected_buffer.resize(expected_buffer.len() + 100 / 8, 0b10101010);
808
809        // For the last 4 0 and 1's, padded with 0.
810        expected_buffer.push(0b00001010);
811        validate_rle(
812            &values,
813            1,
814            Some(&expected_buffer[..]),
815            1 + num_groups as i32,
816        );
817        for width in 2..MAX_WIDTH + 1 {
818            let num_values = bit_util::ceil(100, 8) * 8;
819            validate_rle(
820                &values,
821                width as u8,
822                None,
823                1 + bit_util::ceil(width as i64 * num_values, 8) as i32,
824            );
825        }
826    }
827
828    // `validate_rle` on `num_vals` with width `bit_width`. If `value` is -1, that value
829    // is used, otherwise alternating values are used.
830    fn test_rle_values(bit_width: usize, num_vals: usize, value: i32) {
831        let mod_val = if bit_width == 64 {
832            1
833        } else {
834            1u64 << bit_width
835        };
836        let mut values: Vec<i64> = vec![];
837        for v in 0..num_vals {
838            let val = if value == -1 {
839                v as i64 % mod_val as i64
840            } else {
841                value as i64
842            };
843            values.push(val);
844        }
845        validate_rle(&values, bit_width as u8, None, -1);
846    }
847
848    #[test]
849    fn test_values() {
850        for width in 1..MAX_WIDTH + 1 {
851            test_rle_values(width, 1, -1);
852            test_rle_values(width, 1024, -1);
853            test_rle_values(width, 1024, 0);
854            test_rle_values(width, 1024, 1);
855        }
856    }
857
858    #[test]
859    fn test_truncated_rle() {
860        // The final bit packed run within a page may not be a multiple of 8 values
861        // Unfortunately the specification stores `(bit-packed-run-len) / 8`
862        // This means we don't necessarily know how many values are present
863        // and some writers may not add padding to compensate for this ambiguity
864
865        // Bit pack encode 20 values with a bit width of 8
866        let mut data: Vec<u8> = vec![
867            (3 << 1) | 1, // bit-packed run of 3 * 8
868        ];
869        data.extend(std::iter::repeat(0xFF).take(20));
870        let data: Bytes = data.into();
871
872        let mut decoder = RleDecoder::new(8);
873        decoder.set_data(data.clone());
874
875        let mut output = vec![0_u16; 100];
876        let read = decoder.get_batch(&mut output).unwrap();
877
878        assert_eq!(read, 20);
879        assert!(output.iter().take(20).all(|x| *x == 255));
880
881        // Reset decoder
882        decoder.set_data(data);
883
884        let dict: Vec<u16> = (0..256).collect();
885        let mut output = vec![0_u16; 100];
886        let read = decoder
887            .get_batch_with_dict(&dict, &mut output, 100)
888            .unwrap();
889
890        assert_eq!(read, 20);
891        assert!(output.iter().take(20).all(|x| *x == 255));
892    }
893
894    #[test]
895    fn test_rle_padded() {
896        let values: Vec<i16> = vec![0, 1, 1, 3, 1, 0];
897        let bit_width = 2;
898        let buffer_len = RleEncoder::max_buffer_size(bit_width, values.len());
899        let mut encoder = RleEncoder::new(bit_width, buffer_len + 1);
900        for v in &values {
901            encoder.put(*v as u64)
902        }
903
904        let mut buffer = encoder.consume();
905        buffer.push(0);
906
907        let mut decoder = RleDecoder::new(bit_width);
908        decoder.set_data(buffer.into());
909
910        // We don't always reliably know how many non-null values are contained in a page
911        // and so the decoder must work correctly without a precise value count
912        let mut actual_values: Vec<i16> = vec![0; 12];
913        let r = decoder
914            .get_batch(&mut actual_values)
915            .expect("get_batch() should be OK");
916
917        // Should decode 8 values despite only encoding 6 as length of
918        // bit packed run is always multiple of 8
919        assert_eq!(r, 8);
920        assert_eq!(actual_values[..6], values);
921        assert_eq!(actual_values[6], 0);
922        assert_eq!(actual_values[7], 0);
923    }
924
925    #[test]
926    fn test_long_run() {
927        // This writer does not write runs longer than 504 values as this allows
928        // encoding the run header as a single byte
929        //
930        // This tests that the decoder correctly handles longer runs
931
932        let mut writer = BitWriter::new(1024);
933        let bit_width = 1;
934
935        // Choose a non-multiple of 8 larger than 1024 so that the length
936        // of the run is ambiguous, as the encoding only stores `num_values / 8`
937        let num_values = 2002;
938
939        // bit-packed header
940        let run_bytes = ceil(num_values * bit_width, 8) as u64;
941        writer.put_vlq_int((run_bytes << 1) | 1);
942        for _ in 0..run_bytes {
943            writer.put_aligned(0xFF_u8, 1);
944        }
945        let buffer: Bytes = writer.consume().into();
946
947        let mut decoder = RleDecoder::new(1);
948        decoder.set_data(buffer.clone());
949
950        let mut decoded: Vec<i16> = vec![0; num_values];
951        let r = decoder.get_batch(&mut decoded).unwrap();
952        assert_eq!(r, num_values);
953        assert_eq!(vec![1; num_values], decoded);
954
955        decoder.set_data(buffer);
956        let r = decoder
957            .get_batch_with_dict(&[0, 23], &mut decoded, num_values)
958            .unwrap();
959        assert_eq!(r, num_values);
960        assert_eq!(vec![23; num_values], decoded);
961    }
962
963    #[test]
964    fn test_rle_specific_roundtrip() {
965        let bit_width = 1;
966        let values: Vec<i16> = vec![0, 1, 1, 1, 1, 0, 0, 0, 0, 1];
967        let buffer_len = RleEncoder::max_buffer_size(bit_width, values.len());
968        let mut encoder = RleEncoder::new(bit_width, buffer_len);
969        for v in &values {
970            encoder.put(*v as u64)
971        }
972        let buffer = encoder.consume();
973        let mut decoder = RleDecoder::new(bit_width);
974        decoder.set_data(Bytes::from(buffer));
975        let mut actual_values: Vec<i16> = vec![0; values.len()];
976        decoder
977            .get_batch(&mut actual_values)
978            .expect("get_batch() should be OK");
979        assert_eq!(actual_values, values);
980    }
981
982    fn test_round_trip(values: &[i32], bit_width: u8) {
983        let buffer_len = 64 * 1024;
984        let mut encoder = RleEncoder::new(bit_width, buffer_len);
985        for v in values {
986            encoder.put(*v as u64)
987        }
988
989        let buffer = Bytes::from(encoder.consume());
990
991        // Verify read
992        let mut decoder = RleDecoder::new(bit_width);
993        decoder.set_data(buffer.clone());
994        for v in values {
995            let val = decoder
996                .get::<i32>()
997                .expect("get() should be OK")
998                .expect("get() should return value");
999            assert_eq!(val, *v);
1000        }
1001
1002        // Verify batch read
1003        let mut decoder = RleDecoder::new(bit_width);
1004        decoder.set_data(buffer);
1005        let mut values_read: Vec<i32> = vec![0; values.len()];
1006        decoder
1007            .get_batch(&mut values_read[..])
1008            .expect("get_batch() should be OK");
1009        assert_eq!(&values_read[..], values);
1010    }
1011
1012    #[test]
1013    fn test_random() {
1014        let seed_len = 32;
1015        let niters = 50;
1016        let ngroups = 1000;
1017        let max_group_size = 15;
1018        let mut values = vec![];
1019
1020        for _ in 0..niters {
1021            values.clear();
1022            let rng = thread_rng();
1023            let seed_vec: Vec<u8> = rng.sample_iter::<u8, _>(&Standard).take(seed_len).collect();
1024            let mut seed = [0u8; 32];
1025            seed.copy_from_slice(&seed_vec[0..seed_len]);
1026            let mut gen = rand::rngs::StdRng::from_seed(seed);
1027
1028            let mut parity = false;
1029            for _ in 0..ngroups {
1030                let mut group_size = gen.gen_range(1..20);
1031                if group_size > max_group_size {
1032                    group_size = 1;
1033                }
1034                for _ in 0..group_size {
1035                    values.push(parity as i32);
1036                }
1037                parity = !parity;
1038            }
1039            let bit_width = bit_util::num_required_bits(values.len() as u64);
1040            assert!(bit_width < 64);
1041            test_round_trip(&values[..], bit_width);
1042        }
1043    }
1044}