Skip to main content

parquet/encodings/
decoding.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains all supported decoders for Parquet.
19
20use bytes::Bytes;
21use num_traits::{FromPrimitive, WrappingAdd};
22use std::{cmp, marker::PhantomData, mem};
23
24use super::rle::RleDecoder;
25
26use crate::basic::*;
27use crate::data_type::private::ParquetValueType;
28use crate::data_type::*;
29use crate::encodings::decoding::byte_stream_split_decoder::{
30    ByteStreamSplitDecoder, VariableWidthByteStreamSplitDecoder,
31};
32use crate::errors::{ParquetError, Result};
33use crate::schema::types::ColumnDescPtr;
34use crate::util::bit_util::{self, BitReader};
35
36mod byte_stream_split_decoder;
37
38pub(crate) mod private {
39    use super::*;
40
41    /// A trait that allows getting a [`Decoder`] implementation for a [`DataType`] with
42    /// the corresponding [`ParquetValueType`]. This is necessary to support
43    /// [`Decoder`] implementations that may not be applicable for all [`DataType`]
44    /// and by extension all [`ParquetValueType`]
45    pub trait GetDecoder {
46        fn get_decoder<T: DataType<T = Self>>(
47            descr: ColumnDescPtr,
48            encoding: Encoding,
49        ) -> Result<Box<dyn Decoder<T>>> {
50            get_decoder_default(descr, encoding)
51        }
52    }
53
54    fn get_decoder_default<T: DataType>(
55        descr: ColumnDescPtr,
56        encoding: Encoding,
57    ) -> Result<Box<dyn Decoder<T>>> {
58        match encoding {
59            Encoding::PLAIN => Ok(Box::new(PlainDecoder::new(descr.type_length()))),
60            Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Err(general_err!(
61                "Cannot initialize this encoding through this function"
62            )),
63            Encoding::RLE
64            | Encoding::DELTA_BINARY_PACKED
65            | Encoding::DELTA_BYTE_ARRAY
66            | Encoding::DELTA_LENGTH_BYTE_ARRAY => Err(general_err!(
67                "Encoding {} is not supported for type",
68                encoding
69            )),
70            e => Err(nyi_err!("Encoding {} is not supported", e)),
71        }
72    }
73
74    impl GetDecoder for bool {
75        fn get_decoder<T: DataType<T = Self>>(
76            descr: ColumnDescPtr,
77            encoding: Encoding,
78        ) -> Result<Box<dyn Decoder<T>>> {
79            match encoding {
80                Encoding::RLE => Ok(Box::new(RleValueDecoder::new())),
81                _ => get_decoder_default(descr, encoding),
82            }
83        }
84    }
85
86    impl GetDecoder for i32 {
87        fn get_decoder<T: DataType<T = Self>>(
88            descr: ColumnDescPtr,
89            encoding: Encoding,
90        ) -> Result<Box<dyn Decoder<T>>> {
91            match encoding {
92                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
93                Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
94                _ => get_decoder_default(descr, encoding),
95            }
96        }
97    }
98
99    impl GetDecoder for i64 {
100        fn get_decoder<T: DataType<T = Self>>(
101            descr: ColumnDescPtr,
102            encoding: Encoding,
103        ) -> Result<Box<dyn Decoder<T>>> {
104            match encoding {
105                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
106                Encoding::DELTA_BINARY_PACKED => Ok(Box::new(DeltaBitPackDecoder::new())),
107                _ => get_decoder_default(descr, encoding),
108            }
109        }
110    }
111
112    impl GetDecoder for f32 {
113        fn get_decoder<T: DataType<T = Self>>(
114            descr: ColumnDescPtr,
115            encoding: Encoding,
116        ) -> Result<Box<dyn Decoder<T>>> {
117            match encoding {
118                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
119                _ => get_decoder_default(descr, encoding),
120            }
121        }
122    }
123    impl GetDecoder for f64 {
124        fn get_decoder<T: DataType<T = Self>>(
125            descr: ColumnDescPtr,
126            encoding: Encoding,
127        ) -> Result<Box<dyn Decoder<T>>> {
128            match encoding {
129                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(ByteStreamSplitDecoder::new())),
130                _ => get_decoder_default(descr, encoding),
131            }
132        }
133    }
134
135    impl GetDecoder for ByteArray {
136        fn get_decoder<T: DataType<T = Self>>(
137            descr: ColumnDescPtr,
138            encoding: Encoding,
139        ) -> Result<Box<dyn Decoder<T>>> {
140            match encoding {
141                Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
142                Encoding::DELTA_LENGTH_BYTE_ARRAY => {
143                    Ok(Box::new(DeltaLengthByteArrayDecoder::new()))
144                }
145                _ => get_decoder_default(descr, encoding),
146            }
147        }
148    }
149
150    impl GetDecoder for FixedLenByteArray {
151        fn get_decoder<T: DataType<T = Self>>(
152            descr: ColumnDescPtr,
153            encoding: Encoding,
154        ) -> Result<Box<dyn Decoder<T>>> {
155            match encoding {
156                Encoding::BYTE_STREAM_SPLIT => Ok(Box::new(
157                    VariableWidthByteStreamSplitDecoder::new(descr.type_length()),
158                )),
159                Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayDecoder::new())),
160                _ => get_decoder_default(descr, encoding),
161            }
162        }
163    }
164
165    impl GetDecoder for Int96 {}
166}
167
168// ----------------------------------------------------------------------
169// Decoders
170
171/// A Parquet decoder for the data type `T`.
172pub trait Decoder<T: DataType>: Send {
173    /// Sets the data to decode to be `data`, which should contain `num_values` of values
174    /// to decode.
175    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()>;
176
177    /// Consumes values from this decoder and write the results to `buffer`. This will try
178    /// to fill up `buffer`.
179    ///
180    /// Returns the actual number of values decoded, which should be equal to
181    /// `buffer.len()` unless the remaining number of values is less than
182    /// `buffer.len()`.
183    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize>;
184
185    /// Consume values from this decoder and write the results to `buffer`, leaving
186    /// "spaces" for null values.
187    ///
188    /// `null_count` is the number of nulls we expect to see in `buffer`, after reading.
189    /// `valid_bits` stores the valid bit for each value in the buffer. It should contain
190    ///   at least number of bits that equal to `buffer.len()`.
191    ///
192    /// Returns the actual number of values decoded.
193    ///
194    /// # Panics
195    ///
196    /// Panics if `null_count` is greater than `buffer.len()`.
197    fn get_spaced(
198        &mut self,
199        buffer: &mut [T::T],
200        null_count: usize,
201        valid_bits: &[u8],
202    ) -> Result<usize> {
203        assert!(buffer.len() >= null_count);
204
205        // TODO: check validity of the input arguments?
206        if null_count == 0 {
207            return self.get(buffer);
208        }
209
210        let num_values = buffer.len();
211        let values_to_read = num_values - null_count;
212        let values_read = self.get(buffer)?;
213        if values_read != values_to_read {
214            return Err(general_err!(
215                "Number of values read: {}, doesn't match expected: {}",
216                values_read,
217                values_to_read
218            ));
219        }
220        let mut values_to_move = values_read;
221        for i in (0..num_values).rev() {
222            if bit_util::get_bit(valid_bits, i) {
223                values_to_move -= 1;
224                buffer.swap(i, values_to_move);
225            }
226        }
227
228        Ok(num_values)
229    }
230
231    /// Returns the number of values left in this decoder stream.
232    fn values_left(&self) -> usize;
233
234    /// Returns the encoding for this decoder.
235    fn encoding(&self) -> Encoding;
236
237    /// Skip the specified number of values in this decoder stream.
238    fn skip(&mut self, num_values: usize) -> Result<usize>;
239}
240
241/// Gets a decoder for the column descriptor `descr` and encoding type `encoding`.
242///
243/// NOTE: the primitive type in `descr` MUST match the data type `T`, otherwise
244/// disastrous consequence could occur.
245pub fn get_decoder<T: DataType>(
246    descr: ColumnDescPtr,
247    encoding: Encoding,
248) -> Result<Box<dyn Decoder<T>>> {
249    use self::private::GetDecoder;
250    T::T::get_decoder(descr, encoding)
251}
252
253// ----------------------------------------------------------------------
254// PLAIN Decoding
255
256#[derive(Default)]
257pub struct PlainDecoderDetails {
258    // The remaining number of values in the byte array
259    pub(crate) num_values: usize,
260
261    // The current starting index in the byte array. Not used when `T` is bool.
262    pub(crate) start: usize,
263
264    // The length for the type `T`. Only used when `T` is `FixedLenByteArrayType`
265    pub(crate) type_length: i32,
266
267    // The byte array to decode from. Not set if `T` is bool.
268    pub(crate) data: Option<Bytes>,
269
270    // Read `data` bit by bit. Only set if `T` is bool.
271    pub(crate) bit_reader: Option<BitReader>,
272}
273
274/// Plain decoding that supports all types.
275///
276/// Values are encoded back to back. For native types, data is encoded as little endian.
277/// Floating point types are encoded in IEEE.
278/// See [`PlainEncoder`](crate::encoding::PlainEncoder) for more information.
279pub struct PlainDecoder<T: DataType> {
280    // The binary details needed for decoding
281    inner: PlainDecoderDetails,
282
283    // To allow `T` in the generic parameter for this struct. This doesn't take any
284    // space.
285    _phantom: PhantomData<T>,
286}
287
288impl<T: DataType> PlainDecoder<T> {
289    /// Creates new plain decoder.
290    pub fn new(type_length: i32) -> Self {
291        PlainDecoder {
292            inner: PlainDecoderDetails {
293                type_length,
294                num_values: 0,
295                start: 0,
296                data: None,
297                bit_reader: None,
298            },
299            _phantom: PhantomData,
300        }
301    }
302}
303
304impl<T: DataType> Decoder<T> for PlainDecoder<T> {
305    #[inline]
306    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
307        T::T::set_data(&mut self.inner, data, num_values);
308        Ok(())
309    }
310
311    #[inline]
312    fn values_left(&self) -> usize {
313        self.inner.num_values
314    }
315
316    #[inline]
317    fn encoding(&self) -> Encoding {
318        Encoding::PLAIN
319    }
320
321    #[inline]
322    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
323        T::T::decode(buffer, &mut self.inner)
324    }
325
326    #[inline]
327    fn skip(&mut self, num_values: usize) -> Result<usize> {
328        T::T::skip(&mut self.inner, num_values)
329    }
330}
331
332// ----------------------------------------------------------------------
333// RLE_DICTIONARY/PLAIN_DICTIONARY Decoding
334
335/// Dictionary decoder.
336///
337/// The dictionary encoding builds a dictionary of values encountered in a given column.
338/// The dictionary is be stored in a dictionary page per column chunk.
339/// See [`DictEncoder`](crate::encoding::DictEncoder) for more information.
340pub struct DictDecoder<T: DataType> {
341    // The dictionary, which maps ids to the values
342    dictionary: Vec<T::T>,
343
344    // Whether `dictionary` has been initialized
345    has_dictionary: bool,
346
347    // The decoder for the value ids
348    rle_decoder: Option<RleDecoder>,
349
350    // Number of values left in the data stream
351    num_values: usize,
352}
353
354impl<T: DataType> Default for DictDecoder<T> {
355    fn default() -> Self {
356        Self::new()
357    }
358}
359
360impl<T: DataType> DictDecoder<T> {
361    /// Creates new dictionary decoder.
362    pub fn new() -> Self {
363        Self {
364            dictionary: vec![],
365            has_dictionary: false,
366            rle_decoder: None,
367            num_values: 0,
368        }
369    }
370
371    /// Decodes and sets values for dictionary using `decoder` decoder.
372    pub fn set_dict(&mut self, mut decoder: Box<dyn Decoder<T>>) -> Result<()> {
373        let num_values = decoder.values_left();
374        self.dictionary.resize(num_values, T::T::default());
375        let _ = decoder.get(&mut self.dictionary)?;
376        self.has_dictionary = true;
377        Ok(())
378    }
379}
380
381impl<T: DataType> Decoder<T> for DictDecoder<T> {
382    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
383        // First byte in `data` is bit width
384        if data.is_empty() {
385            return Err(eof_err!("Not enough bytes to decode bit_width"));
386        }
387
388        let bit_width = data.as_ref()[0];
389        if bit_width > 32 {
390            return Err(general_err!(
391                "Invalid or corrupted RLE bit width {}. Max allowed is 32",
392                bit_width
393            ));
394        }
395        let mut rle_decoder = RleDecoder::new(bit_width);
396        rle_decoder.set_data(data.slice(1..))?;
397        self.num_values = num_values;
398        self.rle_decoder = Some(rle_decoder);
399        Ok(())
400    }
401
402    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
403        assert!(self.rle_decoder.is_some());
404        assert!(self.has_dictionary, "Must call set_dict() first!");
405
406        let rle = self.rle_decoder.as_mut().unwrap();
407        let num_values = cmp::min(buffer.len(), self.num_values);
408        rle.get_batch_with_dict(&self.dictionary[..], buffer, num_values)
409    }
410
411    /// Number of values left in this decoder stream
412    fn values_left(&self) -> usize {
413        self.num_values
414    }
415
416    fn encoding(&self) -> Encoding {
417        Encoding::RLE_DICTIONARY
418    }
419
420    fn skip(&mut self, num_values: usize) -> Result<usize> {
421        assert!(self.rle_decoder.is_some());
422        assert!(self.has_dictionary, "Must call set_dict() first!");
423
424        let rle = self.rle_decoder.as_mut().unwrap();
425        let num_values = cmp::min(num_values, self.num_values);
426        rle.skip(num_values)
427    }
428}
429
430// ----------------------------------------------------------------------
431// RLE Decoding
432
433/// RLE/Bit-Packing hybrid decoding for values.
434/// Currently is used only for data pages v2 and supports boolean types.
435/// See [`RleValueEncoder`](crate::encoding::RleValueEncoder) for more information.
436pub struct RleValueDecoder<T: DataType> {
437    values_left: usize,
438    decoder: RleDecoder,
439    _phantom: PhantomData<T>,
440}
441
442impl<T: DataType> Default for RleValueDecoder<T> {
443    fn default() -> Self {
444        Self::new()
445    }
446}
447
448impl<T: DataType> RleValueDecoder<T> {
449    pub fn new() -> Self {
450        Self {
451            values_left: 0,
452            decoder: RleDecoder::new(1),
453            _phantom: PhantomData,
454        }
455    }
456}
457
458impl<T: DataType> Decoder<T> for RleValueDecoder<T> {
459    #[inline]
460    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
461        // Only support RLE value reader for boolean values with bit width of 1.
462        ensure_phys_ty!(Type::BOOLEAN, "RleValueDecoder only supports BoolType");
463
464        // We still need to remove prefix of i32 from the stream.
465        const I32_SIZE: usize = mem::size_of::<i32>();
466        if data.len() < I32_SIZE {
467            return Err(eof_err!("Not enough bytes to decode"));
468        }
469        let data_size = bit_util::read_num_bytes::<i32>(I32_SIZE, data.as_ref()) as usize;
470        if data.len() - I32_SIZE < data_size {
471            return Err(eof_err!("Not enough bytes to decode"));
472        }
473
474        self.decoder = RleDecoder::new(1);
475        self.decoder
476            .set_data(data.slice(I32_SIZE..I32_SIZE + data_size))?;
477        self.values_left = num_values;
478        Ok(())
479    }
480
481    #[inline]
482    fn values_left(&self) -> usize {
483        self.values_left
484    }
485
486    #[inline]
487    fn encoding(&self) -> Encoding {
488        Encoding::RLE
489    }
490
491    #[inline]
492    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
493        let num_values = cmp::min(buffer.len(), self.values_left);
494        let values_read = self.decoder.get_batch(&mut buffer[..num_values])?;
495        self.values_left -= values_read;
496        Ok(values_read)
497    }
498
499    #[inline]
500    fn skip(&mut self, num_values: usize) -> Result<usize> {
501        let num_values = cmp::min(num_values, self.values_left);
502        let values_skipped = self.decoder.skip(num_values)?;
503        self.values_left -= values_skipped;
504        Ok(values_skipped)
505    }
506}
507
508// ----------------------------------------------------------------------
509// DELTA_BINARY_PACKED Decoding
510
511/// Delta binary packed decoder.
512/// Supports INT32 and INT64 types.
513/// See [`DeltaBitPackEncoder`](crate::encoding::DeltaBitPackEncoder) for more
514/// information.
515pub struct DeltaBitPackDecoder<T: DataType> {
516    bit_reader: BitReader,
517    initialized: bool,
518
519    // Header info
520    /// The number of values in each block
521    block_size: usize,
522    /// The number of values that remain to be read in the current page
523    values_left: usize,
524    /// The number of mini-blocks in each block
525    mini_blocks_per_block: usize,
526    /// The number of values in each mini block
527    values_per_mini_block: usize,
528
529    // Per block info
530    /// The minimum delta in the block
531    min_delta: T::T,
532    /// The byte offset of the end of the current block
533    block_end_offset: usize,
534    /// The index on the current mini block
535    mini_block_idx: usize,
536    /// The bit widths of each mini block in the current block
537    mini_block_bit_widths: Vec<u8>,
538    /// The number of values remaining in the current mini block
539    mini_block_remaining: usize,
540
541    /// The first value from the block header if not consumed
542    first_value: Option<T::T>,
543    /// The last value to compute offsets from
544    last_value: T::T,
545}
546
547impl<T: DataType> Default for DeltaBitPackDecoder<T>
548where
549    T::T: Default + FromPrimitive + WrappingAdd + Copy,
550{
551    fn default() -> Self {
552        Self::new()
553    }
554}
555
556impl<T: DataType> DeltaBitPackDecoder<T>
557where
558    T::T: Default + FromPrimitive + WrappingAdd + Copy,
559{
560    /// Creates new delta bit packed decoder.
561    pub fn new() -> Self {
562        Self {
563            bit_reader: BitReader::from(vec![]),
564            initialized: false,
565            block_size: 0,
566            values_left: 0,
567            mini_blocks_per_block: 0,
568            values_per_mini_block: 0,
569            min_delta: Default::default(),
570            mini_block_idx: 0,
571            mini_block_bit_widths: vec![],
572            mini_block_remaining: 0,
573            block_end_offset: 0,
574            first_value: None,
575            last_value: Default::default(),
576        }
577    }
578
579    /// Returns the current offset
580    pub fn get_offset(&self) -> usize {
581        assert!(self.initialized, "Bit reader is not initialized");
582        match self.values_left {
583            // If we've exhausted this page report the end of the current block
584            // as we may not have consumed the trailing padding
585            //
586            // The max is necessary to handle pages which don't contain more than
587            // one value and therefore have no blocks, but still contain a page header
588            0 => self.bit_reader.get_byte_offset().max(self.block_end_offset),
589            _ => self.bit_reader.get_byte_offset(),
590        }
591    }
592
593    /// Initializes the next block and the first mini block within it
594    #[inline]
595    fn next_block(&mut self) -> Result<()> {
596        let min_delta = self
597            .bit_reader
598            .get_zigzag_vlq_int()
599            .ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?;
600
601        self.min_delta =
602            T::T::from_i64(min_delta).ok_or_else(|| general_err!("'min_delta' too large"))?;
603
604        self.mini_block_bit_widths.clear();
605        self.bit_reader
606            .get_aligned_bytes(&mut self.mini_block_bit_widths, self.mini_blocks_per_block);
607
608        let mut offset = self.bit_reader.get_byte_offset();
609        let mut remaining = self.values_left;
610
611        // Compute the end offset of the current block
612        for b in &mut self.mini_block_bit_widths {
613            if remaining == 0 {
614                // Specification requires handling arbitrary bit widths
615                // for trailing mini blocks
616                *b = 0;
617            }
618            remaining = remaining.saturating_sub(self.values_per_mini_block);
619            offset += *b as usize * self.values_per_mini_block / 8;
620        }
621        self.block_end_offset = offset;
622
623        if self.mini_block_bit_widths.len() != self.mini_blocks_per_block {
624            return Err(eof_err!("insufficient mini block bit widths"));
625        }
626
627        self.mini_block_remaining = self.values_per_mini_block;
628        self.mini_block_idx = 0;
629
630        Ok(())
631    }
632
633    /// Initializes the next mini block
634    #[inline]
635    fn next_mini_block(&mut self) -> Result<()> {
636        if self.mini_block_idx + 1 < self.mini_block_bit_widths.len() {
637            self.mini_block_idx += 1;
638            self.mini_block_remaining = self.values_per_mini_block;
639            Ok(())
640        } else {
641            self.next_block()
642        }
643    }
644
645    /// Verify the bit width is smaller then the integer type that it is trying to decode.
646    #[inline]
647    fn check_bit_width(&self, bit_width: usize) -> Result<()> {
648        if bit_width > std::mem::size_of::<T::T>() * 8 {
649            return Err(general_err!(
650                "Invalid delta bit width {} which is larger than expected {} ",
651                bit_width,
652                std::mem::size_of::<T::T>() * 8
653            ));
654        }
655        Ok(())
656    }
657}
658
659impl<T: DataType> Decoder<T> for DeltaBitPackDecoder<T>
660where
661    T::T: Default + FromPrimitive + WrappingAdd + Copy,
662{
663    // # of total values is derived from encoding
664    #[inline]
665    fn set_data(&mut self, data: Bytes, _index: usize) -> Result<()> {
666        self.bit_reader = BitReader::new(data);
667        self.initialized = true;
668
669        // Read header information
670        self.block_size = self
671            .bit_reader
672            .get_vlq_int()
673            .ok_or_else(|| eof_err!("Not enough data to decode 'block_size'"))?
674            .try_into()
675            .map_err(|_| general_err!("invalid 'block_size'"))?;
676
677        self.mini_blocks_per_block = self
678            .bit_reader
679            .get_vlq_int()
680            .ok_or_else(|| eof_err!("Not enough data to decode 'mini_blocks_per_block'"))?
681            .try_into()
682            .map_err(|_| general_err!("invalid 'mini_blocks_per_block'"))?;
683
684        if self.mini_blocks_per_block == 0 {
685            return Err(general_err!("cannot have zero miniblocks per block"));
686        }
687
688        self.values_left = self
689            .bit_reader
690            .get_vlq_int()
691            .ok_or_else(|| eof_err!("Not enough data to decode 'values_left'"))?
692            .try_into()
693            .map_err(|_| general_err!("invalid 'values_left'"))?;
694
695        let first_value = self
696            .bit_reader
697            .get_zigzag_vlq_int()
698            .ok_or_else(|| eof_err!("Not enough data to decode 'first_value'"))?;
699
700        self.first_value =
701            Some(T::T::from_i64(first_value).ok_or_else(|| general_err!("first value too large"))?);
702
703        if self.block_size % 128 != 0 {
704            return Err(general_err!(
705                "'block_size' must be a multiple of 128, got {}",
706                self.block_size
707            ));
708        }
709
710        if self.block_size % self.mini_blocks_per_block != 0 {
711            return Err(general_err!(
712                "'block_size' must be a multiple of 'mini_blocks_per_block' got {} and {}",
713                self.block_size,
714                self.mini_blocks_per_block
715            ));
716        }
717
718        // Reset decoding state
719        self.mini_block_idx = 0;
720        self.values_per_mini_block = self.block_size / self.mini_blocks_per_block;
721        self.mini_block_remaining = 0;
722        self.mini_block_bit_widths.clear();
723
724        if self.values_per_mini_block % 32 != 0 {
725            return Err(general_err!(
726                "'values_per_mini_block' must be a multiple of 32 got {}",
727                self.values_per_mini_block
728            ));
729        }
730
731        Ok(())
732    }
733
734    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
735        assert!(self.initialized, "Bit reader is not initialized");
736        if buffer.is_empty() {
737            return Ok(0);
738        }
739
740        let mut read = 0;
741        let to_read = buffer.len().min(self.values_left);
742
743        if let Some(value) = self.first_value.take() {
744            self.last_value = value;
745            buffer[0] = value;
746            read += 1;
747            self.values_left -= 1;
748        }
749
750        while read != to_read {
751            if self.mini_block_remaining == 0 {
752                self.next_mini_block()?;
753            }
754
755            let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
756            self.check_bit_width(bit_width)?;
757            let batch_to_read = self.mini_block_remaining.min(to_read - read);
758
759            let batch_read = self
760                .bit_reader
761                .get_batch(&mut buffer[read..read + batch_to_read], bit_width);
762
763            if batch_read != batch_to_read {
764                return Err(general_err!(
765                    "Expected to read {} values from miniblock got {}",
766                    batch_to_read,
767                    batch_read
768                ));
769            }
770
771            // At this point we have read the deltas to `buffer` we now need to offset
772            // these to get back to the original values that were encoded
773            for v in &mut buffer[read..read + batch_read] {
774                // It is OK for deltas to contain "overflowed" values after encoding,
775                // e.g. i64::MAX - i64::MIN, so we use `wrapping_add` to "overflow" again and
776                // restore original value.
777                *v = v
778                    .wrapping_add(&self.min_delta)
779                    .wrapping_add(&self.last_value);
780
781                self.last_value = *v;
782            }
783
784            read += batch_read;
785            self.mini_block_remaining -= batch_read;
786            self.values_left -= batch_read;
787        }
788
789        Ok(to_read)
790    }
791
792    fn values_left(&self) -> usize {
793        self.values_left
794    }
795
796    fn encoding(&self) -> Encoding {
797        Encoding::DELTA_BINARY_PACKED
798    }
799
800    fn skip(&mut self, num_values: usize) -> Result<usize> {
801        let mut skip = 0;
802        let to_skip = num_values.min(self.values_left);
803        if to_skip == 0 {
804            return Ok(0);
805        }
806
807        // try to consume first value in header.
808        if let Some(value) = self.first_value.take() {
809            self.last_value = value;
810            skip += 1;
811            self.values_left -= 1;
812        }
813
814        let mini_block_batch_size = match T::T::PHYSICAL_TYPE {
815            Type::INT32 => 32,
816            Type::INT64 => 64,
817            _ => unreachable!(),
818        };
819
820        let mut skip_buffer = vec![T::T::default(); mini_block_batch_size];
821        while skip < to_skip {
822            if self.mini_block_remaining == 0 {
823                self.next_mini_block()?;
824            }
825
826            let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
827            self.check_bit_width(bit_width)?;
828            let mini_block_to_skip = self.mini_block_remaining.min(to_skip - skip);
829            let mini_block_should_skip = mini_block_to_skip;
830
831            let skip_count = self
832                .bit_reader
833                .get_batch(&mut skip_buffer[0..mini_block_to_skip], bit_width);
834
835            if skip_count != mini_block_to_skip {
836                return Err(general_err!(
837                    "Expected to skip {} values from mini block got {}.",
838                    mini_block_batch_size,
839                    skip_count
840                ));
841            }
842
843            for v in &mut skip_buffer[0..skip_count] {
844                *v = v
845                    .wrapping_add(&self.min_delta)
846                    .wrapping_add(&self.last_value);
847
848                self.last_value = *v;
849            }
850
851            skip += mini_block_should_skip;
852            self.mini_block_remaining -= mini_block_should_skip;
853            self.values_left -= mini_block_should_skip;
854        }
855
856        Ok(to_skip)
857    }
858}
859
860// ----------------------------------------------------------------------
861// DELTA_LENGTH_BYTE_ARRAY Decoding
862
863/// Delta length byte array decoder.
864///
865/// Only applied to byte arrays to separate the length values and the data, the lengths
866/// are encoded using DELTA_BINARY_PACKED encoding.
867/// See [`DeltaLengthByteArrayEncoder`](crate::encoding::DeltaLengthByteArrayEncoder)
868/// for more information.
869pub struct DeltaLengthByteArrayDecoder<T: DataType> {
870    // Lengths for each byte array in `data`
871    // TODO: add memory tracker to this
872    lengths: Vec<i32>,
873
874    // Current index into `lengths`
875    current_idx: usize,
876
877    // Concatenated byte array data
878    data: Option<Bytes>,
879
880    // Offset into `data`, always point to the beginning of next byte array.
881    offset: usize,
882
883    // Number of values left in this decoder stream
884    num_values: usize,
885
886    // Placeholder to allow `T` as generic parameter
887    _phantom: PhantomData<T>,
888}
889
890impl<T: DataType> Default for DeltaLengthByteArrayDecoder<T> {
891    fn default() -> Self {
892        Self::new()
893    }
894}
895
896impl<T: DataType> DeltaLengthByteArrayDecoder<T> {
897    /// Creates new delta length byte array decoder.
898    pub fn new() -> Self {
899        Self {
900            lengths: vec![],
901            current_idx: 0,
902            data: None,
903            offset: 0,
904            num_values: 0,
905            _phantom: PhantomData,
906        }
907    }
908}
909
910impl<T: DataType> Decoder<T> for DeltaLengthByteArrayDecoder<T> {
911    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
912        match T::get_physical_type() {
913            Type::BYTE_ARRAY => {
914                let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
915                len_decoder.set_data(data.clone(), num_values)?;
916                let num_lengths = len_decoder.values_left();
917                self.lengths.resize(num_lengths, 0);
918                len_decoder.get(&mut self.lengths[..])?;
919
920                self.data = Some(data.slice(len_decoder.get_offset()..));
921                self.offset = 0;
922                self.current_idx = 0;
923                self.num_values = num_lengths;
924                Ok(())
925            }
926            _ => Err(general_err!(
927                "DeltaLengthByteArrayDecoder only support ByteArrayType"
928            )),
929        }
930    }
931
932    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
933        match T::get_physical_type() {
934            Type::BYTE_ARRAY => {
935                assert!(self.data.is_some());
936
937                let data = self.data.as_ref().unwrap();
938                let num_values = cmp::min(buffer.len(), self.num_values);
939
940                for item in buffer.iter_mut().take(num_values) {
941                    let len = self.lengths[self.current_idx] as usize;
942                    item.set_from_bytes(data.slice(self.offset..self.offset + len));
943
944                    self.offset += len;
945                    self.current_idx += 1;
946                }
947
948                self.num_values -= num_values;
949                Ok(num_values)
950            }
951            _ => Err(general_err!(
952                "DeltaLengthByteArrayDecoder only support ByteArrayType"
953            )),
954        }
955    }
956
957    fn values_left(&self) -> usize {
958        self.num_values
959    }
960
961    fn encoding(&self) -> Encoding {
962        Encoding::DELTA_LENGTH_BYTE_ARRAY
963    }
964
965    fn skip(&mut self, num_values: usize) -> Result<usize> {
966        match T::get_physical_type() {
967            Type::BYTE_ARRAY => {
968                let num_values = cmp::min(num_values, self.num_values);
969
970                let next_offset: i32 = self.lengths
971                    [self.current_idx..self.current_idx + num_values]
972                    .iter()
973                    .sum();
974
975                self.current_idx += num_values;
976                self.offset += next_offset as usize;
977
978                self.num_values -= num_values;
979                Ok(num_values)
980            }
981            other_type => Err(general_err!(
982                "DeltaLengthByteArrayDecoder not support {}, only support byte array",
983                other_type
984            )),
985        }
986    }
987}
988
989// ----------------------------------------------------------------------
990// DELTA_BYTE_ARRAY Decoding
991
992/// Delta byte array decoder.
993///
994/// Prefix lengths are encoded using `DELTA_BINARY_PACKED` encoding, Suffixes are stored
995/// using `DELTA_LENGTH_BYTE_ARRAY` encoding.
996/// See [`DeltaByteArrayEncoder`](crate::encoding::DeltaByteArrayEncoder) for more
997/// information.
998pub struct DeltaByteArrayDecoder<T: DataType> {
999    // Prefix lengths for each byte array
1000    // TODO: add memory tracker to this
1001    prefix_lengths: Vec<i32>,
1002
1003    // The current index into `prefix_lengths`,
1004    current_idx: usize,
1005
1006    // Decoder for all suffixes, the # of which should be the same as
1007    // `prefix_lengths.len()`
1008    suffix_decoder: Option<DeltaLengthByteArrayDecoder<ByteArrayType>>,
1009
1010    // The last byte array, used to derive the current prefix
1011    // Stored as Bytes to avoid clone allocation when creating output
1012    previous_value: Bytes,
1013
1014    // Number of values left
1015    num_values: usize,
1016
1017    // Placeholder to allow `T` as generic parameter
1018    _phantom: PhantomData<T>,
1019}
1020
1021impl<T: DataType> Default for DeltaByteArrayDecoder<T> {
1022    fn default() -> Self {
1023        Self::new()
1024    }
1025}
1026
1027impl<T: DataType> DeltaByteArrayDecoder<T> {
1028    /// Creates new delta byte array decoder.
1029    pub fn new() -> Self {
1030        Self {
1031            prefix_lengths: vec![],
1032            current_idx: 0,
1033            suffix_decoder: None,
1034            previous_value: Bytes::new(),
1035            num_values: 0,
1036            _phantom: PhantomData,
1037        }
1038    }
1039}
1040
1041impl<T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
1042    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
1043        match T::get_physical_type() {
1044            Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1045                let mut prefix_len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
1046                prefix_len_decoder.set_data(data.clone(), num_values)?;
1047                let num_prefixes = prefix_len_decoder.values_left();
1048                self.prefix_lengths.resize(num_prefixes, 0);
1049                prefix_len_decoder.get(&mut self.prefix_lengths[..])?;
1050
1051                let mut suffix_decoder = DeltaLengthByteArrayDecoder::new();
1052                suffix_decoder
1053                    .set_data(data.slice(prefix_len_decoder.get_offset()..), num_values)?;
1054                self.suffix_decoder = Some(suffix_decoder);
1055                self.num_values = num_prefixes;
1056                self.current_idx = 0;
1057                self.previous_value = Bytes::new();
1058                Ok(())
1059            }
1060            _ => Err(general_err!(
1061                "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1062            )),
1063        }
1064    }
1065
1066    fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
1067        match T::get_physical_type() {
1068            Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
1069                let num_values = cmp::min(buffer.len(), self.num_values);
1070                let mut v: [ByteArray; 1] = [ByteArray::new(); 1];
1071                for item in buffer.iter_mut().take(num_values) {
1072                    // Process suffix
1073                    // TODO: this is awkward - maybe we should add a non-vectorized API?
1074                    let suffix_decoder = self
1075                        .suffix_decoder
1076                        .as_mut()
1077                        .expect("decoder not initialized");
1078                    suffix_decoder.get(&mut v[..])?;
1079                    let suffix = v[0].data();
1080
1081                    // Extract current prefix length, can be 0
1082                    let prefix_len = self.prefix_lengths[self.current_idx] as usize;
1083
1084                    // Concatenate prefix with suffix
1085                    let mut result = Vec::with_capacity(prefix_len + suffix.len());
1086                    result.extend_from_slice(&self.previous_value[0..prefix_len]);
1087                    result.extend_from_slice(suffix);
1088
1089                    let data = Bytes::from(result);
1090                    item.set_from_bytes(data.clone());
1091
1092                    self.previous_value = data;
1093                    self.current_idx += 1;
1094                }
1095
1096                self.num_values -= num_values;
1097                Ok(num_values)
1098            }
1099            _ => Err(general_err!(
1100                "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType"
1101            )),
1102        }
1103    }
1104
1105    fn values_left(&self) -> usize {
1106        self.num_values
1107    }
1108
1109    fn encoding(&self) -> Encoding {
1110        Encoding::DELTA_BYTE_ARRAY
1111    }
1112
1113    fn skip(&mut self, num_values: usize) -> Result<usize> {
1114        let mut buffer = vec![T::T::default(); num_values];
1115        self.get(&mut buffer)
1116    }
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121    use super::{super::encoding::*, *};
1122
1123    use std::f32::consts::PI as PI_f32;
1124    use std::f64::consts::PI as PI_f64;
1125    use std::sync::Arc;
1126
1127    use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
1128    use crate::util::test_common::rand_gen::RandGen;
1129
1130    #[test]
1131    fn test_get_decoders() {
1132        // supported encodings
1133        create_and_check_decoder::<Int32Type>(Encoding::PLAIN, None);
1134        create_and_check_decoder::<Int32Type>(Encoding::DELTA_BINARY_PACKED, None);
1135        create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
1136        create_and_check_decoder::<ByteArrayType>(Encoding::DELTA_BYTE_ARRAY, None);
1137        create_and_check_decoder::<BoolType>(Encoding::RLE, None);
1138
1139        // error when initializing
1140        create_and_check_decoder::<Int32Type>(
1141            Encoding::RLE_DICTIONARY,
1142            Some(general_err!(
1143                "Cannot initialize this encoding through this function"
1144            )),
1145        );
1146        create_and_check_decoder::<Int32Type>(
1147            Encoding::PLAIN_DICTIONARY,
1148            Some(general_err!(
1149                "Cannot initialize this encoding through this function"
1150            )),
1151        );
1152        create_and_check_decoder::<Int32Type>(
1153            Encoding::DELTA_LENGTH_BYTE_ARRAY,
1154            Some(general_err!(
1155                "Encoding DELTA_LENGTH_BYTE_ARRAY is not supported for type"
1156            )),
1157        );
1158        create_and_check_decoder::<Int32Type>(
1159            Encoding::DELTA_BYTE_ARRAY,
1160            Some(general_err!(
1161                "Encoding DELTA_BYTE_ARRAY is not supported for type"
1162            )),
1163        );
1164
1165        // unsupported
1166        #[allow(deprecated)]
1167        create_and_check_decoder::<Int32Type>(
1168            Encoding::BIT_PACKED,
1169            Some(nyi_err!("Encoding BIT_PACKED is not supported")),
1170        );
1171    }
1172
1173    #[test]
1174    fn test_plain_decode_int32() {
1175        let data = [42, 18, 52];
1176        let data_bytes = Int32Type::to_byte_array(&data[..]);
1177        let mut buffer = [0; 3];
1178        test_plain_decode::<Int32Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1179    }
1180
1181    #[test]
1182    fn test_plain_skip_int32() {
1183        let data = [42, 18, 52];
1184        let data_bytes = Int32Type::to_byte_array(&data[..]);
1185        test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1186    }
1187
1188    #[test]
1189    fn test_plain_skip_all_int32() {
1190        let data = [42, 18, 52];
1191        let data_bytes = Int32Type::to_byte_array(&data[..]);
1192        test_plain_skip::<Int32Type>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1193    }
1194
1195    #[test]
1196    fn test_plain_decode_int32_spaced() {
1197        let data = [42, 18, 52];
1198        let expected_data = [0, 42, 0, 18, 0, 0, 52, 0];
1199        let data_bytes = Int32Type::to_byte_array(&data[..]);
1200        let mut buffer = [0; 8];
1201        let num_nulls = 5;
1202        let valid_bits = [0b01001010];
1203        test_plain_decode_spaced::<Int32Type>(
1204            Bytes::from(data_bytes),
1205            3,
1206            -1,
1207            &mut buffer[..],
1208            num_nulls,
1209            &valid_bits,
1210            &expected_data[..],
1211        );
1212    }
1213
1214    #[test]
1215    fn test_plain_decode_int64() {
1216        let data = [42, 18, 52];
1217        let data_bytes = Int64Type::to_byte_array(&data[..]);
1218        let mut buffer = [0; 3];
1219        test_plain_decode::<Int64Type>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1220    }
1221
1222    #[test]
1223    fn test_plain_skip_int64() {
1224        let data = [42, 18, 52];
1225        let data_bytes = Int64Type::to_byte_array(&data[..]);
1226        test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 2, -1, &data[2..]);
1227    }
1228
1229    #[test]
1230    fn test_plain_skip_all_int64() {
1231        let data = [42, 18, 52];
1232        let data_bytes = Int64Type::to_byte_array(&data[..]);
1233        test_plain_skip::<Int64Type>(Bytes::from(data_bytes), 3, 3, -1, &[]);
1234    }
1235
1236    #[test]
1237    fn test_plain_decode_float() {
1238        let data = [PI_f32, 2.414, 12.51];
1239        let data_bytes = FloatType::to_byte_array(&data[..]);
1240        let mut buffer = [0.0; 3];
1241        test_plain_decode::<FloatType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1242    }
1243
1244    #[test]
1245    fn test_plain_skip_float() {
1246        let data = [PI_f32, 2.414, 12.51];
1247        let data_bytes = FloatType::to_byte_array(&data[..]);
1248        test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1249    }
1250
1251    #[test]
1252    fn test_plain_skip_all_float() {
1253        let data = [PI_f32, 2.414, 12.51];
1254        let data_bytes = FloatType::to_byte_array(&data[..]);
1255        test_plain_skip::<FloatType>(Bytes::from(data_bytes), 3, 4, -1, &[]);
1256    }
1257
1258    #[test]
1259    fn test_plain_skip_double() {
1260        let data = [PI_f64, 2.414f64, 12.51f64];
1261        let data_bytes = DoubleType::to_byte_array(&data[..]);
1262        test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 1, -1, &data[1..]);
1263    }
1264
1265    #[test]
1266    fn test_plain_skip_all_double() {
1267        let data = [PI_f64, 2.414f64, 12.51f64];
1268        let data_bytes = DoubleType::to_byte_array(&data[..]);
1269        test_plain_skip::<DoubleType>(Bytes::from(data_bytes), 3, 5, -1, &[]);
1270    }
1271
1272    #[test]
1273    fn test_plain_decode_double() {
1274        let data = [PI_f64, 2.414f64, 12.51f64];
1275        let data_bytes = DoubleType::to_byte_array(&data[..]);
1276        let mut buffer = [0.0f64; 3];
1277        test_plain_decode::<DoubleType>(Bytes::from(data_bytes), 3, -1, &mut buffer[..], &data[..]);
1278    }
1279
1280    #[test]
1281    fn test_plain_decode_int96() {
1282        let mut data = [Int96::new(); 4];
1283        data[0].set_data(11, 22, 33);
1284        data[1].set_data(44, 55, 66);
1285        data[2].set_data(10, 20, 30);
1286        data[3].set_data(40, 50, 60);
1287        let data_bytes = Int96Type::to_byte_array(&data[..]);
1288        let mut buffer = [Int96::new(); 4];
1289        test_plain_decode::<Int96Type>(Bytes::from(data_bytes), 4, -1, &mut buffer[..], &data[..]);
1290    }
1291
1292    #[test]
1293    fn test_plain_skip_int96() {
1294        let mut data = [Int96::new(); 4];
1295        data[0].set_data(11, 22, 33);
1296        data[1].set_data(44, 55, 66);
1297        data[2].set_data(10, 20, 30);
1298        data[3].set_data(40, 50, 60);
1299        let data_bytes = Int96Type::to_byte_array(&data[..]);
1300        test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 2, -1, &data[2..]);
1301    }
1302
1303    #[test]
1304    fn test_plain_skip_all_int96() {
1305        let mut data = [Int96::new(); 4];
1306        data[0].set_data(11, 22, 33);
1307        data[1].set_data(44, 55, 66);
1308        data[2].set_data(10, 20, 30);
1309        data[3].set_data(40, 50, 60);
1310        let data_bytes = Int96Type::to_byte_array(&data[..]);
1311        test_plain_skip::<Int96Type>(Bytes::from(data_bytes), 4, 8, -1, &[]);
1312    }
1313
1314    #[test]
1315    fn test_plain_decode_bool() {
1316        let data = [
1317            false, true, false, false, true, false, true, true, false, true,
1318        ];
1319        let data_bytes = BoolType::to_byte_array(&data[..]);
1320        let mut buffer = [false; 10];
1321        test_plain_decode::<BoolType>(Bytes::from(data_bytes), 10, -1, &mut buffer[..], &data[..]);
1322    }
1323
1324    #[test]
1325    fn test_plain_skip_bool() {
1326        let data = [
1327            false, true, false, false, true, false, true, true, false, true,
1328        ];
1329        let data_bytes = BoolType::to_byte_array(&data[..]);
1330        test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 5, -1, &data[5..]);
1331    }
1332
1333    #[test]
1334    fn test_plain_skip_all_bool() {
1335        let data = [
1336            false, true, false, false, true, false, true, true, false, true,
1337        ];
1338        let data_bytes = BoolType::to_byte_array(&data[..]);
1339        test_plain_skip::<BoolType>(Bytes::from(data_bytes), 10, 20, -1, &[]);
1340    }
1341
1342    #[test]
1343    fn test_plain_decode_byte_array() {
1344        let mut data = vec![ByteArray::new(); 2];
1345        data[0].set_data(Bytes::from(String::from("hello")));
1346        data[1].set_data(Bytes::from(String::from("parquet")));
1347        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1348        let mut buffer = vec![ByteArray::new(); 2];
1349        test_plain_decode::<ByteArrayType>(
1350            Bytes::from(data_bytes),
1351            2,
1352            -1,
1353            &mut buffer[..],
1354            &data[..],
1355        );
1356    }
1357
1358    #[test]
1359    fn test_plain_skip_byte_array() {
1360        let mut data = vec![ByteArray::new(); 2];
1361        data[0].set_data(Bytes::from(String::from("hello")));
1362        data[1].set_data(Bytes::from(String::from("parquet")));
1363        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1364        test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 1, -1, &data[1..]);
1365    }
1366
1367    #[test]
1368    fn test_plain_skip_all_byte_array() {
1369        let mut data = vec![ByteArray::new(); 2];
1370        data[0].set_data(Bytes::from(String::from("hello")));
1371        data[1].set_data(Bytes::from(String::from("parquet")));
1372        let data_bytes = ByteArrayType::to_byte_array(&data[..]);
1373        test_plain_skip::<ByteArrayType>(Bytes::from(data_bytes), 2, 2, -1, &[]);
1374    }
1375
1376    #[test]
1377    fn test_plain_decode_fixed_len_byte_array() {
1378        let mut data = vec![FixedLenByteArray::default(); 3];
1379        data[0].set_data(Bytes::from(String::from("bird")));
1380        data[1].set_data(Bytes::from(String::from("come")));
1381        data[2].set_data(Bytes::from(String::from("flow")));
1382        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1383        let mut buffer = vec![FixedLenByteArray::default(); 3];
1384        test_plain_decode::<FixedLenByteArrayType>(
1385            Bytes::from(data_bytes),
1386            3,
1387            4,
1388            &mut buffer[..],
1389            &data[..],
1390        );
1391    }
1392
1393    #[test]
1394    fn test_plain_skip_fixed_len_byte_array() {
1395        let mut data = vec![FixedLenByteArray::default(); 3];
1396        data[0].set_data(Bytes::from(String::from("bird")));
1397        data[1].set_data(Bytes::from(String::from("come")));
1398        data[2].set_data(Bytes::from(String::from("flow")));
1399        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1400        test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 1, 4, &data[1..]);
1401    }
1402
1403    #[test]
1404    fn test_plain_skip_all_fixed_len_byte_array() {
1405        let mut data = vec![FixedLenByteArray::default(); 3];
1406        data[0].set_data(Bytes::from(String::from("bird")));
1407        data[1].set_data(Bytes::from(String::from("come")));
1408        data[2].set_data(Bytes::from(String::from("flow")));
1409        let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]);
1410        test_plain_skip::<FixedLenByteArrayType>(Bytes::from(data_bytes), 3, 6, 4, &[]);
1411    }
1412
1413    #[test]
1414    fn test_dict_decoder_empty_data() {
1415        let mut decoder = DictDecoder::<Int32Type>::new();
1416        let err = decoder.set_data(Bytes::new(), 10).unwrap_err();
1417        assert_eq!(err.to_string(), "EOF: Not enough bytes to decode bit_width");
1418    }
1419
1420    fn test_plain_decode<T: DataType>(
1421        data: Bytes,
1422        num_values: usize,
1423        type_length: i32,
1424        buffer: &mut [T::T],
1425        expected: &[T::T],
1426    ) {
1427        let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1428        let result = decoder.set_data(data, num_values);
1429        assert!(result.is_ok());
1430        let result = decoder.get(buffer);
1431        assert!(result.is_ok());
1432        assert_eq!(decoder.values_left(), 0);
1433        assert_eq!(buffer, expected);
1434    }
1435
1436    fn test_plain_skip<T: DataType>(
1437        data: Bytes,
1438        num_values: usize,
1439        skip: usize,
1440        type_length: i32,
1441        expected: &[T::T],
1442    ) {
1443        let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1444        let result = decoder.set_data(data, num_values);
1445        assert!(result.is_ok());
1446        let skipped = decoder.skip(skip).expect("skipping values");
1447
1448        if skip >= num_values {
1449            assert_eq!(skipped, num_values);
1450
1451            let mut buffer = vec![T::T::default(); 1];
1452            let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1453            assert_eq!(remaining, 0);
1454        } else {
1455            assert_eq!(skipped, skip);
1456            let mut buffer = vec![T::T::default(); num_values - skip];
1457            let remaining = decoder.get(&mut buffer).expect("getting remaining values");
1458            assert_eq!(remaining, num_values - skip);
1459            assert_eq!(decoder.values_left(), 0);
1460            assert_eq!(buffer, expected);
1461        }
1462    }
1463
1464    fn test_plain_decode_spaced<T: DataType>(
1465        data: Bytes,
1466        num_values: usize,
1467        type_length: i32,
1468        buffer: &mut [T::T],
1469        num_nulls: usize,
1470        valid_bits: &[u8],
1471        expected: &[T::T],
1472    ) {
1473        let mut decoder: PlainDecoder<T> = PlainDecoder::new(type_length);
1474        let result = decoder.set_data(data, num_values);
1475        assert!(result.is_ok());
1476        let result = decoder.get_spaced(buffer, num_nulls, valid_bits);
1477        assert!(result.is_ok());
1478        assert_eq!(num_values + num_nulls, result.unwrap());
1479        assert_eq!(decoder.values_left(), 0);
1480        assert_eq!(buffer, expected);
1481    }
1482
1483    #[test]
1484    #[should_panic(expected = "RleValueEncoder only supports BoolType")]
1485    fn test_rle_value_encode_int32_not_supported() {
1486        let mut encoder = RleValueEncoder::<Int32Type>::new();
1487        encoder.put(&[1, 2, 3, 4]).unwrap();
1488    }
1489
1490    #[test]
1491    #[should_panic(expected = "RleValueDecoder only supports BoolType")]
1492    fn test_rle_value_decode_int32_not_supported() {
1493        let mut decoder = RleValueDecoder::<Int32Type>::new();
1494        decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).unwrap();
1495    }
1496
1497    #[test]
1498    fn test_rle_value_decode_missing_size() {
1499        let mut decoder = RleValueDecoder::<BoolType>::new();
1500        assert!(decoder.set_data(Bytes::from(vec![0]), 1).is_err());
1501    }
1502
1503    #[test]
1504    fn test_rle_value_decode_missing_data() {
1505        let mut decoder = RleValueDecoder::<BoolType>::new();
1506        assert!(decoder.set_data(Bytes::from(vec![5, 0, 0, 0]), 1).is_err());
1507    }
1508
1509    #[test]
1510    fn test_rle_value_decode_bool_decode() {
1511        // Test multiple 'put' calls on the same encoder
1512        let data = vec![
1513            BoolType::gen_vec(-1, 256),
1514            BoolType::gen_vec(-1, 257),
1515            BoolType::gen_vec(-1, 126),
1516        ];
1517        test_rle_value_decode::<BoolType>(data);
1518    }
1519
1520    #[test]
1521    #[should_panic(expected = "Bit reader is not initialized")]
1522    fn test_delta_bit_packed_not_initialized_offset() {
1523        // Fail if set_data() is not called before get_offset()
1524        let decoder = DeltaBitPackDecoder::<Int32Type>::new();
1525        decoder.get_offset();
1526    }
1527
1528    #[test]
1529    #[should_panic(expected = "Bit reader is not initialized")]
1530    fn test_delta_bit_packed_not_initialized_get() {
1531        // Fail if set_data() is not called before get()
1532        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1533        let mut buffer = vec![];
1534        decoder.get(&mut buffer).unwrap();
1535    }
1536
1537    #[test]
1538    fn test_delta_bit_packed_int32_empty() {
1539        let data = vec![vec![0; 0]];
1540        test_delta_bit_packed_decode::<Int32Type>(data);
1541    }
1542
1543    #[test]
1544    fn test_delta_bit_packed_int32_repeat() {
1545        let block_data = vec![
1546            1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1547            6, 7, 8,
1548        ];
1549        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1550    }
1551
1552    #[test]
1553    fn test_skip_delta_bit_packed_int32_repeat() {
1554        let block_data = vec![
1555            1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5,
1556            6, 7, 8,
1557        ];
1558        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 10);
1559        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1560    }
1561
1562    #[test]
1563    fn test_delta_bit_packed_int32_uneven() {
1564        let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1565        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1566    }
1567
1568    #[test]
1569    fn test_skip_delta_bit_packed_int32_uneven() {
1570        let block_data = vec![1, -2, 3, -4, 5, 6, 7, 8, 9, 10, 11];
1571        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1572        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1573    }
1574
1575    #[test]
1576    fn test_delta_bit_packed_int32_same_values() {
1577        let block_data = vec![
1578            127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1579        ];
1580        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1581
1582        let block_data = vec![
1583            -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1584            -127, -127,
1585        ];
1586        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1587    }
1588
1589    #[test]
1590    fn test_skip_delta_bit_packed_int32_same_values() {
1591        let block_data = vec![
1592            127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127, 127,
1593        ];
1594        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1595        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1596
1597        let block_data = vec![
1598            -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127, -127,
1599            -127, -127,
1600        ];
1601        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1602        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1603    }
1604
1605    #[test]
1606    fn test_delta_bit_packed_int32_min_max() {
1607        let block_data = vec![
1608            i32::MIN,
1609            i32::MIN,
1610            i32::MIN,
1611            i32::MAX,
1612            i32::MIN,
1613            i32::MAX,
1614            i32::MIN,
1615            i32::MAX,
1616        ];
1617        test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
1618    }
1619
1620    #[test]
1621    fn test_skip_delta_bit_packed_int32_min_max() {
1622        let block_data = vec![
1623            i32::MIN,
1624            i32::MIN,
1625            i32::MIN,
1626            i32::MAX,
1627            i32::MIN,
1628            i32::MAX,
1629            i32::MIN,
1630            i32::MAX,
1631        ];
1632        test_skip::<Int32Type>(block_data.clone(), Encoding::DELTA_BINARY_PACKED, 5);
1633        test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
1634    }
1635
1636    #[test]
1637    fn test_delta_bit_packed_int32_multiple_blocks() {
1638        // Test multiple 'put' calls on the same encoder
1639        let data = vec![
1640            Int32Type::gen_vec(-1, 64),
1641            Int32Type::gen_vec(-1, 128),
1642            Int32Type::gen_vec(-1, 64),
1643        ];
1644        test_delta_bit_packed_decode::<Int32Type>(data);
1645    }
1646
1647    #[test]
1648    fn test_delta_bit_packed_int32_data_across_blocks() {
1649        // Test multiple 'put' calls on the same encoder
1650        let data = vec![Int32Type::gen_vec(-1, 256), Int32Type::gen_vec(-1, 257)];
1651        test_delta_bit_packed_decode::<Int32Type>(data);
1652    }
1653
1654    #[test]
1655    fn test_delta_bit_packed_int32_with_empty_blocks() {
1656        let data = vec![
1657            Int32Type::gen_vec(-1, 128),
1658            vec![0; 0],
1659            Int32Type::gen_vec(-1, 64),
1660        ];
1661        test_delta_bit_packed_decode::<Int32Type>(data);
1662    }
1663
1664    #[test]
1665    fn test_delta_bit_packed_int64_empty() {
1666        let data = vec![vec![0; 0]];
1667        test_delta_bit_packed_decode::<Int64Type>(data);
1668    }
1669
1670    #[test]
1671    fn test_delta_bit_packed_int64_min_max() {
1672        let block_data = vec![
1673            i64::MIN,
1674            i64::MAX,
1675            i64::MIN,
1676            i64::MAX,
1677            i64::MIN,
1678            i64::MAX,
1679            i64::MIN,
1680            i64::MAX,
1681        ];
1682        test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
1683    }
1684
1685    #[test]
1686    fn test_delta_bit_packed_int64_multiple_blocks() {
1687        // Test multiple 'put' calls on the same encoder
1688        let data = vec![
1689            Int64Type::gen_vec(-1, 64),
1690            Int64Type::gen_vec(-1, 128),
1691            Int64Type::gen_vec(-1, 64),
1692        ];
1693        test_delta_bit_packed_decode::<Int64Type>(data);
1694    }
1695
1696    #[test]
1697    fn test_delta_bit_packed_zero_miniblocks() {
1698        // It is invalid for mini_blocks_per_block to be 0
1699        let data = vec![
1700            128, 1, // block_size = 128
1701            0, // mini_blocks_per_block = 0
1702        ];
1703        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1704        let err = decoder.set_data(data.into(), 0).unwrap_err();
1705        assert_eq!(
1706            err.to_string(),
1707            "Parquet error: cannot have zero miniblocks per block"
1708        );
1709    }
1710
1711    #[test]
1712    fn test_delta_bit_packed_decoder_sample() {
1713        let data_bytes = vec![
1714            128, 1, 4, 3, 58, 28, 6, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1715            0, 0, 0, 0, 0, 0,
1716        ];
1717        let mut decoder: DeltaBitPackDecoder<Int32Type> = DeltaBitPackDecoder::new();
1718        decoder.set_data(data_bytes.into(), 3).unwrap();
1719        // check exact offsets, because when reading partial values we end up with
1720        // some data not being read from bit reader
1721        assert_eq!(decoder.get_offset(), 5);
1722        let mut result = vec![0, 0, 0];
1723        decoder.get(&mut result).unwrap();
1724        assert_eq!(decoder.get_offset(), 34);
1725        assert_eq!(result, vec![29, 43, 89]);
1726    }
1727
1728    #[test]
1729    fn test_delta_bit_packed_padding() {
1730        // Page header
1731        let header = vec![
1732            // Page Header
1733
1734            // Block Size - 256
1735            128,
1736            2,
1737            // Miniblocks in block,
1738            4,
1739            // Total value count - 419
1740            128 + 35,
1741            3,
1742            // First value - 7
1743            7,
1744        ];
1745
1746        // Block Header
1747        let block1_header = vec![
1748            0, // Min delta
1749            0, 1, 0, 0, // Bit widths
1750        ];
1751
1752        // Mini-block 1 - bit width 0 => 0 bytes
1753        // Mini-block 2 - bit width 1 => 8 bytes
1754        // Mini-block 3 - bit width 0 => 0 bytes
1755        // Mini-block 4 - bit width 0 => 0 bytes
1756        let block1 = vec![0xFF; 8];
1757
1758        // Block Header
1759        let block2_header = vec![
1760            0, // Min delta
1761            0, 1, 2, 0xFF, // Bit widths, including non-zero padding
1762        ];
1763
1764        // Mini-block 1 - bit width 0 => 0 bytes
1765        // Mini-block 2 - bit width 1 => 8 bytes
1766        // Mini-block 3 - bit width 2 => 16 bytes
1767        // Mini-block 4 - padding => no bytes
1768        let block2 = vec![0xFF; 24];
1769
1770        let data: Vec<u8> = header
1771            .into_iter()
1772            .chain(block1_header)
1773            .chain(block1)
1774            .chain(block2_header)
1775            .chain(block2)
1776            .collect();
1777
1778        let length = data.len();
1779
1780        let ptr = Bytes::from(data);
1781        let mut reader = BitReader::new(ptr.clone());
1782        assert_eq!(reader.get_vlq_int().unwrap(), 256);
1783        assert_eq!(reader.get_vlq_int().unwrap(), 4);
1784        assert_eq!(reader.get_vlq_int().unwrap(), 419);
1785        assert_eq!(reader.get_vlq_int().unwrap(), 7);
1786
1787        // Test output buffer larger than needed and not exact multiple of block size
1788        let mut output = vec![0_i32; 420];
1789
1790        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1791        decoder.set_data(ptr.clone(), 0).unwrap();
1792        assert_eq!(decoder.get(&mut output).unwrap(), 419);
1793        assert_eq!(decoder.get_offset(), length);
1794
1795        // Test with truncated buffer
1796        decoder.set_data(ptr.slice(..12), 0).unwrap();
1797        let err = decoder.get(&mut output).unwrap_err().to_string();
1798        assert!(
1799            err.contains("Expected to read 64 values from miniblock got 8"),
1800            "{}",
1801            err
1802        );
1803    }
1804
1805    #[test]
1806    fn test_delta_byte_array_same_arrays() {
1807        let data = vec![
1808            vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])],
1809            vec![
1810                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1811                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1812            ],
1813            vec![
1814                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1815                ByteArray::from(vec![1, 2, 3, 4, 5, 6]),
1816            ],
1817        ];
1818        test_delta_byte_array_decode(data);
1819    }
1820
1821    #[test]
1822    fn test_delta_byte_array_unique_arrays() {
1823        let data = vec![
1824            vec![ByteArray::from(vec![1])],
1825            vec![ByteArray::from(vec![2, 3]), ByteArray::from(vec![4, 5, 6])],
1826            vec![
1827                ByteArray::from(vec![7, 8]),
1828                ByteArray::from(vec![9, 0, 1, 2]),
1829            ],
1830        ];
1831        test_delta_byte_array_decode(data);
1832    }
1833
1834    #[test]
1835    fn test_delta_byte_array_single_array() {
1836        let data = vec![vec![ByteArray::from(vec![1, 2, 3, 4, 5, 6])]];
1837        test_delta_byte_array_decode(data);
1838    }
1839
1840    #[test]
1841    fn test_byte_stream_split_multiple_f32() {
1842        let data = vec![
1843            vec![
1844                f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
1845                f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
1846            ],
1847            vec![f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
1848        ];
1849        test_byte_stream_split_decode::<FloatType>(data, -1);
1850    }
1851
1852    #[test]
1853    fn test_byte_stream_split_f64() {
1854        let data = vec![vec![
1855            f64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
1856            f64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
1857        ]];
1858        test_byte_stream_split_decode::<DoubleType>(data, -1);
1859    }
1860
1861    #[test]
1862    fn test_byte_stream_split_multiple_i32() {
1863        let data = vec![
1864            vec![
1865                i32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
1866                i32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
1867            ],
1868            vec![i32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6])],
1869        ];
1870        test_byte_stream_split_decode::<Int32Type>(data, -1);
1871    }
1872
1873    #[test]
1874    fn test_byte_stream_split_i64() {
1875        let data = vec![vec![
1876            i64::from_le_bytes([0, 1, 2, 3, 4, 5, 6, 7]),
1877            i64::from_le_bytes([8, 9, 10, 11, 12, 13, 14, 15]),
1878        ]];
1879        test_byte_stream_split_decode::<Int64Type>(data, -1);
1880    }
1881
1882    fn test_byte_stream_split_flba(type_width: usize) {
1883        let data = vec![
1884            vec![
1885                FixedLenByteArrayType::r#gen(type_width as i32),
1886                FixedLenByteArrayType::r#gen(type_width as i32),
1887            ],
1888            vec![FixedLenByteArrayType::r#gen(type_width as i32)],
1889        ];
1890        test_byte_stream_split_decode::<FixedLenByteArrayType>(data, type_width as i32);
1891    }
1892
1893    #[test]
1894    fn test_byte_stream_split_flba5() {
1895        test_byte_stream_split_flba(5);
1896    }
1897
1898    #[test]
1899    fn test_byte_stream_split_flba16() {
1900        test_byte_stream_split_flba(16);
1901    }
1902
1903    #[test]
1904    fn test_byte_stream_split_flba19() {
1905        test_byte_stream_split_flba(19);
1906    }
1907
1908    #[test]
1909    #[should_panic(expected = "Mismatched FixedLenByteArray sizes: 4 != 5")]
1910    fn test_byte_stream_split_flba_mismatch() {
1911        let data = vec![
1912            vec![
1913                FixedLenByteArray::from(vec![0xAA, 0xAB, 0xAC, 0xAD, 0xAE]),
1914                FixedLenByteArray::from(vec![0xBA, 0xBB, 0xBC, 0xBD, 0xBE]),
1915            ],
1916            vec![FixedLenByteArray::from(vec![0xCA, 0xCB, 0xCC, 0xCD])],
1917        ];
1918        test_byte_stream_split_decode::<FixedLenByteArrayType>(data, 5);
1919    }
1920
1921    #[test]
1922    #[should_panic(expected = "Input data length is not a multiple of type width 4")]
1923    fn test_byte_stream_split_flba_bad_input() {
1924        let mut decoder = VariableWidthByteStreamSplitDecoder::<FixedLenByteArrayType>::new(4);
1925        decoder
1926            .set_data(Bytes::from(vec![1, 2, 3, 4, 5]), 1)
1927            .unwrap();
1928    }
1929
1930    #[test]
1931    fn test_skip_byte_stream_split() {
1932        let block_data = vec![0.3, 0.4, 0.1, 4.10];
1933        test_skip::<FloatType>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
1934        test_skip::<DoubleType>(
1935            block_data.into_iter().map(|x| x as f64).collect(),
1936            Encoding::BYTE_STREAM_SPLIT,
1937            100,
1938        );
1939    }
1940
1941    #[test]
1942    fn test_skip_byte_stream_split_ints() {
1943        let block_data = vec![3, 4, 1, 5];
1944        test_skip::<Int32Type>(block_data.clone(), Encoding::BYTE_STREAM_SPLIT, 2);
1945        test_skip::<Int64Type>(
1946            block_data.into_iter().map(|x| x as i64).collect(),
1947            Encoding::BYTE_STREAM_SPLIT,
1948            100,
1949        );
1950    }
1951
1952    fn test_rle_value_decode<T: DataType>(data: Vec<Vec<T::T>>) {
1953        test_encode_decode::<T>(data, Encoding::RLE, -1);
1954    }
1955
1956    fn test_delta_bit_packed_decode<T: DataType>(data: Vec<Vec<T::T>>) {
1957        test_encode_decode::<T>(data, Encoding::DELTA_BINARY_PACKED, -1);
1958    }
1959
1960    fn test_byte_stream_split_decode<T: DataType>(data: Vec<Vec<T::T>>, type_width: i32) {
1961        test_encode_decode::<T>(data, Encoding::BYTE_STREAM_SPLIT, type_width);
1962    }
1963
1964    fn test_delta_byte_array_decode(data: Vec<Vec<ByteArray>>) {
1965        test_encode_decode::<ByteArrayType>(data, Encoding::DELTA_BYTE_ARRAY, -1);
1966    }
1967
1968    // Input data represents vector of data slices to write (test multiple `put()` calls)
1969    // For example,
1970    //   vec![vec![1, 2, 3]] invokes `put()` once and writes {1, 2, 3}
1971    //   vec![vec![1, 2], vec![3]] invokes `put()` twice and writes {1, 2, 3}
1972    fn test_encode_decode<T: DataType>(data: Vec<Vec<T::T>>, encoding: Encoding, type_width: i32) {
1973        let col_descr = create_test_col_desc_ptr(type_width, T::get_physical_type());
1974
1975        // Encode data
1976        let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
1977
1978        for v in &data[..] {
1979            encoder.put(&v[..]).expect("ok to encode");
1980        }
1981        let bytes = encoder.flush_buffer().expect("ok to flush buffer");
1982
1983        // Flatten expected data as contiguous array of values
1984        let expected: Vec<T::T> = data.iter().flat_map(|s| s.clone()).collect();
1985
1986        // Decode data and compare with original
1987        let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
1988
1989        let mut result = vec![T::T::default(); expected.len()];
1990        decoder
1991            .set_data(bytes, expected.len())
1992            .expect("ok to set data");
1993        let mut result_num_values = 0;
1994        while decoder.values_left() > 0 {
1995            result_num_values += decoder
1996                .get(&mut result[result_num_values..])
1997                .expect("ok to decode");
1998        }
1999        assert_eq!(result_num_values, expected.len());
2000        assert_eq!(result, expected);
2001    }
2002
2003    fn test_skip<T: DataType>(data: Vec<T::T>, encoding: Encoding, skip: usize) {
2004        // Type length should not really matter for encode/decode test,
2005        // otherwise change it based on type
2006        let col_descr = create_test_col_desc_ptr(-1, T::get_physical_type());
2007
2008        // Encode data
2009        let mut encoder = get_encoder::<T>(encoding, &col_descr).expect("get encoder");
2010
2011        encoder.put(&data).expect("ok to encode");
2012
2013        let bytes = encoder.flush_buffer().expect("ok to flush buffer");
2014
2015        let mut decoder = get_decoder::<T>(col_descr, encoding).expect("get decoder");
2016        decoder.set_data(bytes, data.len()).expect("ok to set data");
2017
2018        if skip >= data.len() {
2019            let skipped = decoder.skip(skip).expect("ok to skip");
2020            assert_eq!(skipped, data.len());
2021
2022            let skipped_again = decoder.skip(skip).expect("ok to skip again");
2023            assert_eq!(skipped_again, 0);
2024        } else {
2025            let skipped = decoder.skip(skip).expect("ok to skip");
2026            assert_eq!(skipped, skip);
2027
2028            let remaining = data.len() - skip;
2029
2030            let expected = &data[skip..];
2031            let mut buffer = vec![T::T::default(); remaining];
2032            let fetched = decoder.get(&mut buffer).expect("ok to decode");
2033            assert_eq!(remaining, fetched);
2034            assert_eq!(&buffer, expected);
2035        }
2036    }
2037
2038    fn create_and_check_decoder<T: DataType>(encoding: Encoding, err: Option<ParquetError>) {
2039        let descr = create_test_col_desc_ptr(-1, T::get_physical_type());
2040        let decoder = get_decoder::<T>(descr, encoding);
2041        match err {
2042            Some(parquet_error) => {
2043                assert_eq!(
2044                    decoder.err().unwrap().to_string(),
2045                    parquet_error.to_string()
2046                );
2047            }
2048            None => {
2049                assert_eq!(decoder.unwrap().encoding(), encoding);
2050            }
2051        }
2052    }
2053
2054    // Creates test column descriptor.
2055    fn create_test_col_desc_ptr(type_len: i32, t: Type) -> ColumnDescPtr {
2056        let ty = SchemaType::primitive_type_builder("t", t)
2057            .with_length(type_len)
2058            .build()
2059            .unwrap();
2060        Arc::new(ColumnDescriptor::new(
2061            Arc::new(ty),
2062            0,
2063            0,
2064            ColumnPath::new(vec![]),
2065        ))
2066    }
2067
2068    fn usize_to_bytes(v: usize) -> [u8; 4] {
2069        (v as u32).to_ne_bytes()
2070    }
2071
2072    /// A util trait to convert slices of different types to byte arrays
2073    trait ToByteArray<T: DataType> {
2074        #[allow(clippy::wrong_self_convention)]
2075        fn to_byte_array(data: &[T::T]) -> Vec<u8>;
2076    }
2077
2078    macro_rules! to_byte_array_impl {
2079        ($ty: ty) => {
2080            impl ToByteArray<$ty> for $ty {
2081                #[allow(clippy::wrong_self_convention)]
2082                fn to_byte_array(data: &[<$ty as DataType>::T]) -> Vec<u8> {
2083                    <$ty as DataType>::T::slice_as_bytes(data).to_vec()
2084                }
2085            }
2086        };
2087    }
2088
2089    to_byte_array_impl!(Int32Type);
2090    to_byte_array_impl!(Int64Type);
2091    to_byte_array_impl!(FloatType);
2092    to_byte_array_impl!(DoubleType);
2093
2094    impl ToByteArray<BoolType> for BoolType {
2095        #[allow(clippy::wrong_self_convention)]
2096        fn to_byte_array(data: &[bool]) -> Vec<u8> {
2097            let mut v = vec![];
2098            for (i, item) in data.iter().enumerate() {
2099                if i % 8 == 0 {
2100                    v.push(0);
2101                }
2102                if *item {
2103                    v[i / 8] |= 1 << (i % 8);
2104                }
2105            }
2106            v
2107        }
2108    }
2109
2110    impl ToByteArray<Int96Type> for Int96Type {
2111        #[allow(clippy::wrong_self_convention)]
2112        fn to_byte_array(data: &[Int96]) -> Vec<u8> {
2113            let mut v = vec![];
2114            for d in data {
2115                v.extend_from_slice(d.as_bytes());
2116            }
2117            v
2118        }
2119    }
2120
2121    impl ToByteArray<ByteArrayType> for ByteArrayType {
2122        #[allow(clippy::wrong_self_convention)]
2123        fn to_byte_array(data: &[ByteArray]) -> Vec<u8> {
2124            let mut v = vec![];
2125            for d in data {
2126                let buf = d.data();
2127                let len = &usize_to_bytes(buf.len());
2128                v.extend_from_slice(len);
2129                v.extend(buf);
2130            }
2131            v
2132        }
2133    }
2134
2135    impl ToByteArray<FixedLenByteArrayType> for FixedLenByteArrayType {
2136        #[allow(clippy::wrong_self_convention)]
2137        fn to_byte_array(data: &[FixedLenByteArray]) -> Vec<u8> {
2138            let mut v = vec![];
2139            for d in data {
2140                let buf = d.data();
2141                v.extend(buf);
2142            }
2143            v
2144        }
2145    }
2146
2147    #[test]
2148    // Allow initializing a vector and pushing to it for clarity in this test
2149    #[allow(clippy::vec_init_then_push)]
2150    fn test_delta_bit_packed_invalid_bit_width() {
2151        // Manually craft a buffer with an invalid bit width
2152        let mut buffer = vec![];
2153        // block_size = 128
2154        buffer.push(128);
2155        buffer.push(1);
2156        // mini_blocks_per_block = 4
2157        buffer.push(4);
2158        // num_values = 32
2159        buffer.push(32);
2160        // first_value = 0
2161        buffer.push(0);
2162        // min_delta = 0
2163        buffer.push(0);
2164        // bit_widths, one for each of the 4 mini blocks
2165        buffer.push(33); // Invalid bit width
2166        buffer.push(0);
2167        buffer.push(0);
2168        buffer.push(0);
2169
2170        let corrupted_buffer = Bytes::from(buffer);
2171
2172        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
2173        decoder.set_data(corrupted_buffer.clone(), 32).unwrap();
2174        let mut read_buffer = vec![0; 32];
2175        let err = decoder.get(&mut read_buffer).unwrap_err();
2176        assert!(
2177            err.to_string()
2178                .contains("Invalid delta bit width 33 which is larger than expected 32"),
2179            "{}",
2180            err
2181        );
2182
2183        let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
2184        decoder.set_data(corrupted_buffer, 32).unwrap();
2185        let err = decoder.skip(32).unwrap_err();
2186        assert!(
2187            err.to_string()
2188                .contains("Invalid delta bit width 33 which is larger than expected 32"),
2189            "{}",
2190            err
2191        );
2192    }
2193}