Skip to main content

parquet/encodings/
decoding.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains all supported decoders for Parquet.
19
20use bytes::Bytes;
21use num_traits::{FromPrimitive, WrappingAdd};
22use std::{cmp, marker::PhantomData, mem};
23
24use super::rle::RleDecoder;
25
26use crate::basic::*;
27use crate::data_type::private::ParquetValueType;
28use crate::data_type::*;
29use crate::encodings::decoding::byte_stream_split_decoder::{
30    ByteStreamSplitDecoder, VariableWidthByteStreamSplitDecoder,
31};
32use crate::errors::{ParquetError, Result};
33use crate::schema::types::ColumnDescPtr;
34use crate::util::bit_util::{self, BitReader, FromBitpacked};
35
36mod byte_stream_split_decoder;
37
38pub(crate) mod private {
39    use super::*;
40
41    /// A trait that allows getting a [`Decoder`] implementation for a [`DataType`] with
42    /// the corresponding [`ParquetValueType`]. This is necessary to support
43    /// [`Decoder`] implementations that may not be applicable for all [`DataType`]
44    /// and by extension all [`ParquetValueType`]
45    pub trait GetDecoder {
46        fn get_decoder<T: DataType<T = Self>>(
47            descr: ColumnDescPtr,
48            encoding: Encoding,
49        ) -> Result<Box<dyn Decoder<T>>> {
50            get_decoder_default(descr, encoding)
51        }
52    }
53
54    fn get_decoder_default<T: DataType>(
55        descr: ColumnDescPtr,
56        encoding: Encoding,
57    ) -> Result<Box<dyn Decoder<T>>> {
58        match encoding {
59            Encoding::PLAIN => Ok(Box::new(PlainDecoder::new(descr.type_length()))),
60            Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Err(general_err!(
61                "Cannot initialize this encoding through this function"
62            )),
63            Encoding::RLE
64            | Encoding::DELTA_BINARY_PACKED
65            | Encoding::DELTA_BYTE_ARRAY
66            | Encoding::DELTA_LENGTH_BYTE_ARRAY => Err(general_err!(
67                "Encoding {} is not supported for type",
68                encoding
69            )),
70            e => Err(nyi_err!("Encoding {} is not supported", e)),
71        }
72    }
73
74    impl GetDecoder for bool {
75        fn get_decoder<T: DataType<T = Self>>(
76            descr: ColumnDescPtr,
77            encoding: Encoding,
78        ) -> Result<Box<dyn Decoder<T>>> {
79            match encoding {
80                Encoding::RLE => Ok(Box::new(RleValueDecoder::new())),
81                _ => get_decoder_default(descr, encoding),
82            }
83        }
84    }
85
86    impl GetDecoder for i32 {
87        fn get_decoder<T: DataType<T = Self>>(
88            descr: ColumnDescPtr,
89            encoding: Encoding,
90        ) -> Result<Box<dyn Decoder<T>>> {
91            match encoding {
92                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
93                Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
94                _ => get_decoder_default(descr, encoding),
95            }
96        }
97    }
98
99    impl GetDecoder for i64 {
100        fn get_decoder<T: DataType<T = Self>>(
101            descr: ColumnDescPtr,
102            encoding: Encoding,
103        ) -> Result<Box<dyn Decoder<T>>> {
104            match encoding {
105                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
106                Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
107                _ => get_decoder_default(descr, encoding),
108            }
109        }
110    }
111
112    impl GetDecoder for f32 {
113        fn get_decoder<T: DataType<T = Self>>(
114            descr: ColumnDescPtr,
115            encoding: Encoding,
116        ) -> Result<Box<dyn Decoder<T>>> {
117            match encoding {
118                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
119                _ => get_decoder_default(descr, encoding),
120            }
121        }
122    }
123    impl GetDecoder for f64 {
124        fn get_decoder<T: DataType<T = Self>>(
125            descr: ColumnDescPtr,
126            encoding: Encoding,
127        ) -> Result<Box<dyn Decoder<T>>> {
128            match encoding {
129                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
130                _ => get_decoder_default(descr, encoding),
131            }
132        }
133    }
134
135    impl GetDecoder for ByteArray {
136        fn get_decoder<T: DataType<T = Self>>(
137            descr: ColumnDescPtr,
138            encoding: Encoding,
139        ) -> Result<Box<dyn Decoder<T>>> {
140            match encoding {
141                Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
142                Encoding::DELTA_LENGTH_BYTE_ARRAY => {
143                    Ok(Box::new(DeltaLengthByteArrayDecoder::new()))
144                }
145                _ => get_decoder_default(descr, encoding),
146            }
147        }
148    }
149
150    impl GetDecoder for FixedLenByteArray {
151        fn get_decoder<T: DataType<T = Self>>(
152            descr: ColumnDescPtr,
153            encoding: Encoding,
154        ) -> Result<Box<dyn Decoder<T>>> {
155            match encoding {
156                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(
157                    VariableWidthByteStreamSplitDecoder::new(descr.type_length()),
158                )),
159                Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
160                _ => get_decoder_default(descr, encoding),
161            }
162        }
163    }
164
165    impl GetDecoder for Int96 {}
166}
167
168// ----------------------------------------------------------------------
169// Decoders
170
171/// A Parquet decoder for the data type `T`.
172pub trait Decoder<T: DataType>: Send {
173    /// Sets the data to decode to be `data`, which should contain `num_values` of values
174    /// to decode.
175    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()>;
176
177    /// Consumes values from this decoder and write the results to `buffer`. This will try
178    /// to fill up `buffer`.
179    ///
180    /// Returns the actual number of values decoded, which should be equal to
181    /// `buffer.len()` unless the remaining number of values is less than
182    /// `buffer.len()`.
183    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize>;
184
185    /// Consume values from this decoder and write the results to `buffer`, leaving
186    /// "spaces" for null values.
187    ///
188    /// `null_count` is the number of nulls we expect to see in `buffer`, after reading.
189    /// `valid_bits` stores the valid bit for each value in the buffer. It should contain
190    ///   at least number of bits that equal to `buffer.len()`.
191    ///
192    /// Returns the actual number of values decoded.
193    ///
194    /// # Panics
195    ///
196    /// Panics if `null_count` is greater than `buffer.len()`.
197    fn get_spaced(
198        &mut self,
199        buffer: &mut [T::T],
200        null_count: usize,
201        valid_bits: &[u8],
202    ) -> Result<usize> {
203        assert!(buffer.len() >= null_count);
204
205        // TODO: check validity of the input arguments?
206        if null_count == 0 {
207            return self.get(buffer);
208        }
209
210        let num_values = buffer.len();
211        let values_to_read = num_values - null_count;
212        let values_read = self.get(buffer)?;
213        if values_read != values_to_read {
214            return Err(general_err!(
215                "Number of values read: {}, doesn't match expected: {}",
216                values_read,
217                values_to_read
218            ));
219        }
220        let mut values_to_move = values_read;
221        for i in (0..num_values).rev() {
222            if bit_util::get_bit(valid_bits, i) {
223                values_to_move -= 1;
224                buffer.swap(i, values_to_move);
225            }
226        }
227
228        Ok(num_values)
229    }
230
231    /// Returns the number of values left in this decoder stream.
232    fn values_left(&self) -> usize;
233
234    /// Returns the encoding for this decoder.
235    fn encoding(&self) -> Encoding;
236
237    /// Skip the specified number of values in this decoder stream.
238    fn skip(&mut self, num_values: usize) -> Result<usize>;
239}
240
241/// Gets a decoder for the column descriptor `descr` and encoding type `encoding`.
242///
243/// NOTE: the primitive type in `descr` MUST match the data type `T`, otherwise
244/// disastrous consequence could occur.
245pub fn get_decoder<T: DataType>(
246    descr: ColumnDescPtr,
247    encoding: Encoding,
248) -> Result<Box<dyn Decoder<T>>> {
249    use self::private::GetDecoder;
250    T::T::get_decoder(descr, encoding)
251}
252
253// ----------------------------------------------------------------------
254// PLAIN Decoding
255
256#[derive(Default)]
257pub struct PlainDecoderDetails {
258    // The remaining number of values in the byte array
259    pub(crate) num_values: usize,
260
261    // The current starting index in the byte array. Not used when `T` is bool.
262    pub(crate) start: usize,
263
264    // The length for the type `T`. Only used when `T` is `FixedLenByteArrayType`
265    pub(crate) type_length: i32,
266
267    // The byte array to decode from. Not set if `T` is bool.
268    pub(crate) data: Option<Bytes>,
269
270    // Read `data` bit by bit. Only set if `T` is bool.
271    pub(crate) bit_reader: Option<BitReader>,
272}
273
274/// Plain decoding that supports all types.
275///
276/// Values are encoded back to back. For native types, data is encoded as little endian.
277/// Floating point types are encoded in IEEE.
278/// See [`PlainEncoder`](crate::encoding::PlainEncoder) for more information.
279pub struct PlainDecoder<T: DataType> {
280    // The binary details needed for decoding
281    inner: PlainDecoderDetails,
282
283    // To allow `T` in the generic parameter for this struct. This doesn't take any
284    // space.
285    _phantom: PhantomData<T>,
286}
287
288impl<T: DataType> PlainDecoder<T> {
289    /// Creates new plain decoder.
290    pub fn new(type_length: i32) -> Self {
291        PlainDecoder {
292            inner: PlainDecoderDetails {
293                type_length,
294                num_values: 0,
295                start: 0,
296                data: None,
297                bit_reader: None,
298            },
299            _phantom: PhantomData,
300        }
301    }
302}
303
304impl<T: DataType> Decoder<T> for PlainDecoder<T> {
305    #[inline]
306    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
307        T::T::set_data(&mut self.inner, data, num_values);
308        Ok(())
309    }
310
311    #[inline]
312    fn values_left(&self) -> usize {
313        self.inner.num_values
314    }
315
316    #[inline]
317    fn encoding(&self) -> Encoding {
318        Encoding::PLAIN
319    }
320
321    #[inline]
322    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
323        T::T::decode(buffer, &mut self.inner)
324    }
325
326    #[inline]
327    fn skip(&mut self, num_values: usize) -> Result<usize> {
328        T::T::skip(&mut self.inner, num_values)
329    }
330}
331
332// ----------------------------------------------------------------------
333// RLE_DICTIONARY/PLAIN_DICTIONARY Decoding
334
335/// Dictionary decoder.
336///
337/// The dictionary encoding builds a dictionary of values encountered in a given column.
338/// The dictionary is be stored in a dictionary page per column chunk.
339/// See [`DictEncoder`](crate::encoding::DictEncoder) for more information.
340pub struct DictDecoder<T: DataType> {
341    // The dictionary, which maps ids to the values
342    dictionary: Vec<T::T>,
343
344    // Whether `dictionary` has been initialized
345    has_dictionary: bool,
346
347    // The decoder for the value ids
348    rle_decoder: Option<RleDecoder>,
349
350    // Number of values left in the data stream
351    num_values: usize,
352}
353
354impl<T: DataType> Default for DictDecoder<T> {
355    fn default() -> Self {
356        Self::new()
357    }
358}
359
360impl<T: DataType> DictDecoder<T> {
361    /// Creates new dictionary decoder.
362    pub fn new() -> Self {
363        Self {
364            dictionary: vec![],
365            has_dictionary: false,
366            rle_decoder: None,
367            num_values: 0,
368        }
369    }
370
371    /// Decodes and sets values for dictionary using `decoder` decoder.
372    pub fn set_dict(&mut self, mut decoder: Box<dyn Decoder<T>>) -> Result<()> {
373        let num_values = decoder.values_left();
374        self.dictionary.resize(num_values, T::T::default());
375        let _ = decoder.get(&mut self.dictionary)?;
376        self.has_dictionary = true;
377        Ok(())
378    }
379}
380
381impl<T: DataType> Decoder<T> for DictDecoder<T> {
382    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
383        // First byte in `data` is bit width
384        if data.is_empty() {
385            return Err(eof_err!("Not enough bytes to decode bit_width"));
386        }
387
388        let bit_width = data.as_ref()[0];
389        if bit_width > 32 {
390            return Err(general_err!(
391                "Invalid or corrupted RLE bit width {}. Max allowed is 32",
392                bit_width
393            ));
394        }
395        let mut rle_decoder = RleDecoder::new(bit_width);
396        rle_decoder.set_data(data.slice(1..))?;
397        self.num_values = num_values;
398        self.rle_decoder = Some(rle_decoder);
399        Ok(())
400    }
401
402    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
403        assert!(self.rle_decoder.is_some());
404        assert!(self.has_dictionary, "Must call set_dict() first!");
405
406        let rle = self.rle_decoder.as_mut().unwrap();
407        let num_values = cmp::min(buffer.len(), self.num_values);
408        rle.get_batch_with_dict(&self.dictionary[..], buffer, num_values)
409    }
410
411    /// Number of values left in this decoder stream
412    fn values_left(&self) -> usize {
413        self.num_values
414    }
415
416    fn encoding(&self) -> Encoding {
417        Encoding::RLE_DICTIONARY
418    }
419
420    fn skip(&mut self, num_values: usize) -> Result<usize> {
421        assert!(self.rle_decoder.is_some());
422        assert!(self.has_dictionary, "Must call set_dict() first!");
423
424        let rle = self.rle_decoder.as_mut().unwrap();
425        let num_values = cmp::min(num_values, self.num_values);
426        rle.skip(num_values)
427    }
428}
429
430// ----------------------------------------------------------------------
431// RLE Decoding
432
433/// RLE/Bit-Packing hybrid decoding for values.
434/// Currently is used only for data pages v2 and supports boolean types.
435/// See [`RleValueEncoder`](crate::encoding::RleValueEncoder) for more information.
436pub struct RleValueDecoder<T: DataType> {
437    values_left: usize,
438    decoder: RleDecoder,
439    _phantom: PhantomData<T>,
440}
441
442impl<T: DataType> Default for RleValueDecoder<T> {
443    fn default() -> Self {
444        Self::new()
445    }
446}
447
448impl<T: DataType> RleValueDecoder<T> {
449    pub fn new() -> Self {
450        Self {
451            values_left: 0,
452            decoder: RleDecoder::new(1),
453            _phantom: PhantomData,
454        }
455    }
456}
457
458impl<T: DataType> Decoder<T> for RleValueDecoder<T>
459where
460    T::T: FromBitpacked,
461{
462    #[inline]
463    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
464        // Only support RLE value reader for boolean values with bit width of 1.
465        ensure_phys_ty!(Type::BOOLEAN, "RleValueDecoder only supports BoolType");
466
467        // We still need to remove prefix of i32 from the stream.
468        const I32_SIZE: usize = mem::size_of::<i32>();
469        if data.len() < I32_SIZE {
470            return Err(eof_err!("Not enough bytes to decode"));
471        }
472        let data_size = bit_util::read_num_bytes::<i32>(I32_SIZE, data.as_ref()) as usize;
473        if data.len() - I32_SIZE < data_size {
474            return Err(eof_err!("Not enough bytes to decode"));
475        }
476
477        self.decoder = RleDecoder::new(1);
478        self.decoder
479            .set_data(data.slice(I32_SIZE..I32_SIZE + data_size))?;
480        self.values_left = num_values;
481        Ok(())
482    }
483
484    #[inline]
485    fn values_left(&self) -> usize {
486        self.values_left
487    }
488
489    #[inline]
490    fn encoding(&self) -> Encoding {
491        Encoding::RLE
492    }
493
494    #[inline]
495    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
496        let num_values = cmp::min(buffer.len(), self.values_left);
497        let values_read = self.decoder.get_batch(&mut buffer[..num_values])?;
498        self.values_left -= values_read;
499        Ok(values_read)
500    }
501
502    #[inline]
503    fn skip(&mut self, num_values: usize) -> Result<usize> {
504        let num_values = cmp::min(num_values, self.values_left);
505        let values_skipped = self.decoder.skip(num_values)?;
506        self.values_left -= values_skipped;
507        Ok(values_skipped)
508    }
509}
510
511// ----------------------------------------------------------------------
512// DELTA_BINARY_PACKED Decoding
513
514/// Delta binary packed decoder.
515/// Supports INT32 and INT64 types.
516/// See [`DeltaBitPackEncoder`](crate::encoding::DeltaBitPackEncoder) for more
517/// information.
518pub struct DeltaBitPackDecoder<T: DataType> {
519    bit_reader: BitReader,
520    initialized: bool,
521
522    // Header info
523    /// The number of values in each block
524    block_size: usize,
525    /// The number of values that remain to be read in the current page
526    values_left: usize,
527    /// The number of mini-blocks in each block
528    mini_blocks_per_block: usize,
529    /// The number of values in each mini block
530    values_per_mini_block: usize,
531
532    // Per block info
533    /// The minimum delta in the block
534    min_delta: T::T,
535    /// The byte offset of the end of the current block
536    block_end_offset: usize,
537    /// The index on the current mini block
538    mini_block_idx: usize,
539    /// The bit widths of each mini block in the current block
540    mini_block_bit_widths: Vec<u8>,
541    /// The number of values remaining in the current mini block
542    mini_block_remaining: usize,
543
544    /// The first value from the block header if not consumed
545    first_value: Option<T::T>,
546    /// The last value to compute offsets from
547    last_value: T::T,
548}
549
550impl<T: DataType> Default for DeltaBitPackDecoder<T>
551where
552    T::T: Default + FromPrimitive + WrappingAdd + Copy,
553{
554    fn default() -> Self {
555        Self::new()
556    }
557}
558
559impl<T: DataType> DeltaBitPackDecoder<T>
560where
561    T::T: Default + FromPrimitive + WrappingAdd + Copy,
562{
563    /// Creates new delta bit packed decoder.
564    pub fn new() -> Self {
565        Self {
566            bit_reader: BitReader::from(vec![]),
567            initialized: false,
568            block_size: 0,
569            values_left: 0,
570            mini_blocks_per_block: 0,
571            values_per_mini_block: 0,
572            min_delta: Default::default(),
573            mini_block_idx: 0,
574            mini_block_bit_widths: vec![],
575            mini_block_remaining: 0,
576            block_end_offset: 0,
577            first_value: None,
578            last_value: Default::default(),
579        }
580    }
581
582    /// Returns the current offset
583    pub fn get_offset(&self) -> usize {
584        assert!(self.initialized, "Bit reader is not initialized");
585        match self.values_left {
586            // If we've exhausted this page report the end of the current block
587            // as we may not have consumed the trailing padding
588            //
589            // The max is necessary to handle pages which don't contain more than
590            // one value and therefore have no blocks, but still contain a page header
591            0 => self.bit_reader.get_byte_offset().max(self.block_end_offset),
592            _ => self.bit_reader.get_byte_offset(),
593        }
594    }
595
596    /// Initializes the next block and the first mini block within it
597    #[inline]
598    fn next_block(&mut self) -> Result<()> {
599        let min_delta = self
600            .bit_reader
601            .get_zigzag_vlq_int()
602            .ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?;
603
604        self.min_delta =
605            T::T::from_i64(min_delta).ok_or_else(|| general_err!("'min_delta' too large"))?;
606
607        self.mini_block_bit_widths.clear();
608        self.bit_reader
609            .get_aligned_bytes(&mut self.mini_block_bit_widths, self.mini_blocks_per_block);
610
611        let mut offset = self.bit_reader.get_byte_offset();
612        let mut remaining = self.values_left;
613
614        // Compute the end offset of the current block
615        for b in &mut self.mini_block_bit_widths {
616            if remaining == 0 {
617                // Specification requires handling arbitrary bit widths
618                // for trailing mini blocks
619                *b = 0;
620            }
621            remaining = remaining.saturating_sub(self.values_per_mini_block);
622            offset += *b as usize * self.values_per_mini_block / 8;
623        }
624        self.block_end_offset = offset;
625
626        if self.mini_block_bit_widths.len() != self.mini_blocks_per_block {
627            return Err(eof_err!("insufficient mini block bit widths"));
628        }
629
630        self.mini_block_remaining = self.values_per_mini_block;
631        self.mini_block_idx = 0;
632
633        Ok(())
634    }
635
636    /// Initializes the next mini block
637    #[inline]
638    fn next_mini_block(&mut self) -> Result<()> {
639        if self.mini_block_idx + 1 < self.mini_block_bit_widths.len() {
640            self.mini_block_idx += 1;
641            self.mini_block_remaining = self.values_per_mini_block;
642            Ok(())
643        } else {
644            self.next_block()
645        }
646    }
647
648    /// Verify the bit width is smaller then the integer type that it is trying to decode.
649    #[inline]
650    fn check_bit_width(&self, bit_width: usize) -> Result<()> {
651        if bit_width > std::mem::size_of::<T::T>() * 8 {
652            return Err(general_err!(
653                "Invalid delta bit width {} which is larger than expected {} ",
654                bit_width,
655                std::mem::size_of::<T::T>() * 8
656            ));
657        }
658        Ok(())
659    }
660}
661
662impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T>
663where
664    T::T: Default + FromPrimitive + FromBitpacked + WrappingAdd + Copy,
665{
666    // # of total values is derived from encoding
667    #[inline]
668    fn set_data(&mut self, data: Bytes, _index: usize) -> Result<()> {
669        self.bit_reader = BitReader::new(data);
670        self.initialized = true;
671
672        // Read header information
673        self.block_size = self
674            .bit_reader
675            .get_vlq_int()
676            .ok_or_else(|| eof_err!("Not enough data to decode 'block_size'"))?
677            .try_into()
678            .map_err(|_| general_err!("invalid 'block_size'"))?;
679
680        self.mini_blocks_per_block = self
681            .bit_reader
682            .get_vlq_int()
683            .ok_or_else(|| eof_err!("Not enough data to decode 'mini_blocks_per_block'"))?
684            .try_into()
685            .map_err(|_| general_err!("invalid 'mini_blocks_per_block'"))?;
686
687        if self.mini_blocks_per_block == 0 {
688            return Err(general_err!("cannot have zero miniblocks per block"));
689        }
690
691        self.values_left = self
692            .bit_reader
693            .get_vlq_int()
694            .ok_or_else(|| eof_err!("Not enough data to decode 'values_left'"))?
695            .try_into()
696            .map_err(|_| general_err!("invalid 'values_left'"))?;
697
698        let first_value = self
699            .bit_reader
700            .get_zigzag_vlq_int()
701            .ok_or_else(|| eof_err!("Not enough data to decode 'first_value'"))?;
702
703        self.first_value =
704            Some(T::T::from_i64(first_value).ok_or_else(|| general_err!("first value too large"))?);
705
706        if self.block_size % 128 != 0 {
707            return Err(general_err!(
708                "'block_size' must be a multiple of 128, got {}",
709                self.block_size
710            ));
711        }
712
713        if self.block_size % self.mini_blocks_per_block != 0 {
714            return Err(general_err!(
715                "'block_size' must be a multiple of 'mini_blocks_per_block' got {} and {}",
716                self.block_size,
717                self.mini_blocks_per_block
718            ));
719        }
720
721        // Reset decoding state
722        self.mini_block_idx = 0;
723        self.values_per_mini_block = self.block_size / self.mini_blocks_per_block;
724        self.mini_block_remaining = 0;
725        self.mini_block_bit_widths.clear();
726
727        if self.values_per_mini_block % 32 != 0 {
728            return Err(general_err!(
729                "'values_per_mini_block' must be a multiple of 32 got {}",
730                self.values_per_mini_block
731            ));
732        }
733
734        Ok(())
735    }
736
737    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
738        assert!(self.initialized, "Bit reader is not initialized");
739        if buffer.is_empty() {
740            return Ok(0);
741        }
742
743        let mut read = 0;
744        let to_read = buffer.len().min(self.values_left);
745
746        if let Some(value) = self.first_value.take() {
747            self.last_value = value;
748            buffer[0] = value;
749            read += 1;
750            self.values_left -= 1;
751        }
752
753        while read != to_read {
754            if self.mini_block_remaining == 0 {
755                self.next_mini_block()?;
756            }
757
758            let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
759            self.check_bit_width(bit_width)?;
760            let batch_to_read = self.mini_block_remaining.min(to_read - read);
761
762            let batch_read = self
763                .bit_reader
764                .get_batch(&mut buffer[read..read + batch_to_read], bit_width);
765
766            if batch_read != batch_to_read {
767                return Err(general_err!(
768                    "Expected to read {} values from miniblock got {}",
769                    batch_to_read,
770                    batch_read
771                ));
772            }
773
774            // At this point we have read the deltas to `buffer` we now need to offset
775            // these to get back to the original values that were encoded
776            //
777            // Optimization: if the bit_width for the miniblock is 0, then we can employ
778            // a faster decoding method than setting `value[i] = value[i-1] + value[i] + min_delta`.
779            // Where min_delta is 0 (all values in the miniblock are the same), we can simply
780            // set all values to `self.last_value`. In the case of non-zero min_delta (values
781            // in the mini-block form an arithmetic progression) each value can be computed via
782            // `value[i] = (i + 1) * min_delta + last_value`. In both cases we remove the
783            // dependence on the preceding value.
784            // Kudos to @pitrou for the idea https://github.com/apache/arrow/pull/49296
785            let min_delta = self.min_delta.as_i64()?;
786            if bit_width == 0 {
787                if min_delta == 0 {
788                    buffer[read..read + batch_read].fill(self.last_value);
789                } else {
790                    // the c++ version multiplies min_delta by the iter index, but doing
791                    // wrapping_mul through T::T was a bit slower. this is still
792                    // faster than before.
793                    let mut delta = self.min_delta;
794                    for v in &mut buffer[read..read + batch_read] {
795                        *v = self.last_value.wrapping_add(&delta);
796                        delta = delta.wrapping_add(&self.min_delta);
797                    }
798
799                    self.last_value = buffer[read + batch_read - 1];
800                }
801            } else {
802                // It is OK for deltas to contain "overflowed" values after encoding,
803                // e.g. i64::MAX - i64::MIN, so we use `wrapping_add` to "overflow" again and
804                // restore original value.
805                if min_delta == 0 {
806                    for v in &mut buffer[read..read + batch_read] {
807                        *v = v.wrapping_add(&self.last_value);
808                        self.last_value = *v;
809                    }
810                } else {
811                    for v in &mut buffer[read..read + batch_read] {
812                        *v = v
813                            .wrapping_add(&self.min_delta)
814                            .wrapping_add(&self.last_value);
815                        self.last_value = *v;
816                    }
817                }
818            }
819
820            read += batch_read;
821            self.mini_block_remaining -= batch_read;
822            self.values_left -= batch_read;
823        }
824
825        Ok(to_read)
826    }
827
828    fn values_left(&self) -> usize {
829        self.values_left
830    }
831
832    fn encoding(&self) -> Encoding {
833        Encoding::DELTA_BINARY_PACKED
834    }
835
836    fn skip(&mut self, num_values: usize) -> Result<usize> {
837        let mut skip = 0;
838        let to_skip = num_values.min(self.values_left);
839        if to_skip == 0 {
840            return Ok(0);
841        }
842
843        // try to consume first value in header.
844        if let Some(value) = self.first_value.take() {
845            self.last_value = value;
846            skip += 1;
847            self.values_left -= 1;
848        }
849
850        let mini_block_batch_size = match T::T::PHYSICAL_TYPE {
851            Type::INT32 => 32,
852            Type::INT64 => 64,
853            _ => unreachable!(),
854        };
855
856        let mut skip_buffer = vec![T::T::default(); mini_block_batch_size];
857        while skip < to_skip {
858            if self.mini_block_remaining == 0 {
859                self.next_mini_block()?;
860            }
861
862            let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
863            self.check_bit_width(bit_width)?;
864            let mini_block_to_skip = self.mini_block_remaining.min(to_skip - skip);
865
866            let min_delta = self.min_delta.as_i64()?;
867            if bit_width == 0 {
868                // All remainders are zero: every delta equals min_delta exactly.
869                // Advance last_value by n * min_delta with no bit reads.
870                if min_delta != 0 {
871                    let total = min_delta.wrapping_mul(mini_block_to_skip as i64);
872                    let step = T::T::from_i64(total)
873                        .ok_or_else(|| general_err!("delta*n overflow in skip"))?;
874                    self.last_value = self.last_value.wrapping_add(&step);
875                }
876                // bit_width=0 payloads occupy zero bytes; no bit_reader advancement needed.
877            } else {
878                // bw>0: must decode to track last_value for subsequent get() calls.
879                let skip_count = self
880                    .bit_reader
881                    .get_batch(&mut skip_buffer[0..mini_block_to_skip], bit_width);
882
883                if skip_count != mini_block_to_skip {
884                    return Err(general_err!(
885                        "Expected to skip {} values from mini block got {}.",
886                        mini_block_to_skip,
887                        skip_count
888                    ));
889                }
890
891                if min_delta == 0 {
892                    for v in &mut skip_buffer[0..skip_count] {
893                        *v = v.wrapping_add(&self.last_value);
894                        self.last_value = *v;
895                    }
896                } else {
897                    for v in &mut skip_buffer[0..skip_count] {
898                        *v = v
899                            .wrapping_add(&self.min_delta)
900                            .wrapping_add(&self.last_value);
901                        self.last_value = *v;
902                    }
903                }
904            }
905
906            skip += mini_block_to_skip;
907            self.mini_block_remaining -= mini_block_to_skip;
908            self.values_left -= mini_block_to_skip;
909        }
910
911        Ok(to_skip)
912    }
913}
914
915// ----------------------------------------------------------------------
916// DELTA_LENGTH_BYTE_ARRAY Decoding
917
918/// Delta length byte array decoder.
919///
920/// Only applied to byte arrays to separate the length values and the data, the lengths
921/// are encoded using DELTA_BINARY_PACKED encoding.
922/// See [`DeltaLengthByteArrayEncoder`](crate::encoding::DeltaLengthByteArrayEncoder)
923/// for more information.
924pub struct DeltaLengthByteArrayDecoder<T: DataType> {
925    // Lengths for each byte array in `data`
926    // TODO: add memory tracker to this
927    lengths: Vec<i32>,
928
929    // Current index into `lengths`
930    current_idx: usize,
931
932    // Concatenated byte array data
933    data: Option<Bytes>,
934
935    // Offset into `data`, always point to the beginning of next byte array.
936    offset: usize,
937
938    // Number of values left in this decoder stream
939    num_values: usize,
940
941    // Placeholder to allow `T` as generic parameter
942    _phantom: PhantomData<T>,
943}
944
945impl<T: DataType> Default for DeltaLengthByteArrayDecoder<T> {
946    fn default() -> Self {
947        Self::new()
948    }
949}
950
951impl<T: DataType> DeltaLengthByteArrayDecoder<T> {
952    /// Creates new delta length byte array decoder.
953    pub fn new() -> Self {
954        Self {
955            lengths: vec![],
956            current_idx: 0,
957            data: None,
958            offset: 0,
959            num_values: 0,
960            _phantom: PhantomData,
961        }
962    }
963}
964
965impl<T: DataType> Decoder<T> for DeltaLengthByteArrayDecoder<T> {
966    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
967        match T::get_physical_type() {
968            Type::BYTE_ARRAY => {
969                let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
970                len_decoder.set_data(data.clone(), num_values)?;
971                let num_lengths = len_decoder.values_left();
972                self.lengths.resize(num_lengths, 0);
973                len_decoder.get(&mut self.lengths[..])?;
974
975                self.data = Some(data.slice(len_decoder.get_offset()..));
976                self.offset = 0;
977                self.current_idx = 0;
978                self.num_values = num_lengths;
979                Ok(())
980            }
981            _ => Err(general_err!(
982                "DeltaLengthByteArrayDecoder only support ByteArrayType"
983            )),
984        }
985    }
986
987    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
988        match T::get_physical_type() {
989            Type::BYTE_ARRAY => {
990                assert!(self.data.is_some());
991
992                let data = self.data.as_ref().unwrap();
993                let num_values = cmp::min(buffer.len(), self.num_values);
994
995                for item in buffer.iter_mut().take(num_values) {
996                    let len = self.lengths[self.current_idx] as usize;
997                    item.set_from_bytes(data.slice(self.offset..self.offset + len));
998
999                    self.offset += len;
1000                    self.current_idx += 1;
1001                }
1002
1003                self.num_values -= num_values;
1004                Ok(num_values)
1005            }
1006            _ => Err(general_err!(
1007                "DeltaLengthByteArrayDecoder only support ByteArrayType"
1008            )),
1009        }
1010    }
1011
1012    fn values_left(&self) -> usize {
1013        self.num_values
1014    }
1015
1016    fn encoding(&self) -> Encoding {
1017        Encoding::DELTA_LENGTH_BYTE_ARRAY
1018    }
1019
1020    fn skip(&mut self, num_values: usize) -> Result<usize> {
1021        match T::get_physical_type() {
1022            Type::BYTE_ARRAY => {
1023                let num_values = cmp::min(num_values, self.num_values);
1024
1025                let next_offset: i32 = self.lengths
1026                    [self.current_idx..self.current_idx + num_values]
1027                    .iter()
1028                    .sum();
1029
1030                self.current_idx += num_values;
1031                self.offset += next_offset as usize;
1032
1033                self.num_values -= num_values;
1034                Ok(num_values)
1035            }
1036            other_type => Err(general_err!(
1037                "DeltaLengthByteArrayDecoder not support {}, only support byte array",
1038                other_type
1039            )),
1040        }
1041    }
1042}
1043
1044// ----------------------------------------------------------------------
1045// DELTA_BYTE_ARRAY Decoding
1046
1047/// Delta byte array decoder.
1048///
1049/// Prefix lengths are encoded using `DELTA_BINARY_PACKED` encoding, Suffixes are stored
1050/// using `DELTA_LENGTH_BYTE_ARRAY` encoding.
1051/// See [`DeltaByteArrayEncoder`](crate::encoding::DeltaByteArrayEncoder) for more
1052/// information.
1053pub struct DeltaByteArrayDecoder<T: DataType> {
1054    // Prefix lengths for each byte array
1055    // TODO: add memory tracker to this
1056    prefix_lengths: Vec<i32>,
1057
1058    // The current index into `prefix_lengths`,
1059    current_idx: usize,
1060
1061    // Decoder for all suffixes, the # of which should be the same as
1062    // `prefix_lengths.len()`
1063    suffix_decoder: Option<DeltaLengthByteArrayDecoder<ByteArrayType>>,
1064
1065    // The last byte array, used to derive the current prefix
1066    // Stored as Bytes to avoid clone allocation when creating output
1067    previous_value: Bytes,
1068
1069    // Number of values left
1070    num_values: usize,
1071
1072    // Placeholder to allow `T` as generic parameter
1073    _phantom: PhantomData<T>,
1074}
1075
1076impl<T: DataType> Default for DeltaByteArrayDecoder<T> {
1077    fn default() -> Self {
1078        Self::new()
1079    }
1080}
1081
1082impl<T: DataType> DeltaByteArrayDecoder<T> {
1083    /// Creates new delta byte array decoder.
1084    pub fn new() -> Self {
1085        Self {
1086            prefix_lengths: vec![],
1087            current_idx: 0,
1088            suffix_decoder: None,
1089            previous_value: Bytes::new(),
1090            num_values: 0,
1091            _phantom: PhantomData,
1092        }
1093    }
1094}
1095
1096impl<T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
1097    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
1098        match T::get_physical_type() {
1099            Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1100                let mut prefix_len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
1101                prefix_len_decoder.set_data(data.clone(), num_values)?;
1102                let num_prefixes = prefix_len_decoder.values_left();
1103                self.prefix_lengths.resize(num_prefixes, 0);
1104                prefix_len_decoder.get(&mut self.prefix_lengths[..])?;
1105
1106                let mut suffix_decoder = DeltaLengthByteArrayDecoder::new();
1107                suffix_decoder
1108                    .set_data(data.slice(prefix_len_decoder.get_offset()..), num_values)?;
1109                self.suffix_decoder = Some(suffix_decoder);
1110                self.num_values = num_prefixes;
1111                self.current_idx = 0;
1112                self.previous_value = Bytes::new();
1113                Ok(())
1114            }
1115            _ => Err(general_err!(
1116                "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1117            )),
1118        }
1119    }
1120
1121    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
1122        match T::get_physical_type() {
1123            Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1124                let num_values = cmp::min(buffer.len(), self.num_values);
1125                let mut v: [ByteArray; 1] = [ByteArray::new(); 1];
1126                for item in buffer.iter_mut().take(num_values) {
1127                    // Process suffix
1128                    // TODO: this is awkward - maybe we should add a non-vectorized API?
1129                    let suffix_decoder = self
1130                        .suffix_decoder
1131                        .as_mut()
1132                        .expect("decoder not initialized");
1133                    suffix_decoder.get(&mut v[..])?;
1134                    let suffix = v[0].data();
1135
1136                    // Extract current prefix length, can be 0
1137                    let prefix_len = usize::try_from(self.prefix_lengths[self.current_idx])
1138                        .map_err(|_| {
1139                            general_err!(
1140                                "Invalid DELTA_BYTE_ARRAY prefix length {}",
1141                                self.prefix_lengths[self.current_idx]
1142                            )
1143                        })?;
1144
1145                    if prefix_len > self.previous_value.len() {
1146                        return Err(general_err!(
1147                            "Invalid DELTA_BYTE_ARRAY prefix length {} exceeds previous value length {}",
1148                            prefix_len,
1149                            self.previous_value.len()
1150                        ));
1151                    }
1152
1153                    // Concatenate prefix with suffix
1154                    let mut result = Vec::with_capacity(prefix_len + suffix.len());
1155                    result.extend_from_slice(&self.previous_value[0..prefix_len]);
1156                    result.extend_from_slice(suffix);
1157
1158                    let data = Bytes::from(result);
1159                    item.set_from_bytes(data.clone());
1160
1161                    self.previous_value = data;
1162                    self.current_idx += 1;
1163                }
1164
1165                self.num_values -= num_values;
1166                Ok(num_values)
1167            }
1168            _ => Err(general_err!(
1169                "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1170            )),
1171        }
1172    }
1173
1174    fn values_left(&self) -> usize {
1175        self.num_values
1176    }
1177
1178    fn encoding(&self) -> Encoding {
1179        Encoding::DELTA_BYTE_ARRAY
1180    }
1181
1182    fn skip(&mut self, num_values: usize) -> Result<usize> {
1183        let mut buffer = vec![T::T::default(); num_values];
1184        self.get(&mut buffer)
1185    }
1186}
1187
1188#[cfg(test)]
1189mod tests {
1190    use super::{super::encoding::*, *};
1191
1192    use std::f32::consts::PI as PI_f32;
1193    use std::f64::consts::PI as PI_f64;
1194    use std::sync::Arc;
1195
1196    use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
1197    use crate::util::test_common::rand_gen::RandGen;
1198
1199    #[test]
1200    fn test_delta_byte_array_invalid_prefix_len_returns_error() {
1201        let col_descr = create_test_col_desc_ptr(-1, Type::BYTE_ARRAY);
1202
1203        let mut encoder =
1204            get_encoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, &col_descr).unwrap();
1205        let input = vec![ByteArray::from("a"), ByteArray::from("ab")];
1206        encoder.put(&input).unwrap();
1207        let encoded = encoder.flush_buffer().unwrap();
1208
1209        // First, decode just the prefix-length stream so we know where the suffix stream starts.
1210        let mut prefix_len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
1211        prefix_len_decoder
1212            .set_data(encoded.clone(), input.len())
1213            .unwrap();
1214        let num_prefixes = prefix_len_decoder.values_left();
1215        let mut prefix_lengths = vec![0; num_prefixes];
1216        prefix_len_decoder.get(&mut prefix_lengths).unwrap();
1217
1218        // check: valid encoding should produce prefix lengths [0, 1]
1219        assert_eq!(prefix_lengths, vec![0, 1]);
1220
1221        let prefix_stream_end = prefix_len_decoder.get_offset();
1222
1223        // Corrupt the prefix-length stream itself:
1224        // replace it with a valid DELTA_BINARY_PACKED stream for [1, 1],
1225        // so the first decoded prefix length becomes impossible because previous_value is empty.
1226        let mut prefix_encoder = get_encoder::<Int32Type>(
1227            Encoding::DELTA_BINARY_PACKED,
1228            &create_test_col_desc_ptr(-1, Type::INT32),
1229        )
1230        .unwrap();
1231        prefix_encoder.put(&[1i32, 1i32]).unwrap();
1232        let corrupted_prefix = prefix_encoder.flush_buffer().unwrap();
1233
1234        let mut corrupted = Vec::new();
1235        corrupted.extend_from_slice(corrupted_prefix.as_ref());
1236        corrupted.extend_from_slice(&encoded[prefix_stream_end..]);
1237
1238        let mut decoder = DeltaByteArrayDecoder::<ByteArrayType>::new();
1239        decoder
1240            .set_data(Bytes::from(corrupted), input.len())
1241            .unwrap();
1242
1243        let mut out = vec![ByteArray::new(); input.len()];
1244
1245        let err = decoder.get(&mut out).unwrap_err();
1246        assert!(
1247            err.to_string()
1248                .contains("Invalid DELTA_BYTE_ARRAY prefix length"),
1249            "{}",
1250            err
1251        );
1252    }
1253
1254    #[test]
1255    fn test_delta_byte_array_negative_prefix_len_returns_error() {
1256        let col_descr = create_test_col_desc_ptr(-1, Type::BYTE_ARRAY);
1257
1258        let mut encoder =
1259            get_encoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, &col_descr).unwrap();
1260        let input = vec![ByteArray::from("a"), ByteArray::from("ab")];
1261        encoder.put(&input).unwrap();
1262        let encoded = encoder.flush_buffer().unwrap();
1263
1264        let mut decoder = DeltaByteArrayDecoder::<ByteArrayType>::new();
1265        decoder.set_data(encoded, input.len()).unwrap();
1266
1267        // Force a negative prefix length after decoder initialization
1268        decoder.prefix_lengths[0] = -1;
1269        let mut out = vec![ByteArray::new(); input.len()];
1270
1271        let err = decoder.get(&mut out).unwrap_err();
1272        assert!(
1273            err.to_string()
1274                .contains("Invalid DELTA_BYTE_ARRAY prefix length"),
1275            "{}",
1276            err
1277        );
1278    }
1279
1280    #[test]
1281    fn test_get_decoders() {
1282        // supported encodings
1283        create_and_check_decoder::<Int32Type>(Encoding::PLAIN, None);
1284        create_and_check_decoder::<Int32Type>(Encoding::DELTA_BINARY_PACKED, None);
1285        create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
1286        create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, None);
1287        create_and_check_decoder::<BoolType>(Encoding::RLE, None);
1288
1289        // error when initializing
1290        create_and_check_decoder::<Int32Type>(
1291            Encoding::RLE_DICTIONARY,
1292            Some(general_err!(
1293                "Cannot initialize this encoding through this function"
1294            )),
1295        );
1296        create_and_check_decoder::<Int32Type>(
1297            Encoding::PLAIN_DICTIONARY,
1298            Some(general_err!(
1299                "Cannot initialize this encoding through this function"
1300            )),
1301        );
1302        create_and_check_decoder::<Int32Type>(
1303            Encoding::DELTA_LENGTH_BYTE_ARRAY,
1304            Some(general_err!(
1305                "Encoding DELTA_LENGTH_BYTE_ARRAY is not supported for type"
1306            )),
1307        );
1308        create_and_check_decoder::<Int32Type>(
1309            Encoding::DELTA_BYTE_ARRAY,
1310            Some(general_err!(
1311                "Encoding DELTA_BYTE_ARRAY is not supported for type"
1312            )),
1313        );
1314
1315        // unsupported
1316        #[allow(deprecated)]
1317        create_and_check_decoder::<Int32Type>(
1318            Encoding::BIT_PACKED,
1319            Some(nyi_err!("Encoding BIT_PACKED is not supported")),
1320        );
1321    }
1322
1323    #[test]
1324    fn test_plain_decode_int32() {
1325        let data = [42, 18, 52];
1326        let data_bytes = Int32Type::to_byte_array(&data[..]);
1327        let mut buffer = [0; 3];
1328        test_plain_decode::<Int32Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1329    }
1330
1331    #[test]
1332    fn test_plain_skip_int32() {
1333        let data = [42, 18, 52];
1334        let data_bytes = Int32Type::to_byte_array(&data[..]);
1335        test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1336    }
1337
1338    #[test]
1339    fn test_plain_skip_all_int32() {
1340        let data = [42, 18, 52];
1341        let data_bytes = Int32Type::to_byte_array(&data[..]);
1342        test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1343    }
1344
1345    #[test]
1346    fn test_plain_decode_int32_spaced() {
1347        let data = [42, 18, 52];
1348        let expected_data = [0, 42, 0, 18, 0, 0, 52, 0];
1349        let data_bytes = Int32Type::to_byte_array(&data[..]);
1350        let mut buffer = [0; 8];
1351        let num_nulls = 5;
1352        let valid_bits = [0b01001010];
1353        test_plain_decode_spaced::<Int32Type>(
1354            Bytes::from(data_bytes),
1355            3,
1356            -1,
1357            &mut buffer[..],
1358            num_nulls,
1359            &valid_bits,
1360            &expected_data[..],
1361        );
1362    }
1363
1364    #[test]
1365    fn test_plain_decode_int64() {
1366        let data = [42, 18, 52];
1367        let data_bytes = Int64Type::to_byte_array(&data[..]);
1368        let mut buffer = [0; 3];
1369        test_plain_decode::<Int64Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1370    }
1371
1372    #[test]
1373    fn test_plain_skip_int64() {
1374        let data = [42, 18, 52];
1375        let data_bytes = Int64Type::to_byte_array(&data[..]);
1376        test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 2, -1, &data[2..]);
1377    }
1378
1379    #[test]
1380    fn test_plain_skip_all_int64() {
1381        let data = [42, 18, 52];
1382        let data_bytes = Int64Type::to_byte_array(&data[..]);
1383        test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 3, -1, &[]);
1384    }
1385
1386    #[test]
1387    fn test_plain_decode_float() {
1388        let data = [PI_f32, 2.414, 12.51];
1389        let data_bytes = FloatType::to_byte_array(&data[..]);
1390        let mut buffer = [0.0; 3];
1391        test_plain_decode::<FloatType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1392    }
1393
1394    #[test]
1395    fn test_plain_skip_float() {
1396        let data = [PI_f32, 2.414, 12.51];
1397        let data_bytes = FloatType::to_byte_array(&data[..]);
1398        test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1399    }
1400
1401    #[test]
1402    fn test_plain_skip_all_float() {
1403        let data = [PI_f32, 2.414, 12.51];
1404        let data_bytes = FloatType::to_byte_array(&data[..]);
1405        test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 4, -1, &[]);
1406    }
1407
1408    #[test]
1409    fn test_plain_skip_double() {
1410        let data = [PI_f64, 2.414f64, 12.51f64];
1411        let data_bytes = DoubleType::to_byte_array(&data[..]);
1412        test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1413    }
1414
1415    #[test]
1416    fn test_plain_skip_all_double() {
1417        let data = [PI_f64, 2.414f64, 12.51f64];
1418        let data_bytes = DoubleType::to_byte_array(&data[..]);
1419        test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1420    }
1421
1422    #[test]
1423    fn test_plain_decode_double() {
1424        let data = [PI_f64, 2.414f64, 12.51f64];
1425        let data_bytes = DoubleType::to_byte_array(&data[..]);
1426        let mut buffer = [0.0f64; 3];
1427        test_plain_decode::<DoubleType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1428    }
1429
1430    #[test]
1431    fn test_plain_decode_int96() {
1432        let mut data = [Int96::new(); 4];
1433        data[0].set_data(11, 22, 33);
1434        data[1].set_data(44, 55, 66);
1435        data[2].set_data(10, 20, 30);
1436        data[3].set_data(40, 50, 60);
1437        let data_bytes = Int96Type::to_byte_array(&data[..]);
1438        let mut buffer = [Int96::new(); 4];
1439        test_plain_decode::<Int96Type>(Bytes::from(data_bytes), 4, -1, &mut buffer[..], &data[..]);
1440    }
1441
1442    #[test]
1443    fn test_plain_skip_int96() {
1444        let mut data = [Int96::new(); 4];
1445        data[0].set_data(11, 22, 33);
1446        data[1].set_data(44, 55, 66);
1447        data[2].set_data(10, 20, 30);
1448        data[3].set_data(40, 50, 60);
1449        let data_bytes = Int96Type::to_byte_array(&data[..]);
1450        test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 2, -1, &data[2..]);
1451    }
1452
1453    #[test]
1454    fn test_plain_skip_all_int96() {
1455        let mut data = [Int96::new(); 4];
1456        data[0].set_data(11, 22, 33);
1457        data[1].set_data(44, 55, 66);
1458        data[2].set_data(10, 20, 30);
1459        data[3].set_data(40, 50, 60);
1460        let data_bytes = Int96Type::to_byte_array(&data[..]);
1461        test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 8, -1, &[]);
1462    }
1463
1464    #[test]
1465    fn test_plain_decode_bool() {
1466        let data = [
1467            false, true, false, false, true, false, true, true, false, true,
1468        ];
1469        let data_bytes = BoolType::to_byte_array(&data[..]);
1470        let mut buffer = [false; 10];
1471        test_plain_decode::<BoolType>(Bytes::from(data_bytes), 10, -1, &mut buffer[..], &data[..]);
1472    }
1473
1474    #[test]
1475    fn test_plain_skip_bool() {
1476        let data = [
1477            false, true, false, false, true, false, true, true, false, true,
1478        ];
1479        let data_bytes = BoolType::to_byte_array(&data[..]);
1480        test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 5, -1, &data[5..]);
1481    }
1482
1483    #[test]
1484    fn test_plain_skip_all_bool() {
1485        let data = [
1486            false, true, false, false, true, false, true, true, false, true,
1487        ];
1488        let data_bytes = BoolType::to_byte_array(&data[..]);
1489        test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 20, -1, &[]);
1490    }
1491
1492    #[test]
1493    fn test_plain_decode_byte_array() {
1494        let mut data = vec![ByteArray::new(); 2];
1495        data[0].set_data(Bytes::from(String::from("hello")));
1496        data[1].set_data(Bytes::from(String::from("parquet")));
1497        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1498        let mut buffer = vec![ByteArray::new(); 2];
1499        test_plain_decode::<ByteArrayType>(
1500            Bytes::from(data_bytes),
1501            2,
1502            -1,
1503            &mut buffer[..],
1504            &data[..],
1505        );
1506    }
1507
1508    #[test]
1509    fn test_plain_skip_byte_array() {
1510        let mut data = vec![ByteArray::new(); 2];
1511        data[0].set_data(Bytes::from(String::from("hello")));
1512        data[1].set_data(Bytes::from(String::from("parquet")));
1513        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1514        test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 1, -1, &data[1..]);
1515    }
1516
1517    #[test]
1518    fn test_plain_skip_all_byte_array() {
1519        let mut data = vec![ByteArray::new(); 2];
1520        data[0].set_data(Bytes::from(String::from("hello")));
1521        data[1].set_data(Bytes::from(String::from("parquet")));
1522        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1523        test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 2, -1, &[]);
1524    }
1525
1526    #[test]
1527    fn test_plain_decode_fixed_len_byte_array() {
1528        let mut data = vec![FixedLenByteArray::default(); 3];
1529        data[0].set_data(Bytes::from(String::from("bird")));
1530        data[1].set_data(Bytes::from(String::from("come")));
1531        data[2].set_data(Bytes::from(String::from("flow")));
1532        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1533        let mut buffer = vec![FixedLenByteArray::default(); 3];
1534        test_plain_decode::<FixedLenByteArrayType>(
1535            Bytes::from(data_bytes),
1536            3,
1537            4,
1538            &mut buffer[..],
1539            &data[..],
1540        );
1541    }
1542
1543    #[test]
1544    fn test_plain_skip_fixed_len_byte_array() {
1545        let mut data = vec![FixedLenByteArray::default(); 3];
1546        data[0].set_data(Bytes::from(String::from("bird")));
1547        data[1].set_data(Bytes::from(String::from("come")));
1548        data[2].set_data(Bytes::from(String::from("flow")));
1549        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1550        test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 1, 4, &data[1..]);
1551    }
1552
1553    #[test]
1554    fn test_plain_skip_all_fixed_len_byte_array() {
1555        let mut data = vec![FixedLenByteArray::default(); 3];
1556        data[0].set_data(Bytes::from(String::from("bird")));
1557        data[1].set_data(Bytes::from(String::from("come")));
1558        data[2].set_data(Bytes::from(String::from("flow")));
1559        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1560        test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 6, 4, &[]);
1561    }
1562
1563    #[test]
1564    fn test_dict_decoder_empty_data() {
1565        let mut decoder = DictDecoder::<Int32Type>::new();
1566        let err = decoder.set_data(Bytes::new(), 10).unwrap_err();
1567        assert_eq!(err.to_string(), "EOF: Not enough bytes to decode bit_width");
1568    }
1569
1570    fn test_plain_decode<T: DataType>(
1571        data: Bytes,
1572        num_values: usize,
1573        type_length: i32,
1574        buffer: &mut [T::T],
1575        expected: &[T::T],
1576    ) {
1577        let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1578        let result = decoder.set_data(data, num_values);
1579        assert!(result.is_ok());
1580        let result = decoder.get(buffer);
1581        assert!(result.is_ok());
1582        assert_eq!(decoder.values_left(), 0);
1583        assert_eq!(buffer, expected);
1584    }
1585
1586    fn test_plain_skip<T: DataType>(
1587        data: Bytes,
1588        num_values: usize,
1589        skip: usize,
1590        type_length: i32,
1591        expected: &[T::T],
1592    ) {
1593        let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1594        let result = decoder.set_data(data, num_values);
1595        assert!(result.is_ok());
1596        let skipped = decoder.skip(skip).expect("skipping values");
1597
1598        if skip >= num_values {
1599            assert_eq!(skipped, num_values);
1600
1601            let mut buffer = vec![T::T::default(); 1];
1602            let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1603            assert_eq!(remaining, 0);
1604        } else {
1605            assert_eq!(skipped, skip);
1606            let mut buffer = vec![T::T::default(); num_values - skip];
1607            let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1608            assert_eq!(remaining, num_values - skip);
1609            assert_eq!(decoder.values_left(), 0);
1610            assert_eq!(buffer, expected);
1611        }
1612    }
1613
1614    fn test_plain_decode_spaced<T: DataType>(
1615        data: Bytes,
1616        num_values: usize,
1617        type_length: i32,
1618        buffer: &mut [T::T],
1619        num_nulls: usize,
1620        valid_bits: &[u8],
1621        expected: &[T::T],
1622    ) {
1623        let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1624        let result = decoder.set_data(data, num_values);
1625        assert!(result.is_ok());
1626        let result = decoder.get_spaced(buffer, num_nulls, valid_bits);
1627        assert!(result.is_ok());
1628        assert_eq!(num_values + num_nulls, result.unwrap());
1629        assert_eq!(decoder.values_left(), 0);
1630        assert_eq!(buffer, expected);
1631    }
1632
1633    #[test]
1634    #[should_panic(expected = "RleValueEncoder only supports BoolType")]
1635    fn test_rle_value_encode_int32_not_supported() {
1636        let mut encoder = RleValueEncoder::<Int32Type>::new();
1637        encoder.put(&[1, 2, 3, 4]).unwrap();
1638    }
1639
1640    #[test]
1641    #[should_panic(expected = "RleValueDecoder only supports BoolType")]
1642    fn test_rle_value_decode_int32_not_supported() {
1643        let mut decoder = RleValueDecoder::<Int32Type>::new();
1644        decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).unwrap();
1645    }
1646
1647    #[test]
1648    fn test_rle_value_decode_missing_size() {
1649        let mut decoder = RleValueDecoder::<BoolType>::new();
1650        assert!(decoder.set_data(Bytes::from(vec![0]), 1).is_err());
1651    }
1652
1653    #[test]
1654    fn test_rle_value_decode_missing_data() {
1655        let mut decoder = RleValueDecoder::<BoolType>::new();
1656        assert!(decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).is_err());
1657    }
1658
1659    #[test]
1660    fn test_rle_value_decode_bool_decode() {
1661        // Test multiple 'put' calls on the same encoder
1662        let data = vec![
1663            BoolType::gen_vec(-1, 256),
1664            BoolType::gen_vec(-1, 257),
1665            BoolType::gen_vec(-1, 126),
1666        ];
1667        test_rle_value_decode::<BoolType>(data);
1668    }
1669
1670    #[test]
1671    #[should_panic(expected = "Bit reader is not initialized")]
1672    fn test_delta_bit_packed_not_initialized_offset() {
1673        // Fail if set_data() is not called before get_offset()
1674        let decoder = DeltaBitPackDecoder::<Int32Type>::new();
1675        decoder.get_offset();
1676    }
1677
1678    #[test]
1679    #[should_panic(expected = "Bit reader is not initialized")]
1680    fn test_delta_bit_packed_not_initialized_get() {
1681        // Fail if set_data() is not called before get()
1682        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1683        let mut buffer = vec![];
1684        decoder.get(&mut buffer).unwrap();
1685    }
1686
1687    #[test]
1688    fn test_delta_bit_packed_int32_empty() {
1689        let data = vec![vec![0; 0]];
1690        test_delta_bit_packed_decode::<Int32Type>(data);
1691    }
1692
1693    #[test]
1694    fn test_delta_bit_packed_int32_repeat() {
1695        let block_data = vec![
1696            1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1697            6, 7, 8,
1698        ];
1699        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1700    }
1701
1702    #[test]
1703    fn test_skip_delta_bit_packed_int32_repeat() {
1704        let block_data = vec![
1705            1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1706            6, 7, 8,
1707        ];
1708        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 10);
1709        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1710    }
1711
1712    #[test]
1713    fn test_delta_bit_packed_int32_uneven() {
1714        let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1715        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1716    }
1717
1718    #[test]
1719    fn test_skip_delta_bit_packed_int32_uneven() {
1720        let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1721        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1722        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1723    }
1724
1725    #[test]
1726    fn test_delta_bit_packed_int32_same_values() {
1727        let block_data = vec![
1728            127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1729        ];
1730        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1731
1732        let block_data = vec![
1733            -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1734            -127, -127,
1735        ];
1736        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1737    }
1738
1739    #[test]
1740    fn test_skip_delta_bit_packed_int32_same_values() {
1741        let block_data = vec![
1742            127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1743        ];
1744        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1745        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1746
1747        let block_data = vec![
1748            -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1749            -127, -127,
1750        ];
1751        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1752        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1753    }
1754
1755    #[test]
1756    fn test_delta_bit_packed_int32_min_max() {
1757        let block_data = vec![
1758            i32::MIN,
1759            i32::MIN,
1760            i32::MIN,
1761            i32::MAX,
1762            i32::MIN,
1763            i32::MAX,
1764            i32::MIN,
1765            i32::MAX,
1766        ];
1767        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1768    }
1769
1770    #[test]
1771    fn test_skip_delta_bit_packed_int32_min_max() {
1772        let block_data = vec![
1773            i32::MIN,
1774            i32::MIN,
1775            i32::MIN,
1776            i32::MAX,
1777            i32::MIN,
1778            i32::MAX,
1779            i32::MIN,
1780            i32::MAX,
1781        ];
1782        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1783        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1784    }
1785
1786    #[test]
1787    fn test_skip_delta_bit_packed_bw0_uniform_step_i32() {
1788        // Uniform-step column: every delta equals min_delta, so bw=0 miniblocks.
1789        // Partial skip must advance last_value by n * min_delta (min_delta != 0 path).
1790        let data: Vec<i32> = (0..128).map(|i| i * 7).collect();
1791        test_skip::<Int32Type>(data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1792        test_skip::<Int32Type>(data, Encoding::DELTA_BINARY_PACKED, 200);
1793    }
1794
1795    #[test]
1796    fn test_skip_delta_bit_packed_bw0_uniform_step_i64() {
1797        // Same as above for i64.
1798        let data: Vec<i64> = (0..128).map(|i| i * 100).collect();
1799        test_skip::<Int64Type>(data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1800        test_skip::<Int64Type>(data, Encoding::DELTA_BINARY_PACKED, 200);
1801    }
1802
1803    #[test]
1804    fn test_delta_bit_packed_int32_multiple_blocks() {
1805        // Test multiple 'put' calls on the same encoder
1806        let data = vec![
1807            Int32Type::gen_vec(-1, 64),
1808            Int32Type::gen_vec(-1, 128),
1809            Int32Type::gen_vec(-1, 64),
1810        ];
1811        test_delta_bit_packed_decode::<Int32Type>(data);
1812    }
1813
1814    #[test]
1815    fn test_delta_bit_packed_int32_data_across_blocks() {
1816        // Test multiple 'put' calls on the same encoder
1817        let data = vec![Int32Type::gen_vec(-1, 256), Int32Type::gen_vec(-1, 257)];
1818        test_delta_bit_packed_decode::<Int32Type>(data);
1819    }
1820
1821    #[test]
1822    fn test_delta_bit_packed_int32_with_empty_blocks() {
1823        let data = vec![
1824            Int32Type::gen_vec(-1, 128),
1825            vec![0; 0],
1826            Int32Type::gen_vec(-1, 64),
1827        ];
1828        test_delta_bit_packed_decode::<Int32Type>(data);
1829    }
1830
1831    #[test]
1832    fn test_delta_bit_packed_int64_empty() {
1833        let data = vec![vec![0; 0]];
1834        test_delta_bit_packed_decode::<Int64Type>(data);
1835    }
1836
1837    #[test]
1838    fn test_delta_bit_packed_int64_min_max() {
1839        let block_data = vec![
1840            i64::MIN,
1841            i64::MAX,
1842            i64::MIN,
1843            i64::MAX,
1844            i64::MIN,
1845            i64::MAX,
1846            i64::MIN,
1847            i64::MAX,
1848        ];
1849        test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
1850    }
1851
1852    #[test]
1853    fn test_delta_bit_packed_int64_multiple_blocks() {
1854        // Test multiple 'put' calls on the same encoder
1855        let data = vec![
1856            Int64Type::gen_vec(-1, 64),
1857            Int64Type::gen_vec(-1, 128),
1858            Int64Type::gen_vec(-1, 64),
1859        ];
1860        test_delta_bit_packed_decode::<Int64Type>(data);
1861    }
1862
1863    #[test]
1864    fn test_delta_bit_packed_zero_miniblocks() {
1865        // It is invalid for mini_blocks_per_block to be 0
1866        let data = vec![
1867            128, 1, // block_size = 128
1868            0, // mini_blocks_per_block = 0
1869        ];
1870        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1871        let err = decoder.set_data(data.into(), 0).unwrap_err();
1872        assert_eq!(
1873            err.to_string(),
1874            "Parquet error: cannot have zero miniblocks per block"
1875        );
1876    }
1877
1878    #[test]
1879    fn test_delta_bit_packed_decoder_sample() {
1880        let data_bytes = vec![
1881            128, 1, 4, 3, 58, 28, 6, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1882            0, 0, 0, 0, 0, 0,
1883        ];
1884        let mut decoder: DeltaBitPackDecoder<Int32Type> = DeltaBitPackDecoder::new();
1885        decoder.set_data(data_bytes.into(), 3).unwrap();
1886        // check exact offsets, because when reading partial values we end up with
1887        // some data not being read from bit reader
1888        assert_eq!(decoder.get_offset(), 5);
1889        let mut result = vec![0, 0, 0];
1890        decoder.get(&mut result).unwrap();
1891        assert_eq!(decoder.get_offset(), 34);
1892        assert_eq!(result, vec![29, 43, 89]);
1893    }
1894
1895    #[test]
1896    fn test_delta_bit_packed_padding() {
1897        // Page header
1898        let header = vec![
1899            // Page Header
1900
1901            // Block Size - 256
1902            128,
1903            2,
1904            // Miniblocks in block,
1905            4,
1906            // Total value count - 419
1907            128 + 35,
1908            3,
1909            // First value - 7
1910            7,
1911        ];
1912
1913        // Block Header
1914        let block1_header = vec![
1915            0, // Min delta
1916            0, 1, 0, 0, // Bit widths
1917        ];
1918
1919        // Mini-block 1 - bit width 0 => 0 bytes
1920        // Mini-block 2 - bit width 1 => 8 bytes
1921        // Mini-block 3 - bit width 0 => 0 bytes
1922        // Mini-block 4 - bit width 0 => 0 bytes
1923        let block1 = vec![0xFF; 8];
1924
1925        // Block Header
1926        let block2_header = vec![
1927            0, // Min delta
1928            0, 1, 2, 0xFF, // Bit widths, including non-zero padding
1929        ];
1930
1931        // Mini-block 1 - bit width 0 => 0 bytes
1932        // Mini-block 2 - bit width 1 => 8 bytes
1933        // Mini-block 3 - bit width 2 => 16 bytes
1934        // Mini-block 4 - padding => no bytes
1935        let block2 = vec![0xFF; 24];
1936
1937        let data: Vec<u8> = header
1938            .into_iter()
1939            .chain(block1_header)
1940            .chain(block1)
1941            .chain(block2_header)
1942            .chain(block2)
1943            .collect();
1944
1945        let length = data.len();
1946
1947        let ptr = Bytes::from(data);
1948        let mut reader = BitReader::new(ptr.clone());
1949        assert_eq!(reader.get_vlq_int().unwrap(), 256);
1950        assert_eq!(reader.get_vlq_int().unwrap(), 4);
1951        assert_eq!(reader.get_vlq_int().unwrap(), 419);
1952        assert_eq!(reader.get_vlq_int().unwrap(), 7);
1953
1954        // Test output buffer larger than needed and not exact multiple of block size
1955        let mut output = vec![0_i32; 420];
1956
1957        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1958        decoder.set_data(ptr.clone(), 0).unwrap();
1959        assert_eq!(decoder.get(&mut output).unwrap(), 419);
1960        assert_eq!(decoder.get_offset(), length);
1961
1962        // Test with truncated buffer
1963        decoder.set_data(ptr.slice(..12), 0).unwrap();
1964        let err = decoder.get(&mut output).unwrap_err().to_string();
1965        assert!(
1966            err.contains("Expected to read 64 values from miniblock got 8"),
1967            "{}",
1968            err
1969        );
1970    }
1971
1972    #[test]
1973    fn test_delta_bit_packed_int32_single_value_large() {
1974        let block_data = vec![3; 10240];
1975        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1976    }
1977
1978    #[test]
1979    fn test_delta_bit_packed_int32_single_value_skip_large() {
1980        let block_data = vec![3; 10240];
1981        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1982        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 5000);
1983    }
1984
1985    #[test]
1986    fn test_delta_bit_packed_int32_increasing_value_large() {
1987        let block_data = (0i32..10240).collect();
1988        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1989    }
1990
1991    #[test]
1992    fn test_delta_bit_packed_int32_increasing_value_skip_large() {
1993        let block_data = (0i32..10240).collect::<Vec<i32>>();
1994        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1995        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 5000);
1996    }
1997
1998    #[test]
1999    fn test_delta_bit_packed_int32_stepped_value_large() {
2000        let block_data = (0i32..10240).map(|i| i / 2).collect();
2001        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
2002    }
2003
2004    #[test]
2005    fn test_delta_bit_packed_int32_stepped_value_skip_large() {
2006        let block_data = (0i32..10240).map(|i| i / 2).collect::<Vec<i32>>();
2007        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
2008        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 5000);
2009    }
2010
2011    #[test]
2012    fn test_delta_bit_packed_int32_mixed_large() {
2013        // should be enough for 4 mini-blocks plus a little so we get some
2014        // mixed mini-blocks
2015        const BLOCK_SIZE: i32 = 133;
2016        let block1_data = (0..BLOCK_SIZE).map(|i| (i * 7) % 11).collect();
2017        let block2_data = vec![3; BLOCK_SIZE as usize];
2018        let block3_data = (0..BLOCK_SIZE).map(|i| (i * 5) % 13).collect();
2019        let block4_data = (0..BLOCK_SIZE).collect();
2020        let block5_data = (0..BLOCK_SIZE).map(|i| (i * 3) % 17).collect();
2021        test_delta_bit_packed_decode::<Int32Type>(vec![
2022            block1_data,
2023            block2_data,
2024            block3_data,
2025            block4_data,
2026            block5_data,
2027        ]);
2028    }
2029
2030    #[test]
2031    fn test_delta_bit_packed_int64_single_value_large() {
2032        let block_data = vec![5; 10240];
2033        test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
2034    }
2035
2036    #[test]
2037    fn test_delta_bit_packed_int64_increasing_value_large() {
2038        let block_data = (0i64..10240).collect();
2039        test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
2040    }
2041
2042    #[test]
2043    fn test_delta_byte_array_same_arrays() {
2044        let data = vec![
2045            vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])],
2046            vec![
2047                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
2048                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
2049            ],
2050            vec![
2051                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
2052                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
2053            ],
2054        ];
2055        test_delta_byte_array_decode(data);
2056    }
2057
2058    #[test]
2059    fn test_delta_byte_array_unique_arrays() {
2060        let data = vec![
2061            vec![ByteArray::from(vec![1])],
2062            vec![ByteArray::from(vec![2, 3]), ByteArray::from(vec![4, 5, 6])],
2063            vec![
2064                ByteArray::from(vec![7, 8]),
2065                ByteArray::from(vec![9, 0, 1, 2]),
2066            ],
2067        ];
2068        test_delta_byte_array_decode(data);
2069    }
2070
2071    #[test]
2072    fn test_delta_byte_array_single_array() {
2073        let data = vec![vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])]];
2074        test_delta_byte_array_decode(data);
2075    }
2076
2077    #[test]
2078    fn test_byte_stream_split_multiple_f32() {
2079        let data = vec![
2080            vec![
2081                f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
2082                f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
2083            ],
2084            vec![f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
2085        ];
2086        test_byte_stream_split_decode::<FloatType>(data, -1);
2087    }
2088
2089    #[test]
2090    fn test_byte_stream_split_f64() {
2091        let data = vec![vec![
2092            f64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
2093            f64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
2094        ]];
2095        test_byte_stream_split_decode::<DoubleType>(data, -1);
2096    }
2097
2098    #[test]
2099    fn test_byte_stream_split_multiple_i32() {
2100        let data = vec![
2101            vec![
2102                i32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
2103                i32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
2104            ],
2105            vec![i32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
2106        ];
2107        test_byte_stream_split_decode::<Int32Type>(data, -1);
2108    }
2109
2110    #[test]
2111    fn test_byte_stream_split_i64() {
2112        let data = vec![vec![
2113            i64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
2114            i64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
2115        ]];
2116        test_byte_stream_split_decode::<Int64Type>(data, -1);
2117    }
2118
2119    fn test_byte_stream_split_flba(type_width: usize) {
2120        let data = vec![
2121            vec![
2122                FixedLenByteArrayType::r#gen(type_width as i32),
2123                FixedLenByteArrayType::r#gen(type_width as i32),
2124            ],
2125            vec![FixedLenByteArrayType::r#gen(type_width as i32)],
2126        ];
2127        test_byte_stream_split_decode::<FixedLenByteArrayType>(data, type_width as i32);
2128    }
2129
2130    #[test]
2131    fn test_byte_stream_split_flba5() {
2132        test_byte_stream_split_flba(5);
2133    }
2134
2135    #[test]
2136    fn test_byte_stream_split_flba16() {
2137        test_byte_stream_split_flba(16);
2138    }
2139
2140    #[test]
2141    fn test_byte_stream_split_flba19() {
2142        test_byte_stream_split_flba(19);
2143    }
2144
2145    #[test]
2146    #[should_panic(expected = "Mismatched FixedLenByteArray sizes: 4 != 5")]
2147    fn test_byte_stream_split_flba_mismatch() {
2148        let data = vec![
2149            vec![
2150                FixedLenByteArray::from(vec![0xAA, 0xAB, 0xAC, 0xAD, 0xAE]),
2151                FixedLenByteArray::from(vec![0xBA, 0xBB, 0xBC, 0xBD, 0xBE]),
2152            ],
2153            vec![FixedLenByteArray::from(vec![0xCA, 0xCB, 0xCC, 0xCD])],
2154        ];
2155        test_byte_stream_split_decode::<FixedLenByteArrayType>(data, 5);
2156    }
2157
2158    #[test]
2159    #[should_panic(expected = "Input data length is not a multiple of type width 4")]
2160    fn test_byte_stream_split_flba_bad_input() {
2161        let mut decoder = VariableWidthByteStreamSplitDecoder::<FixedLenByteArrayType>::new(4);
2162        decoder
2163            .set_data(Bytes::from(vec![1, 2, 3, 4, 5]), 1)
2164            .unwrap();
2165    }
2166
2167    #[test]
2168    fn test_skip_byte_stream_split() {
2169        let block_data = vec![0.3, 0.4, 0.1, 4.10];
2170        test_skip::<FloatType>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
2171        test_skip::<DoubleType>(
2172            block_data.into_iter().map(|x| x as f64).collect(),
2173            Encoding::BYTE_STREAM_SPLIT,
2174            100,
2175        );
2176    }
2177
2178    #[test]
2179    fn test_skip_byte_stream_split_ints() {
2180        let block_data = vec![3, 4, 1, 5];
2181        test_skip::<Int32Type>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
2182        test_skip::<Int64Type>(
2183            block_data.into_iter().map(|x| x as i64).collect(),
2184            Encoding::BYTE_STREAM_SPLIT,
2185            100,
2186        );
2187    }
2188
2189    fn test_rle_value_decode<T: DataType>(data: Vec<Vec<T::T>>) {
2190        test_encode_decode::<T>(data, Encoding::RLE, -1);
2191    }
2192
2193    fn test_delta_bit_packed_decode<T: DataType>(data: Vec<Vec<T::T>>) {
2194        test_encode_decode::<T>(data, Encoding::DELTA_BINARY_PACKED, -1);
2195    }
2196
2197    fn test_byte_stream_split_decode<T: DataType>(data: Vec<Vec<T::T>>, type_width: i32) {
2198        test_encode_decode::<T>(data, Encoding::BYTE_STREAM_SPLIT, type_width);
2199    }
2200
2201    fn test_delta_byte_array_decode(data: Vec<Vec<ByteArray>>) {
2202        test_encode_decode::<ByteArrayType>(data, Encoding::DELTA_BYTE_ARRAY, -1);
2203    }
2204
2205    // Input data represents vector of data slices to write (test multiple `put()` calls)
2206    // For example,
2207    //   vec![vec![1, 2, 3]] invokes `put()` once and writes {1, 2, 3}
2208    //   vec![vec![1, 2], vec![3]] invokes `put()` twice and writes {1, 2, 3}
2209    fn test_encode_decode<T: DataType>(data: Vec<Vec<T::T>>, encoding: Encoding, type_width: i32) {
2210        let col_descr = create_test_col_desc_ptr(type_width, T::get_physical_type());
2211
2212        // Encode data
2213        let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
2214
2215        for v in &data[..] {
2216            encoder.put(&v[..]).expect("ok to encode");
2217        }
2218        let bytes = encoder.flush_buffer().expect("ok to flush buffer");
2219
2220        // Flatten expected data as contiguous array of values
2221        let expected: Vec<T::T> = data.iter().flat_map(|s| s.clone()).collect();
2222
2223        // Decode data and compare with original
2224        let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
2225
2226        let mut result = vec![T::T::default(); expected.len()];
2227        decoder
2228            .set_data(bytes, expected.len())
2229            .expect("ok to set data");
2230        let mut result_num_values = 0;
2231        while decoder.values_left() > 0 {
2232            result_num_values += decoder
2233                .get(&mut result[result_num_values..])
2234                .expect("ok to decode");
2235        }
2236        assert_eq!(result_num_values, expected.len());
2237        assert_eq!(result, expected);
2238    }
2239
2240    fn test_skip<T: DataType>(data: Vec<T::T>, encoding: Encoding, skip: usize) {
2241        // Type length should not really matter for encode/decode test,
2242        // otherwise change it based on type
2243        let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());
2244
2245        // Encode data
2246        let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
2247
2248        encoder.put(&data).expect("ok to encode");
2249
2250        let bytes = encoder.flush_buffer().expect("ok to flush buffer");
2251
2252        let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
2253        decoder.set_data(bytes, data.len()).expect("ok to set data");
2254
2255        if skip >= data.len() {
2256            let skipped = decoder.skip(skip).expect("ok to skip");
2257            assert_eq!(skipped, data.len());
2258
2259            let skipped_again = decoder.skip(skip).expect("ok to skip again");
2260            assert_eq!(skipped_again, 0);
2261        } else {
2262            let skipped = decoder.skip(skip).expect("ok to skip");
2263            assert_eq!(skipped, skip);
2264
2265            let remaining = data.len() - skip;
2266
2267            let expected = &data[skip..];
2268            let mut buffer = vec![T::T::default(); remaining];
2269            let fetched = decoder.get(&mut buffer).expect("ok to decode");
2270            assert_eq!(remaining, fetched);
2271            assert_eq!(&buffer, expected);
2272        }
2273    }
2274
2275    fn create_and_check_decoder<T: DataType>(encoding: Encoding, err: Option<ParquetError>) {
2276        let descr = create_test_col_desc_ptr(-1, T::get_physical_type());
2277        let decoder = get_decoder::<T>(descr, encoding);
2278        match err {
2279            Some(parquet_error) => {
2280                assert_eq!(
2281                    decoder.err().unwrap().to_string(),
2282                    parquet_error.to_string()
2283                );
2284            }
2285            None => {
2286                assert_eq!(decoder.unwrap().encoding(), encoding);
2287            }
2288        }
2289    }
2290
2291    // Creates test column descriptor.
2292    fn create_test_col_desc_ptr(type_len: i32, t: Type) -> ColumnDescPtr {
2293        let ty = SchemaType::primitive_type_builder("t", t)
2294            .with_length(type_len)
2295            .build()
2296            .unwrap();
2297        Arc::new(ColumnDescriptor::new(
2298            Arc::new(ty),
2299            0,
2300            0,
2301            ColumnPath::new(vec![]),
2302        ))
2303    }
2304
2305    fn usize_to_bytes(v: usize) -> [u8; 4] {
2306        (v as u32).to_ne_bytes()
2307    }
2308
2309    /// A util trait to convert slices of different types to byte arrays
2310    trait ToByteArray<T: DataType> {
2311        #[allow(clippy::wrong_self_convention)]
2312        fn to_byte_array(data: &[T::T]) -> Vec<u8>;
2313    }
2314
2315    macro_rules! to_byte_array_impl {
2316        ($ty: ty) => {
2317            impl ToByteArray<$ty> for $ty {
2318                #[allow(clippy::wrong_self_convention)]
2319                fn to_byte_array(data: &[<$ty as DataType>::T]) -> Vec<u8> {
2320                    <$ty as DataType>::T::slice_as_bytes(data).to_vec()
2321                }
2322            }
2323        };
2324    }
2325
2326    to_byte_array_impl!(Int32Type);
2327    to_byte_array_impl!(Int64Type);
2328    to_byte_array_impl!(FloatType);
2329    to_byte_array_impl!(DoubleType);
2330
2331    impl ToByteArray<BoolType> for BoolType {
2332        #[allow(clippy::wrong_self_convention)]
2333        fn to_byte_array(data: &[bool]) -> Vec<u8> {
2334            let mut v = vec![];
2335            for (i, item) in data.iter().enumerate() {
2336                if i % 8 == 0 {
2337                    v.push(0);
2338                }
2339                if *item {
2340                    v[i / 8] |= 1 << (i % 8);
2341                }
2342            }
2343            v
2344        }
2345    }
2346
2347    impl ToByteArray<Int96Type> for Int96Type {
2348        #[allow(clippy::wrong_self_convention)]
2349        fn to_byte_array(data: &[Int96]) -> Vec<u8> {
2350            let mut v = vec![];
2351            for d in data {
2352                v.extend_from_slice(d.as_bytes());
2353            }
2354            v
2355        }
2356    }
2357
2358    impl ToByteArray<ByteArrayType> for ByteArrayType {
2359        #[allow(clippy::wrong_self_convention)]
2360        fn to_byte_array(data: &[ByteArray]) -> Vec<u8> {
2361            let mut v = vec![];
2362            for d in data {
2363                let buf = d.data();
2364                let len = &usize_to_bytes(buf.len());
2365                v.extend_from_slice(len);
2366                v.extend(buf);
2367            }
2368            v
2369        }
2370    }
2371
2372    impl ToByteArray<FixedLenByteArrayType> for FixedLenByteArrayType {
2373        #[allow(clippy::wrong_self_convention)]
2374        fn to_byte_array(data: &[FixedLenByteArray]) -> Vec<u8> {
2375            let mut v = vec![];
2376            for d in data {
2377                let buf = d.data();
2378                v.extend(buf);
2379            }
2380            v
2381        }
2382    }
2383
2384    #[test]
2385    // Allow initializing a vector and pushing to it for clarity in this test
2386    #[allow(clippy::vec_init_then_push)]
2387    fn test_delta_bit_packed_invalid_bit_width() {
2388        // Manually craft a buffer with an invalid bit width
2389        let mut buffer = vec![];
2390        // block_size = 128
2391        buffer.push(128);
2392        buffer.push(1);
2393        // mini_blocks_per_block = 4
2394        buffer.push(4);
2395        // num_values = 32
2396        buffer.push(32);
2397        // first_value = 0
2398        buffer.push(0);
2399        // min_delta = 0
2400        buffer.push(0);
2401        // bit_widths, one for each of the 4 mini blocks
2402        buffer.push(33); // Invalid bit width
2403        buffer.push(0);
2404        buffer.push(0);
2405        buffer.push(0);
2406
2407        let corrupted_buffer = Bytes::from(buffer);
2408
2409        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
2410        decoder.set_data(corrupted_buffer.clone(), 32).unwrap();
2411        let mut read_buffer = vec![0; 32];
2412        let err = decoder.get(&mut read_buffer).unwrap_err();
2413        assert!(
2414            err.to_string()
2415                .contains("Invalid delta bit width 33 which is larger than expected 32"),
2416            "{}",
2417            err
2418        );
2419
2420        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
2421        decoder.set_data(corrupted_buffer, 32).unwrap();
2422        let err = decoder.skip(32).unwrap_err();
2423        assert!(
2424            err.to_string()
2425                .contains("Invalid delta bit width 33 which is larger than expected 32"),
2426            "{}",
2427            err
2428        );
2429    }
2430}