Skip to main content

parquet/encodings/
decoding.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains all supported decoders for Parquet.
19
20use bytes::Bytes;
21use num_traits::{FromPrimitive, WrappingAdd};
22use std::{cmp, marker::PhantomData, mem};
23
24use super::rle::RleDecoder;
25
26use crate::basic::*;
27use crate::data_type::private::ParquetValueType;
28use crate::data_type::*;
29use crate::encodings::decoding::byte_stream_split_decoder::{
30    ByteStreamSplitDecoder, VariableWidthByteStreamSplitDecoder,
31};
32use crate::errors::{ParquetError, Result};
33use crate::schema::types::ColumnDescPtr;
34use crate::util::bit_util::{self, BitReader};
35
36mod byte_stream_split_decoder;
37
38pub(crate) mod private {
39    use super::*;
40
41    /// A trait that allows getting a [`Decoder`] implementation for a [`DataType`] with
42    /// the corresponding [`ParquetValueType`]. This is necessary to support
43    /// [`Decoder`] implementations that may not be applicable for all [`DataType`]
44    /// and by extension all [`ParquetValueType`]
45    pub trait GetDecoder {
46        fn get_decoder<T: DataType<T = Self>>(
47            descr: ColumnDescPtr,
48            encoding: Encoding,
49        ) -> Result<Box<dyn Decoder<T>>> {
50            get_decoder_default(descr, encoding)
51        }
52    }
53
54    fn get_decoder_default<T: DataType>(
55        descr: ColumnDescPtr,
56        encoding: Encoding,
57    ) -> Result<Box<dyn Decoder<T>>> {
58        match encoding {
59            Encoding::PLAIN => Ok(Box::new(PlainDecoder::new(descr.type_length()))),
60            Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Err(general_err!(
61                "Cannot initialize this encoding through this function"
62            )),
63            Encoding::RLE
64            | Encoding::DELTA_BINARY_PACKED
65            | Encoding::DELTA_BYTE_ARRAY
66            | Encoding::DELTA_LENGTH_BYTE_ARRAY => Err(general_err!(
67                "Encoding {} is not supported for type",
68                encoding
69            )),
70            e => Err(nyi_err!("Encoding {} is not supported", e)),
71        }
72    }
73
74    impl GetDecoder for bool {
75        fn get_decoder<T: DataType<T = Self>>(
76            descr: ColumnDescPtr,
77            encoding: Encoding,
78        ) -> Result<Box<dyn Decoder<T>>> {
79            match encoding {
80                Encoding::RLE => Ok(Box::new(RleValueDecoder::new())),
81                _ => get_decoder_default(descr, encoding),
82            }
83        }
84    }
85
86    impl GetDecoder for i32 {
87        fn get_decoder<T: DataType<T = Self>>(
88            descr: ColumnDescPtr,
89            encoding: Encoding,
90        ) -> Result<Box<dyn Decoder<T>>> {
91            match encoding {
92                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
93                Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
94                _ => get_decoder_default(descr, encoding),
95            }
96        }
97    }
98
99    impl GetDecoder for i64 {
100        fn get_decoder<T: DataType<T = Self>>(
101            descr: ColumnDescPtr,
102            encoding: Encoding,
103        ) -> Result<Box<dyn Decoder<T>>> {
104            match encoding {
105                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
106                Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
107                _ => get_decoder_default(descr, encoding),
108            }
109        }
110    }
111
112    impl GetDecoder for f32 {
113        fn get_decoder<T: DataType<T = Self>>(
114            descr: ColumnDescPtr,
115            encoding: Encoding,
116        ) -> Result<Box<dyn Decoder<T>>> {
117            match encoding {
118                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
119                _ => get_decoder_default(descr, encoding),
120            }
121        }
122    }
123    impl GetDecoder for f64 {
124        fn get_decoder<T: DataType<T = Self>>(
125            descr: ColumnDescPtr,
126            encoding: Encoding,
127        ) -> Result<Box<dyn Decoder<T>>> {
128            match encoding {
129                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
130                _ => get_decoder_default(descr, encoding),
131            }
132        }
133    }
134
135    impl GetDecoder for ByteArray {
136        fn get_decoder<T: DataType<T = Self>>(
137            descr: ColumnDescPtr,
138            encoding: Encoding,
139        ) -> Result<Box<dyn Decoder<T>>> {
140            match encoding {
141                Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
142                Encoding::DELTA_LENGTH_BYTE_ARRAY => {
143                    Ok(Box::new(DeltaLengthByteArrayDecoder::new()))
144                }
145                _ => get_decoder_default(descr, encoding),
146            }
147        }
148    }
149
150    impl GetDecoder for FixedLenByteArray {
151        fn get_decoder<T: DataType<T = Self>>(
152            descr: ColumnDescPtr,
153            encoding: Encoding,
154        ) -> Result<Box<dyn Decoder<T>>> {
155            match encoding {
156                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(
157                    VariableWidthByteStreamSplitDecoder::new(descr.type_length()),
158                )),
159                Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
160                _ => get_decoder_default(descr, encoding),
161            }
162        }
163    }
164
165    impl GetDecoder for Int96 {}
166}
167
168// ----------------------------------------------------------------------
169// Decoders
170
171/// A Parquet decoder for the data type `T`.
172pub trait Decoder<T: DataType>: Send {
173    /// Sets the data to decode to be `data`, which should contain `num_values` of values
174    /// to decode.
175    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()>;
176
177    /// Consumes values from this decoder and write the results to `buffer`. This will try
178    /// to fill up `buffer`.
179    ///
180    /// Returns the actual number of values decoded, which should be equal to
181    /// `buffer.len()` unless the remaining number of values is less than
182    /// `buffer.len()`.
183    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize>;
184
185    /// Consume values from this decoder and write the results to `buffer`, leaving
186    /// "spaces" for null values.
187    ///
188    /// `null_count` is the number of nulls we expect to see in `buffer`, after reading.
189    /// `valid_bits` stores the valid bit for each value in the buffer. It should contain
190    ///   at least number of bits that equal to `buffer.len()`.
191    ///
192    /// Returns the actual number of values decoded.
193    ///
194    /// # Panics
195    ///
196    /// Panics if `null_count` is greater than `buffer.len()`.
197    fn get_spaced(
198        &mut self,
199        buffer: &mut [T::T],
200        null_count: usize,
201        valid_bits: &[u8],
202    ) -> Result<usize> {
203        assert!(buffer.len() >= null_count);
204
205        // TODO: check validity of the input arguments?
206        if null_count == 0 {
207            return self.get(buffer);
208        }
209
210        let num_values = buffer.len();
211        let values_to_read = num_values - null_count;
212        let values_read = self.get(buffer)?;
213        if values_read != values_to_read {
214            return Err(general_err!(
215                "Number of values read: {}, doesn't match expected: {}",
216                values_read,
217                values_to_read
218            ));
219        }
220        let mut values_to_move = values_read;
221        for i in (0..num_values).rev() {
222            if bit_util::get_bit(valid_bits, i) {
223                values_to_move -= 1;
224                buffer.swap(i, values_to_move);
225            }
226        }
227
228        Ok(num_values)
229    }
230
231    /// Returns the number of values left in this decoder stream.
232    fn values_left(&self) -> usize;
233
234    /// Returns the encoding for this decoder.
235    fn encoding(&self) -> Encoding;
236
237    /// Skip the specified number of values in this decoder stream.
238    fn skip(&mut self, num_values: usize) -> Result<usize>;
239}
240
241/// Gets a decoder for the column descriptor `descr` and encoding type `encoding`.
242///
243/// NOTE: the primitive type in `descr` MUST match the data type `T`, otherwise
244/// disastrous consequence could occur.
245pub fn get_decoder<T: DataType>(
246    descr: ColumnDescPtr,
247    encoding: Encoding,
248) -> Result<Box<dyn Decoder<T>>> {
249    use self::private::GetDecoder;
250    T::T::get_decoder(descr, encoding)
251}
252
253// ----------------------------------------------------------------------
254// PLAIN Decoding
255
256#[derive(Default)]
257pub struct PlainDecoderDetails {
258    // The remaining number of values in the byte array
259    pub(crate) num_values: usize,
260
261    // The current starting index in the byte array. Not used when `T` is bool.
262    pub(crate) start: usize,
263
264    // The length for the type `T`. Only used when `T` is `FixedLenByteArrayType`
265    pub(crate) type_length: i32,
266
267    // The byte array to decode from. Not set if `T` is bool.
268    pub(crate) data: Option<Bytes>,
269
270    // Read `data` bit by bit. Only set if `T` is bool.
271    pub(crate) bit_reader: Option<BitReader>,
272}
273
274/// Plain decoding that supports all types.
275///
276/// Values are encoded back to back. For native types, data is encoded as little endian.
277/// Floating point types are encoded in IEEE.
278/// See [`PlainEncoder`](crate::encoding::PlainEncoder) for more information.
279pub struct PlainDecoder<T: DataType> {
280    // The binary details needed for decoding
281    inner: PlainDecoderDetails,
282
283    // To allow `T` in the generic parameter for this struct. This doesn't take any
284    // space.
285    _phantom: PhantomData<T>,
286}
287
288impl<T: DataType> PlainDecoder<T> {
289    /// Creates new plain decoder.
290    pub fn new(type_length: i32) -> Self {
291        PlainDecoder {
292            inner: PlainDecoderDetails {
293                type_length,
294                num_values: 0,
295                start: 0,
296                data: None,
297                bit_reader: None,
298            },
299            _phantom: PhantomData,
300        }
301    }
302}
303
304impl<T: DataType> Decoder<T> for PlainDecoder<T> {
305    #[inline]
306    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
307        T::T::set_data(&mut self.inner, data, num_values);
308        Ok(())
309    }
310
311    #[inline]
312    fn values_left(&self) -> usize {
313        self.inner.num_values
314    }
315
316    #[inline]
317    fn encoding(&self) -> Encoding {
318        Encoding::PLAIN
319    }
320
321    #[inline]
322    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
323        T::T::decode(buffer, &mut self.inner)
324    }
325
326    #[inline]
327    fn skip(&mut self, num_values: usize) -> Result<usize> {
328        T::T::skip(&mut self.inner, num_values)
329    }
330}
331
332// ----------------------------------------------------------------------
333// RLE_DICTIONARY/PLAIN_DICTIONARY Decoding
334
335/// Dictionary decoder.
336///
337/// The dictionary encoding builds a dictionary of values encountered in a given column.
338/// The dictionary is be stored in a dictionary page per column chunk.
339/// See [`DictEncoder`](crate::encoding::DictEncoder) for more information.
340pub struct DictDecoder<T: DataType> {
341    // The dictionary, which maps ids to the values
342    dictionary: Vec<T::T>,
343
344    // Whether `dictionary` has been initialized
345    has_dictionary: bool,
346
347    // The decoder for the value ids
348    rle_decoder: Option<RleDecoder>,
349
350    // Number of values left in the data stream
351    num_values: usize,
352}
353
354impl<T: DataType> Default for DictDecoder<T> {
355    fn default() -> Self {
356        Self::new()
357    }
358}
359
360impl<T: DataType> DictDecoder<T> {
361    /// Creates new dictionary decoder.
362    pub fn new() -> Self {
363        Self {
364            dictionary: vec![],
365            has_dictionary: false,
366            rle_decoder: None,
367            num_values: 0,
368        }
369    }
370
371    /// Decodes and sets values for dictionary using `decoder` decoder.
372    pub fn set_dict(&mut self, mut decoder: Box<dyn Decoder<T>>) -> Result<()> {
373        let num_values = decoder.values_left();
374        self.dictionary.resize(num_values, T::T::default());
375        let _ = decoder.get(&mut self.dictionary)?;
376        self.has_dictionary = true;
377        Ok(())
378    }
379}
380
381impl<T: DataType> Decoder<T> for DictDecoder<T> {
382    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
383        // First byte in `data` is bit width
384        if data.is_empty() {
385            return Err(eof_err!("Not enough bytes to decode bit_width"));
386        }
387
388        let bit_width = data.as_ref()[0];
389        if bit_width > 32 {
390            return Err(general_err!(
391                "Invalid or corrupted RLE bit width {}. Max allowed is 32",
392                bit_width
393            ));
394        }
395        let mut rle_decoder = RleDecoder::new(bit_width);
396        rle_decoder.set_data(data.slice(1..))?;
397        self.num_values = num_values;
398        self.rle_decoder = Some(rle_decoder);
399        Ok(())
400    }
401
402    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
403        assert!(self.rle_decoder.is_some());
404        assert!(self.has_dictionary, "Must call set_dict() first!");
405
406        let rle = self.rle_decoder.as_mut().unwrap();
407        let num_values = cmp::min(buffer.len(), self.num_values);
408        rle.get_batch_with_dict(&self.dictionary[..], buffer, num_values)
409    }
410
411    /// Number of values left in this decoder stream
412    fn values_left(&self) -> usize {
413        self.num_values
414    }
415
416    fn encoding(&self) -> Encoding {
417        Encoding::RLE_DICTIONARY
418    }
419
420    fn skip(&mut self, num_values: usize) -> Result<usize> {
421        assert!(self.rle_decoder.is_some());
422        assert!(self.has_dictionary, "Must call set_dict() first!");
423
424        let rle = self.rle_decoder.as_mut().unwrap();
425        let num_values = cmp::min(num_values, self.num_values);
426        rle.skip(num_values)
427    }
428}
429
430// ----------------------------------------------------------------------
431// RLE Decoding
432
433/// RLE/Bit-Packing hybrid decoding for values.
434/// Currently is used only for data pages v2 and supports boolean types.
435/// See [`RleValueEncoder`](crate::encoding::RleValueEncoder) for more information.
436pub struct RleValueDecoder<T: DataType> {
437    values_left: usize,
438    decoder: RleDecoder,
439    _phantom: PhantomData<T>,
440}
441
442impl<T: DataType> Default for RleValueDecoder<T> {
443    fn default() -> Self {
444        Self::new()
445    }
446}
447
448impl<T: DataType> RleValueDecoder<T> {
449    pub fn new() -> Self {
450        Self {
451            values_left: 0,
452            decoder: RleDecoder::new(1),
453            _phantom: PhantomData,
454        }
455    }
456}
457
458impl<T: DataType> Decoder<T> for RleValueDecoder<T> {
459    #[inline]
460    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
461        // Only support RLE value reader for boolean values with bit width of 1.
462        ensure_phys_ty!(Type::BOOLEAN, "RleValueDecoder only supports BoolType");
463
464        // We still need to remove prefix of i32 from the stream.
465        const I32_SIZE: usize = mem::size_of::<i32>();
466        if data.len() < I32_SIZE {
467            return Err(eof_err!("Not enough bytes to decode"));
468        }
469        let data_size = bit_util::read_num_bytes::<i32>(I32_SIZE, data.as_ref()) as usize;
470        if data.len() - I32_SIZE < data_size {
471            return Err(eof_err!("Not enough bytes to decode"));
472        }
473
474        self.decoder = RleDecoder::new(1);
475        self.decoder
476            .set_data(data.slice(I32_SIZE..I32_SIZE + data_size))?;
477        self.values_left = num_values;
478        Ok(())
479    }
480
481    #[inline]
482    fn values_left(&self) -> usize {
483        self.values_left
484    }
485
486    #[inline]
487    fn encoding(&self) -> Encoding {
488        Encoding::RLE
489    }
490
491    #[inline]
492    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
493        let num_values = cmp::min(buffer.len(), self.values_left);
494        let values_read = self.decoder.get_batch(&mut buffer[..num_values])?;
495        self.values_left -= values_read;
496        Ok(values_read)
497    }
498
499    #[inline]
500    fn skip(&mut self, num_values: usize) -> Result<usize> {
501        let num_values = cmp::min(num_values, self.values_left);
502        let values_skipped = self.decoder.skip(num_values)?;
503        self.values_left -= values_skipped;
504        Ok(values_skipped)
505    }
506}
507
508// ----------------------------------------------------------------------
509// DELTA_BINARY_PACKED Decoding
510
511/// Delta binary packed decoder.
512/// Supports INT32 and INT64 types.
513/// See [`DeltaBitPackEncoder`](crate::encoding::DeltaBitPackEncoder) for more
514/// information.
515pub struct DeltaBitPackDecoder<T: DataType> {
516    bit_reader: BitReader,
517    initialized: bool,
518
519    // Header info
520    /// The number of values in each block
521    block_size: usize,
522    /// The number of values that remain to be read in the current page
523    values_left: usize,
524    /// The number of mini-blocks in each block
525    mini_blocks_per_block: usize,
526    /// The number of values in each mini block
527    values_per_mini_block: usize,
528
529    // Per block info
530    /// The minimum delta in the block
531    min_delta: T::T,
532    /// The byte offset of the end of the current block
533    block_end_offset: usize,
534    /// The index on the current mini block
535    mini_block_idx: usize,
536    /// The bit widths of each mini block in the current block
537    mini_block_bit_widths: Vec<u8>,
538    /// The number of values remaining in the current mini block
539    mini_block_remaining: usize,
540
541    /// The first value from the block header if not consumed
542    first_value: Option<T::T>,
543    /// The last value to compute offsets from
544    last_value: T::T,
545}
546
547impl<T: DataType> Default for DeltaBitPackDecoder<T>
548where
549    T::T: Default + FromPrimitive + WrappingAdd + Copy,
550{
551    fn default() -> Self {
552        Self::new()
553    }
554}
555
556impl<T: DataType> DeltaBitPackDecoder<T>
557where
558    T::T: Default + FromPrimitive + WrappingAdd + Copy,
559{
560    /// Creates new delta bit packed decoder.
561    pub fn new() -> Self {
562        Self {
563            bit_reader: BitReader::from(vec![]),
564            initialized: false,
565            block_size: 0,
566            values_left: 0,
567            mini_blocks_per_block: 0,
568            values_per_mini_block: 0,
569            min_delta: Default::default(),
570            mini_block_idx: 0,
571            mini_block_bit_widths: vec![],
572            mini_block_remaining: 0,
573            block_end_offset: 0,
574            first_value: None,
575            last_value: Default::default(),
576        }
577    }
578
579    /// Returns the current offset
580    pub fn get_offset(&self) -> usize {
581        assert!(self.initialized, "Bit reader is not initialized");
582        match self.values_left {
583            // If we've exhausted this page report the end of the current block
584            // as we may not have consumed the trailing padding
585            //
586            // The max is necessary to handle pages which don't contain more than
587            // one value and therefore have no blocks, but still contain a page header
588            0 => self.bit_reader.get_byte_offset().max(self.block_end_offset),
589            _ => self.bit_reader.get_byte_offset(),
590        }
591    }
592
593    /// Initializes the next block and the first mini block within it
594    #[inline]
595    fn next_block(&mut self) -> Result<()> {
596        let min_delta = self
597            .bit_reader
598            .get_zigzag_vlq_int()
599            .ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?;
600
601        self.min_delta =
602            T::T::from_i64(min_delta).ok_or_else(|| general_err!("'min_delta' too large"))?;
603
604        self.mini_block_bit_widths.clear();
605        self.bit_reader
606            .get_aligned_bytes(&mut self.mini_block_bit_widths, self.mini_blocks_per_block);
607
608        let mut offset = self.bit_reader.get_byte_offset();
609        let mut remaining = self.values_left;
610
611        // Compute the end offset of the current block
612        for b in &mut self.mini_block_bit_widths {
613            if remaining == 0 {
614                // Specification requires handling arbitrary bit widths
615                // for trailing mini blocks
616                *b = 0;
617            }
618            remaining = remaining.saturating_sub(self.values_per_mini_block);
619            offset += *b as usize * self.values_per_mini_block / 8;
620        }
621        self.block_end_offset = offset;
622
623        if self.mini_block_bit_widths.len() != self.mini_blocks_per_block {
624            return Err(eof_err!("insufficient mini block bit widths"));
625        }
626
627        self.mini_block_remaining = self.values_per_mini_block;
628        self.mini_block_idx = 0;
629
630        Ok(())
631    }
632
633    /// Initializes the next mini block
634    #[inline]
635    fn next_mini_block(&mut self) -> Result<()> {
636        if self.mini_block_idx + 1 < self.mini_block_bit_widths.len() {
637            self.mini_block_idx += 1;
638            self.mini_block_remaining = self.values_per_mini_block;
639            Ok(())
640        } else {
641            self.next_block()
642        }
643    }
644
645    /// Verify the bit width is smaller then the integer type that it is trying to decode.
646    #[inline]
647    fn check_bit_width(&self, bit_width: usize) -> Result<()> {
648        if bit_width > std::mem::size_of::<T::T>() * 8 {
649            return Err(general_err!(
650                "Invalid delta bit width {} which is larger than expected {} ",
651                bit_width,
652                std::mem::size_of::<T::T>() * 8
653            ));
654        }
655        Ok(())
656    }
657}
658
659impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T>
660where
661    T::T: Default + FromPrimitive + WrappingAdd + Copy,
662{
663    // # of total values is derived from encoding
664    #[inline]
665    fn set_data(&mut self, data: Bytes, _index: usize) -> Result<()> {
666        self.bit_reader = BitReader::new(data);
667        self.initialized = true;
668
669        // Read header information
670        self.block_size = self
671            .bit_reader
672            .get_vlq_int()
673            .ok_or_else(|| eof_err!("Not enough data to decode 'block_size'"))?
674            .try_into()
675            .map_err(|_| general_err!("invalid 'block_size'"))?;
676
677        self.mini_blocks_per_block = self
678            .bit_reader
679            .get_vlq_int()
680            .ok_or_else(|| eof_err!("Not enough data to decode 'mini_blocks_per_block'"))?
681            .try_into()
682            .map_err(|_| general_err!("invalid 'mini_blocks_per_block'"))?;
683
684        if self.mini_blocks_per_block == 0 {
685            return Err(general_err!("cannot have zero miniblocks per block"));
686        }
687
688        self.values_left = self
689            .bit_reader
690            .get_vlq_int()
691            .ok_or_else(|| eof_err!("Not enough data to decode 'values_left'"))?
692            .try_into()
693            .map_err(|_| general_err!("invalid 'values_left'"))?;
694
695        let first_value = self
696            .bit_reader
697            .get_zigzag_vlq_int()
698            .ok_or_else(|| eof_err!("Not enough data to decode 'first_value'"))?;
699
700        self.first_value =
701            Some(T::T::from_i64(first_value).ok_or_else(|| general_err!("first value too large"))?);
702
703        if self.block_size % 128 != 0 {
704            return Err(general_err!(
705                "'block_size' must be a multiple of 128, got {}",
706                self.block_size
707            ));
708        }
709
710        if self.block_size % self.mini_blocks_per_block != 0 {
711            return Err(general_err!(
712                "'block_size' must be a multiple of 'mini_blocks_per_block' got {} and {}",
713                self.block_size,
714                self.mini_blocks_per_block
715            ));
716        }
717
718        // Reset decoding state
719        self.mini_block_idx = 0;
720        self.values_per_mini_block = self.block_size / self.mini_blocks_per_block;
721        self.mini_block_remaining = 0;
722        self.mini_block_bit_widths.clear();
723
724        if self.values_per_mini_block % 32 != 0 {
725            return Err(general_err!(
726                "'values_per_mini_block' must be a multiple of 32 got {}",
727                self.values_per_mini_block
728            ));
729        }
730
731        Ok(())
732    }
733
734    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
735        assert!(self.initialized, "Bit reader is not initialized");
736        if buffer.is_empty() {
737            return Ok(0);
738        }
739
740        let mut read = 0;
741        let to_read = buffer.len().min(self.values_left);
742
743        if let Some(value) = self.first_value.take() {
744            self.last_value = value;
745            buffer[0] = value;
746            read += 1;
747            self.values_left -= 1;
748        }
749
750        while read != to_read {
751            if self.mini_block_remaining == 0 {
752                self.next_mini_block()?;
753            }
754
755            let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
756            self.check_bit_width(bit_width)?;
757            let batch_to_read = self.mini_block_remaining.min(to_read - read);
758
759            let batch_read = self
760                .bit_reader
761                .get_batch(&mut buffer[read..read + batch_to_read], bit_width);
762
763            if batch_read != batch_to_read {
764                return Err(general_err!(
765                    "Expected to read {} values from miniblock got {}",
766                    batch_to_read,
767                    batch_read
768                ));
769            }
770
771            // At this point we have read the deltas to `buffer` we now need to offset
772            // these to get back to the original values that were encoded
773            //
774            // Optimization: if the bit_width for the miniblock is 0, then we can employ
775            // a faster decoding method than setting `value[i] = value[i-1] + value[i] + min_delta`.
776            // Where min_delta is 0 (all values in the miniblock are the same), we can simply
777            // set all values to `self.last_value`. In the case of non-zero min_delta (values
778            // in the mini-block form an arithmetic progression) each value can be computed via
779            // `value[i] = (i + 1) * min_delta + last_value`. In both cases we remove the
780            // dependence on the preceding value.
781            // Kudos to @pitrou for the idea https://github.com/apache/arrow/pull/49296
782            let min_delta = self.min_delta.as_i64()?;
783            if bit_width == 0 {
784                if min_delta == 0 {
785                    buffer[read..read + batch_read].fill(self.last_value);
786                } else {
787                    // the c++ version multiplies min_delta by the iter index, but doing
788                    // wrapping_mul through T::T was a bit slower. this is still
789                    // faster than before.
790                    let mut delta = self.min_delta;
791                    for v in &mut buffer[read..read + batch_read] {
792                        *v = self.last_value.wrapping_add(&delta);
793                        delta = delta.wrapping_add(&self.min_delta);
794                    }
795
796                    self.last_value = buffer[read + batch_read - 1];
797                }
798            } else {
799                // It is OK for deltas to contain "overflowed" values after encoding,
800                // e.g. i64::MAX - i64::MIN, so we use `wrapping_add` to "overflow" again and
801                // restore original value.
802                if min_delta == 0 {
803                    for v in &mut buffer[read..read + batch_read] {
804                        *v = v.wrapping_add(&self.last_value);
805                        self.last_value = *v;
806                    }
807                } else {
808                    for v in &mut buffer[read..read + batch_read] {
809                        *v = v
810                            .wrapping_add(&self.min_delta)
811                            .wrapping_add(&self.last_value);
812                        self.last_value = *v;
813                    }
814                }
815            }
816
817            read += batch_read;
818            self.mini_block_remaining -= batch_read;
819            self.values_left -= batch_read;
820        }
821
822        Ok(to_read)
823    }
824
825    fn values_left(&self) -> usize {
826        self.values_left
827    }
828
829    fn encoding(&self) -> Encoding {
830        Encoding::DELTA_BINARY_PACKED
831    }
832
833    fn skip(&mut self, num_values: usize) -> Result<usize> {
834        let mut skip = 0;
835        let to_skip = num_values.min(self.values_left);
836        if to_skip == 0 {
837            return Ok(0);
838        }
839
840        // try to consume first value in header.
841        if let Some(value) = self.first_value.take() {
842            self.last_value = value;
843            skip += 1;
844            self.values_left -= 1;
845        }
846
847        let mini_block_batch_size = match T::T::PHYSICAL_TYPE {
848            Type::INT32 => 32,
849            Type::INT64 => 64,
850            _ => unreachable!(),
851        };
852
853        let mut skip_buffer = vec![T::T::default(); mini_block_batch_size];
854        while skip < to_skip {
855            if self.mini_block_remaining == 0 {
856                self.next_mini_block()?;
857            }
858
859            let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
860            self.check_bit_width(bit_width)?;
861            let mini_block_to_skip = self.mini_block_remaining.min(to_skip - skip);
862            let mini_block_should_skip = mini_block_to_skip;
863
864            let skip_count = self
865                .bit_reader
866                .get_batch(&mut skip_buffer[0..mini_block_to_skip], bit_width);
867
868            if skip_count != mini_block_to_skip {
869                return Err(general_err!(
870                    "Expected to skip {} values from mini block got {}.",
871                    mini_block_batch_size,
872                    skip_count
873                ));
874            }
875
876            // see commentary in self.get() above regarding optimizations
877            let min_delta = self.min_delta.as_i64()?;
878            if bit_width == 0 {
879                // if min_delta == 0, there's nothing to do. self.last_value is unchanged
880                if min_delta != 0 {
881                    let mut delta = self.min_delta;
882                    for v in &mut skip_buffer[0..skip_count] {
883                        *v = self.last_value.wrapping_add(&delta);
884                        delta = delta.wrapping_add(&self.min_delta);
885                    }
886
887                    self.last_value = skip_buffer[skip_count - 1];
888                }
889            } else if min_delta == 0 {
890                for v in &mut skip_buffer[0..skip_count] {
891                    *v = v.wrapping_add(&self.last_value);
892
893                    self.last_value = *v;
894                }
895            } else {
896                for v in &mut skip_buffer[0..skip_count] {
897                    *v = v
898                        .wrapping_add(&self.min_delta)
899                        .wrapping_add(&self.last_value);
900
901                    self.last_value = *v;
902                }
903            }
904
905            skip += mini_block_should_skip;
906            self.mini_block_remaining -= mini_block_should_skip;
907            self.values_left -= mini_block_should_skip;
908        }
909
910        Ok(to_skip)
911    }
912}
913
914// ----------------------------------------------------------------------
915// DELTA_LENGTH_BYTE_ARRAY Decoding
916
917/// Delta length byte array decoder.
918///
919/// Only applied to byte arrays to separate the length values and the data, the lengths
920/// are encoded using DELTA_BINARY_PACKED encoding.
921/// See [`DeltaLengthByteArrayEncoder`](crate::encoding::DeltaLengthByteArrayEncoder)
922/// for more information.
923pub struct DeltaLengthByteArrayDecoder<T: DataType> {
924    // Lengths for each byte array in `data`
925    // TODO: add memory tracker to this
926    lengths: Vec<i32>,
927
928    // Current index into `lengths`
929    current_idx: usize,
930
931    // Concatenated byte array data
932    data: Option<Bytes>,
933
934    // Offset into `data`, always point to the beginning of next byte array.
935    offset: usize,
936
937    // Number of values left in this decoder stream
938    num_values: usize,
939
940    // Placeholder to allow `T` as generic parameter
941    _phantom: PhantomData<T>,
942}
943
944impl<T: DataType> Default for DeltaLengthByteArrayDecoder<T> {
945    fn default() -> Self {
946        Self::new()
947    }
948}
949
950impl<T: DataType> DeltaLengthByteArrayDecoder<T> {
951    /// Creates new delta length byte array decoder.
952    pub fn new() -> Self {
953        Self {
954            lengths: vec![],
955            current_idx: 0,
956            data: None,
957            offset: 0,
958            num_values: 0,
959            _phantom: PhantomData,
960        }
961    }
962}
963
964impl<T: DataType> Decoder<T> for DeltaLengthByteArrayDecoder<T> {
965    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
966        match T::get_physical_type() {
967            Type::BYTE_ARRAY => {
968                let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
969                len_decoder.set_data(data.clone(), num_values)?;
970                let num_lengths = len_decoder.values_left();
971                self.lengths.resize(num_lengths, 0);
972                len_decoder.get(&mut self.lengths[..])?;
973
974                self.data = Some(data.slice(len_decoder.get_offset()..));
975                self.offset = 0;
976                self.current_idx = 0;
977                self.num_values = num_lengths;
978                Ok(())
979            }
980            _ => Err(general_err!(
981                "DeltaLengthByteArrayDecoder only support ByteArrayType"
982            )),
983        }
984    }
985
986    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
987        match T::get_physical_type() {
988            Type::BYTE_ARRAY => {
989                assert!(self.data.is_some());
990
991                let data = self.data.as_ref().unwrap();
992                let num_values = cmp::min(buffer.len(), self.num_values);
993
994                for item in buffer.iter_mut().take(num_values) {
995                    let len = self.lengths[self.current_idx] as usize;
996                    item.set_from_bytes(data.slice(self.offset..self.offset + len));
997
998                    self.offset += len;
999                    self.current_idx += 1;
1000                }
1001
1002                self.num_values -= num_values;
1003                Ok(num_values)
1004            }
1005            _ => Err(general_err!(
1006                "DeltaLengthByteArrayDecoder only support ByteArrayType"
1007            )),
1008        }
1009    }
1010
1011    fn values_left(&self) -> usize {
1012        self.num_values
1013    }
1014
1015    fn encoding(&self) -> Encoding {
1016        Encoding::DELTA_LENGTH_BYTE_ARRAY
1017    }
1018
1019    fn skip(&mut self, num_values: usize) -> Result<usize> {
1020        match T::get_physical_type() {
1021            Type::BYTE_ARRAY => {
1022                let num_values = cmp::min(num_values, self.num_values);
1023
1024                let next_offset: i32 = self.lengths
1025                    [self.current_idx..self.current_idx + num_values]
1026                    .iter()
1027                    .sum();
1028
1029                self.current_idx += num_values;
1030                self.offset += next_offset as usize;
1031
1032                self.num_values -= num_values;
1033                Ok(num_values)
1034            }
1035            other_type => Err(general_err!(
1036                "DeltaLengthByteArrayDecoder not support {}, only support byte array",
1037                other_type
1038            )),
1039        }
1040    }
1041}
1042
1043// ----------------------------------------------------------------------
1044// DELTA_BYTE_ARRAY Decoding
1045
1046/// Delta byte array decoder.
1047///
1048/// Prefix lengths are encoded using `DELTA_BINARY_PACKED` encoding, Suffixes are stored
1049/// using `DELTA_LENGTH_BYTE_ARRAY` encoding.
1050/// See [`DeltaByteArrayEncoder`](crate::encoding::DeltaByteArrayEncoder) for more
1051/// information.
1052pub struct DeltaByteArrayDecoder<T: DataType> {
1053    // Prefix lengths for each byte array
1054    // TODO: add memory tracker to this
1055    prefix_lengths: Vec<i32>,
1056
1057    // The current index into `prefix_lengths`,
1058    current_idx: usize,
1059
1060    // Decoder for all suffixes, the # of which should be the same as
1061    // `prefix_lengths.len()`
1062    suffix_decoder: Option<DeltaLengthByteArrayDecoder<ByteArrayType>>,
1063
1064    // The last byte array, used to derive the current prefix
1065    // Stored as Bytes to avoid clone allocation when creating output
1066    previous_value: Bytes,
1067
1068    // Number of values left
1069    num_values: usize,
1070
1071    // Placeholder to allow `T` as generic parameter
1072    _phantom: PhantomData<T>,
1073}
1074
1075impl<T: DataType> Default for DeltaByteArrayDecoder<T> {
1076    fn default() -> Self {
1077        Self::new()
1078    }
1079}
1080
1081impl<T: DataType> DeltaByteArrayDecoder<T> {
1082    /// Creates new delta byte array decoder.
1083    pub fn new() -> Self {
1084        Self {
1085            prefix_lengths: vec![],
1086            current_idx: 0,
1087            suffix_decoder: None,
1088            previous_value: Bytes::new(),
1089            num_values: 0,
1090            _phantom: PhantomData,
1091        }
1092    }
1093}
1094
1095impl<T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
1096    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
1097        match T::get_physical_type() {
1098            Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1099                let mut prefix_len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
1100                prefix_len_decoder.set_data(data.clone(), num_values)?;
1101                let num_prefixes = prefix_len_decoder.values_left();
1102                self.prefix_lengths.resize(num_prefixes, 0);
1103                prefix_len_decoder.get(&mut self.prefix_lengths[..])?;
1104
1105                let mut suffix_decoder = DeltaLengthByteArrayDecoder::new();
1106                suffix_decoder
1107                    .set_data(data.slice(prefix_len_decoder.get_offset()..), num_values)?;
1108                self.suffix_decoder = Some(suffix_decoder);
1109                self.num_values = num_prefixes;
1110                self.current_idx = 0;
1111                self.previous_value = Bytes::new();
1112                Ok(())
1113            }
1114            _ => Err(general_err!(
1115                "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1116            )),
1117        }
1118    }
1119
1120    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
1121        match T::get_physical_type() {
1122            Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1123                let num_values = cmp::min(buffer.len(), self.num_values);
1124                let mut v: [ByteArray; 1] = [ByteArray::new(); 1];
1125                for item in buffer.iter_mut().take(num_values) {
1126                    // Process suffix
1127                    // TODO: this is awkward - maybe we should add a non-vectorized API?
1128                    let suffix_decoder = self
1129                        .suffix_decoder
1130                        .as_mut()
1131                        .expect("decoder not initialized");
1132                    suffix_decoder.get(&mut v[..])?;
1133                    let suffix = v[0].data();
1134
1135                    // Extract current prefix length, can be 0
1136                    let prefix_len = self.prefix_lengths[self.current_idx] as usize;
1137
1138                    // Concatenate prefix with suffix
1139                    let mut result = Vec::with_capacity(prefix_len + suffix.len());
1140                    result.extend_from_slice(&self.previous_value[0..prefix_len]);
1141                    result.extend_from_slice(suffix);
1142
1143                    let data = Bytes::from(result);
1144                    item.set_from_bytes(data.clone());
1145
1146                    self.previous_value = data;
1147                    self.current_idx += 1;
1148                }
1149
1150                self.num_values -= num_values;
1151                Ok(num_values)
1152            }
1153            _ => Err(general_err!(
1154                "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1155            )),
1156        }
1157    }
1158
1159    fn values_left(&self) -> usize {
1160        self.num_values
1161    }
1162
1163    fn encoding(&self) -> Encoding {
1164        Encoding::DELTA_BYTE_ARRAY
1165    }
1166
1167    fn skip(&mut self, num_values: usize) -> Result<usize> {
1168        let mut buffer = vec![T::T::default(); num_values];
1169        self.get(&mut buffer)
1170    }
1171}
1172
1173#[cfg(test)]
1174mod tests {
1175    use super::{super::encoding::*, *};
1176
1177    use std::f32::consts::PI as PI_f32;
1178    use std::f64::consts::PI as PI_f64;
1179    use std::sync::Arc;
1180
1181    use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
1182    use crate::util::test_common::rand_gen::RandGen;
1183
1184    #[test]
1185    fn test_get_decoders() {
1186        // supported encodings
1187        create_and_check_decoder::<Int32Type>(Encoding::PLAIN, None);
1188        create_and_check_decoder::<Int32Type>(Encoding::DELTA_BINARY_PACKED, None);
1189        create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
1190        create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, None);
1191        create_and_check_decoder::<BoolType>(Encoding::RLE, None);
1192
1193        // error when initializing
1194        create_and_check_decoder::<Int32Type>(
1195            Encoding::RLE_DICTIONARY,
1196            Some(general_err!(
1197                "Cannot initialize this encoding through this function"
1198            )),
1199        );
1200        create_and_check_decoder::<Int32Type>(
1201            Encoding::PLAIN_DICTIONARY,
1202            Some(general_err!(
1203                "Cannot initialize this encoding through this function"
1204            )),
1205        );
1206        create_and_check_decoder::<Int32Type>(
1207            Encoding::DELTA_LENGTH_BYTE_ARRAY,
1208            Some(general_err!(
1209                "Encoding DELTA_LENGTH_BYTE_ARRAY is not supported for type"
1210            )),
1211        );
1212        create_and_check_decoder::<Int32Type>(
1213            Encoding::DELTA_BYTE_ARRAY,
1214            Some(general_err!(
1215                "Encoding DELTA_BYTE_ARRAY is not supported for type"
1216            )),
1217        );
1218
1219        // unsupported
1220        #[allow(deprecated)]
1221        create_and_check_decoder::<Int32Type>(
1222            Encoding::BIT_PACKED,
1223            Some(nyi_err!("Encoding BIT_PACKED is not supported")),
1224        );
1225    }
1226
1227    #[test]
1228    fn test_plain_decode_int32() {
1229        let data = [42, 18, 52];
1230        let data_bytes = Int32Type::to_byte_array(&data[..]);
1231        let mut buffer = [0; 3];
1232        test_plain_decode::<Int32Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1233    }
1234
1235    #[test]
1236    fn test_plain_skip_int32() {
1237        let data = [42, 18, 52];
1238        let data_bytes = Int32Type::to_byte_array(&data[..]);
1239        test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1240    }
1241
1242    #[test]
1243    fn test_plain_skip_all_int32() {
1244        let data = [42, 18, 52];
1245        let data_bytes = Int32Type::to_byte_array(&data[..]);
1246        test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1247    }
1248
1249    #[test]
1250    fn test_plain_decode_int32_spaced() {
1251        let data = [42, 18, 52];
1252        let expected_data = [0, 42, 0, 18, 0, 0, 52, 0];
1253        let data_bytes = Int32Type::to_byte_array(&data[..]);
1254        let mut buffer = [0; 8];
1255        let num_nulls = 5;
1256        let valid_bits = [0b01001010];
1257        test_plain_decode_spaced::<Int32Type>(
1258            Bytes::from(data_bytes),
1259            3,
1260            -1,
1261            &mut buffer[..],
1262            num_nulls,
1263            &valid_bits,
1264            &expected_data[..],
1265        );
1266    }
1267
1268    #[test]
1269    fn test_plain_decode_int64() {
1270        let data = [42, 18, 52];
1271        let data_bytes = Int64Type::to_byte_array(&data[..]);
1272        let mut buffer = [0; 3];
1273        test_plain_decode::<Int64Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1274    }
1275
1276    #[test]
1277    fn test_plain_skip_int64() {
1278        let data = [42, 18, 52];
1279        let data_bytes = Int64Type::to_byte_array(&data[..]);
1280        test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 2, -1, &data[2..]);
1281    }
1282
1283    #[test]
1284    fn test_plain_skip_all_int64() {
1285        let data = [42, 18, 52];
1286        let data_bytes = Int64Type::to_byte_array(&data[..]);
1287        test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 3, -1, &[]);
1288    }
1289
1290    #[test]
1291    fn test_plain_decode_float() {
1292        let data = [PI_f32, 2.414, 12.51];
1293        let data_bytes = FloatType::to_byte_array(&data[..]);
1294        let mut buffer = [0.0; 3];
1295        test_plain_decode::<FloatType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1296    }
1297
1298    #[test]
1299    fn test_plain_skip_float() {
1300        let data = [PI_f32, 2.414, 12.51];
1301        let data_bytes = FloatType::to_byte_array(&data[..]);
1302        test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1303    }
1304
1305    #[test]
1306    fn test_plain_skip_all_float() {
1307        let data = [PI_f32, 2.414, 12.51];
1308        let data_bytes = FloatType::to_byte_array(&data[..]);
1309        test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 4, -1, &[]);
1310    }
1311
1312    #[test]
1313    fn test_plain_skip_double() {
1314        let data = [PI_f64, 2.414f64, 12.51f64];
1315        let data_bytes = DoubleType::to_byte_array(&data[..]);
1316        test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1317    }
1318
1319    #[test]
1320    fn test_plain_skip_all_double() {
1321        let data = [PI_f64, 2.414f64, 12.51f64];
1322        let data_bytes = DoubleType::to_byte_array(&data[..]);
1323        test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1324    }
1325
1326    #[test]
1327    fn test_plain_decode_double() {
1328        let data = [PI_f64, 2.414f64, 12.51f64];
1329        let data_bytes = DoubleType::to_byte_array(&data[..]);
1330        let mut buffer = [0.0f64; 3];
1331        test_plain_decode::<DoubleType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1332    }
1333
1334    #[test]
1335    fn test_plain_decode_int96() {
1336        let mut data = [Int96::new(); 4];
1337        data[0].set_data(11, 22, 33);
1338        data[1].set_data(44, 55, 66);
1339        data[2].set_data(10, 20, 30);
1340        data[3].set_data(40, 50, 60);
1341        let data_bytes = Int96Type::to_byte_array(&data[..]);
1342        let mut buffer = [Int96::new(); 4];
1343        test_plain_decode::<Int96Type>(Bytes::from(data_bytes), 4, -1, &mut buffer[..], &data[..]);
1344    }
1345
1346    #[test]
1347    fn test_plain_skip_int96() {
1348        let mut data = [Int96::new(); 4];
1349        data[0].set_data(11, 22, 33);
1350        data[1].set_data(44, 55, 66);
1351        data[2].set_data(10, 20, 30);
1352        data[3].set_data(40, 50, 60);
1353        let data_bytes = Int96Type::to_byte_array(&data[..]);
1354        test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 2, -1, &data[2..]);
1355    }
1356
1357    #[test]
1358    fn test_plain_skip_all_int96() {
1359        let mut data = [Int96::new(); 4];
1360        data[0].set_data(11, 22, 33);
1361        data[1].set_data(44, 55, 66);
1362        data[2].set_data(10, 20, 30);
1363        data[3].set_data(40, 50, 60);
1364        let data_bytes = Int96Type::to_byte_array(&data[..]);
1365        test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 8, -1, &[]);
1366    }
1367
1368    #[test]
1369    fn test_plain_decode_bool() {
1370        let data = [
1371            false, true, false, false, true, false, true, true, false, true,
1372        ];
1373        let data_bytes = BoolType::to_byte_array(&data[..]);
1374        let mut buffer = [false; 10];
1375        test_plain_decode::<BoolType>(Bytes::from(data_bytes), 10, -1, &mut buffer[..], &data[..]);
1376    }
1377
1378    #[test]
1379    fn test_plain_skip_bool() {
1380        let data = [
1381            false, true, false, false, true, false, true, true, false, true,
1382        ];
1383        let data_bytes = BoolType::to_byte_array(&data[..]);
1384        test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 5, -1, &data[5..]);
1385    }
1386
1387    #[test]
1388    fn test_plain_skip_all_bool() {
1389        let data = [
1390            false, true, false, false, true, false, true, true, false, true,
1391        ];
1392        let data_bytes = BoolType::to_byte_array(&data[..]);
1393        test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 20, -1, &[]);
1394    }
1395
1396    #[test]
1397    fn test_plain_decode_byte_array() {
1398        let mut data = vec![ByteArray::new(); 2];
1399        data[0].set_data(Bytes::from(String::from("hello")));
1400        data[1].set_data(Bytes::from(String::from("parquet")));
1401        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1402        let mut buffer = vec![ByteArray::new(); 2];
1403        test_plain_decode::<ByteArrayType>(
1404            Bytes::from(data_bytes),
1405            2,
1406            -1,
1407            &mut buffer[..],
1408            &data[..],
1409        );
1410    }
1411
1412    #[test]
1413    fn test_plain_skip_byte_array() {
1414        let mut data = vec![ByteArray::new(); 2];
1415        data[0].set_data(Bytes::from(String::from("hello")));
1416        data[1].set_data(Bytes::from(String::from("parquet")));
1417        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1418        test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 1, -1, &data[1..]);
1419    }
1420
1421    #[test]
1422    fn test_plain_skip_all_byte_array() {
1423        let mut data = vec![ByteArray::new(); 2];
1424        data[0].set_data(Bytes::from(String::from("hello")));
1425        data[1].set_data(Bytes::from(String::from("parquet")));
1426        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1427        test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 2, -1, &[]);
1428    }
1429
1430    #[test]
1431    fn test_plain_decode_fixed_len_byte_array() {
1432        let mut data = vec![FixedLenByteArray::default(); 3];
1433        data[0].set_data(Bytes::from(String::from("bird")));
1434        data[1].set_data(Bytes::from(String::from("come")));
1435        data[2].set_data(Bytes::from(String::from("flow")));
1436        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1437        let mut buffer = vec![FixedLenByteArray::default(); 3];
1438        test_plain_decode::<FixedLenByteArrayType>(
1439            Bytes::from(data_bytes),
1440            3,
1441            4,
1442            &mut buffer[..],
1443            &data[..],
1444        );
1445    }
1446
1447    #[test]
1448    fn test_plain_skip_fixed_len_byte_array() {
1449        let mut data = vec![FixedLenByteArray::default(); 3];
1450        data[0].set_data(Bytes::from(String::from("bird")));
1451        data[1].set_data(Bytes::from(String::from("come")));
1452        data[2].set_data(Bytes::from(String::from("flow")));
1453        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1454        test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 1, 4, &data[1..]);
1455    }
1456
1457    #[test]
1458    fn test_plain_skip_all_fixed_len_byte_array() {
1459        let mut data = vec![FixedLenByteArray::default(); 3];
1460        data[0].set_data(Bytes::from(String::from("bird")));
1461        data[1].set_data(Bytes::from(String::from("come")));
1462        data[2].set_data(Bytes::from(String::from("flow")));
1463        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1464        test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 6, 4, &[]);
1465    }
1466
1467    #[test]
1468    fn test_dict_decoder_empty_data() {
1469        let mut decoder = DictDecoder::<Int32Type>::new();
1470        let err = decoder.set_data(Bytes::new(), 10).unwrap_err();
1471        assert_eq!(err.to_string(), "EOF: Not enough bytes to decode bit_width");
1472    }
1473
1474    fn test_plain_decode<T: DataType>(
1475        data: Bytes,
1476        num_values: usize,
1477        type_length: i32,
1478        buffer: &mut [T::T],
1479        expected: &[T::T],
1480    ) {
1481        let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1482        let result = decoder.set_data(data, num_values);
1483        assert!(result.is_ok());
1484        let result = decoder.get(buffer);
1485        assert!(result.is_ok());
1486        assert_eq!(decoder.values_left(), 0);
1487        assert_eq!(buffer, expected);
1488    }
1489
1490    fn test_plain_skip<T: DataType>(
1491        data: Bytes,
1492        num_values: usize,
1493        skip: usize,
1494        type_length: i32,
1495        expected: &[T::T],
1496    ) {
1497        let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1498        let result = decoder.set_data(data, num_values);
1499        assert!(result.is_ok());
1500        let skipped = decoder.skip(skip).expect("skipping values");
1501
1502        if skip >= num_values {
1503            assert_eq!(skipped, num_values);
1504
1505            let mut buffer = vec![T::T::default(); 1];
1506            let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1507            assert_eq!(remaining, 0);
1508        } else {
1509            assert_eq!(skipped, skip);
1510            let mut buffer = vec![T::T::default(); num_values - skip];
1511            let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1512            assert_eq!(remaining, num_values - skip);
1513            assert_eq!(decoder.values_left(), 0);
1514            assert_eq!(buffer, expected);
1515        }
1516    }
1517
1518    fn test_plain_decode_spaced<T: DataType>(
1519        data: Bytes,
1520        num_values: usize,
1521        type_length: i32,
1522        buffer: &mut [T::T],
1523        num_nulls: usize,
1524        valid_bits: &[u8],
1525        expected: &[T::T],
1526    ) {
1527        let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1528        let result = decoder.set_data(data, num_values);
1529        assert!(result.is_ok());
1530        let result = decoder.get_spaced(buffer, num_nulls, valid_bits);
1531        assert!(result.is_ok());
1532        assert_eq!(num_values + num_nulls, result.unwrap());
1533        assert_eq!(decoder.values_left(), 0);
1534        assert_eq!(buffer, expected);
1535    }
1536
1537    #[test]
1538    #[should_panic(expected = "RleValueEncoder only supports BoolType")]
1539    fn test_rle_value_encode_int32_not_supported() {
1540        let mut encoder = RleValueEncoder::<Int32Type>::new();
1541        encoder.put(&[1, 2, 3, 4]).unwrap();
1542    }
1543
1544    #[test]
1545    #[should_panic(expected = "RleValueDecoder only supports BoolType")]
1546    fn test_rle_value_decode_int32_not_supported() {
1547        let mut decoder = RleValueDecoder::<Int32Type>::new();
1548        decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).unwrap();
1549    }
1550
1551    #[test]
1552    fn test_rle_value_decode_missing_size() {
1553        let mut decoder = RleValueDecoder::<BoolType>::new();
1554        assert!(decoder.set_data(Bytes::from(vec![0]), 1).is_err());
1555    }
1556
1557    #[test]
1558    fn test_rle_value_decode_missing_data() {
1559        let mut decoder = RleValueDecoder::<BoolType>::new();
1560        assert!(decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).is_err());
1561    }
1562
1563    #[test]
1564    fn test_rle_value_decode_bool_decode() {
1565        // Test multiple 'put' calls on the same encoder
1566        let data = vec![
1567            BoolType::gen_vec(-1, 256),
1568            BoolType::gen_vec(-1, 257),
1569            BoolType::gen_vec(-1, 126),
1570        ];
1571        test_rle_value_decode::<BoolType>(data);
1572    }
1573
1574    #[test]
1575    #[should_panic(expected = "Bit reader is not initialized")]
1576    fn test_delta_bit_packed_not_initialized_offset() {
1577        // Fail if set_data() is not called before get_offset()
1578        let decoder = DeltaBitPackDecoder::<Int32Type>::new();
1579        decoder.get_offset();
1580    }
1581
1582    #[test]
1583    #[should_panic(expected = "Bit reader is not initialized")]
1584    fn test_delta_bit_packed_not_initialized_get() {
1585        // Fail if set_data() is not called before get()
1586        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1587        let mut buffer = vec![];
1588        decoder.get(&mut buffer).unwrap();
1589    }
1590
1591    #[test]
1592    fn test_delta_bit_packed_int32_empty() {
1593        let data = vec![vec![0; 0]];
1594        test_delta_bit_packed_decode::<Int32Type>(data);
1595    }
1596
1597    #[test]
1598    fn test_delta_bit_packed_int32_repeat() {
1599        let block_data = vec![
1600            1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1601            6, 7, 8,
1602        ];
1603        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1604    }
1605
1606    #[test]
1607    fn test_skip_delta_bit_packed_int32_repeat() {
1608        let block_data = vec![
1609            1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1610            6, 7, 8,
1611        ];
1612        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 10);
1613        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1614    }
1615
1616    #[test]
1617    fn test_delta_bit_packed_int32_uneven() {
1618        let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1619        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1620    }
1621
1622    #[test]
1623    fn test_skip_delta_bit_packed_int32_uneven() {
1624        let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1625        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1626        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1627    }
1628
1629    #[test]
1630    fn test_delta_bit_packed_int32_same_values() {
1631        let block_data = vec![
1632            127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1633        ];
1634        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1635
1636        let block_data = vec![
1637            -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1638            -127, -127,
1639        ];
1640        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1641    }
1642
1643    #[test]
1644    fn test_skip_delta_bit_packed_int32_same_values() {
1645        let block_data = vec![
1646            127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1647        ];
1648        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1649        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1650
1651        let block_data = vec![
1652            -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1653            -127, -127,
1654        ];
1655        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1656        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1657    }
1658
1659    #[test]
1660    fn test_delta_bit_packed_int32_min_max() {
1661        let block_data = vec![
1662            i32::MIN,
1663            i32::MIN,
1664            i32::MIN,
1665            i32::MAX,
1666            i32::MIN,
1667            i32::MAX,
1668            i32::MIN,
1669            i32::MAX,
1670        ];
1671        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1672    }
1673
1674    #[test]
1675    fn test_skip_delta_bit_packed_int32_min_max() {
1676        let block_data = vec![
1677            i32::MIN,
1678            i32::MIN,
1679            i32::MIN,
1680            i32::MAX,
1681            i32::MIN,
1682            i32::MAX,
1683            i32::MIN,
1684            i32::MAX,
1685        ];
1686        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1687        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1688    }
1689
1690    #[test]
1691    fn test_delta_bit_packed_int32_multiple_blocks() {
1692        // Test multiple 'put' calls on the same encoder
1693        let data = vec![
1694            Int32Type::gen_vec(-1, 64),
1695            Int32Type::gen_vec(-1, 128),
1696            Int32Type::gen_vec(-1, 64),
1697        ];
1698        test_delta_bit_packed_decode::<Int32Type>(data);
1699    }
1700
1701    #[test]
1702    fn test_delta_bit_packed_int32_data_across_blocks() {
1703        // Test multiple 'put' calls on the same encoder
1704        let data = vec![Int32Type::gen_vec(-1, 256), Int32Type::gen_vec(-1, 257)];
1705        test_delta_bit_packed_decode::<Int32Type>(data);
1706    }
1707
1708    #[test]
1709    fn test_delta_bit_packed_int32_with_empty_blocks() {
1710        let data = vec![
1711            Int32Type::gen_vec(-1, 128),
1712            vec![0; 0],
1713            Int32Type::gen_vec(-1, 64),
1714        ];
1715        test_delta_bit_packed_decode::<Int32Type>(data);
1716    }
1717
1718    #[test]
1719    fn test_delta_bit_packed_int64_empty() {
1720        let data = vec![vec![0; 0]];
1721        test_delta_bit_packed_decode::<Int64Type>(data);
1722    }
1723
1724    #[test]
1725    fn test_delta_bit_packed_int64_min_max() {
1726        let block_data = vec![
1727            i64::MIN,
1728            i64::MAX,
1729            i64::MIN,
1730            i64::MAX,
1731            i64::MIN,
1732            i64::MAX,
1733            i64::MIN,
1734            i64::MAX,
1735        ];
1736        test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
1737    }
1738
1739    #[test]
1740    fn test_delta_bit_packed_int64_multiple_blocks() {
1741        // Test multiple 'put' calls on the same encoder
1742        let data = vec![
1743            Int64Type::gen_vec(-1, 64),
1744            Int64Type::gen_vec(-1, 128),
1745            Int64Type::gen_vec(-1, 64),
1746        ];
1747        test_delta_bit_packed_decode::<Int64Type>(data);
1748    }
1749
1750    #[test]
1751    fn test_delta_bit_packed_zero_miniblocks() {
1752        // It is invalid for mini_blocks_per_block to be 0
1753        let data = vec![
1754            128, 1, // block_size = 128
1755            0, // mini_blocks_per_block = 0
1756        ];
1757        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1758        let err = decoder.set_data(data.into(), 0).unwrap_err();
1759        assert_eq!(
1760            err.to_string(),
1761            "Parquet error: cannot have zero miniblocks per block"
1762        );
1763    }
1764
1765    #[test]
1766    fn test_delta_bit_packed_decoder_sample() {
1767        let data_bytes = vec![
1768            128, 1, 4, 3, 58, 28, 6, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1769            0, 0, 0, 0, 0, 0,
1770        ];
1771        let mut decoder: DeltaBitPackDecoder<Int32Type> = DeltaBitPackDecoder::new();
1772        decoder.set_data(data_bytes.into(), 3).unwrap();
1773        // check exact offsets, because when reading partial values we end up with
1774        // some data not being read from bit reader
1775        assert_eq!(decoder.get_offset(), 5);
1776        let mut result = vec![0, 0, 0];
1777        decoder.get(&mut result).unwrap();
1778        assert_eq!(decoder.get_offset(), 34);
1779        assert_eq!(result, vec![29, 43, 89]);
1780    }
1781
1782    #[test]
1783    fn test_delta_bit_packed_padding() {
1784        // Page header
1785        let header = vec![
1786            // Page Header
1787
1788            // Block Size - 256
1789            128,
1790            2,
1791            // Miniblocks in block,
1792            4,
1793            // Total value count - 419
1794            128 + 35,
1795            3,
1796            // First value - 7
1797            7,
1798        ];
1799
1800        // Block Header
1801        let block1_header = vec![
1802            0, // Min delta
1803            0, 1, 0, 0, // Bit widths
1804        ];
1805
1806        // Mini-block 1 - bit width 0 => 0 bytes
1807        // Mini-block 2 - bit width 1 => 8 bytes
1808        // Mini-block 3 - bit width 0 => 0 bytes
1809        // Mini-block 4 - bit width 0 => 0 bytes
1810        let block1 = vec![0xFF; 8];
1811
1812        // Block Header
1813        let block2_header = vec![
1814            0, // Min delta
1815            0, 1, 2, 0xFF, // Bit widths, including non-zero padding
1816        ];
1817
1818        // Mini-block 1 - bit width 0 => 0 bytes
1819        // Mini-block 2 - bit width 1 => 8 bytes
1820        // Mini-block 3 - bit width 2 => 16 bytes
1821        // Mini-block 4 - padding => no bytes
1822        let block2 = vec![0xFF; 24];
1823
1824        let data: Vec<u8> = header
1825            .into_iter()
1826            .chain(block1_header)
1827            .chain(block1)
1828            .chain(block2_header)
1829            .chain(block2)
1830            .collect();
1831
1832        let length = data.len();
1833
1834        let ptr = Bytes::from(data);
1835        let mut reader = BitReader::new(ptr.clone());
1836        assert_eq!(reader.get_vlq_int().unwrap(), 256);
1837        assert_eq!(reader.get_vlq_int().unwrap(), 4);
1838        assert_eq!(reader.get_vlq_int().unwrap(), 419);
1839        assert_eq!(reader.get_vlq_int().unwrap(), 7);
1840
1841        // Test output buffer larger than needed and not exact multiple of block size
1842        let mut output = vec![0_i32; 420];
1843
1844        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1845        decoder.set_data(ptr.clone(), 0).unwrap();
1846        assert_eq!(decoder.get(&mut output).unwrap(), 419);
1847        assert_eq!(decoder.get_offset(), length);
1848
1849        // Test with truncated buffer
1850        decoder.set_data(ptr.slice(..12), 0).unwrap();
1851        let err = decoder.get(&mut output).unwrap_err().to_string();
1852        assert!(
1853            err.contains("Expected to read 64 values from miniblock got 8"),
1854            "{}",
1855            err
1856        );
1857    }
1858
1859    #[test]
1860    fn test_delta_bit_packed_int32_single_value_large() {
1861        let block_data = vec![3; 10240];
1862        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1863    }
1864
1865    #[test]
1866    fn test_delta_bit_packed_int32_single_value_skip_large() {
1867        let block_data = vec![3; 10240];
1868        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1869        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 5000);
1870    }
1871
1872    #[test]
1873    fn test_delta_bit_packed_int32_increasing_value_large() {
1874        let block_data = (0i32..10240).collect();
1875        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1876    }
1877
1878    #[test]
1879    fn test_delta_bit_packed_int32_increasing_value_skip_large() {
1880        let block_data = (0i32..10240).collect::<Vec<i32>>();
1881        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1882        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 5000);
1883    }
1884
1885    #[test]
1886    fn test_delta_bit_packed_int32_stepped_value_large() {
1887        let block_data = (0i32..10240).map(|i| i / 2).collect();
1888        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1889    }
1890
1891    #[test]
1892    fn test_delta_bit_packed_int32_stepped_value_skip_large() {
1893        let block_data = (0i32..10240).map(|i| i / 2).collect::<Vec<i32>>();
1894        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1895        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 5000);
1896    }
1897
1898    #[test]
1899    fn test_delta_bit_packed_int32_mixed_large() {
1900        // should be enough for 4 mini-blocks plus a little so we get some
1901        // mixed mini-blocks
1902        const BLOCK_SIZE: i32 = 133;
1903        let block1_data = (0..BLOCK_SIZE).map(|i| (i * 7) % 11).collect();
1904        let block2_data = vec![3; BLOCK_SIZE as usize];
1905        let block3_data = (0..BLOCK_SIZE).map(|i| (i * 5) % 13).collect();
1906        let block4_data = (0..BLOCK_SIZE).collect();
1907        let block5_data = (0..BLOCK_SIZE).map(|i| (i * 3) % 17).collect();
1908        test_delta_bit_packed_decode::<Int32Type>(vec![
1909            block1_data,
1910            block2_data,
1911            block3_data,
1912            block4_data,
1913            block5_data,
1914        ]);
1915    }
1916
1917    #[test]
1918    fn test_delta_bit_packed_int64_single_value_large() {
1919        let block_data = vec![5; 10240];
1920        test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
1921    }
1922
1923    #[test]
1924    fn test_delta_bit_packed_int64_increasing_value_large() {
1925        let block_data = (0i64..10240).collect();
1926        test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
1927    }
1928
1929    #[test]
1930    fn test_delta_byte_array_same_arrays() {
1931        let data = vec![
1932            vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])],
1933            vec![
1934                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1935                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1936            ],
1937            vec![
1938                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1939                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1940            ],
1941        ];
1942        test_delta_byte_array_decode(data);
1943    }
1944
1945    #[test]
1946    fn test_delta_byte_array_unique_arrays() {
1947        let data = vec![
1948            vec![ByteArray::from(vec![1])],
1949            vec![ByteArray::from(vec![2, 3]), ByteArray::from(vec![4, 5, 6])],
1950            vec![
1951                ByteArray::from(vec![7, 8]),
1952                ByteArray::from(vec![9, 0, 1, 2]),
1953            ],
1954        ];
1955        test_delta_byte_array_decode(data);
1956    }
1957
1958    #[test]
1959    fn test_delta_byte_array_single_array() {
1960        let data = vec![vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])]];
1961        test_delta_byte_array_decode(data);
1962    }
1963
1964    #[test]
1965    fn test_byte_stream_split_multiple_f32() {
1966        let data = vec![
1967            vec![
1968                f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
1969                f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
1970            ],
1971            vec![f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
1972        ];
1973        test_byte_stream_split_decode::<FloatType>(data, -1);
1974    }
1975
1976    #[test]
1977    fn test_byte_stream_split_f64() {
1978        let data = vec![vec![
1979            f64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
1980            f64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
1981        ]];
1982        test_byte_stream_split_decode::<DoubleType>(data, -1);
1983    }
1984
1985    #[test]
1986    fn test_byte_stream_split_multiple_i32() {
1987        let data = vec![
1988            vec![
1989                i32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
1990                i32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
1991            ],
1992            vec![i32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
1993        ];
1994        test_byte_stream_split_decode::<Int32Type>(data, -1);
1995    }
1996
1997    #[test]
1998    fn test_byte_stream_split_i64() {
1999        let data = vec![vec![
2000            i64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
2001            i64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
2002        ]];
2003        test_byte_stream_split_decode::<Int64Type>(data, -1);
2004    }
2005
2006    fn test_byte_stream_split_flba(type_width: usize) {
2007        let data = vec![
2008            vec![
2009                FixedLenByteArrayType::r#gen(type_width as i32),
2010                FixedLenByteArrayType::r#gen(type_width as i32),
2011            ],
2012            vec![FixedLenByteArrayType::r#gen(type_width as i32)],
2013        ];
2014        test_byte_stream_split_decode::<FixedLenByteArrayType>(data, type_width as i32);
2015    }
2016
2017    #[test]
2018    fn test_byte_stream_split_flba5() {
2019        test_byte_stream_split_flba(5);
2020    }
2021
2022    #[test]
2023    fn test_byte_stream_split_flba16() {
2024        test_byte_stream_split_flba(16);
2025    }
2026
2027    #[test]
2028    fn test_byte_stream_split_flba19() {
2029        test_byte_stream_split_flba(19);
2030    }
2031
2032    #[test]
2033    #[should_panic(expected = "Mismatched FixedLenByteArray sizes: 4 != 5")]
2034    fn test_byte_stream_split_flba_mismatch() {
2035        let data = vec![
2036            vec![
2037                FixedLenByteArray::from(vec![0xAA, 0xAB, 0xAC, 0xAD, 0xAE]),
2038                FixedLenByteArray::from(vec![0xBA, 0xBB, 0xBC, 0xBD, 0xBE]),
2039            ],
2040            vec![FixedLenByteArray::from(vec![0xCA, 0xCB, 0xCC, 0xCD])],
2041        ];
2042        test_byte_stream_split_decode::<FixedLenByteArrayType>(data, 5);
2043    }
2044
2045    #[test]
2046    #[should_panic(expected = "Input data length is not a multiple of type width 4")]
2047    fn test_byte_stream_split_flba_bad_input() {
2048        let mut decoder = VariableWidthByteStreamSplitDecoder::<FixedLenByteArrayType>::new(4);
2049        decoder
2050            .set_data(Bytes::from(vec![1, 2, 3, 4, 5]), 1)
2051            .unwrap();
2052    }
2053
2054    #[test]
2055    fn test_skip_byte_stream_split() {
2056        let block_data = vec![0.3, 0.4, 0.1, 4.10];
2057        test_skip::<FloatType>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
2058        test_skip::<DoubleType>(
2059            block_data.into_iter().map(|x| x as f64).collect(),
2060            Encoding::BYTE_STREAM_SPLIT,
2061            100,
2062        );
2063    }
2064
2065    #[test]
2066    fn test_skip_byte_stream_split_ints() {
2067        let block_data = vec![3, 4, 1, 5];
2068        test_skip::<Int32Type>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
2069        test_skip::<Int64Type>(
2070            block_data.into_iter().map(|x| x as i64).collect(),
2071            Encoding::BYTE_STREAM_SPLIT,
2072            100,
2073        );
2074    }
2075
2076    fn test_rle_value_decode<T: DataType>(data: Vec<Vec<T::T>>) {
2077        test_encode_decode::<T>(data, Encoding::RLE, -1);
2078    }
2079
2080    fn test_delta_bit_packed_decode<T: DataType>(data: Vec<Vec<T::T>>) {
2081        test_encode_decode::<T>(data, Encoding::DELTA_BINARY_PACKED, -1);
2082    }
2083
2084    fn test_byte_stream_split_decode<T: DataType>(data: Vec<Vec<T::T>>, type_width: i32) {
2085        test_encode_decode::<T>(data, Encoding::BYTE_STREAM_SPLIT, type_width);
2086    }
2087
2088    fn test_delta_byte_array_decode(data: Vec<Vec<ByteArray>>) {
2089        test_encode_decode::<ByteArrayType>(data, Encoding::DELTA_BYTE_ARRAY, -1);
2090    }
2091
2092    // Input data represents vector of data slices to write (test multiple `put()` calls)
2093    // For example,
2094    //   vec![vec![1, 2, 3]] invokes `put()` once and writes {1, 2, 3}
2095    //   vec![vec![1, 2], vec![3]] invokes `put()` twice and writes {1, 2, 3}
2096    fn test_encode_decode<T: DataType>(data: Vec<Vec<T::T>>, encoding: Encoding, type_width: i32) {
2097        let col_descr = create_test_col_desc_ptr(type_width, T::get_physical_type());
2098
2099        // Encode data
2100        let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
2101
2102        for v in &data[..] {
2103            encoder.put(&v[..]).expect("ok to encode");
2104        }
2105        let bytes = encoder.flush_buffer().expect("ok to flush buffer");
2106
2107        // Flatten expected data as contiguous array of values
2108        let expected: Vec<T::T> = data.iter().flat_map(|s| s.clone()).collect();
2109
2110        // Decode data and compare with original
2111        let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
2112
2113        let mut result = vec![T::T::default(); expected.len()];
2114        decoder
2115            .set_data(bytes, expected.len())
2116            .expect("ok to set data");
2117        let mut result_num_values = 0;
2118        while decoder.values_left() > 0 {
2119            result_num_values += decoder
2120                .get(&mut result[result_num_values..])
2121                .expect("ok to decode");
2122        }
2123        assert_eq!(result_num_values, expected.len());
2124        assert_eq!(result, expected);
2125    }
2126
2127    fn test_skip<T: DataType>(data: Vec<T::T>, encoding: Encoding, skip: usize) {
2128        // Type length should not really matter for encode/decode test,
2129        // otherwise change it based on type
2130        let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());
2131
2132        // Encode data
2133        let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
2134
2135        encoder.put(&data).expect("ok to encode");
2136
2137        let bytes = encoder.flush_buffer().expect("ok to flush buffer");
2138
2139        let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
2140        decoder.set_data(bytes, data.len()).expect("ok to set data");
2141
2142        if skip >= data.len() {
2143            let skipped = decoder.skip(skip).expect("ok to skip");
2144            assert_eq!(skipped, data.len());
2145
2146            let skipped_again = decoder.skip(skip).expect("ok to skip again");
2147            assert_eq!(skipped_again, 0);
2148        } else {
2149            let skipped = decoder.skip(skip).expect("ok to skip");
2150            assert_eq!(skipped, skip);
2151
2152            let remaining = data.len() - skip;
2153
2154            let expected = &data[skip..];
2155            let mut buffer = vec![T::T::default(); remaining];
2156            let fetched = decoder.get(&mut buffer).expect("ok to decode");
2157            assert_eq!(remaining, fetched);
2158            assert_eq!(&buffer, expected);
2159        }
2160    }
2161
2162    fn create_and_check_decoder<T: DataType>(encoding: Encoding, err: Option<ParquetError>) {
2163        let descr = create_test_col_desc_ptr(-1, T::get_physical_type());
2164        let decoder = get_decoder::<T>(descr, encoding);
2165        match err {
2166            Some(parquet_error) => {
2167                assert_eq!(
2168                    decoder.err().unwrap().to_string(),
2169                    parquet_error.to_string()
2170                );
2171            }
2172            None => {
2173                assert_eq!(decoder.unwrap().encoding(), encoding);
2174            }
2175        }
2176    }
2177
2178    // Creates test column descriptor.
2179    fn create_test_col_desc_ptr(type_len: i32, t: Type) -> ColumnDescPtr {
2180        let ty = SchemaType::primitive_type_builder("t", t)
2181            .with_length(type_len)
2182            .build()
2183            .unwrap();
2184        Arc::new(ColumnDescriptor::new(
2185            Arc::new(ty),
2186            0,
2187            0,
2188            ColumnPath::new(vec![]),
2189        ))
2190    }
2191
2192    fn usize_to_bytes(v: usize) -> [u8; 4] {
2193        (v as u32).to_ne_bytes()
2194    }
2195
2196    /// A util trait to convert slices of different types to byte arrays
2197    trait ToByteArray<T: DataType> {
2198        #[allow(clippy::wrong_self_convention)]
2199        fn to_byte_array(data: &[T::T]) -> Vec<u8>;
2200    }
2201
2202    macro_rules! to_byte_array_impl {
2203        ($ty: ty) => {
2204            impl ToByteArray<$ty> for $ty {
2205                #[allow(clippy::wrong_self_convention)]
2206                fn to_byte_array(data: &[<$ty as DataType>::T]) -> Vec<u8> {
2207                    <$ty as DataType>::T::slice_as_bytes(data).to_vec()
2208                }
2209            }
2210        };
2211    }
2212
2213    to_byte_array_impl!(Int32Type);
2214    to_byte_array_impl!(Int64Type);
2215    to_byte_array_impl!(FloatType);
2216    to_byte_array_impl!(DoubleType);
2217
2218    impl ToByteArray<BoolType> for BoolType {
2219        #[allow(clippy::wrong_self_convention)]
2220        fn to_byte_array(data: &[bool]) -> Vec<u8> {
2221            let mut v = vec![];
2222            for (i, item) in data.iter().enumerate() {
2223                if i % 8 == 0 {
2224                    v.push(0);
2225                }
2226                if *item {
2227                    v[i / 8] |= 1 << (i % 8);
2228                }
2229            }
2230            v
2231        }
2232    }
2233
2234    impl ToByteArray<Int96Type> for Int96Type {
2235        #[allow(clippy::wrong_self_convention)]
2236        fn to_byte_array(data: &[Int96]) -> Vec<u8> {
2237            let mut v = vec![];
2238            for d in data {
2239                v.extend_from_slice(d.as_bytes());
2240            }
2241            v
2242        }
2243    }
2244
2245    impl ToByteArray<ByteArrayType> for ByteArrayType {
2246        #[allow(clippy::wrong_self_convention)]
2247        fn to_byte_array(data: &[ByteArray]) -> Vec<u8> {
2248            let mut v = vec![];
2249            for d in data {
2250                let buf = d.data();
2251                let len = &usize_to_bytes(buf.len());
2252                v.extend_from_slice(len);
2253                v.extend(buf);
2254            }
2255            v
2256        }
2257    }
2258
2259    impl ToByteArray<FixedLenByteArrayType> for FixedLenByteArrayType {
2260        #[allow(clippy::wrong_self_convention)]
2261        fn to_byte_array(data: &[FixedLenByteArray]) -> Vec<u8> {
2262            let mut v = vec![];
2263            for d in data {
2264                let buf = d.data();
2265                v.extend(buf);
2266            }
2267            v
2268        }
2269    }
2270
2271    #[test]
2272    // Allow initializing a vector and pushing to it for clarity in this test
2273    #[allow(clippy::vec_init_then_push)]
2274    fn test_delta_bit_packed_invalid_bit_width() {
2275        // Manually craft a buffer with an invalid bit width
2276        let mut buffer = vec![];
2277        // block_size = 128
2278        buffer.push(128);
2279        buffer.push(1);
2280        // mini_blocks_per_block = 4
2281        buffer.push(4);
2282        // num_values = 32
2283        buffer.push(32);
2284        // first_value = 0
2285        buffer.push(0);
2286        // min_delta = 0
2287        buffer.push(0);
2288        // bit_widths, one for each of the 4 mini blocks
2289        buffer.push(33); // Invalid bit width
2290        buffer.push(0);
2291        buffer.push(0);
2292        buffer.push(0);
2293
2294        let corrupted_buffer = Bytes::from(buffer);
2295
2296        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
2297        decoder.set_data(corrupted_buffer.clone(), 32).unwrap();
2298        let mut read_buffer = vec![0; 32];
2299        let err = decoder.get(&mut read_buffer).unwrap_err();
2300        assert!(
2301            err.to_string()
2302                .contains("Invalid delta bit width 33 which is larger than expected 32"),
2303            "{}",
2304            err
2305        );
2306
2307        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
2308        decoder.set_data(corrupted_buffer, 32).unwrap();
2309        let err = decoder.skip(32).unwrap_err();
2310        assert!(
2311            err.to_string()
2312                .contains("Invalid delta bit width 33 which is larger than expected 32"),
2313            "{}",
2314            err
2315        );
2316    }
2317}