Skip to main content

parquet/encodings/
decoding.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains all supported decoders for Parquet.
19
20use bytes::Bytes;
21use num_traits::{FromPrimitive, WrappingAdd};
22use std::{cmp, marker::PhantomData, mem};
23
24use super::rle::RleDecoder;
25
26use crate::basic::*;
27use crate::data_type::private::ParquetValueType;
28use crate::data_type::*;
29use crate::encodings::decoding::byte_stream_split_decoder::{
30    ByteStreamSplitDecoder, VariableWidthByteStreamSplitDecoder,
31};
32use crate::errors::{ParquetError, Result};
33use crate::schema::types::ColumnDescPtr;
34use crate::util::bit_util::{self, BitReader, FromBitpacked};
35
36mod byte_stream_split_decoder;
37
38pub(crate) mod private {
39    use super::*;
40
41    /// A trait that allows getting a [`Decoder`] implementation for a [`DataType`] with
42    /// the corresponding [`ParquetValueType`]. This is necessary to support
43    /// [`Decoder`] implementations that may not be applicable for all [`DataType`]
44    /// and by extension all [`ParquetValueType`]
45    pub trait GetDecoder {
46        fn get_decoder<T: DataType<T = Self>>(
47            descr: ColumnDescPtr,
48            encoding: Encoding,
49        ) -> Result<Box<dyn Decoder<T>>> {
50            get_decoder_default(descr, encoding)
51        }
52    }
53
54    fn get_decoder_default<T: DataType>(
55        descr: ColumnDescPtr,
56        encoding: Encoding,
57    ) -> Result<Box<dyn Decoder<T>>> {
58        match encoding {
59            Encoding::PLAIN => Ok(Box::new(PlainDecoder::new(descr.type_length()))),
60            Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Err(general_err!(
61                "Cannot initialize this encoding through this function"
62            )),
63            Encoding::RLE
64            | Encoding::DELTA_BINARY_PACKED
65            | Encoding::DELTA_BYTE_ARRAY
66            | Encoding::DELTA_LENGTH_BYTE_ARRAY => Err(general_err!(
67                "Encoding {} is not supported for type",
68                encoding
69            )),
70            e => Err(nyi_err!("Encoding {} is not supported", e)),
71        }
72    }
73
74    impl GetDecoder for bool {
75        fn get_decoder<T: DataType<T = Self>>(
76            descr: ColumnDescPtr,
77            encoding: Encoding,
78        ) -> Result<Box<dyn Decoder<T>>> {
79            match encoding {
80                Encoding::RLE => Ok(Box::new(RleValueDecoder::new())),
81                _ => get_decoder_default(descr, encoding),
82            }
83        }
84    }
85
86    impl GetDecoder for i32 {
87        fn get_decoder<T: DataType<T = Self>>(
88            descr: ColumnDescPtr,
89            encoding: Encoding,
90        ) -> Result<Box<dyn Decoder<T>>> {
91            match encoding {
92                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
93                Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
94                _ => get_decoder_default(descr, encoding),
95            }
96        }
97    }
98
99    impl GetDecoder for i64 {
100        fn get_decoder<T: DataType<T = Self>>(
101            descr: ColumnDescPtr,
102            encoding: Encoding,
103        ) -> Result<Box<dyn Decoder<T>>> {
104            match encoding {
105                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
106                Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
107                _ => get_decoder_default(descr, encoding),
108            }
109        }
110    }
111
112    impl GetDecoder for f32 {
113        fn get_decoder<T: DataType<T = Self>>(
114            descr: ColumnDescPtr,
115            encoding: Encoding,
116        ) -> Result<Box<dyn Decoder<T>>> {
117            match encoding {
118                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
119                _ => get_decoder_default(descr, encoding),
120            }
121        }
122    }
123    impl GetDecoder for f64 {
124        fn get_decoder<T: DataType<T = Self>>(
125            descr: ColumnDescPtr,
126            encoding: Encoding,
127        ) -> Result<Box<dyn Decoder<T>>> {
128            match encoding {
129                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
130                _ => get_decoder_default(descr, encoding),
131            }
132        }
133    }
134
135    impl GetDecoder for ByteArray {
136        fn get_decoder<T: DataType<T = Self>>(
137            descr: ColumnDescPtr,
138            encoding: Encoding,
139        ) -> Result<Box<dyn Decoder<T>>> {
140            match encoding {
141                Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
142                Encoding::DELTA_LENGTH_BYTE_ARRAY => {
143                    Ok(Box::new(DeltaLengthByteArrayDecoder::new()))
144                }
145                _ => get_decoder_default(descr, encoding),
146            }
147        }
148    }
149
150    impl GetDecoder for FixedLenByteArray {
151        fn get_decoder<T: DataType<T = Self>>(
152            descr: ColumnDescPtr,
153            encoding: Encoding,
154        ) -> Result<Box<dyn Decoder<T>>> {
155            match encoding {
156                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(
157                    VariableWidthByteStreamSplitDecoder::new(descr.type_length()),
158                )),
159                Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
160                _ => get_decoder_default(descr, encoding),
161            }
162        }
163    }
164
165    impl GetDecoder for Int96 {}
166}
167
168// ----------------------------------------------------------------------
169// Decoders
170
171/// A Parquet decoder for the data type `T`.
172pub trait Decoder<T: DataType>: Send {
173    /// Sets the data to decode to be `data`, which should contain `num_values` of values
174    /// to decode.
175    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()>;
176
177    /// Consumes values from this decoder and write the results to `buffer`. This will try
178    /// to fill up `buffer`.
179    ///
180    /// Returns the actual number of values decoded, which should be equal to
181    /// `buffer.len()` unless the remaining number of values is less than
182    /// `buffer.len()`.
183    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize>;
184
185    /// Consume values from this decoder and write the results to `buffer`, leaving
186    /// "spaces" for null values.
187    ///
188    /// `null_count` is the number of nulls we expect to see in `buffer`, after reading.
189    /// `valid_bits` stores the valid bit for each value in the buffer. It should contain
190    ///   at least number of bits that equal to `buffer.len()`.
191    ///
192    /// Returns the actual number of values decoded.
193    ///
194    /// # Panics
195    ///
196    /// Panics if `null_count` is greater than `buffer.len()`.
197    fn get_spaced(
198        &mut self,
199        buffer: &mut [T::T],
200        null_count: usize,
201        valid_bits: &[u8],
202    ) -> Result<usize> {
203        assert!(buffer.len() >= null_count);
204
205        // TODO: check validity of the input arguments?
206        if null_count == 0 {
207            return self.get(buffer);
208        }
209
210        let num_values = buffer.len();
211        let values_to_read = num_values - null_count;
212        let values_read = self.get(buffer)?;
213        if values_read != values_to_read {
214            return Err(general_err!(
215                "Number of values read: {}, doesn't match expected: {}",
216                values_read,
217                values_to_read
218            ));
219        }
220        let mut values_to_move = values_read;
221        for i in (0..num_values).rev() {
222            if bit_util::get_bit(valid_bits, i) {
223                values_to_move -= 1;
224                buffer.swap(i, values_to_move);
225            }
226        }
227
228        Ok(num_values)
229    }
230
231    /// Returns the number of values left in this decoder stream.
232    fn values_left(&self) -> usize;
233
234    /// Returns the encoding for this decoder.
235    fn encoding(&self) -> Encoding;
236
237    /// Skip the specified number of values in this decoder stream.
238    fn skip(&mut self, num_values: usize) -> Result<usize>;
239}
240
241/// Gets a decoder for the column descriptor `descr` and encoding type `encoding`.
242///
243/// NOTE: the primitive type in `descr` MUST match the data type `T`, otherwise
244/// disastrous consequence could occur.
245pub fn get_decoder<T: DataType>(
246    descr: ColumnDescPtr,
247    encoding: Encoding,
248) -> Result<Box<dyn Decoder<T>>> {
249    use self::private::GetDecoder;
250    T::T::get_decoder(descr, encoding)
251}
252
253// ----------------------------------------------------------------------
254// PLAIN Decoding
255
256#[derive(Default)]
257pub struct PlainDecoderDetails {
258    // The remaining number of values in the byte array
259    pub(crate) num_values: usize,
260
261    // The current starting index in the byte array. Not used when `T` is bool.
262    pub(crate) start: usize,
263
264    // The length for the type `T`. Only used when `T` is `FixedLenByteArrayType`
265    pub(crate) type_length: i32,
266
267    // The byte array to decode from. Not set if `T` is bool.
268    pub(crate) data: Option<Bytes>,
269
270    // Read `data` bit by bit. Only set if `T` is bool.
271    pub(crate) bit_reader: Option<BitReader>,
272}
273
274/// Plain decoding that supports all types.
275///
276/// Values are encoded back to back. For native types, data is encoded as little endian.
277/// Floating point types are encoded in IEEE.
278/// See [`PlainEncoder`](crate::encoding::PlainEncoder) for more information.
279pub struct PlainDecoder<T: DataType> {
280    // The binary details needed for decoding
281    inner: PlainDecoderDetails,
282
283    // To allow `T` in the generic parameter for this struct. This doesn't take any
284    // space.
285    _phantom: PhantomData<T>,
286}
287
288impl<T: DataType> PlainDecoder<T> {
289    /// Creates new plain decoder.
290    pub fn new(type_length: i32) -> Self {
291        PlainDecoder {
292            inner: PlainDecoderDetails {
293                type_length,
294                num_values: 0,
295                start: 0,
296                data: None,
297                bit_reader: None,
298            },
299            _phantom: PhantomData,
300        }
301    }
302}
303
304impl<T: DataType> Decoder<T> for PlainDecoder<T> {
305    #[inline]
306    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
307        T::T::set_data(&mut self.inner, data, num_values);
308        Ok(())
309    }
310
311    #[inline]
312    fn values_left(&self) -> usize {
313        self.inner.num_values
314    }
315
316    #[inline]
317    fn encoding(&self) -> Encoding {
318        Encoding::PLAIN
319    }
320
321    #[inline]
322    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
323        T::T::decode(buffer, &mut self.inner)
324    }
325
326    #[inline]
327    fn skip(&mut self, num_values: usize) -> Result<usize> {
328        T::T::skip(&mut self.inner, num_values)
329    }
330}
331
332// ----------------------------------------------------------------------
333// RLE_DICTIONARY/PLAIN_DICTIONARY Decoding
334
335/// Dictionary decoder.
336///
337/// The dictionary encoding builds a dictionary of values encountered in a given column.
338/// The dictionary is be stored in a dictionary page per column chunk.
339/// See [`DictEncoder`](crate::encoding::DictEncoder) for more information.
340pub struct DictDecoder<T: DataType> {
341    // The dictionary, which maps ids to the values
342    dictionary: Vec<T::T>,
343
344    // Whether `dictionary` has been initialized
345    has_dictionary: bool,
346
347    // The decoder for the value ids
348    rle_decoder: Option<RleDecoder>,
349
350    // Number of values left in the data stream
351    num_values: usize,
352}
353
354impl<T: DataType> Default for DictDecoder<T> {
355    fn default() -> Self {
356        Self::new()
357    }
358}
359
360impl<T: DataType> DictDecoder<T> {
361    /// Creates new dictionary decoder.
362    pub fn new() -> Self {
363        Self {
364            dictionary: vec![],
365            has_dictionary: false,
366            rle_decoder: None,
367            num_values: 0,
368        }
369    }
370
371    /// Decodes and sets values for dictionary using `decoder` decoder.
372    pub fn set_dict(&mut self, mut decoder: Box<dyn Decoder<T>>) -> Result<()> {
373        let num_values = decoder.values_left();
374        self.dictionary.resize(num_values, T::T::default());
375        let _ = decoder.get(&mut self.dictionary)?;
376        self.has_dictionary = true;
377        Ok(())
378    }
379}
380
381impl<T: DataType> Decoder<T> for DictDecoder<T> {
382    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
383        // First byte in `data` is bit width
384        if data.is_empty() {
385            return Err(eof_err!("Not enough bytes to decode bit_width"));
386        }
387
388        let bit_width = data.as_ref()[0];
389        if bit_width > 32 {
390            return Err(general_err!(
391                "Invalid or corrupted RLE bit width {}. Max allowed is 32",
392                bit_width
393            ));
394        }
395        let mut rle_decoder = RleDecoder::new(bit_width);
396        rle_decoder.set_data(data.slice(1..))?;
397        self.num_values = num_values;
398        self.rle_decoder = Some(rle_decoder);
399        Ok(())
400    }
401
402    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
403        assert!(self.rle_decoder.is_some());
404        assert!(self.has_dictionary, "Must call set_dict() first!");
405
406        let rle = self.rle_decoder.as_mut().unwrap();
407        let num_values = cmp::min(buffer.len(), self.num_values);
408        rle.get_batch_with_dict(&self.dictionary[..], buffer, num_values)
409    }
410
411    /// Number of values left in this decoder stream
412    fn values_left(&self) -> usize {
413        self.num_values
414    }
415
416    fn encoding(&self) -> Encoding {
417        Encoding::RLE_DICTIONARY
418    }
419
420    fn skip(&mut self, num_values: usize) -> Result<usize> {
421        assert!(self.rle_decoder.is_some());
422        assert!(self.has_dictionary, "Must call set_dict() first!");
423
424        let rle = self.rle_decoder.as_mut().unwrap();
425        let num_values = cmp::min(num_values, self.num_values);
426        rle.skip(num_values)
427    }
428}
429
430// ----------------------------------------------------------------------
431// RLE Decoding
432
433/// RLE/Bit-Packing hybrid decoding for values.
434/// Currently is used only for data pages v2 and supports boolean types.
435/// See [`RleValueEncoder`](crate::encoding::RleValueEncoder) for more information.
436pub struct RleValueDecoder<T: DataType> {
437    values_left: usize,
438    decoder: RleDecoder,
439    _phantom: PhantomData<T>,
440}
441
442impl<T: DataType> Default for RleValueDecoder<T> {
443    fn default() -> Self {
444        Self::new()
445    }
446}
447
448impl<T: DataType> RleValueDecoder<T> {
449    pub fn new() -> Self {
450        Self {
451            values_left: 0,
452            decoder: RleDecoder::new(1),
453            _phantom: PhantomData,
454        }
455    }
456}
457
458impl<T: DataType> Decoder<T> for RleValueDecoder<T>
459where
460    T::T: FromBitpacked,
461{
462    #[inline]
463    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
464        // Only support RLE value reader for boolean values with bit width of 1.
465        ensure_phys_ty!(Type::BOOLEAN, "RleValueDecoder only supports BoolType");
466
467        // We still need to remove prefix of i32 from the stream.
468        const I32_SIZE: usize = mem::size_of::<i32>();
469        if data.len() < I32_SIZE {
470            return Err(eof_err!("Not enough bytes to decode"));
471        }
472        let data_size = bit_util::read_num_bytes::<i32>(I32_SIZE, data.as_ref()) as usize;
473        if data.len() - I32_SIZE < data_size {
474            return Err(eof_err!("Not enough bytes to decode"));
475        }
476
477        self.decoder = RleDecoder::new(1);
478        self.decoder
479            .set_data(data.slice(I32_SIZE..I32_SIZE + data_size))?;
480        self.values_left = num_values;
481        Ok(())
482    }
483
484    #[inline]
485    fn values_left(&self) -> usize {
486        self.values_left
487    }
488
489    #[inline]
490    fn encoding(&self) -> Encoding {
491        Encoding::RLE
492    }
493
494    #[inline]
495    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
496        let num_values = cmp::min(buffer.len(), self.values_left);
497        let values_read = self.decoder.get_batch(&mut buffer[..num_values])?;
498        self.values_left -= values_read;
499        Ok(values_read)
500    }
501
502    #[inline]
503    fn skip(&mut self, num_values: usize) -> Result<usize> {
504        let num_values = cmp::min(num_values, self.values_left);
505        let values_skipped = self.decoder.skip(num_values)?;
506        self.values_left -= values_skipped;
507        Ok(values_skipped)
508    }
509}
510
511// ----------------------------------------------------------------------
512// DELTA_BINARY_PACKED Decoding
513
514/// Delta binary packed decoder.
515/// Supports INT32 and INT64 types.
516/// See [`DeltaBitPackEncoder`](crate::encoding::DeltaBitPackEncoder) for more
517/// information.
518pub struct DeltaBitPackDecoder<T: DataType> {
519    bit_reader: BitReader,
520    initialized: bool,
521
522    // Header info
523    /// The number of values in each block
524    block_size: usize,
525    /// The number of values that remain to be read in the current page
526    values_left: usize,
527    /// The number of mini-blocks in each block
528    mini_blocks_per_block: usize,
529    /// The number of values in each mini block
530    values_per_mini_block: usize,
531
532    // Per block info
533    /// The minimum delta in the block
534    min_delta: T::T,
535    /// The byte offset of the end of the current block
536    block_end_offset: usize,
537    /// The index on the current mini block
538    mini_block_idx: usize,
539    /// The bit widths of each mini block in the current block
540    mini_block_bit_widths: Vec<u8>,
541    /// The number of values remaining in the current mini block
542    mini_block_remaining: usize,
543
544    /// The first value from the block header if not consumed
545    first_value: Option<T::T>,
546    /// The last value to compute offsets from
547    last_value: T::T,
548}
549
550impl<T: DataType> Default for DeltaBitPackDecoder<T>
551where
552    T::T: Default + FromPrimitive + WrappingAdd + Copy,
553{
554    fn default() -> Self {
555        Self::new()
556    }
557}
558
559impl<T: DataType> DeltaBitPackDecoder<T>
560where
561    T::T: Default + FromPrimitive + WrappingAdd + Copy,
562{
563    /// Creates new delta bit packed decoder.
564    pub fn new() -> Self {
565        Self {
566            bit_reader: BitReader::from(vec![]),
567            initialized: false,
568            block_size: 0,
569            values_left: 0,
570            mini_blocks_per_block: 0,
571            values_per_mini_block: 0,
572            min_delta: Default::default(),
573            mini_block_idx: 0,
574            mini_block_bit_widths: vec![],
575            mini_block_remaining: 0,
576            block_end_offset: 0,
577            first_value: None,
578            last_value: Default::default(),
579        }
580    }
581
582    /// Returns the current offset
583    pub fn get_offset(&self) -> usize {
584        assert!(self.initialized, "Bit reader is not initialized");
585        match self.values_left {
586            // If we've exhausted this page report the end of the current block
587            // as we may not have consumed the trailing padding
588            //
589            // The max is necessary to handle pages which don't contain more than
590            // one value and therefore have no blocks, but still contain a page header
591            0 => self.bit_reader.get_byte_offset().max(self.block_end_offset),
592            _ => self.bit_reader.get_byte_offset(),
593        }
594    }
595
596    /// Initializes the next block and the first mini block within it
597    #[inline]
598    fn next_block(&mut self) -> Result<()> {
599        let min_delta = self
600            .bit_reader
601            .get_zigzag_vlq_int()
602            .ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?;
603
604        self.min_delta =
605            T::T::from_i64(min_delta).ok_or_else(|| general_err!("'min_delta' too large"))?;
606
607        self.mini_block_bit_widths.clear();
608        self.bit_reader
609            .get_aligned_bytes(&mut self.mini_block_bit_widths, self.mini_blocks_per_block);
610
611        let mut offset = self.bit_reader.get_byte_offset();
612        let mut remaining = self.values_left;
613
614        // Compute the end offset of the current block
615        for b in &mut self.mini_block_bit_widths {
616            if remaining == 0 {
617                // Specification requires handling arbitrary bit widths
618                // for trailing mini blocks
619                *b = 0;
620            }
621            remaining = remaining.saturating_sub(self.values_per_mini_block);
622            offset += *b as usize * self.values_per_mini_block / 8;
623        }
624        self.block_end_offset = offset;
625
626        if self.mini_block_bit_widths.len() != self.mini_blocks_per_block {
627            return Err(eof_err!("insufficient mini block bit widths"));
628        }
629
630        self.mini_block_remaining = self.values_per_mini_block;
631        self.mini_block_idx = 0;
632
633        Ok(())
634    }
635
636    /// Initializes the next mini block
637    #[inline]
638    fn next_mini_block(&mut self) -> Result<()> {
639        if self.mini_block_idx + 1 < self.mini_block_bit_widths.len() {
640            self.mini_block_idx += 1;
641            self.mini_block_remaining = self.values_per_mini_block;
642            Ok(())
643        } else {
644            self.next_block()
645        }
646    }
647
648    /// Verify the bit width is smaller then the integer type that it is trying to decode.
649    #[inline]
650    fn check_bit_width(&self, bit_width: usize) -> Result<()> {
651        if bit_width > std::mem::size_of::<T::T>() * 8 {
652            return Err(general_err!(
653                "Invalid delta bit width {} which is larger than expected {} ",
654                bit_width,
655                std::mem::size_of::<T::T>() * 8
656            ));
657        }
658        Ok(())
659    }
660}
661
662impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T>
663where
664    T::T: Default + FromPrimitive + FromBitpacked + WrappingAdd + Copy,
665{
666    // # of total values is derived from encoding
667    #[inline]
668    fn set_data(&mut self, data: Bytes, _index: usize) -> Result<()> {
669        self.bit_reader = BitReader::new(data);
670        self.initialized = true;
671
672        // Read header information
673        self.block_size = self
674            .bit_reader
675            .get_vlq_int()
676            .ok_or_else(|| eof_err!("Not enough data to decode 'block_size'"))?
677            .try_into()
678            .map_err(|_| general_err!("invalid 'block_size'"))?;
679
680        self.mini_blocks_per_block = self
681            .bit_reader
682            .get_vlq_int()
683            .ok_or_else(|| eof_err!("Not enough data to decode 'mini_blocks_per_block'"))?
684            .try_into()
685            .map_err(|_| general_err!("invalid 'mini_blocks_per_block'"))?;
686
687        if self.mini_blocks_per_block == 0 {
688            return Err(general_err!("cannot have zero miniblocks per block"));
689        }
690
691        self.values_left = self
692            .bit_reader
693            .get_vlq_int()
694            .ok_or_else(|| eof_err!("Not enough data to decode 'values_left'"))?
695            .try_into()
696            .map_err(|_| general_err!("invalid 'values_left'"))?;
697
698        let first_value = self
699            .bit_reader
700            .get_zigzag_vlq_int()
701            .ok_or_else(|| eof_err!("Not enough data to decode 'first_value'"))?;
702
703        self.first_value =
704            Some(T::T::from_i64(first_value).ok_or_else(|| general_err!("first value too large"))?);
705
706        if self.block_size % 128 != 0 {
707            return Err(general_err!(
708                "'block_size' must be a multiple of 128, got {}",
709                self.block_size
710            ));
711        }
712
713        if self.block_size % self.mini_blocks_per_block != 0 {
714            return Err(general_err!(
715                "'block_size' must be a multiple of 'mini_blocks_per_block' got {} and {}",
716                self.block_size,
717                self.mini_blocks_per_block
718            ));
719        }
720
721        // Reset decoding state
722        self.mini_block_idx = 0;
723        self.values_per_mini_block = self.block_size / self.mini_blocks_per_block;
724        self.mini_block_remaining = 0;
725        self.mini_block_bit_widths.clear();
726
727        if self.values_per_mini_block % 32 != 0 {
728            return Err(general_err!(
729                "'values_per_mini_block' must be a multiple of 32 got {}",
730                self.values_per_mini_block
731            ));
732        }
733
734        Ok(())
735    }
736
737    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
738        assert!(self.initialized, "Bit reader is not initialized");
739        if buffer.is_empty() {
740            return Ok(0);
741        }
742
743        let mut read = 0;
744        let to_read = buffer.len().min(self.values_left);
745
746        if let Some(value) = self.first_value.take() {
747            self.last_value = value;
748            buffer[0] = value;
749            read += 1;
750            self.values_left -= 1;
751        }
752
753        while read != to_read {
754            if self.mini_block_remaining == 0 {
755                self.next_mini_block()?;
756            }
757
758            let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
759            self.check_bit_width(bit_width)?;
760            let batch_to_read = self.mini_block_remaining.min(to_read - read);
761
762            let batch_read = self
763                .bit_reader
764                .get_batch(&mut buffer[read..read + batch_to_read], bit_width);
765
766            if batch_read != batch_to_read {
767                return Err(general_err!(
768                    "Expected to read {} values from miniblock got {}",
769                    batch_to_read,
770                    batch_read
771                ));
772            }
773
774            // At this point we have read the deltas to `buffer` we now need to offset
775            // these to get back to the original values that were encoded
776            //
777            // Optimization: if the bit_width for the miniblock is 0, then we can employ
778            // a faster decoding method than setting `value[i] = value[i-1] + value[i] + min_delta`.
779            // Where min_delta is 0 (all values in the miniblock are the same), we can simply
780            // set all values to `self.last_value`. In the case of non-zero min_delta (values
781            // in the mini-block form an arithmetic progression) each value can be computed via
782            // `value[i] = (i + 1) * min_delta + last_value`. In both cases we remove the
783            // dependence on the preceding value.
784            // Kudos to @pitrou for the idea https://github.com/apache/arrow/pull/49296
785            let min_delta = self.min_delta.as_i64()?;
786            if bit_width == 0 {
787                if min_delta == 0 {
788                    buffer[read..read + batch_read].fill(self.last_value);
789                } else {
790                    // the c++ version multiplies min_delta by the iter index, but doing
791                    // wrapping_mul through T::T was a bit slower. this is still
792                    // faster than before.
793                    let mut delta = self.min_delta;
794                    for v in &mut buffer[read..read + batch_read] {
795                        *v = self.last_value.wrapping_add(&delta);
796                        delta = delta.wrapping_add(&self.min_delta);
797                    }
798
799                    self.last_value = buffer[read + batch_read - 1];
800                }
801            } else {
802                // It is OK for deltas to contain "overflowed" values after encoding,
803                // e.g. i64::MAX - i64::MIN, so we use `wrapping_add` to "overflow" again and
804                // restore original value.
805                if min_delta == 0 {
806                    for v in &mut buffer[read..read + batch_read] {
807                        *v = v.wrapping_add(&self.last_value);
808                        self.last_value = *v;
809                    }
810                } else {
811                    for v in &mut buffer[read..read + batch_read] {
812                        *v = v
813                            .wrapping_add(&self.min_delta)
814                            .wrapping_add(&self.last_value);
815                        self.last_value = *v;
816                    }
817                }
818            }
819
820            read += batch_read;
821            self.mini_block_remaining -= batch_read;
822            self.values_left -= batch_read;
823        }
824
825        Ok(to_read)
826    }
827
828    fn values_left(&self) -> usize {
829        self.values_left
830    }
831
832    fn encoding(&self) -> Encoding {
833        Encoding::DELTA_BINARY_PACKED
834    }
835
836    fn skip(&mut self, num_values: usize) -> Result<usize> {
837        let mut skip = 0;
838        let to_skip = num_values.min(self.values_left);
839        if to_skip == 0 {
840            return Ok(0);
841        }
842
843        // try to consume first value in header.
844        if let Some(value) = self.first_value.take() {
845            self.last_value = value;
846            skip += 1;
847            self.values_left -= 1;
848        }
849
850        // See https://github.com/apache/arrow-rs/pull/9794.
851        // The parquet spec actually allows for miniblock sizes other than 32 or 64, but
852        // no current writers use anything else. Using values_per_mini_block directly
853        // for the skip_buffer doesn't allow stack allocation and leads to a significant
854        // drop in performance. We'll settle for erroring out here and come up with a
855        // better fix if writers ever start getting creative with block sizes.
856        let mini_block_batch_size = match self.values_per_mini_block {
857            32 => 32,
858            64 => 64,
859            _ => {
860                return Err(general_err!(
861                    "cannot skip miniblock of size {}",
862                    self.values_per_mini_block
863                ));
864            }
865        };
866
867        let mut skip_buffer = vec![T::T::default(); mini_block_batch_size];
868        while skip < to_skip {
869            if self.mini_block_remaining == 0 {
870                self.next_mini_block()?;
871            }
872
873            let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
874            self.check_bit_width(bit_width)?;
875            let mini_block_to_skip = self.mini_block_remaining.min(to_skip - skip);
876
877            // see commentary in self.get() above regarding optimizations
878            let min_delta = self.min_delta.as_i64()?;
879            if bit_width == 0 {
880                // All remainders are zero: every delta equals min_delta exactly.
881                // Advance last_value by n * min_delta with no bit reads.
882                // When min_delta == 0 there is nothing to do: last_value is
883                // unchanged and no bytes are consumed from the bit reader.
884                if min_delta != 0 {
885                    let total = min_delta.wrapping_mul(mini_block_to_skip as i64);
886                    let step = T::T::from_i64(total)
887                        .ok_or_else(|| general_err!("delta*n overflow in skip"))?;
888                    self.last_value = self.last_value.wrapping_add(&step);
889                }
890                // bit_width=0 payloads occupy zero bytes; no bit_reader advancement needed.
891            } else {
892                // bw>0: must decode to track last_value for subsequent get() calls.
893                let skip_count = self
894                    .bit_reader
895                    .get_batch(&mut skip_buffer[0..mini_block_to_skip], bit_width);
896
897                if skip_count != mini_block_to_skip {
898                    return Err(general_err!(
899                        "Expected to skip {} values from mini block got {}.",
900                        mini_block_to_skip,
901                        skip_count
902                    ));
903                }
904
905                if min_delta == 0 {
906                    for v in &mut skip_buffer[0..skip_count] {
907                        *v = v.wrapping_add(&self.last_value);
908                        self.last_value = *v;
909                    }
910                } else {
911                    for v in &mut skip_buffer[0..skip_count] {
912                        *v = v
913                            .wrapping_add(&self.min_delta)
914                            .wrapping_add(&self.last_value);
915                        self.last_value = *v;
916                    }
917                }
918            }
919
920            skip += mini_block_to_skip;
921            self.mini_block_remaining -= mini_block_to_skip;
922            self.values_left -= mini_block_to_skip;
923        }
924
925        Ok(to_skip)
926    }
927}
928
929// ----------------------------------------------------------------------
930// DELTA_LENGTH_BYTE_ARRAY Decoding
931
932/// Delta length byte array decoder.
933///
934/// Only applied to byte arrays to separate the length values and the data, the lengths
935/// are encoded using DELTA_BINARY_PACKED encoding.
936/// See [`DeltaLengthByteArrayEncoder`](crate::encoding::DeltaLengthByteArrayEncoder)
937/// for more information.
938pub struct DeltaLengthByteArrayDecoder<T: DataType> {
939    // Lengths for each byte array in `data`
940    // TODO: add memory tracker to this
941    lengths: Vec<i32>,
942
943    // Current index into `lengths`
944    current_idx: usize,
945
946    // Concatenated byte array data
947    data: Option<Bytes>,
948
949    // Offset into `data`, always point to the beginning of next byte array.
950    offset: usize,
951
952    // Number of values left in this decoder stream
953    num_values: usize,
954
955    // Placeholder to allow `T` as generic parameter
956    _phantom: PhantomData<T>,
957}
958
959impl<T: DataType> Default for DeltaLengthByteArrayDecoder<T> {
960    fn default() -> Self {
961        Self::new()
962    }
963}
964
965impl<T: DataType> DeltaLengthByteArrayDecoder<T> {
966    /// Creates new delta length byte array decoder.
967    pub fn new() -> Self {
968        Self {
969            lengths: vec![],
970            current_idx: 0,
971            data: None,
972            offset: 0,
973            num_values: 0,
974            _phantom: PhantomData,
975        }
976    }
977}
978
979impl<T: DataType> Decoder<T> for DeltaLengthByteArrayDecoder<T> {
980    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
981        match T::get_physical_type() {
982            Type::BYTE_ARRAY => {
983                let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
984                len_decoder.set_data(data.clone(), num_values)?;
985                let num_lengths = len_decoder.values_left();
986                self.lengths.resize(num_lengths, 0);
987                len_decoder.get(&mut self.lengths[..])?;
988
989                self.data = Some(data.slice(len_decoder.get_offset()..));
990                self.offset = 0;
991                self.current_idx = 0;
992                self.num_values = num_lengths;
993                Ok(())
994            }
995            _ => Err(general_err!(
996                "DeltaLengthByteArrayDecoder only support ByteArrayType"
997            )),
998        }
999    }
1000
1001    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
1002        match T::get_physical_type() {
1003            Type::BYTE_ARRAY => {
1004                assert!(self.data.is_some());
1005
1006                let data = self.data.as_ref().unwrap();
1007                let num_values = cmp::min(buffer.len(), self.num_values);
1008
1009                for item in buffer.iter_mut().take(num_values) {
1010                    let len = self.lengths[self.current_idx] as usize;
1011                    item.set_from_bytes(data.slice(self.offset..self.offset + len));
1012
1013                    self.offset += len;
1014                    self.current_idx += 1;
1015                }
1016
1017                self.num_values -= num_values;
1018                Ok(num_values)
1019            }
1020            _ => Err(general_err!(
1021                "DeltaLengthByteArrayDecoder only support ByteArrayType"
1022            )),
1023        }
1024    }
1025
1026    fn values_left(&self) -> usize {
1027        self.num_values
1028    }
1029
1030    fn encoding(&self) -> Encoding {
1031        Encoding::DELTA_LENGTH_BYTE_ARRAY
1032    }
1033
1034    fn skip(&mut self, num_values: usize) -> Result<usize> {
1035        match T::get_physical_type() {
1036            Type::BYTE_ARRAY => {
1037                let num_values = cmp::min(num_values, self.num_values);
1038
1039                let next_offset: i32 = self.lengths
1040                    [self.current_idx..self.current_idx + num_values]
1041                    .iter()
1042                    .sum();
1043
1044                self.current_idx += num_values;
1045                self.offset += next_offset as usize;
1046
1047                self.num_values -= num_values;
1048                Ok(num_values)
1049            }
1050            other_type => Err(general_err!(
1051                "DeltaLengthByteArrayDecoder not support {}, only support byte array",
1052                other_type
1053            )),
1054        }
1055    }
1056}
1057
1058// ----------------------------------------------------------------------
1059// DELTA_BYTE_ARRAY Decoding
1060
1061/// Delta byte array decoder.
1062///
1063/// Prefix lengths are encoded using `DELTA_BINARY_PACKED` encoding, Suffixes are stored
1064/// using `DELTA_LENGTH_BYTE_ARRAY` encoding.
1065/// See [`DeltaByteArrayEncoder`](crate::encoding::DeltaByteArrayEncoder) for more
1066/// information.
1067pub struct DeltaByteArrayDecoder<T: DataType> {
1068    // Prefix lengths for each byte array
1069    // TODO: add memory tracker to this
1070    prefix_lengths: Vec<i32>,
1071
1072    // The current index into `prefix_lengths`,
1073    current_idx: usize,
1074
1075    // Decoder for all suffixes, the # of which should be the same as
1076    // `prefix_lengths.len()`
1077    suffix_decoder: Option<DeltaLengthByteArrayDecoder<ByteArrayType>>,
1078
1079    // The last byte array, used to derive the current prefix
1080    // Stored as Bytes to avoid clone allocation when creating output
1081    previous_value: Bytes,
1082
1083    // Number of values left
1084    num_values: usize,
1085
1086    // Placeholder to allow `T` as generic parameter
1087    _phantom: PhantomData<T>,
1088}
1089
1090impl<T: DataType> Default for DeltaByteArrayDecoder<T> {
1091    fn default() -> Self {
1092        Self::new()
1093    }
1094}
1095
1096impl<T: DataType> DeltaByteArrayDecoder<T> {
1097    /// Creates new delta byte array decoder.
1098    pub fn new() -> Self {
1099        Self {
1100            prefix_lengths: vec![],
1101            current_idx: 0,
1102            suffix_decoder: None,
1103            previous_value: Bytes::new(),
1104            num_values: 0,
1105            _phantom: PhantomData,
1106        }
1107    }
1108}
1109
1110impl<T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
1111    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
1112        match T::get_physical_type() {
1113            Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1114                let mut prefix_len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
1115                prefix_len_decoder.set_data(data.clone(), num_values)?;
1116                let num_prefixes = prefix_len_decoder.values_left();
1117                self.prefix_lengths.resize(num_prefixes, 0);
1118                prefix_len_decoder.get(&mut self.prefix_lengths[..])?;
1119
1120                let mut suffix_decoder = DeltaLengthByteArrayDecoder::new();
1121                suffix_decoder
1122                    .set_data(data.slice(prefix_len_decoder.get_offset()..), num_values)?;
1123                self.suffix_decoder = Some(suffix_decoder);
1124                self.num_values = num_prefixes;
1125                self.current_idx = 0;
1126                self.previous_value = Bytes::new();
1127                Ok(())
1128            }
1129            _ => Err(general_err!(
1130                "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1131            )),
1132        }
1133    }
1134
1135    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
1136        match T::get_physical_type() {
1137            Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1138                let num_values = cmp::min(buffer.len(), self.num_values);
1139                let mut v: [ByteArray; 1] = [ByteArray::new(); 1];
1140                for item in buffer.iter_mut().take(num_values) {
1141                    // Process suffix
1142                    // TODO: this is awkward - maybe we should add a non-vectorized API?
1143                    let suffix_decoder = self
1144                        .suffix_decoder
1145                        .as_mut()
1146                        .expect("decoder not initialized");
1147                    suffix_decoder.get(&mut v[..])?;
1148                    let suffix = v[0].data();
1149
1150                    // Extract current prefix length, can be 0
1151                    let prefix_len = usize::try_from(self.prefix_lengths[self.current_idx])
1152                        .map_err(|_| {
1153                            general_err!(
1154                                "Invalid DELTA_BYTE_ARRAY prefix length {}",
1155                                self.prefix_lengths[self.current_idx]
1156                            )
1157                        })?;
1158
1159                    if prefix_len > self.previous_value.len() {
1160                        return Err(general_err!(
1161                            "Invalid DELTA_BYTE_ARRAY prefix length {} exceeds previous value length {}",
1162                            prefix_len,
1163                            self.previous_value.len()
1164                        ));
1165                    }
1166
1167                    // Concatenate prefix with suffix
1168                    let mut result = Vec::with_capacity(prefix_len + suffix.len());
1169                    result.extend_from_slice(&self.previous_value[0..prefix_len]);
1170                    result.extend_from_slice(suffix);
1171
1172                    let data = Bytes::from(result);
1173                    item.set_from_bytes(data.clone());
1174
1175                    self.previous_value = data;
1176                    self.current_idx += 1;
1177                }
1178
1179                self.num_values -= num_values;
1180                Ok(num_values)
1181            }
1182            _ => Err(general_err!(
1183                "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1184            )),
1185        }
1186    }
1187
1188    fn values_left(&self) -> usize {
1189        self.num_values
1190    }
1191
1192    fn encoding(&self) -> Encoding {
1193        Encoding::DELTA_BYTE_ARRAY
1194    }
1195
1196    fn skip(&mut self, num_values: usize) -> Result<usize> {
1197        let mut buffer = vec![T::T::default(); num_values];
1198        self.get(&mut buffer)
1199    }
1200}
1201
1202#[cfg(test)]
1203mod tests {
1204    use super::{super::encoding::*, *};
1205
1206    use std::f32::consts::PI as PI_f32;
1207    use std::f64::consts::PI as PI_f64;
1208    use std::sync::Arc;
1209
1210    use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
1211    use crate::util::test_common::rand_gen::RandGen;
1212
1213    #[test]
1214    fn test_delta_byte_array_invalid_prefix_len_returns_error() {
1215        let col_descr = create_test_col_desc_ptr(-1, Type::BYTE_ARRAY);
1216
1217        let mut encoder =
1218            get_encoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, &col_descr).unwrap();
1219        let input = vec![ByteArray::from("a"), ByteArray::from("ab")];
1220        encoder.put(&input).unwrap();
1221        let encoded = encoder.flush_buffer().unwrap();
1222
1223        // First, decode just the prefix-length stream so we know where the suffix stream starts.
1224        let mut prefix_len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
1225        prefix_len_decoder
1226            .set_data(encoded.clone(), input.len())
1227            .unwrap();
1228        let num_prefixes = prefix_len_decoder.values_left();
1229        let mut prefix_lengths = vec![0; num_prefixes];
1230        prefix_len_decoder.get(&mut prefix_lengths).unwrap();
1231
1232        // check: valid encoding should produce prefix lengths [0, 1]
1233        assert_eq!(prefix_lengths, vec![0, 1]);
1234
1235        let prefix_stream_end = prefix_len_decoder.get_offset();
1236
1237        // Corrupt the prefix-length stream itself:
1238        // replace it with a valid DELTA_BINARY_PACKED stream for [1, 1],
1239        // so the first decoded prefix length becomes impossible because previous_value is empty.
1240        let mut prefix_encoder = get_encoder::<Int32Type>(
1241            Encoding::DELTA_BINARY_PACKED,
1242            &create_test_col_desc_ptr(-1, Type::INT32),
1243        )
1244        .unwrap();
1245        prefix_encoder.put(&[1i32, 1i32]).unwrap();
1246        let corrupted_prefix = prefix_encoder.flush_buffer().unwrap();
1247
1248        let mut corrupted = Vec::new();
1249        corrupted.extend_from_slice(corrupted_prefix.as_ref());
1250        corrupted.extend_from_slice(&encoded[prefix_stream_end..]);
1251
1252        let mut decoder = DeltaByteArrayDecoder::<ByteArrayType>::new();
1253        decoder
1254            .set_data(Bytes::from(corrupted), input.len())
1255            .unwrap();
1256
1257        let mut out = vec![ByteArray::new(); input.len()];
1258
1259        let err = decoder.get(&mut out).unwrap_err();
1260        assert!(
1261            err.to_string()
1262                .contains("Invalid DELTA_BYTE_ARRAY prefix length"),
1263            "{}",
1264            err
1265        );
1266    }
1267
1268    #[test]
1269    fn test_delta_byte_array_negative_prefix_len_returns_error() {
1270        let col_descr = create_test_col_desc_ptr(-1, Type::BYTE_ARRAY);
1271
1272        let mut encoder =
1273            get_encoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, &col_descr).unwrap();
1274        let input = vec![ByteArray::from("a"), ByteArray::from("ab")];
1275        encoder.put(&input).unwrap();
1276        let encoded = encoder.flush_buffer().unwrap();
1277
1278        let mut decoder = DeltaByteArrayDecoder::<ByteArrayType>::new();
1279        decoder.set_data(encoded, input.len()).unwrap();
1280
1281        // Force a negative prefix length after decoder initialization
1282        decoder.prefix_lengths[0] = -1;
1283        let mut out = vec![ByteArray::new(); input.len()];
1284
1285        let err = decoder.get(&mut out).unwrap_err();
1286        assert!(
1287            err.to_string()
1288                .contains("Invalid DELTA_BYTE_ARRAY prefix length"),
1289            "{}",
1290            err
1291        );
1292    }
1293
1294    #[test]
1295    fn test_get_decoders() {
1296        // supported encodings
1297        create_and_check_decoder::<Int32Type>(Encoding::PLAIN, None);
1298        create_and_check_decoder::<Int32Type>(Encoding::DELTA_BINARY_PACKED, None);
1299        create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
1300        create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, None);
1301        create_and_check_decoder::<BoolType>(Encoding::RLE, None);
1302
1303        // error when initializing
1304        create_and_check_decoder::<Int32Type>(
1305            Encoding::RLE_DICTIONARY,
1306            Some(general_err!(
1307                "Cannot initialize this encoding through this function"
1308            )),
1309        );
1310        create_and_check_decoder::<Int32Type>(
1311            Encoding::PLAIN_DICTIONARY,
1312            Some(general_err!(
1313                "Cannot initialize this encoding through this function"
1314            )),
1315        );
1316        create_and_check_decoder::<Int32Type>(
1317            Encoding::DELTA_LENGTH_BYTE_ARRAY,
1318            Some(general_err!(
1319                "Encoding DELTA_LENGTH_BYTE_ARRAY is not supported for type"
1320            )),
1321        );
1322        create_and_check_decoder::<Int32Type>(
1323            Encoding::DELTA_BYTE_ARRAY,
1324            Some(general_err!(
1325                "Encoding DELTA_BYTE_ARRAY is not supported for type"
1326            )),
1327        );
1328
1329        // unsupported
1330        #[allow(deprecated)]
1331        create_and_check_decoder::<Int32Type>(
1332            Encoding::BIT_PACKED,
1333            Some(nyi_err!("Encoding BIT_PACKED is not supported")),
1334        );
1335    }
1336
1337    #[test]
1338    fn test_plain_decode_int32() {
1339        let data = [42, 18, 52];
1340        let data_bytes = Int32Type::to_byte_array(&data[..]);
1341        let mut buffer = [0; 3];
1342        test_plain_decode::<Int32Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1343    }
1344
1345    #[test]
1346    fn test_plain_skip_int32() {
1347        let data = [42, 18, 52];
1348        let data_bytes = Int32Type::to_byte_array(&data[..]);
1349        test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1350    }
1351
1352    #[test]
1353    fn test_plain_skip_all_int32() {
1354        let data = [42, 18, 52];
1355        let data_bytes = Int32Type::to_byte_array(&data[..]);
1356        test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1357    }
1358
1359    #[test]
1360    fn test_plain_decode_int32_spaced() {
1361        let data = [42, 18, 52];
1362        let expected_data = [0, 42, 0, 18, 0, 0, 52, 0];
1363        let data_bytes = Int32Type::to_byte_array(&data[..]);
1364        let mut buffer = [0; 8];
1365        let num_nulls = 5;
1366        let valid_bits = [0b01001010];
1367        test_plain_decode_spaced::<Int32Type>(
1368            Bytes::from(data_bytes),
1369            3,
1370            -1,
1371            &mut buffer[..],
1372            num_nulls,
1373            &valid_bits,
1374            &expected_data[..],
1375        );
1376    }
1377
1378    #[test]
1379    fn test_plain_decode_int64() {
1380        let data = [42, 18, 52];
1381        let data_bytes = Int64Type::to_byte_array(&data[..]);
1382        let mut buffer = [0; 3];
1383        test_plain_decode::<Int64Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1384    }
1385
1386    #[test]
1387    fn test_plain_skip_int64() {
1388        let data = [42, 18, 52];
1389        let data_bytes = Int64Type::to_byte_array(&data[..]);
1390        test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 2, -1, &data[2..]);
1391    }
1392
1393    #[test]
1394    fn test_plain_skip_all_int64() {
1395        let data = [42, 18, 52];
1396        let data_bytes = Int64Type::to_byte_array(&data[..]);
1397        test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 3, -1, &[]);
1398    }
1399
1400    #[test]
1401    fn test_plain_decode_float() {
1402        let data = [PI_f32, 2.414, 12.51];
1403        let data_bytes = FloatType::to_byte_array(&data[..]);
1404        let mut buffer = [0.0; 3];
1405        test_plain_decode::<FloatType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1406    }
1407
1408    #[test]
1409    fn test_plain_skip_float() {
1410        let data = [PI_f32, 2.414, 12.51];
1411        let data_bytes = FloatType::to_byte_array(&data[..]);
1412        test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1413    }
1414
1415    #[test]
1416    fn test_plain_skip_all_float() {
1417        let data = [PI_f32, 2.414, 12.51];
1418        let data_bytes = FloatType::to_byte_array(&data[..]);
1419        test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 4, -1, &[]);
1420    }
1421
1422    #[test]
1423    fn test_plain_skip_double() {
1424        let data = [PI_f64, 2.414f64, 12.51f64];
1425        let data_bytes = DoubleType::to_byte_array(&data[..]);
1426        test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1427    }
1428
1429    #[test]
1430    fn test_plain_skip_all_double() {
1431        let data = [PI_f64, 2.414f64, 12.51f64];
1432        let data_bytes = DoubleType::to_byte_array(&data[..]);
1433        test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1434    }
1435
1436    #[test]
1437    fn test_plain_decode_double() {
1438        let data = [PI_f64, 2.414f64, 12.51f64];
1439        let data_bytes = DoubleType::to_byte_array(&data[..]);
1440        let mut buffer = [0.0f64; 3];
1441        test_plain_decode::<DoubleType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1442    }
1443
1444    #[test]
1445    fn test_plain_decode_int96() {
1446        let mut data = [Int96::new(); 4];
1447        data[0].set_data(11, 22, 33);
1448        data[1].set_data(44, 55, 66);
1449        data[2].set_data(10, 20, 30);
1450        data[3].set_data(40, 50, 60);
1451        let data_bytes = Int96Type::to_byte_array(&data[..]);
1452        let mut buffer = [Int96::new(); 4];
1453        test_plain_decode::<Int96Type>(Bytes::from(data_bytes), 4, -1, &mut buffer[..], &data[..]);
1454    }
1455
1456    #[test]
1457    fn test_plain_skip_int96() {
1458        let mut data = [Int96::new(); 4];
1459        data[0].set_data(11, 22, 33);
1460        data[1].set_data(44, 55, 66);
1461        data[2].set_data(10, 20, 30);
1462        data[3].set_data(40, 50, 60);
1463        let data_bytes = Int96Type::to_byte_array(&data[..]);
1464        test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 2, -1, &data[2..]);
1465    }
1466
1467    #[test]
1468    fn test_plain_skip_all_int96() {
1469        let mut data = [Int96::new(); 4];
1470        data[0].set_data(11, 22, 33);
1471        data[1].set_data(44, 55, 66);
1472        data[2].set_data(10, 20, 30);
1473        data[3].set_data(40, 50, 60);
1474        let data_bytes = Int96Type::to_byte_array(&data[..]);
1475        test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 8, -1, &[]);
1476    }
1477
1478    #[test]
1479    fn test_plain_decode_bool() {
1480        let data = [
1481            false, true, false, false, true, false, true, true, false, true,
1482        ];
1483        let data_bytes = BoolType::to_byte_array(&data[..]);
1484        let mut buffer = [false; 10];
1485        test_plain_decode::<BoolType>(Bytes::from(data_bytes), 10, -1, &mut buffer[..], &data[..]);
1486    }
1487
1488    #[test]
1489    fn test_plain_skip_bool() {
1490        let data = [
1491            false, true, false, false, true, false, true, true, false, true,
1492        ];
1493        let data_bytes = BoolType::to_byte_array(&data[..]);
1494        test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 5, -1, &data[5..]);
1495    }
1496
1497    #[test]
1498    fn test_plain_skip_all_bool() {
1499        let data = [
1500            false, true, false, false, true, false, true, true, false, true,
1501        ];
1502        let data_bytes = BoolType::to_byte_array(&data[..]);
1503        test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 20, -1, &[]);
1504    }
1505
1506    #[test]
1507    fn test_plain_decode_byte_array() {
1508        let mut data = vec![ByteArray::new(); 2];
1509        data[0].set_data(Bytes::from(String::from("hello")));
1510        data[1].set_data(Bytes::from(String::from("parquet")));
1511        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1512        let mut buffer = vec![ByteArray::new(); 2];
1513        test_plain_decode::<ByteArrayType>(
1514            Bytes::from(data_bytes),
1515            2,
1516            -1,
1517            &mut buffer[..],
1518            &data[..],
1519        );
1520    }
1521
1522    #[test]
1523    fn test_plain_skip_byte_array() {
1524        let mut data = vec![ByteArray::new(); 2];
1525        data[0].set_data(Bytes::from(String::from("hello")));
1526        data[1].set_data(Bytes::from(String::from("parquet")));
1527        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1528        test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 1, -1, &data[1..]);
1529    }
1530
1531    #[test]
1532    fn test_plain_skip_all_byte_array() {
1533        let mut data = vec![ByteArray::new(); 2];
1534        data[0].set_data(Bytes::from(String::from("hello")));
1535        data[1].set_data(Bytes::from(String::from("parquet")));
1536        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1537        test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 2, -1, &[]);
1538    }
1539
1540    #[test]
1541    fn test_plain_decode_fixed_len_byte_array() {
1542        let mut data = vec![FixedLenByteArray::default(); 3];
1543        data[0].set_data(Bytes::from(String::from("bird")));
1544        data[1].set_data(Bytes::from(String::from("come")));
1545        data[2].set_data(Bytes::from(String::from("flow")));
1546        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1547        let mut buffer = vec![FixedLenByteArray::default(); 3];
1548        test_plain_decode::<FixedLenByteArrayType>(
1549            Bytes::from(data_bytes),
1550            3,
1551            4,
1552            &mut buffer[..],
1553            &data[..],
1554        );
1555    }
1556
1557    #[test]
1558    fn test_plain_skip_fixed_len_byte_array() {
1559        let mut data = vec![FixedLenByteArray::default(); 3];
1560        data[0].set_data(Bytes::from(String::from("bird")));
1561        data[1].set_data(Bytes::from(String::from("come")));
1562        data[2].set_data(Bytes::from(String::from("flow")));
1563        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1564        test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 1, 4, &data[1..]);
1565    }
1566
1567    #[test]
1568    fn test_plain_skip_all_fixed_len_byte_array() {
1569        let mut data = vec![FixedLenByteArray::default(); 3];
1570        data[0].set_data(Bytes::from(String::from("bird")));
1571        data[1].set_data(Bytes::from(String::from("come")));
1572        data[2].set_data(Bytes::from(String::from("flow")));
1573        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1574        test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 6, 4, &[]);
1575    }
1576
1577    #[test]
1578    fn test_dict_decoder_empty_data() {
1579        let mut decoder = DictDecoder::<Int32Type>::new();
1580        let err = decoder.set_data(Bytes::new(), 10).unwrap_err();
1581        assert_eq!(err.to_string(), "EOF: Not enough bytes to decode bit_width");
1582    }
1583
1584    fn test_plain_decode<T: DataType>(
1585        data: Bytes,
1586        num_values: usize,
1587        type_length: i32,
1588        buffer: &mut [T::T],
1589        expected: &[T::T],
1590    ) {
1591        let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1592        let result = decoder.set_data(data, num_values);
1593        assert!(result.is_ok());
1594        let result = decoder.get(buffer);
1595        assert!(result.is_ok());
1596        assert_eq!(decoder.values_left(), 0);
1597        assert_eq!(buffer, expected);
1598    }
1599
1600    fn test_plain_skip<T: DataType>(
1601        data: Bytes,
1602        num_values: usize,
1603        skip: usize,
1604        type_length: i32,
1605        expected: &[T::T],
1606    ) {
1607        let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1608        let result = decoder.set_data(data, num_values);
1609        assert!(result.is_ok());
1610        let skipped = decoder.skip(skip).expect("skipping values");
1611
1612        if skip >= num_values {
1613            assert_eq!(skipped, num_values);
1614
1615            let mut buffer = vec![T::T::default(); 1];
1616            let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1617            assert_eq!(remaining, 0);
1618        } else {
1619            assert_eq!(skipped, skip);
1620            let mut buffer = vec![T::T::default(); num_values - skip];
1621            let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1622            assert_eq!(remaining, num_values - skip);
1623            assert_eq!(decoder.values_left(), 0);
1624            assert_eq!(buffer, expected);
1625        }
1626    }
1627
1628    fn test_plain_decode_spaced<T: DataType>(
1629        data: Bytes,
1630        num_values: usize,
1631        type_length: i32,
1632        buffer: &mut [T::T],
1633        num_nulls: usize,
1634        valid_bits: &[u8],
1635        expected: &[T::T],
1636    ) {
1637        let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1638        let result = decoder.set_data(data, num_values);
1639        assert!(result.is_ok());
1640        let result = decoder.get_spaced(buffer, num_nulls, valid_bits);
1641        assert!(result.is_ok());
1642        assert_eq!(num_values + num_nulls, result.unwrap());
1643        assert_eq!(decoder.values_left(), 0);
1644        assert_eq!(buffer, expected);
1645    }
1646
1647    #[test]
1648    #[should_panic(expected = "RleValueEncoder only supports BoolType")]
1649    fn test_rle_value_encode_int32_not_supported() {
1650        let mut encoder = RleValueEncoder::<Int32Type>::new();
1651        encoder.put(&[1, 2, 3, 4]).unwrap();
1652    }
1653
1654    #[test]
1655    #[should_panic(expected = "RleValueDecoder only supports BoolType")]
1656    fn test_rle_value_decode_int32_not_supported() {
1657        let mut decoder = RleValueDecoder::<Int32Type>::new();
1658        decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).unwrap();
1659    }
1660
1661    #[test]
1662    fn test_rle_value_decode_missing_size() {
1663        let mut decoder = RleValueDecoder::<BoolType>::new();
1664        assert!(decoder.set_data(Bytes::from(vec![0]), 1).is_err());
1665    }
1666
1667    #[test]
1668    fn test_rle_value_decode_missing_data() {
1669        let mut decoder = RleValueDecoder::<BoolType>::new();
1670        assert!(decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).is_err());
1671    }
1672
1673    #[test]
1674    fn test_rle_value_decode_bool_decode() {
1675        // Test multiple 'put' calls on the same encoder
1676        let data = vec![
1677            BoolType::gen_vec(-1, 256),
1678            BoolType::gen_vec(-1, 257),
1679            BoolType::gen_vec(-1, 126),
1680        ];
1681        test_rle_value_decode::<BoolType>(data);
1682    }
1683
1684    #[test]
1685    #[should_panic(expected = "Bit reader is not initialized")]
1686    fn test_delta_bit_packed_not_initialized_offset() {
1687        // Fail if set_data() is not called before get_offset()
1688        let decoder = DeltaBitPackDecoder::<Int32Type>::new();
1689        decoder.get_offset();
1690    }
1691
1692    #[test]
1693    #[should_panic(expected = "Bit reader is not initialized")]
1694    fn test_delta_bit_packed_not_initialized_get() {
1695        // Fail if set_data() is not called before get()
1696        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1697        let mut buffer = vec![];
1698        decoder.get(&mut buffer).unwrap();
1699    }
1700
1701    #[test]
1702    fn test_delta_bit_packed_int32_empty() {
1703        let data = vec![vec![0; 0]];
1704        test_delta_bit_packed_decode::<Int32Type>(data);
1705    }
1706
1707    #[test]
1708    fn test_delta_bit_packed_int32_repeat() {
1709        let block_data = vec![
1710            1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1711            6, 7, 8,
1712        ];
1713        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1714    }
1715
1716    #[test]
1717    fn test_skip_delta_bit_packed_int32_repeat() {
1718        let block_data = vec![
1719            1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1720            6, 7, 8,
1721        ];
1722        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 10);
1723        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1724    }
1725
1726    #[test]
1727    fn test_delta_bit_packed_int32_uneven() {
1728        let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1729        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1730    }
1731
1732    #[test]
1733    fn test_skip_delta_bit_packed_int32_uneven() {
1734        let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1735        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1736        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1737    }
1738
1739    #[test]
1740    fn test_delta_bit_packed_int32_same_values() {
1741        let block_data = vec![
1742            127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1743        ];
1744        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1745
1746        let block_data = vec![
1747            -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1748            -127, -127,
1749        ];
1750        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1751    }
1752
1753    #[test]
1754    fn test_skip_delta_bit_packed_int32_same_values() {
1755        let block_data = vec![
1756            127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1757        ];
1758        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1759        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1760
1761        let block_data = vec![
1762            -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1763            -127, -127,
1764        ];
1765        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1766        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1767    }
1768
1769    #[test]
1770    fn test_delta_bit_packed_int32_min_max() {
1771        let block_data = vec![
1772            i32::MIN,
1773            i32::MIN,
1774            i32::MIN,
1775            i32::MAX,
1776            i32::MIN,
1777            i32::MAX,
1778            i32::MIN,
1779            i32::MAX,
1780        ];
1781        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1782    }
1783
1784    #[test]
1785    fn test_skip_delta_bit_packed_int32_min_max() {
1786        let block_data = vec![
1787            i32::MIN,
1788            i32::MIN,
1789            i32::MIN,
1790            i32::MAX,
1791            i32::MIN,
1792            i32::MAX,
1793            i32::MIN,
1794            i32::MAX,
1795        ];
1796        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1797        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1798    }
1799
1800    #[test]
1801    fn test_skip_delta_bit_packed_bw0_uniform_step_i32() {
1802        // Uniform-step column: every delta equals min_delta, so bw=0 miniblocks.
1803        // Partial skip must advance last_value by n * min_delta (min_delta != 0 path).
1804        let data: Vec<i32> = (0..128).map(|i| i * 7).collect();
1805        test_skip::<Int32Type>(data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1806        test_skip::<Int32Type>(data, Encoding::DELTA_BINARY_PACKED, 200);
1807    }
1808
1809    #[test]
1810    fn test_skip_delta_bit_packed_bw0_uniform_step_i64() {
1811        // Same as above for i64.
1812        let data: Vec<i64> = (0..128).map(|i| i * 100).collect();
1813        test_skip::<Int64Type>(data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1814        test_skip::<Int64Type>(data, Encoding::DELTA_BINARY_PACKED, 200);
1815    }
1816
1817    #[test]
1818    fn test_delta_bit_packed_int32_multiple_blocks() {
1819        // Test multiple 'put' calls on the same encoder
1820        let data = vec![
1821            Int32Type::gen_vec(-1, 64),
1822            Int32Type::gen_vec(-1, 128),
1823            Int32Type::gen_vec(-1, 64),
1824        ];
1825        test_delta_bit_packed_decode::<Int32Type>(data);
1826    }
1827
1828    #[test]
1829    fn test_delta_bit_packed_int32_data_across_blocks() {
1830        // Test multiple 'put' calls on the same encoder
1831        let data = vec![Int32Type::gen_vec(-1, 256), Int32Type::gen_vec(-1, 257)];
1832        test_delta_bit_packed_decode::<Int32Type>(data);
1833    }
1834
1835    #[test]
1836    fn test_delta_bit_packed_int32_with_empty_blocks() {
1837        let data = vec![
1838            Int32Type::gen_vec(-1, 128),
1839            vec![0; 0],
1840            Int32Type::gen_vec(-1, 64),
1841        ];
1842        test_delta_bit_packed_decode::<Int32Type>(data);
1843    }
1844
1845    #[test]
1846    fn test_delta_bit_packed_int64_empty() {
1847        let data = vec![vec![0; 0]];
1848        test_delta_bit_packed_decode::<Int64Type>(data);
1849    }
1850
1851    #[test]
1852    fn test_delta_bit_packed_int64_min_max() {
1853        let block_data = vec![
1854            i64::MIN,
1855            i64::MAX,
1856            i64::MIN,
1857            i64::MAX,
1858            i64::MIN,
1859            i64::MAX,
1860            i64::MIN,
1861            i64::MAX,
1862        ];
1863        test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
1864    }
1865
1866    #[test]
1867    fn test_delta_bit_packed_int64_multiple_blocks() {
1868        // Test multiple 'put' calls on the same encoder
1869        let data = vec![
1870            Int64Type::gen_vec(-1, 64),
1871            Int64Type::gen_vec(-1, 128),
1872            Int64Type::gen_vec(-1, 64),
1873        ];
1874        test_delta_bit_packed_decode::<Int64Type>(data);
1875    }
1876
1877    #[test]
1878    fn test_delta_bit_packed_zero_miniblocks() {
1879        // It is invalid for mini_blocks_per_block to be 0
1880        let data = vec![
1881            128, 1, // block_size = 128
1882            0, // mini_blocks_per_block = 0
1883        ];
1884        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1885        let err = decoder.set_data(data.into(), 0).unwrap_err();
1886        assert_eq!(
1887            err.to_string(),
1888            "Parquet error: cannot have zero miniblocks per block"
1889        );
1890    }
1891
1892    #[test]
1893    fn test_delta_bit_packed_decoder_sample() {
1894        let data_bytes = vec![
1895            128, 1, 4, 3, 58, 28, 6, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1896            0, 0, 0, 0, 0, 0,
1897        ];
1898        let mut decoder: DeltaBitPackDecoder<Int32Type> = DeltaBitPackDecoder::new();
1899        decoder.set_data(data_bytes.into(), 3).unwrap();
1900        // check exact offsets, because when reading partial values we end up with
1901        // some data not being read from bit reader
1902        assert_eq!(decoder.get_offset(), 5);
1903        let mut result = vec![0, 0, 0];
1904        decoder.get(&mut result).unwrap();
1905        assert_eq!(decoder.get_offset(), 34);
1906        assert_eq!(result, vec![29, 43, 89]);
1907    }
1908
1909    #[test]
1910    fn test_delta_bit_packed_padding() {
1911        // Page header
1912        let header = vec![
1913            // Page Header
1914
1915            // Block Size - 256
1916            128,
1917            2,
1918            // Miniblocks in block,
1919            4,
1920            // Total value count - 419
1921            128 + 35,
1922            3,
1923            // First value - 7
1924            7,
1925        ];
1926
1927        // Block Header
1928        let block1_header = vec![
1929            0, // Min delta
1930            0, 1, 0, 0, // Bit widths
1931        ];
1932
1933        // Mini-block 1 - bit width 0 => 0 bytes
1934        // Mini-block 2 - bit width 1 => 8 bytes
1935        // Mini-block 3 - bit width 0 => 0 bytes
1936        // Mini-block 4 - bit width 0 => 0 bytes
1937        let block1 = vec![0xFF; 8];
1938
1939        // Block Header
1940        let block2_header = vec![
1941            0, // Min delta
1942            0, 1, 2, 0xFF, // Bit widths, including non-zero padding
1943        ];
1944
1945        // Mini-block 1 - bit width 0 => 0 bytes
1946        // Mini-block 2 - bit width 1 => 8 bytes
1947        // Mini-block 3 - bit width 2 => 16 bytes
1948        // Mini-block 4 - padding => no bytes
1949        let block2 = vec![0xFF; 24];
1950
1951        let data: Vec<u8> = header
1952            .into_iter()
1953            .chain(block1_header)
1954            .chain(block1)
1955            .chain(block2_header)
1956            .chain(block2)
1957            .collect();
1958
1959        let length = data.len();
1960
1961        let ptr = Bytes::from(data);
1962        let mut reader = BitReader::new(ptr.clone());
1963        assert_eq!(reader.get_vlq_int().unwrap(), 256);
1964        assert_eq!(reader.get_vlq_int().unwrap(), 4);
1965        assert_eq!(reader.get_vlq_int().unwrap(), 419);
1966        assert_eq!(reader.get_vlq_int().unwrap(), 7);
1967
1968        // Test output buffer larger than needed and not exact multiple of block size
1969        let mut output = vec![0_i32; 420];
1970
1971        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1972        decoder.set_data(ptr.clone(), 0).unwrap();
1973        assert_eq!(decoder.get(&mut output).unwrap(), 419);
1974        assert_eq!(decoder.get_offset(), length);
1975
1976        // Test with truncated buffer
1977        decoder.set_data(ptr.slice(..12), 0).unwrap();
1978        let err = decoder.get(&mut output).unwrap_err().to_string();
1979        assert!(
1980            err.contains("Expected to read 64 values from miniblock got 8"),
1981            "{}",
1982            err
1983        );
1984    }
1985
1986    #[test]
1987    fn test_delta_bit_packed_int32_single_value_large() {
1988        let block_data = vec![3; 10240];
1989        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1990    }
1991
1992    #[test]
1993    fn test_delta_bit_packed_int32_single_value_skip_large() {
1994        let block_data = vec![3; 10240];
1995        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
1996        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 5000);
1997    }
1998
1999    #[test]
2000    fn test_delta_bit_packed_int32_increasing_value_large() {
2001        let block_data = (0i32..10240).collect();
2002        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
2003    }
2004
2005    #[test]
2006    fn test_delta_bit_packed_int32_increasing_value_skip_large() {
2007        let block_data = (0i32..10240).collect::<Vec<i32>>();
2008        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
2009        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 5000);
2010    }
2011
2012    #[test]
2013    fn test_delta_bit_packed_int32_stepped_value_large() {
2014        let block_data = (0i32..10240).map(|i| i / 2).collect();
2015        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
2016    }
2017
2018    #[test]
2019    fn test_delta_bit_packed_int32_stepped_value_skip_large() {
2020        let block_data = (0i32..10240).map(|i| i / 2).collect::<Vec<i32>>();
2021        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 50);
2022        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 5000);
2023    }
2024
2025    #[test]
2026    fn test_delta_bit_packed_int32_mixed_large() {
2027        // should be enough for 4 mini-blocks plus a little so we get some
2028        // mixed mini-blocks
2029        const BLOCK_SIZE: i32 = 133;
2030        let block1_data = (0..BLOCK_SIZE).map(|i| (i * 7) % 11).collect();
2031        let block2_data = vec![3; BLOCK_SIZE as usize];
2032        let block3_data = (0..BLOCK_SIZE).map(|i| (i * 5) % 13).collect();
2033        let block4_data = (0..BLOCK_SIZE).collect();
2034        let block5_data = (0..BLOCK_SIZE).map(|i| (i * 3) % 17).collect();
2035        test_delta_bit_packed_decode::<Int32Type>(vec![
2036            block1_data,
2037            block2_data,
2038            block3_data,
2039            block4_data,
2040            block5_data,
2041        ]);
2042    }
2043
2044    #[test]
2045    fn test_delta_bit_packed_int64_single_value_large() {
2046        let block_data = vec![5; 10240];
2047        test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
2048    }
2049
2050    #[test]
2051    fn test_delta_bit_packed_int64_increasing_value_large() {
2052        let block_data = (0i64..10240).collect();
2053        test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
2054    }
2055
2056    #[test]
2057    fn test_delta_byte_array_same_arrays() {
2058        let data = vec![
2059            vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])],
2060            vec![
2061                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
2062                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
2063            ],
2064            vec![
2065                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
2066                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
2067            ],
2068        ];
2069        test_delta_byte_array_decode(data);
2070    }
2071
2072    #[test]
2073    fn test_delta_byte_array_unique_arrays() {
2074        let data = vec![
2075            vec![ByteArray::from(vec![1])],
2076            vec![ByteArray::from(vec![2, 3]), ByteArray::from(vec![4, 5, 6])],
2077            vec![
2078                ByteArray::from(vec![7, 8]),
2079                ByteArray::from(vec![9, 0, 1, 2]),
2080            ],
2081        ];
2082        test_delta_byte_array_decode(data);
2083    }
2084
2085    #[test]
2086    fn test_delta_byte_array_single_array() {
2087        let data = vec![vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])]];
2088        test_delta_byte_array_decode(data);
2089    }
2090
2091    #[test]
2092    fn test_byte_stream_split_multiple_f32() {
2093        let data = vec![
2094            vec![
2095                f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
2096                f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
2097            ],
2098            vec![f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
2099        ];
2100        test_byte_stream_split_decode::<FloatType>(data, -1);
2101    }
2102
2103    #[test]
2104    fn test_byte_stream_split_f64() {
2105        let data = vec![vec![
2106            f64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
2107            f64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
2108        ]];
2109        test_byte_stream_split_decode::<DoubleType>(data, -1);
2110    }
2111
2112    #[test]
2113    fn test_byte_stream_split_multiple_i32() {
2114        let data = vec![
2115            vec![
2116                i32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
2117                i32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
2118            ],
2119            vec![i32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
2120        ];
2121        test_byte_stream_split_decode::<Int32Type>(data, -1);
2122    }
2123
2124    #[test]
2125    fn test_byte_stream_split_i64() {
2126        let data = vec![vec![
2127            i64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
2128            i64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
2129        ]];
2130        test_byte_stream_split_decode::<Int64Type>(data, -1);
2131    }
2132
2133    fn test_byte_stream_split_flba(type_width: usize) {
2134        let data = vec![
2135            vec![
2136                FixedLenByteArrayType::r#gen(type_width as i32),
2137                FixedLenByteArrayType::r#gen(type_width as i32),
2138            ],
2139            vec![FixedLenByteArrayType::r#gen(type_width as i32)],
2140        ];
2141        test_byte_stream_split_decode::<FixedLenByteArrayType>(data, type_width as i32);
2142    }
2143
2144    #[test]
2145    fn test_byte_stream_split_flba5() {
2146        test_byte_stream_split_flba(5);
2147    }
2148
2149    #[test]
2150    fn test_byte_stream_split_flba16() {
2151        test_byte_stream_split_flba(16);
2152    }
2153
2154    #[test]
2155    fn test_byte_stream_split_flba19() {
2156        test_byte_stream_split_flba(19);
2157    }
2158
2159    #[test]
2160    #[should_panic(expected = "Mismatched FixedLenByteArray sizes: 4 != 5")]
2161    fn test_byte_stream_split_flba_mismatch() {
2162        let data = vec![
2163            vec![
2164                FixedLenByteArray::from(vec![0xAA, 0xAB, 0xAC, 0xAD, 0xAE]),
2165                FixedLenByteArray::from(vec![0xBA, 0xBB, 0xBC, 0xBD, 0xBE]),
2166            ],
2167            vec![FixedLenByteArray::from(vec![0xCA, 0xCB, 0xCC, 0xCD])],
2168        ];
2169        test_byte_stream_split_decode::<FixedLenByteArrayType>(data, 5);
2170    }
2171
2172    #[test]
2173    #[should_panic(expected = "Input data length is not a multiple of type width 4")]
2174    fn test_byte_stream_split_flba_bad_input() {
2175        let mut decoder = VariableWidthByteStreamSplitDecoder::<FixedLenByteArrayType>::new(4);
2176        decoder
2177            .set_data(Bytes::from(vec![1, 2, 3, 4, 5]), 1)
2178            .unwrap();
2179    }
2180
2181    #[test]
2182    fn test_skip_byte_stream_split() {
2183        let block_data = vec![0.3, 0.4, 0.1, 4.10];
2184        test_skip::<FloatType>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
2185        test_skip::<DoubleType>(
2186            block_data.into_iter().map(|x| x as f64).collect(),
2187            Encoding::BYTE_STREAM_SPLIT,
2188            100,
2189        );
2190    }
2191
2192    #[test]
2193    fn test_skip_byte_stream_split_ints() {
2194        let block_data = vec![3, 4, 1, 5];
2195        test_skip::<Int32Type>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
2196        test_skip::<Int64Type>(
2197            block_data.into_iter().map(|x| x as i64).collect(),
2198            Encoding::BYTE_STREAM_SPLIT,
2199            100,
2200        );
2201    }
2202
2203    fn test_rle_value_decode<T: DataType>(data: Vec<Vec<T::T>>) {
2204        test_encode_decode::<T>(data, Encoding::RLE, -1);
2205    }
2206
2207    fn test_delta_bit_packed_decode<T: DataType>(data: Vec<Vec<T::T>>) {
2208        test_encode_decode::<T>(data, Encoding::DELTA_BINARY_PACKED, -1);
2209    }
2210
2211    fn test_byte_stream_split_decode<T: DataType>(data: Vec<Vec<T::T>>, type_width: i32) {
2212        test_encode_decode::<T>(data, Encoding::BYTE_STREAM_SPLIT, type_width);
2213    }
2214
2215    fn test_delta_byte_array_decode(data: Vec<Vec<ByteArray>>) {
2216        test_encode_decode::<ByteArrayType>(data, Encoding::DELTA_BYTE_ARRAY, -1);
2217    }
2218
2219    // Input data represents vector of data slices to write (test multiple `put()` calls)
2220    // For example,
2221    //   vec![vec![1, 2, 3]] invokes `put()` once and writes {1, 2, 3}
2222    //   vec![vec![1, 2], vec![3]] invokes `put()` twice and writes {1, 2, 3}
2223    fn test_encode_decode<T: DataType>(data: Vec<Vec<T::T>>, encoding: Encoding, type_width: i32) {
2224        let col_descr = create_test_col_desc_ptr(type_width, T::get_physical_type());
2225
2226        // Encode data
2227        let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
2228
2229        for v in &data[..] {
2230            encoder.put(&v[..]).expect("ok to encode");
2231        }
2232        let bytes = encoder.flush_buffer().expect("ok to flush buffer");
2233
2234        // Flatten expected data as contiguous array of values
2235        let expected: Vec<T::T> = data.iter().flat_map(|s| s.clone()).collect();
2236
2237        // Decode data and compare with original
2238        let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
2239
2240        let mut result = vec![T::T::default(); expected.len()];
2241        decoder
2242            .set_data(bytes, expected.len())
2243            .expect("ok to set data");
2244        let mut result_num_values = 0;
2245        while decoder.values_left() > 0 {
2246            result_num_values += decoder
2247                .get(&mut result[result_num_values..])
2248                .expect("ok to decode");
2249        }
2250        assert_eq!(result_num_values, expected.len());
2251        assert_eq!(result, expected);
2252    }
2253
2254    fn test_skip<T: DataType>(data: Vec<T::T>, encoding: Encoding, skip: usize) {
2255        // Type length should not really matter for encode/decode test,
2256        // otherwise change it based on type
2257        let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());
2258
2259        // Encode data
2260        let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
2261
2262        encoder.put(&data).expect("ok to encode");
2263
2264        let bytes = encoder.flush_buffer().expect("ok to flush buffer");
2265
2266        let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
2267        decoder.set_data(bytes, data.len()).expect("ok to set data");
2268
2269        if skip >= data.len() {
2270            let skipped = decoder.skip(skip).expect("ok to skip");
2271            assert_eq!(skipped, data.len());
2272
2273            let skipped_again = decoder.skip(skip).expect("ok to skip again");
2274            assert_eq!(skipped_again, 0);
2275        } else {
2276            let skipped = decoder.skip(skip).expect("ok to skip");
2277            assert_eq!(skipped, skip);
2278
2279            let remaining = data.len() - skip;
2280
2281            let expected = &data[skip..];
2282            let mut buffer = vec![T::T::default(); remaining];
2283            let fetched = decoder.get(&mut buffer).expect("ok to decode");
2284            assert_eq!(remaining, fetched);
2285            assert_eq!(&buffer, expected);
2286        }
2287    }
2288
2289    fn create_and_check_decoder<T: DataType>(encoding: Encoding, err: Option<ParquetError>) {
2290        let descr = create_test_col_desc_ptr(-1, T::get_physical_type());
2291        let decoder = get_decoder::<T>(descr, encoding);
2292        match err {
2293            Some(parquet_error) => {
2294                assert_eq!(
2295                    decoder.err().unwrap().to_string(),
2296                    parquet_error.to_string()
2297                );
2298            }
2299            None => {
2300                assert_eq!(decoder.unwrap().encoding(), encoding);
2301            }
2302        }
2303    }
2304
2305    // Creates test column descriptor.
2306    fn create_test_col_desc_ptr(type_len: i32, t: Type) -> ColumnDescPtr {
2307        let ty = SchemaType::primitive_type_builder("t", t)
2308            .with_length(type_len)
2309            .build()
2310            .unwrap();
2311        Arc::new(ColumnDescriptor::new(
2312            Arc::new(ty),
2313            0,
2314            0,
2315            ColumnPath::new(vec![]),
2316        ))
2317    }
2318
2319    fn usize_to_bytes(v: usize) -> [u8; 4] {
2320        (v as u32).to_ne_bytes()
2321    }
2322
2323    /// A util trait to convert slices of different types to byte arrays
2324    trait ToByteArray<T: DataType> {
2325        #[allow(clippy::wrong_self_convention)]
2326        fn to_byte_array(data: &[T::T]) -> Vec<u8>;
2327    }
2328
2329    macro_rules! to_byte_array_impl {
2330        ($ty: ty) => {
2331            impl ToByteArray<$ty> for $ty {
2332                #[allow(clippy::wrong_self_convention)]
2333                fn to_byte_array(data: &[<$ty as DataType>::T]) -> Vec<u8> {
2334                    <$ty as DataType>::T::slice_as_bytes(data).to_vec()
2335                }
2336            }
2337        };
2338    }
2339
2340    to_byte_array_impl!(Int32Type);
2341    to_byte_array_impl!(Int64Type);
2342    to_byte_array_impl!(FloatType);
2343    to_byte_array_impl!(DoubleType);
2344
2345    impl ToByteArray<BoolType> for BoolType {
2346        #[allow(clippy::wrong_self_convention)]
2347        fn to_byte_array(data: &[bool]) -> Vec<u8> {
2348            let mut v = vec![];
2349            for (i, item) in data.iter().enumerate() {
2350                if i % 8 == 0 {
2351                    v.push(0);
2352                }
2353                if *item {
2354                    v[i / 8] |= 1 << (i % 8);
2355                }
2356            }
2357            v
2358        }
2359    }
2360
2361    impl ToByteArray<Int96Type> for Int96Type {
2362        #[allow(clippy::wrong_self_convention)]
2363        fn to_byte_array(data: &[Int96]) -> Vec<u8> {
2364            let mut v = vec![];
2365            for d in data {
2366                v.extend_from_slice(d.as_bytes());
2367            }
2368            v
2369        }
2370    }
2371
2372    impl ToByteArray<ByteArrayType> for ByteArrayType {
2373        #[allow(clippy::wrong_self_convention)]
2374        fn to_byte_array(data: &[ByteArray]) -> Vec<u8> {
2375            let mut v = vec![];
2376            for d in data {
2377                let buf = d.data();
2378                let len = &usize_to_bytes(buf.len());
2379                v.extend_from_slice(len);
2380                v.extend(buf);
2381            }
2382            v
2383        }
2384    }
2385
2386    impl ToByteArray<FixedLenByteArrayType> for FixedLenByteArrayType {
2387        #[allow(clippy::wrong_self_convention)]
2388        fn to_byte_array(data: &[FixedLenByteArray]) -> Vec<u8> {
2389            let mut v = vec![];
2390            for d in data {
2391                let buf = d.data();
2392                v.extend(buf);
2393            }
2394            v
2395        }
2396    }
2397
2398    #[test]
2399    // Allow initializing a vector and pushing to it for clarity in this test
2400    #[allow(clippy::vec_init_then_push)]
2401    fn test_delta_bit_packed_invalid_bit_width() {
2402        // Manually craft a buffer with an invalid bit width
2403        let mut buffer = vec![];
2404        // block_size = 128
2405        buffer.push(128);
2406        buffer.push(1);
2407        // mini_blocks_per_block = 4
2408        buffer.push(4);
2409        // num_values = 32
2410        buffer.push(32);
2411        // first_value = 0
2412        buffer.push(0);
2413        // min_delta = 0
2414        buffer.push(0);
2415        // bit_widths, one for each of the 4 mini blocks
2416        buffer.push(33); // Invalid bit width
2417        buffer.push(0);
2418        buffer.push(0);
2419        buffer.push(0);
2420
2421        let corrupted_buffer = Bytes::from(buffer);
2422
2423        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
2424        decoder.set_data(corrupted_buffer.clone(), 32).unwrap();
2425        let mut read_buffer = vec![0; 32];
2426        let err = decoder.get(&mut read_buffer).unwrap_err();
2427        assert!(
2428            err.to_string()
2429                .contains("Invalid delta bit width 33 which is larger than expected 32"),
2430            "{}",
2431            err
2432        );
2433
2434        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
2435        decoder.set_data(corrupted_buffer, 32).unwrap();
2436        let err = decoder.skip(32).unwrap_err();
2437        assert!(
2438            err.to_string()
2439                .contains("Invalid delta bit width 33 which is larger than expected 32"),
2440            "{}",
2441            err
2442        );
2443    }
2444}