Skip to main content

parquet/encodings/
rle.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Rle/Bit-Packing Hybrid Encoding
19//! The grammar for this encoding looks like the following (copied verbatim
20//! from <https://github.com/Parquet/parquet-format/blob/master/Encodings.md>):
21//!
22//! rle-bit-packed-hybrid: `<length>` `<encoded-data>`
23//! length := length of the `<encoded-data>` in bytes stored as 4 bytes little endian
24//! encoded-data := `<run>`*
25//! run := `<bit-packed-run>` | `<rle-run>`
26//! bit-packed-run := `<bit-packed-header>` `<bit-packed-values>`
27//! bit-packed-header := varint-encode(`<bit-pack-count>` << 1 | 1)
28//! we always bit-pack a multiple of 8 values at a time, so we only store the number of
29//! values / 8
30//! bit-pack-count := (number of values in this run) / 8
31//! bit-packed-values := *see 1 below*
32//! rle-run := `<rle-header>` `<repeated-value>`
33//! rle-header := varint-encode( (number of times repeated) << 1)
34//! repeated-value := value that is repeated, using a fixed-width of
35//! round-up-to-next-byte(bit-width)
36
37use std::{cmp, mem::size_of};
38
39use bytes::Bytes;
40
41use crate::errors::{ParquetError, Result};
42use crate::util::bit_util::{self, BitReader, BitWriter, FromBytes};
43
44/// Maximum groups of 8 values per bit-packed run. Current value is 64.
45const MAX_GROUPS_PER_BIT_PACKED_RUN: usize = 1 << 6;
46
47/// A RLE/Bit-Packing hybrid encoder.
48// TODO: tracking memory usage
49pub struct RleEncoder {
50    // Number of bits needed to encode the value. Must be in the range of [0, 64].
51    bit_width: u8,
52
53    // Underlying writer which holds an internal buffer.
54    bit_writer: BitWriter,
55
56    // Buffered values for bit-packed runs.
57    buffered_values: [u64; 8],
58
59    // Number of current buffered values. Must be less than 8.
60    num_buffered_values: usize,
61
62    // The current (also last) value that was written and the count of how many
63    // times in a row that value has been seen.
64    current_value: u64,
65
66    // The number of repetitions for `current_value`. If this gets too high we'd
67    // switch to use RLE encoding.
68    repeat_count: usize,
69
70    // Number of bit-packed values in the current run. This doesn't include values
71    // in `buffered_values`.
72    bit_packed_count: usize,
73
74    // The position of the indicator byte in the `bit_writer`.
75    indicator_byte_pos: i64,
76}
77
78impl RleEncoder {
79    #[allow(unused)]
80    pub fn new(bit_width: u8, buffer_len: usize) -> Self {
81        let buffer = Vec::with_capacity(buffer_len);
82        RleEncoder::new_from_buf(bit_width, buffer)
83    }
84
85    /// Initialize the encoder from existing `buffer`
86    pub fn new_from_buf(bit_width: u8, buffer: Vec<u8>) -> Self {
87        assert!(bit_width <= 64);
88        let bit_writer = BitWriter::new_from_buf(buffer);
89        RleEncoder {
90            bit_width,
91            bit_writer,
92            buffered_values: [0; 8],
93            num_buffered_values: 0,
94            current_value: 0,
95            repeat_count: 0,
96            bit_packed_count: 0,
97            indicator_byte_pos: -1,
98        }
99    }
100
101    /// Returns the maximum buffer size to encode `num_values` values with
102    /// `bit_width`.
103    pub fn max_buffer_size(bit_width: u8, num_values: usize) -> usize {
104        // The maximum size occurs with the shortest possible runs of 8
105        let num_runs = bit_util::ceil(num_values, 8);
106
107        // The number of bytes in a run of 8
108        let bytes_per_run = bit_width as usize;
109
110        // The maximum size if stored as shortest possible bit packed runs of 8
111        let bit_packed_max_size = num_runs + num_runs * bytes_per_run;
112
113        // The length of `8` VLQ encoded
114        let rle_len_prefix = 1;
115
116        // The length of an RLE run of 8
117        let min_rle_run_size = rle_len_prefix + bit_util::ceil(bit_width as usize, 8);
118
119        // The maximum size if stored as shortest possible RLE runs of 8
120        let rle_max_size = num_runs * min_rle_run_size;
121
122        bit_packed_max_size.max(rle_max_size)
123    }
124
125    /// Encodes `value`, which must be representable with `bit_width` bits.
126    #[inline]
127    pub fn put(&mut self, value: u64) {
128        // This function buffers 8 values at a time. After seeing 8 values, it
129        // decides whether the current run should be encoded in bit-packed or RLE.
130        if self.current_value == value {
131            self.repeat_count += 1;
132            if self.repeat_count > 8 {
133                // A continuation of last value. No need to buffer.
134                return;
135            }
136        } else {
137            if self.repeat_count >= 8 {
138                // The current RLE run has ended and we've gathered enough. Flush first.
139                debug_assert_eq!(self.bit_packed_count, 0);
140                self.flush_rle_run();
141            }
142            self.repeat_count = 1;
143            self.current_value = value;
144        }
145
146        self.buffered_values[self.num_buffered_values] = value;
147        self.num_buffered_values += 1;
148        if self.num_buffered_values == 8 {
149            // Buffered values are full. Flush them.
150            debug_assert_eq!(self.bit_packed_count % 8, 0);
151            self.flush_buffered_values();
152        }
153    }
154
155    #[inline]
156    #[allow(unused)]
157    pub fn buffer(&self) -> &[u8] {
158        self.bit_writer.buffer()
159    }
160
161    #[inline]
162    pub fn len(&self) -> usize {
163        self.bit_writer.bytes_written()
164    }
165
166    #[allow(unused)]
167    pub fn is_empty(&self) -> bool {
168        self.bit_writer.bytes_written() == 0
169    }
170
171    #[inline]
172    pub fn consume(mut self) -> Vec<u8> {
173        self.flush();
174        self.bit_writer.consume()
175    }
176
177    /// Borrow equivalent of the `consume` method.
178    /// Call `clear()` after invoking this method.
179    #[inline]
180    #[allow(unused)]
181    pub fn flush_buffer(&mut self) -> &[u8] {
182        self.flush();
183        self.bit_writer.flush_buffer()
184    }
185
186    /// Clears the internal state so this encoder can be reused (e.g., after becoming
187    /// full).
188    #[inline]
189    #[allow(unused)]
190    pub fn clear(&mut self) {
191        self.bit_writer.clear();
192        self.num_buffered_values = 0;
193        self.current_value = 0;
194        self.repeat_count = 0;
195        self.bit_packed_count = 0;
196        self.indicator_byte_pos = -1;
197    }
198
199    /// Flushes all remaining values and return the final byte buffer maintained by the
200    /// internal writer.
201    #[inline]
202    pub fn flush(&mut self) {
203        if self.bit_packed_count > 0 || self.repeat_count > 0 || self.num_buffered_values > 0 {
204            let all_repeat = self.bit_packed_count == 0
205                && (self.repeat_count == self.num_buffered_values || self.num_buffered_values == 0);
206            if self.repeat_count > 0 && all_repeat {
207                self.flush_rle_run();
208            } else {
209                // Buffer the last group of bit-packed values to 8 by padding with 0s.
210                if self.num_buffered_values > 0 {
211                    while self.num_buffered_values < 8 {
212                        self.buffered_values[self.num_buffered_values] = 0;
213                        self.num_buffered_values += 1;
214                    }
215                }
216                self.bit_packed_count += self.num_buffered_values;
217                self.flush_bit_packed_run(true);
218                self.repeat_count = 0;
219            }
220        }
221    }
222
223    fn flush_rle_run(&mut self) {
224        debug_assert!(self.repeat_count > 0);
225        let indicator_value = self.repeat_count << 1;
226        self.bit_writer.put_vlq_int(indicator_value as u64);
227        self.bit_writer.put_aligned(
228            self.current_value,
229            bit_util::ceil(self.bit_width as usize, 8),
230        );
231        self.num_buffered_values = 0;
232        self.repeat_count = 0;
233    }
234
235    fn flush_bit_packed_run(&mut self, update_indicator_byte: bool) {
236        if self.indicator_byte_pos < 0 {
237            self.indicator_byte_pos = self.bit_writer.skip(1) as i64;
238        }
239
240        // Write all buffered values as bit-packed literals
241        for v in &self.buffered_values[..self.num_buffered_values] {
242            self.bit_writer.put_value(*v, self.bit_width as usize);
243        }
244        self.num_buffered_values = 0;
245        if update_indicator_byte {
246            // Write the indicator byte to the reserved position in `bit_writer`
247            let num_groups = self.bit_packed_count / 8;
248            let indicator_byte = ((num_groups << 1) | 1) as u8;
249            self.bit_writer
250                .put_aligned_offset(indicator_byte, 1, self.indicator_byte_pos as usize);
251            self.indicator_byte_pos = -1;
252            self.bit_packed_count = 0;
253        }
254    }
255
256    fn flush_buffered_values(&mut self) {
257        if self.repeat_count >= 8 {
258            self.num_buffered_values = 0;
259            if self.bit_packed_count > 0 {
260                // In this case we choose RLE encoding. Flush the current buffered values
261                // as bit-packed encoding.
262                debug_assert_eq!(self.bit_packed_count % 8, 0);
263                self.flush_bit_packed_run(true)
264            }
265            return;
266        }
267
268        self.bit_packed_count += self.num_buffered_values;
269        let num_groups = self.bit_packed_count / 8;
270        if num_groups + 1 >= MAX_GROUPS_PER_BIT_PACKED_RUN {
271            // We've reached the maximum value that can be hold in a single bit-packed
272            // run.
273            debug_assert!(self.indicator_byte_pos >= 0);
274            self.flush_bit_packed_run(true);
275        } else {
276            self.flush_bit_packed_run(false);
277        }
278        self.repeat_count = 0;
279    }
280
281    /// return the estimated memory size of this encoder.
282    pub(crate) fn estimated_memory_size(&self) -> usize {
283        self.bit_writer.estimated_memory_size() + std::mem::size_of::<Self>()
284    }
285}
286
287/// Size, in number of `i32s` of buffer to use for RLE batch reading
288const RLE_DECODER_INDEX_BUFFER_SIZE: usize = 1024;
289
290/// A RLE/Bit-Packing hybrid decoder.
291pub struct RleDecoder {
292    // Number of bits used to encode the value. Must be between [0, 64].
293    bit_width: u8,
294
295    // Bit reader loaded with input buffer.
296    bit_reader: Option<BitReader>,
297
298    // Buffer used when `bit_reader` is not `None`, for batch reading.
299    index_buf: Option<Box<[i32; RLE_DECODER_INDEX_BUFFER_SIZE]>>,
300
301    // The remaining number of values in RLE for this run
302    rle_left: u32,
303
304    // The remaining number of values in Bit-Packing for this run
305    bit_packed_left: u32,
306
307    // The current value for the case of RLE mode
308    current_value: Option<u64>,
309}
310
311impl RleDecoder {
312    pub fn new(bit_width: u8) -> Self {
313        RleDecoder {
314            bit_width,
315            rle_left: 0,
316            bit_packed_left: 0,
317            bit_reader: None,
318            index_buf: None,
319            current_value: None,
320        }
321    }
322
323    #[inline]
324    pub fn set_data(&mut self, data: Bytes) -> Result<()> {
325        if let Some(ref mut bit_reader) = self.bit_reader {
326            bit_reader.reset(data);
327        } else {
328            self.bit_reader = Some(BitReader::new(data));
329        }
330
331        // Initialize decoder state. The boolean only reports whether the first run contained data,
332        // and `get`/`get_batch` already interpret that result to drive iteration. We only need
333        // errors propagated here, so the flag returned is intentionally ignored.
334        let _ = self.reload()?;
335        Ok(())
336    }
337
338    // These functions inline badly, they tend to inline and then create very large loop unrolls
339    // that damage L1d-cache occupancy. This results in a ~18% performance drop
340    #[inline(never)]
341    #[allow(unused)]
342    pub fn get<T: FromBytes>(&mut self) -> Result<Option<T>> {
343        assert!(size_of::<T>() <= 8);
344
345        while self.rle_left == 0 && self.bit_packed_left == 0 {
346            if !self.reload()? {
347                return Ok(None);
348            }
349        }
350
351        let value = if self.rle_left > 0 {
352            let rle_value = T::try_from_le_slice(
353                &self
354                    .current_value
355                    .as_mut()
356                    .ok_or_else(|| general_err!("current_value should be Some"))?
357                    .to_ne_bytes(),
358            )?;
359            self.rle_left -= 1;
360            rle_value
361        } else {
362            // self.bit_packed_left > 0
363            let bit_reader = self
364                .bit_reader
365                .as_mut()
366                .ok_or_else(|| general_err!("bit_reader should be Some"))?;
367            let bit_packed_value = bit_reader
368                .get_value(self.bit_width as usize)
369                .ok_or_else(|| eof_err!("Not enough data for 'bit_packed_value'"))?;
370            self.bit_packed_left -= 1;
371            bit_packed_value
372        };
373
374        Ok(Some(value))
375    }
376
377    #[inline(never)]
378    pub fn get_batch<T: FromBytes + Clone>(&mut self, buffer: &mut [T]) -> Result<usize> {
379        assert!(size_of::<T>() <= 8);
380
381        let mut values_read = 0;
382        while values_read < buffer.len() {
383            if self.rle_left > 0 {
384                let num_values = cmp::min(buffer.len() - values_read, self.rle_left as usize);
385                let repeated_value =
386                    T::try_from_le_slice(&self.current_value.as_mut().unwrap().to_ne_bytes())?;
387                buffer[values_read..values_read + num_values].fill(repeated_value);
388                self.rle_left -= num_values as u32;
389                values_read += num_values;
390            } else if self.bit_packed_left > 0 {
391                let mut num_values =
392                    cmp::min(buffer.len() - values_read, self.bit_packed_left as usize);
393                let bit_reader = self
394                    .bit_reader
395                    .as_mut()
396                    .ok_or_else(|| ParquetError::General("bit_reader should be set".into()))?;
397
398                num_values = bit_reader.get_batch::<T>(
399                    &mut buffer[values_read..values_read + num_values],
400                    self.bit_width as usize,
401                );
402                if num_values == 0 {
403                    // Handle writers which truncate the final block
404                    self.bit_packed_left = 0;
405                    continue;
406                }
407                self.bit_packed_left -= num_values as u32;
408                values_read += num_values;
409            } else if !self.reload()? {
410                break;
411            }
412        }
413
414        Ok(values_read)
415    }
416
417    #[inline(never)]
418    pub fn skip(&mut self, num_values: usize) -> Result<usize> {
419        let mut values_skipped = 0;
420        while values_skipped < num_values {
421            if self.rle_left > 0 {
422                let num_values = cmp::min(num_values - values_skipped, self.rle_left as usize);
423                self.rle_left -= num_values as u32;
424                values_skipped += num_values;
425            } else if self.bit_packed_left > 0 {
426                let mut num_values =
427                    cmp::min(num_values - values_skipped, self.bit_packed_left as usize);
428                let bit_reader = self
429                    .bit_reader
430                    .as_mut()
431                    .ok_or_else(|| general_err!("bit_reader should be set"))?;
432
433                num_values = bit_reader.skip(num_values, self.bit_width as usize);
434                if num_values == 0 {
435                    // Handle writers which truncate the final block
436                    self.bit_packed_left = 0;
437                    continue;
438                }
439                self.bit_packed_left -= num_values as u32;
440                values_skipped += num_values;
441            } else if !self.reload()? {
442                break;
443            }
444        }
445
446        Ok(values_skipped)
447    }
448
449    #[inline(never)]
450    pub fn get_batch_with_dict<T>(
451        &mut self,
452        dict: &[T],
453        buffer: &mut [T],
454        max_values: usize,
455    ) -> Result<usize>
456    where
457        T: Default + Clone,
458    {
459        assert!(buffer.len() >= max_values);
460
461        let mut values_read = 0;
462        while values_read < max_values {
463            let index_buf = self.index_buf.get_or_insert_with(|| Box::new([0; 1024]));
464
465            if self.rle_left > 0 {
466                let num_values = cmp::min(max_values - values_read, self.rle_left as usize);
467                let dict_idx = self.current_value.unwrap() as usize;
468                let dict_value = dict[dict_idx].clone();
469
470                buffer[values_read..values_read + num_values].fill(dict_value);
471
472                self.rle_left -= num_values as u32;
473                values_read += num_values;
474            } else if self.bit_packed_left > 0 {
475                let bit_reader = self
476                    .bit_reader
477                    .as_mut()
478                    .ok_or_else(|| general_err!("bit_reader should be set"))?;
479
480                loop {
481                    let to_read = index_buf
482                        .len()
483                        .min(max_values - values_read)
484                        .min(self.bit_packed_left as usize);
485
486                    if to_read == 0 {
487                        break;
488                    }
489
490                    let num_values = bit_reader
491                        .get_batch::<i32>(&mut index_buf[..to_read], self.bit_width as usize);
492                    if num_values == 0 {
493                        // Handle writers which truncate the final block
494                        self.bit_packed_left = 0;
495                        break;
496                    }
497                    buffer[values_read..values_read + num_values]
498                        .iter_mut()
499                        .zip(index_buf[..num_values].iter())
500                        .for_each(|(b, i)| b.clone_from(&dict[*i as usize]));
501                    self.bit_packed_left -= num_values as u32;
502                    values_read += num_values;
503                    if num_values < to_read {
504                        break;
505                    }
506                }
507            } else if !self.reload()? {
508                break;
509            }
510        }
511
512        Ok(values_read)
513    }
514
515    #[inline]
516    fn reload(&mut self) -> Result<bool> {
517        let bit_reader = self
518            .bit_reader
519            .as_mut()
520            .ok_or_else(|| general_err!("bit_reader should be set"))?;
521
522        if let Some(indicator_value) = bit_reader.get_vlq_int() {
523            // fastparquet adds padding to the end of pages. This is not spec-compliant
524            // but is handled by the C++ implementation
525            // <https://github.com/apache/arrow/blob/8074496cb41bc8ec8fe9fc814ca5576d89a6eb94/cpp/src/arrow/util/rle_encoding.h#L653>
526            if indicator_value == 0 {
527                return Ok(false);
528            }
529            if indicator_value & 1 == 1 {
530                self.bit_packed_left = ((indicator_value >> 1) * 8) as u32;
531            } else {
532                self.rle_left = (indicator_value >> 1) as u32;
533                let value_width = bit_util::ceil(self.bit_width as usize, 8);
534                self.current_value = bit_reader.get_aligned::<u64>(value_width);
535                self.current_value.ok_or_else(|| {
536                    general_err!("parquet_data_error: not enough data for RLE decoding")
537                })?;
538            }
539            Ok(true)
540        } else {
541            Ok(false)
542        }
543    }
544}
545
546#[cfg(test)]
547mod tests {
548    use super::*;
549
550    use crate::util::bit_util::ceil;
551    use rand::{self, Rng, SeedableRng, distr::StandardUniform, rng};
552
553    const MAX_WIDTH: usize = 32;
554
555    #[test]
556    fn test_rle_decode_int32() {
557        // Test data: 0-7 with bit width 3
558        // 00000011 10001000 11000110 11111010
559        let data = vec![0x03, 0x88, 0xC6, 0xFA];
560        let mut decoder: RleDecoder = RleDecoder::new(3);
561        decoder.set_data(data.into()).unwrap();
562        let mut buffer = vec![0; 8];
563        let expected = vec![0, 1, 2, 3, 4, 5, 6, 7];
564        let result = decoder.get_batch::<i32>(&mut buffer);
565        assert!(result.is_ok());
566        assert_eq!(buffer, expected);
567    }
568
569    #[test]
570    fn test_rle_skip_int32() {
571        // Test data: 0-7 with bit width 3
572        // 00000011 10001000 11000110 11111010
573        let data = vec![0x03, 0x88, 0xC6, 0xFA];
574        let mut decoder: RleDecoder = RleDecoder::new(3);
575        decoder.set_data(data.into()).unwrap();
576        let expected = vec![2, 3, 4, 5, 6, 7];
577        let skipped = decoder.skip(2).expect("skipping values");
578        assert_eq!(skipped, 2);
579
580        let mut buffer = vec![0; 6];
581        let remaining = decoder
582            .get_batch::<i32>(&mut buffer)
583            .expect("getting remaining");
584        assert_eq!(remaining, 6);
585        assert_eq!(buffer, expected);
586    }
587
588    #[test]
589    fn test_rle_consume_flush_buffer() {
590        let data = vec![1, 1, 1, 2, 2, 3, 3, 3];
591        let mut encoder1 = RleEncoder::new(3, 256);
592        let mut encoder2 = RleEncoder::new(3, 256);
593        for value in data {
594            encoder1.put(value as u64);
595            encoder2.put(value as u64);
596        }
597        let res1 = encoder1.flush_buffer();
598        let res2 = encoder2.consume();
599        assert_eq!(res1, &res2[..]);
600    }
601
602    #[test]
603    fn test_rle_decode_bool() {
604        // RLE test data: 50 1s followed by 50 0s
605        // 01100100 00000001 01100100 00000000
606        let data1 = vec![0x64, 0x01, 0x64, 0x00];
607
608        // Bit-packing test data: alternating 1s and 0s, 100 total
609        // 100 / 8 = 13 groups
610        // 00011011 10101010 ... 00001010
611        let data2 = vec![
612            0x1B, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x0A,
613        ];
614
615        let mut decoder: RleDecoder = RleDecoder::new(1);
616        decoder.set_data(data1.into()).unwrap();
617        let mut buffer = vec![false; 100];
618        let mut expected = vec![];
619        for i in 0..100 {
620            if i < 50 {
621                expected.push(true);
622            } else {
623                expected.push(false);
624            }
625        }
626        let result = decoder.get_batch::<bool>(&mut buffer);
627        assert!(result.is_ok());
628        assert_eq!(buffer, expected);
629
630        decoder.set_data(data2.into()).unwrap();
631        let mut buffer = vec![false; 100];
632        let mut expected = vec![];
633        for i in 0..100 {
634            if i % 2 == 0 {
635                expected.push(false);
636            } else {
637                expected.push(true);
638            }
639        }
640        let result = decoder.get_batch::<bool>(&mut buffer);
641        assert!(result.is_ok());
642        assert_eq!(buffer, expected);
643    }
644
645    #[test]
646    fn test_rle_skip_bool() {
647        // RLE test data: 50 1s followed by 50 0s
648        // 01100100 00000001 01100100 00000000
649        let data1 = vec![0x64, 0x01, 0x64, 0x00];
650
651        // Bit-packing test data: alternating 1s and 0s, 100 total
652        // 100 / 8 = 13 groups
653        // 00011011 10101010 ... 00001010
654        let data2 = vec![
655            0x1B, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x0A,
656        ];
657
658        let mut decoder: RleDecoder = RleDecoder::new(1);
659        decoder.set_data(data1.into()).unwrap();
660        let mut buffer = vec![true; 50];
661        let expected = vec![false; 50];
662
663        let skipped = decoder.skip(50).expect("skipping first 50");
664        assert_eq!(skipped, 50);
665        let remainder = decoder
666            .get_batch::<bool>(&mut buffer)
667            .expect("getting remaining 50");
668        assert_eq!(remainder, 50);
669        assert_eq!(buffer, expected);
670
671        decoder.set_data(data2.into()).unwrap();
672        let mut buffer = vec![false; 50];
673        let mut expected = vec![];
674        for i in 0..50 {
675            if i % 2 == 0 {
676                expected.push(false);
677            } else {
678                expected.push(true);
679            }
680        }
681        let skipped = decoder.skip(50).expect("skipping first 50");
682        assert_eq!(skipped, 50);
683        let remainder = decoder
684            .get_batch::<bool>(&mut buffer)
685            .expect("getting remaining 50");
686        assert_eq!(remainder, 50);
687        assert_eq!(buffer, expected);
688    }
689
690    #[test]
691    fn test_rle_decode_with_dict_int32() {
692        // Test RLE encoding: 3 0s followed by 4 1s followed by 5 2s
693        // 00000110 00000000 00001000 00000001 00001010 00000010
694        let dict = vec![10, 20, 30];
695        let data = vec![0x06, 0x00, 0x08, 0x01, 0x0A, 0x02];
696        let mut decoder: RleDecoder = RleDecoder::new(3);
697        decoder.set_data(data.into()).unwrap();
698        let mut buffer = vec![0; 12];
699        let expected = vec![10, 10, 10, 20, 20, 20, 20, 30, 30, 30, 30, 30];
700        let result = decoder.get_batch_with_dict::<i32>(&dict, &mut buffer, 12);
701        assert!(result.is_ok());
702        assert_eq!(buffer, expected);
703
704        // Test bit-pack encoding: 345345345455 (2 groups: 8 and 4)
705        // 011 100 101 011 100 101 011 100 101 100 101 101
706        // 00000011 01100011 11000111 10001110 00000011 01100101 00001011
707        let dict = vec!["aaa", "bbb", "ccc", "ddd", "eee", "fff"];
708        let data = vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B];
709        let mut decoder: RleDecoder = RleDecoder::new(3);
710        decoder.set_data(data.into()).unwrap();
711        let mut buffer = vec![""; 12];
712        let expected = vec![
713            "ddd", "eee", "fff", "ddd", "eee", "fff", "ddd", "eee", "fff", "eee", "fff", "fff",
714        ];
715        let result =
716            decoder.get_batch_with_dict::<&str>(dict.as_slice(), buffer.as_mut_slice(), 12);
717        assert!(result.is_ok());
718        assert_eq!(buffer, expected);
719    }
720
721    #[test]
722    fn test_rle_skip_dict() {
723        // Test RLE encoding: 3 0s followed by 4 1s followed by 5 2s
724        // 00000110 00000000 00001000 00000001 00001010 00000010
725        let dict = vec![10, 20, 30];
726        let data = vec![0x06, 0x00, 0x08, 0x01, 0x0A, 0x02];
727        let mut decoder: RleDecoder = RleDecoder::new(3);
728        decoder.set_data(data.into()).unwrap();
729        let mut buffer = vec![0; 10];
730        let expected = vec![10, 20, 20, 20, 20, 30, 30, 30, 30, 30];
731        let skipped = decoder.skip(2).expect("skipping two values");
732        assert_eq!(skipped, 2);
733        let remainder = decoder
734            .get_batch_with_dict::<i32>(&dict, &mut buffer, 10)
735            .expect("getting remainder");
736        assert_eq!(remainder, 10);
737        assert_eq!(buffer, expected);
738
739        // Test bit-pack encoding: 345345345455 (2 groups: 8 and 4)
740        // 011 100 101 011 100 101 011 100 101 100 101 101
741        // 00000011 01100011 11000111 10001110 00000011 01100101 00001011
742        let dict = vec!["aaa", "bbb", "ccc", "ddd", "eee", "fff"];
743        let data = vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B];
744        let mut decoder: RleDecoder = RleDecoder::new(3);
745        decoder.set_data(data.into()).unwrap();
746        let mut buffer = vec![""; 8];
747        let expected = vec!["eee", "fff", "ddd", "eee", "fff", "eee", "fff", "fff"];
748        let skipped = decoder.skip(4).expect("skipping four values");
749        assert_eq!(skipped, 4);
750        let remainder = decoder
751            .get_batch_with_dict::<&str>(dict.as_slice(), buffer.as_mut_slice(), 8)
752            .expect("getting remainder");
753        assert_eq!(remainder, 8);
754        assert_eq!(buffer, expected);
755    }
756
757    fn validate_rle(
758        values: &[i64],
759        bit_width: u8,
760        expected_encoding: Option<&[u8]>,
761        expected_len: i32,
762    ) {
763        let buffer_len = 64 * 1024;
764        let mut encoder = RleEncoder::new(bit_width, buffer_len);
765        for v in values {
766            encoder.put(*v as u64)
767        }
768        let buffer: Bytes = encoder.consume().into();
769        if expected_len != -1 {
770            assert_eq!(buffer.len(), expected_len as usize);
771        }
772        if let Some(b) = expected_encoding {
773            assert_eq!(buffer.as_ref(), b);
774        }
775
776        // Verify read
777        let mut decoder = RleDecoder::new(bit_width);
778        decoder.set_data(buffer.clone()).unwrap();
779        for v in values {
780            let val: i64 = decoder
781                .get()
782                .expect("get() should be OK")
783                .expect("get() should return more value");
784            assert_eq!(val, *v);
785        }
786
787        // Verify batch read
788        decoder.set_data(buffer).unwrap();
789        let mut values_read: Vec<i64> = vec![0; values.len()];
790        decoder
791            .get_batch(&mut values_read[..])
792            .expect("get_batch() should be OK");
793        assert_eq!(&values_read[..], values);
794    }
795
796    #[test]
797    fn test_rle_specific_sequences() {
798        let mut expected_buffer = Vec::new();
799        let mut values = vec![0; 50];
800        values.resize(100, 1);
801
802        expected_buffer.push(50 << 1);
803        expected_buffer.push(0);
804        expected_buffer.push(50 << 1);
805        expected_buffer.push(1);
806
807        for width in 1..9 {
808            validate_rle(&values[..], width, Some(&expected_buffer[..]), 4);
809        }
810        for width in 9..MAX_WIDTH + 1 {
811            validate_rle(
812                &values[..],
813                width as u8,
814                None,
815                2 * (1 + bit_util::ceil(width as i64, 8) as i32),
816            );
817        }
818
819        // Test 100 0's and 1's alternating
820        values.clear();
821        expected_buffer.clear();
822        for i in 0..101 {
823            values.push(i % 2);
824        }
825        let num_groups = bit_util::ceil(100, 8) as u8;
826        expected_buffer.push((num_groups << 1) | 1);
827        expected_buffer.resize(expected_buffer.len() + 100 / 8, 0b10101010);
828
829        // For the last 4 0 and 1's, padded with 0.
830        expected_buffer.push(0b00001010);
831        validate_rle(
832            &values,
833            1,
834            Some(&expected_buffer[..]),
835            1 + num_groups as i32,
836        );
837        for width in 2..MAX_WIDTH + 1 {
838            let num_values = bit_util::ceil(100, 8) * 8;
839            validate_rle(
840                &values,
841                width as u8,
842                None,
843                1 + bit_util::ceil(width as i64 * num_values, 8) as i32,
844            );
845        }
846    }
847
848    // `validate_rle` on `num_vals` with width `bit_width`. If `value` is -1, that value
849    // is used, otherwise alternating values are used.
850    fn test_rle_values(bit_width: usize, num_vals: usize, value: i32) {
851        let mod_val = if bit_width == 64 {
852            1
853        } else {
854            1u64 << bit_width
855        };
856        let mut values: Vec<i64> = vec![];
857        for v in 0..num_vals {
858            let val = if value == -1 {
859                v as i64 % mod_val as i64
860            } else {
861                value as i64
862            };
863            values.push(val);
864        }
865        validate_rle(&values, bit_width as u8, None, -1);
866    }
867
868    #[test]
869    fn test_values() {
870        for width in 1..MAX_WIDTH + 1 {
871            test_rle_values(width, 1, -1);
872            test_rle_values(width, 1024, -1);
873            test_rle_values(width, 1024, 0);
874            test_rle_values(width, 1024, 1);
875        }
876    }
877
878    #[test]
879    fn test_truncated_rle() {
880        // The final bit packed run within a page may not be a multiple of 8 values
881        // Unfortunately the specification stores `(bit-packed-run-len) / 8`
882        // This means we don't necessarily know how many values are present
883        // and some writers may not add padding to compensate for this ambiguity
884
885        // Bit pack encode 20 values with a bit width of 8
886        let mut data: Vec<u8> = vec![
887            (3 << 1) | 1, // bit-packed run of 3 * 8
888        ];
889        data.extend(std::iter::repeat_n(0xFF, 20));
890        let data: Bytes = data.into();
891
892        let mut decoder = RleDecoder::new(8);
893        decoder.set_data(data.clone()).unwrap();
894
895        let mut output = vec![0_u16; 100];
896        let read = decoder.get_batch(&mut output).unwrap();
897
898        assert_eq!(read, 20);
899        assert!(output.iter().take(20).all(|x| *x == 255));
900
901        // Reset decoder
902        decoder.set_data(data).unwrap();
903
904        let dict: Vec<u16> = (0..256).collect();
905        let mut output = vec![0_u16; 100];
906        let read = decoder
907            .get_batch_with_dict(&dict, &mut output, 100)
908            .unwrap();
909
910        assert_eq!(read, 20);
911        assert!(output.iter().take(20).all(|x| *x == 255));
912    }
913
914    #[test]
915    fn test_rle_padded() {
916        let values: Vec<i16> = vec![0, 1, 1, 3, 1, 0];
917        let bit_width = 2;
918        let buffer_len = RleEncoder::max_buffer_size(bit_width, values.len());
919        let mut encoder = RleEncoder::new(bit_width, buffer_len + 1);
920        for v in &values {
921            encoder.put(*v as u64)
922        }
923
924        let mut buffer = encoder.consume();
925        buffer.push(0);
926
927        let mut decoder = RleDecoder::new(bit_width);
928        decoder.set_data(buffer.into()).unwrap();
929
930        // We don't always reliably know how many non-null values are contained in a page
931        // and so the decoder must work correctly without a precise value count
932        let mut actual_values: Vec<i16> = vec![0; 12];
933        let r = decoder
934            .get_batch(&mut actual_values)
935            .expect("get_batch() should be OK");
936
937        // Should decode 8 values despite only encoding 6 as length of
938        // bit packed run is always multiple of 8
939        assert_eq!(r, 8);
940        assert_eq!(actual_values[..6], values);
941        assert_eq!(actual_values[6], 0);
942        assert_eq!(actual_values[7], 0);
943    }
944
945    #[test]
946    fn test_long_run() {
947        // This writer does not write runs longer than 504 values as this allows
948        // encoding the run header as a single byte
949        //
950        // This tests that the decoder correctly handles longer runs
951
952        let mut writer = BitWriter::new(1024);
953        let bit_width = 1;
954
955        // Choose a non-multiple of 8 larger than 1024 so that the length
956        // of the run is ambiguous, as the encoding only stores `num_values / 8`
957        let num_values = 2002;
958
959        // bit-packed header
960        let run_bytes = ceil(num_values * bit_width, 8) as u64;
961        writer.put_vlq_int((run_bytes << 1) | 1);
962        for _ in 0..run_bytes {
963            writer.put_aligned(0xFF_u8, 1);
964        }
965        let buffer: Bytes = writer.consume().into();
966
967        let mut decoder = RleDecoder::new(1);
968        decoder.set_data(buffer.clone()).unwrap();
969
970        let mut decoded: Vec<i16> = vec![0; num_values];
971        let r = decoder.get_batch(&mut decoded).unwrap();
972        assert_eq!(r, num_values);
973        assert_eq!(vec![1; num_values], decoded);
974
975        decoder.set_data(buffer).unwrap();
976        let r = decoder
977            .get_batch_with_dict(&[0, 23], &mut decoded, num_values)
978            .unwrap();
979        assert_eq!(r, num_values);
980        assert_eq!(vec![23; num_values], decoded);
981    }
982
983    #[test]
984    fn test_rle_specific_roundtrip() {
985        let bit_width = 1;
986        let values: Vec<i16> = vec![0, 1, 1, 1, 1, 0, 0, 0, 0, 1];
987        let buffer_len = RleEncoder::max_buffer_size(bit_width, values.len());
988        let mut encoder = RleEncoder::new(bit_width, buffer_len);
989        for v in &values {
990            encoder.put(*v as u64)
991        }
992        let buffer = encoder.consume();
993        let mut decoder = RleDecoder::new(bit_width);
994        decoder.set_data(Bytes::from(buffer)).unwrap();
995        let mut actual_values: Vec<i16> = vec![0; values.len()];
996        decoder
997            .get_batch(&mut actual_values)
998            .expect("get_batch() should be OK");
999        assert_eq!(actual_values, values);
1000    }
1001
1002    fn test_round_trip(values: &[i32], bit_width: u8) {
1003        let buffer_len = 64 * 1024;
1004        let mut encoder = RleEncoder::new(bit_width, buffer_len);
1005        for v in values {
1006            encoder.put(*v as u64)
1007        }
1008
1009        let buffer = Bytes::from(encoder.consume());
1010
1011        // Verify read
1012        let mut decoder = RleDecoder::new(bit_width);
1013        decoder.set_data(buffer.clone()).unwrap();
1014        for v in values {
1015            let val = decoder
1016                .get::<i32>()
1017                .expect("get() should be OK")
1018                .expect("get() should return value");
1019            assert_eq!(val, *v);
1020        }
1021
1022        // Verify batch read
1023        let mut decoder = RleDecoder::new(bit_width);
1024        decoder.set_data(buffer).unwrap();
1025        let mut values_read: Vec<i32> = vec![0; values.len()];
1026        decoder
1027            .get_batch(&mut values_read[..])
1028            .expect("get_batch() should be OK");
1029        assert_eq!(&values_read[..], values);
1030    }
1031
1032    #[test]
1033    fn test_random() {
1034        let seed_len = 32;
1035        let niters = 50;
1036        let ngroups = 1000;
1037        let max_group_size = 15;
1038        let mut values = vec![];
1039
1040        for _ in 0..niters {
1041            values.clear();
1042            let rng = rng();
1043            let seed_vec: Vec<u8> = rng
1044                .sample_iter::<u8, _>(&StandardUniform)
1045                .take(seed_len)
1046                .collect();
1047            let mut seed = [0u8; 32];
1048            seed.copy_from_slice(&seed_vec[0..seed_len]);
1049            let mut r#gen = rand::rngs::StdRng::from_seed(seed);
1050
1051            let mut parity = false;
1052            for _ in 0..ngroups {
1053                let mut group_size = r#gen.random_range(1..20);
1054                if group_size > max_group_size {
1055                    group_size = 1;
1056                }
1057                for _ in 0..group_size {
1058                    values.push(parity as i32);
1059                }
1060                parity = !parity;
1061            }
1062            let bit_width = bit_util::num_required_bits(values.len() as u64);
1063            assert!(bit_width < 64);
1064            test_round_trip(&values[..], bit_width);
1065        }
1066    }
1067}