Skip to main content

parquet/encodings/encoding/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains all supported encoders for Parquet.
19
20use std::{cmp, marker::PhantomData};
21
22use crate::basic::*;
23use crate::data_type::private::ParquetValueType;
24use crate::data_type::*;
25use crate::encodings::rle::RleEncoder;
26use crate::errors::{ParquetError, Result};
27use crate::schema::types::ColumnDescPtr;
28use crate::util::bit_util::{BitWriter, num_required_bits};
29
30use byte_stream_split_encoder::{ByteStreamSplitEncoder, VariableWidthByteStreamSplitEncoder};
31use bytes::Bytes;
32pub use dict_encoder::DictEncoder;
33
34mod byte_stream_split_encoder;
35mod dict_encoder;
36
37// ----------------------------------------------------------------------
38// Encoders
39
40/// An Parquet encoder for the data type `T`.
41///
42/// Currently this allocates internal buffers for the encoded values. After done putting
43/// values, caller should call `flush_buffer()` to get an immutable buffer pointer.
44pub trait Encoder<T: DataType>: Send {
45    /// Encodes data from `values`.
46    fn put(&mut self, values: &[T::T]) -> Result<()>;
47
48    /// Encodes data from `values`, which contains spaces for null values, that is
49    /// identified by `valid_bits`.
50    ///
51    /// Returns the number of non-null values encoded.
52    #[cfg(test)]
53    fn put_spaced(&mut self, values: &[T::T], valid_bits: &[u8]) -> Result<usize> {
54        let num_values = values.len();
55        let mut buffer = Vec::with_capacity(num_values);
56        // TODO: this is pretty inefficient. Revisit in future.
57        for (i, item) in values.iter().enumerate().take(num_values) {
58            if crate::util::bit_util::get_bit(valid_bits, i) {
59                buffer.push(item.clone());
60            }
61        }
62        self.put(&buffer[..])?;
63        Ok(buffer.len())
64    }
65
66    /// Returns the encoding type of this encoder.
67    fn encoding(&self) -> Encoding;
68
69    /// Returns an estimate of the encoded data, in bytes.
70    /// Method call must be O(1).
71    fn estimated_data_encoded_size(&self) -> usize;
72
73    /// Returns an estimate of the memory use of this encoder, in bytes
74    fn estimated_memory_size(&self) -> usize;
75
76    /// Flushes the underlying byte buffer that's being processed by this encoder, and
77    /// return the immutable copy of it. This will also reset the internal state.
78    fn flush_buffer(&mut self) -> Result<Bytes>;
79}
80
81/// Gets a encoder for the particular data type `T` and encoding `encoding`. Memory usage
82/// for the encoder instance is tracked by `mem_tracker`.
83pub fn get_encoder<T: DataType>(
84    encoding: Encoding,
85    descr: &ColumnDescPtr,
86) -> Result<Box<dyn Encoder<T>>> {
87    let encoder: Box<dyn Encoder<T>> = match encoding {
88        Encoding::PLAIN => Box::new(PlainEncoder::new()),
89        Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
90            return Err(general_err!(
91                "Cannot initialize this encoding through this function"
92            ));
93        }
94        Encoding::RLE => Box::new(RleValueEncoder::new()),
95        Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackEncoder::new()),
96        Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayEncoder::new()),
97        Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayEncoder::new()),
98        Encoding::BYTE_STREAM_SPLIT => match T::get_physical_type() {
99            Type::FIXED_LEN_BYTE_ARRAY => Box::new(VariableWidthByteStreamSplitEncoder::new(
100                descr.type_length(),
101            )),
102            _ => Box::new(ByteStreamSplitEncoder::new()),
103        },
104        e => return Err(nyi_err!("Encoding {} is not supported", e)),
105    };
106    Ok(encoder)
107}
108
109// ----------------------------------------------------------------------
110// Plain encoding
111
112/// Plain encoding that supports all types.
113/// Values are encoded back to back.
114/// The plain encoding is used whenever a more efficient encoding can not be used.
115/// It stores the data in the following format:
116/// - BOOLEAN - 1 bit per value, 0 is false; 1 is true.
117/// - INT32 - 4 bytes per value, stored as little-endian.
118/// - INT64 - 8 bytes per value, stored as little-endian.
119/// - FLOAT - 4 bytes per value, stored as IEEE little-endian.
120/// - DOUBLE - 8 bytes per value, stored as IEEE little-endian.
121/// - BYTE_ARRAY - 4 byte length stored as little endian, followed by bytes.
122/// - FIXED_LEN_BYTE_ARRAY - just the bytes are stored.
123pub struct PlainEncoder<T: DataType> {
124    buffer: Vec<u8>,
125    bit_writer: BitWriter,
126    _phantom: PhantomData<T>,
127}
128
129impl<T: DataType> Default for PlainEncoder<T> {
130    fn default() -> Self {
131        Self::new()
132    }
133}
134
135impl<T: DataType> PlainEncoder<T> {
136    /// Creates new plain encoder.
137    pub fn new() -> Self {
138        Self {
139            buffer: vec![],
140            bit_writer: BitWriter::new(256),
141            _phantom: PhantomData,
142        }
143    }
144}
145
146impl<T: DataType> Encoder<T> for PlainEncoder<T> {
147    // Performance Note:
148    // As far as can be seen these functions are rarely called and as such we can hint to the
149    // compiler that they dont need to be folded into hot locations in the final output.
150    #[cold]
151    fn encoding(&self) -> Encoding {
152        Encoding::PLAIN
153    }
154
155    fn estimated_data_encoded_size(&self) -> usize {
156        self.buffer.len() + self.bit_writer.bytes_written()
157    }
158
159    #[inline]
160    fn flush_buffer(&mut self) -> Result<Bytes> {
161        self.buffer
162            .extend_from_slice(self.bit_writer.flush_buffer());
163        self.bit_writer.clear();
164        Ok(std::mem::take(&mut self.buffer).into())
165    }
166
167    #[inline]
168    fn put(&mut self, values: &[T::T]) -> Result<()> {
169        T::T::encode(values, &mut self.buffer, &mut self.bit_writer)?;
170        Ok(())
171    }
172
173    /// Return the estimated memory size of this encoder.
174    fn estimated_memory_size(&self) -> usize {
175        self.buffer.capacity() * std::mem::size_of::<u8>() + self.bit_writer.estimated_memory_size()
176    }
177}
178
179// ----------------------------------------------------------------------
180// RLE encoding
181
182const DEFAULT_RLE_BUFFER_LEN: usize = 1024;
183
184/// RLE/Bit-Packing hybrid encoding for values.
185/// Currently is used only for data pages v2 and supports boolean types.
186pub struct RleValueEncoder<T: DataType> {
187    // Buffer with raw values that we collect,
188    // when flushing buffer they are encoded using RLE encoder
189    encoder: Option<RleEncoder>,
190    _phantom: PhantomData<T>,
191}
192
193impl<T: DataType> Default for RleValueEncoder<T> {
194    fn default() -> Self {
195        Self::new()
196    }
197}
198
199impl<T: DataType> RleValueEncoder<T> {
200    /// Creates new rle value encoder.
201    pub fn new() -> Self {
202        Self {
203            encoder: None,
204            _phantom: PhantomData,
205        }
206    }
207}
208
209impl<T: DataType> Encoder<T> for RleValueEncoder<T> {
210    #[inline]
211    fn put(&mut self, values: &[T::T]) -> Result<()> {
212        ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType");
213
214        let rle_encoder = self.encoder.get_or_insert_with(|| {
215            let mut buffer = Vec::with_capacity(DEFAULT_RLE_BUFFER_LEN);
216            // Reserve space for length
217            buffer.extend_from_slice(&[0; 4]);
218            RleEncoder::new_from_buf(1, buffer)
219        });
220
221        for value in values {
222            let value = value.as_u64()?;
223            rle_encoder.put(value)
224        }
225        Ok(())
226    }
227
228    // Performance Note:
229    // As far as can be seen these functions are rarely called and as such we can hint to the
230    // compiler that they dont need to be folded into hot locations in the final output.
231    #[cold]
232    fn encoding(&self) -> Encoding {
233        Encoding::RLE
234    }
235
236    #[inline]
237    fn estimated_data_encoded_size(&self) -> usize {
238        match self.encoder {
239            Some(ref enc) => enc.len(),
240            None => 0,
241        }
242    }
243
244    #[inline]
245    fn flush_buffer(&mut self) -> Result<Bytes> {
246        ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType");
247        let rle_encoder = self
248            .encoder
249            .take()
250            .expect("RLE value encoder is not initialized");
251
252        // Flush all encoder buffers and raw values
253        let mut buf = rle_encoder.consume();
254        assert!(buf.len() >= 4, "should have had padding inserted");
255
256        // Note that buf does not have any offset, all data is encoded bytes
257        let len = (buf.len() - 4) as i32;
258        buf[..4].copy_from_slice(&len.to_le_bytes());
259
260        Ok(buf.into())
261    }
262
263    /// return the estimated memory size of this encoder.
264    fn estimated_memory_size(&self) -> usize {
265        self.encoder
266            .as_ref()
267            .map_or(0, |enc| enc.estimated_memory_size())
268    }
269}
270
271// ----------------------------------------------------------------------
272// DELTA_BINARY_PACKED encoding
273
274const MAX_PAGE_HEADER_WRITER_SIZE: usize = 32;
275const DEFAULT_BIT_WRITER_SIZE: usize = 1024 * 1024;
276const DEFAULT_NUM_MINI_BLOCKS: usize = 4;
277
278/// Delta bit packed encoder.
279/// Consists of a header followed by blocks of delta encoded values binary packed.
280///
281/// Delta-binary-packing:
282/// ```shell
283///   [page-header] [block 1], [block 2], ... [block N]
284/// ```
285///
286/// Each page header consists of:
287/// ```shell
288///   [block size] [number of miniblocks in a block] [total value count] [first value]
289/// ```
290///
291/// Each block consists of:
292/// ```shell
293///   [min delta] [list of bitwidths of miniblocks] [miniblocks]
294/// ```
295///
296/// Current implementation writes values in `put` method, multiple calls to `put` to
297/// existing block or start new block if block size is exceeded. Calling `flush_buffer`
298/// writes out all data and resets internal state, including page header.
299///
300/// Supports only INT32 and INT64.
301pub struct DeltaBitPackEncoder<T: DataType> {
302    page_header_writer: BitWriter,
303    bit_writer: BitWriter,
304    total_values: usize,
305    first_value: i64,
306    current_value: i64,
307    block_size: usize,
308    mini_block_size: usize,
309    num_mini_blocks: usize,
310    values_in_block: usize,
311    deltas: Vec<i64>,
312    _phantom: PhantomData<T>,
313}
314
315impl<T: DataType> Default for DeltaBitPackEncoder<T> {
316    fn default() -> Self {
317        Self::new()
318    }
319}
320
321impl<T: DataType> DeltaBitPackEncoder<T> {
322    /// Creates new delta bit packed encoder.
323    pub fn new() -> Self {
324        Self::assert_supported_type();
325
326        // Size miniblocks so that they can be efficiently decoded
327        let mini_block_size = match T::T::PHYSICAL_TYPE {
328            Type::INT32 => 32,
329            Type::INT64 => 64,
330            _ => unreachable!(),
331        };
332
333        let num_mini_blocks = DEFAULT_NUM_MINI_BLOCKS;
334        let block_size = mini_block_size * num_mini_blocks;
335        assert_eq!(block_size % 128, 0);
336
337        DeltaBitPackEncoder {
338            page_header_writer: BitWriter::new(MAX_PAGE_HEADER_WRITER_SIZE),
339            bit_writer: BitWriter::new(DEFAULT_BIT_WRITER_SIZE),
340            total_values: 0,
341            first_value: 0,
342            current_value: 0, // current value to keep adding deltas
343            block_size,       // can write fewer values than block size for last block
344            mini_block_size,
345            num_mini_blocks,
346            values_in_block: 0, // will be at most block_size
347            deltas: vec![0; block_size],
348            _phantom: PhantomData,
349        }
350    }
351
352    /// Writes page header for blocks, this method is invoked when we are done encoding
353    /// values. It is also okay to encode when no values have been provided
354    fn write_page_header(&mut self) {
355        // We ignore the result of each 'put' operation, because
356        // MAX_PAGE_HEADER_WRITER_SIZE is chosen to fit all header values and
357        // guarantees that writes will not fail.
358
359        // Write the size of each block
360        self.page_header_writer.put_vlq_int(self.block_size as u64);
361        // Write the number of mini blocks
362        self.page_header_writer
363            .put_vlq_int(self.num_mini_blocks as u64);
364        // Write the number of all values (including non-encoded first value)
365        self.page_header_writer
366            .put_vlq_int(self.total_values as u64);
367        // Write first value
368        self.page_header_writer.put_zigzag_vlq_int(self.first_value);
369    }
370
371    // Write current delta buffer (<= 'block size' values) into bit writer
372    #[inline(never)]
373    fn flush_block_values(&mut self) -> Result<()> {
374        if self.values_in_block == 0 {
375            return Ok(());
376        }
377
378        let mut min_delta = i64::MAX;
379        for i in 0..self.values_in_block {
380            min_delta = cmp::min(min_delta, self.deltas[i]);
381        }
382
383        // Write min delta
384        self.bit_writer.put_zigzag_vlq_int(min_delta);
385
386        // Slice to store bit width for each mini block
387        let offset = self.bit_writer.skip(self.num_mini_blocks);
388
389        for i in 0..self.num_mini_blocks {
390            // Find how many values we need to encode - either block size or whatever
391            // values left
392            let n = cmp::min(self.mini_block_size, self.values_in_block);
393            if n == 0 {
394                // Decoders should be agnostic to the padding value, we therefore use 0xFF
395                // when running tests. However, not all implementations may handle this correctly
396                // so pad with 0 when not running tests
397                let pad_value = cfg!(test).then(|| 0xFF).unwrap_or(0);
398                for j in i..self.num_mini_blocks {
399                    self.bit_writer.write_at(offset + j, pad_value);
400                }
401                break;
402            }
403
404            // Compute the max delta in current mini block
405            let mut max_delta = i64::MIN;
406            for j in 0..n {
407                max_delta = cmp::max(max_delta, self.deltas[i * self.mini_block_size + j]);
408            }
409
410            // Compute bit width to store (max_delta - min_delta)
411            let bit_width = num_required_bits(self.subtract_u64(max_delta, min_delta)) as usize;
412            self.bit_writer.write_at(offset + i, bit_width as u8);
413
414            // Encode values in current mini block using min_delta and bit_width
415            for j in 0..n {
416                let packed_value =
417                    self.subtract_u64(self.deltas[i * self.mini_block_size + j], min_delta);
418                self.bit_writer.put_value(packed_value, bit_width);
419            }
420
421            // Pad the last block (n < mini_block_size)
422            for _ in n..self.mini_block_size {
423                self.bit_writer.put_value(0, bit_width);
424            }
425
426            self.values_in_block -= n;
427        }
428
429        assert_eq!(
430            self.values_in_block, 0,
431            "Expected 0 values in block, found {}",
432            self.values_in_block
433        );
434        Ok(())
435    }
436}
437
438// Implementation is shared between Int32Type and Int64Type,
439// see `DeltaBitPackEncoderConversion` below for specifics.
440impl<T: DataType> Encoder<T> for DeltaBitPackEncoder<T> {
441    fn put(&mut self, values: &[T::T]) -> Result<()> {
442        if values.is_empty() {
443            return Ok(());
444        }
445
446        // Define values to encode, initialize state
447        let mut idx = if self.total_values == 0 {
448            self.first_value = self.as_i64(values, 0);
449            self.current_value = self.first_value;
450            1
451        } else {
452            0
453        };
454        // Add all values (including first value)
455        self.total_values += values.len();
456
457        // Write block
458        while idx < values.len() {
459            let value = self.as_i64(values, idx);
460            self.deltas[self.values_in_block] = self.subtract(value, self.current_value);
461            self.current_value = value;
462            idx += 1;
463            self.values_in_block += 1;
464            if self.values_in_block == self.block_size {
465                self.flush_block_values()?;
466            }
467        }
468        Ok(())
469    }
470
471    // Performance Note:
472    // As far as can be seen these functions are rarely called and as such we can hint to the
473    // compiler that they dont need to be folded into hot locations in the final output.
474    #[cold]
475    fn encoding(&self) -> Encoding {
476        Encoding::DELTA_BINARY_PACKED
477    }
478
479    fn estimated_data_encoded_size(&self) -> usize {
480        self.bit_writer.bytes_written()
481    }
482
483    fn flush_buffer(&mut self) -> Result<Bytes> {
484        // Write remaining values
485        self.flush_block_values()?;
486        // Write page header with total values
487        self.write_page_header();
488
489        let mut buffer = Vec::new();
490        buffer.extend_from_slice(self.page_header_writer.flush_buffer());
491        buffer.extend_from_slice(self.bit_writer.flush_buffer());
492
493        // Reset state
494        self.page_header_writer.clear();
495        self.bit_writer.clear();
496        self.total_values = 0;
497        self.first_value = 0;
498        self.current_value = 0;
499        self.values_in_block = 0;
500
501        Ok(buffer.into())
502    }
503
504    /// return the estimated memory size of this encoder.
505    fn estimated_memory_size(&self) -> usize {
506        self.page_header_writer.estimated_memory_size()
507            + self.bit_writer.estimated_memory_size()
508            + self.deltas.capacity() * std::mem::size_of::<i64>()
509            + std::mem::size_of::<Self>()
510    }
511}
512
513/// Helper trait to define specific conversions and subtractions when computing deltas
514trait DeltaBitPackEncoderConversion<T: DataType> {
515    // Method should panic if type is not supported, otherwise no-op
516    fn assert_supported_type();
517
518    fn as_i64(&self, values: &[T::T], index: usize) -> i64;
519
520    fn subtract(&self, left: i64, right: i64) -> i64;
521
522    fn subtract_u64(&self, left: i64, right: i64) -> u64;
523}
524
525const DELTA_BIT_PACK_TYPE_ERROR: &str =
526    "DeltaBitPackDecoder only supports Int32Type, UInt32Type, Int64Type, and UInt64Type";
527
528impl<T: DataType> DeltaBitPackEncoderConversion<T> for DeltaBitPackEncoder<T> {
529    #[inline]
530    fn assert_supported_type() {
531        ensure_phys_ty!(Type::INT32 | Type::INT64, "{}", DELTA_BIT_PACK_TYPE_ERROR);
532    }
533
534    #[inline]
535    fn as_i64(&self, values: &[T::T], index: usize) -> i64 {
536        values[index].as_i64().expect(DELTA_BIT_PACK_TYPE_ERROR)
537    }
538
539    #[inline]
540    fn subtract(&self, left: i64, right: i64) -> i64 {
541        // It is okay for values to overflow, wrapping_sub wrapping around at the boundary
542        match T::get_physical_type() {
543            Type::INT32 => (left as i32).wrapping_sub(right as i32) as i64,
544            Type::INT64 => left.wrapping_sub(right),
545            _ => panic!("{}", DELTA_BIT_PACK_TYPE_ERROR),
546        }
547    }
548
549    #[inline]
550    fn subtract_u64(&self, left: i64, right: i64) -> u64 {
551        match T::get_physical_type() {
552            // Conversion of i32 -> u32 -> u64 is to avoid non-zero left most bytes in int repr
553            Type::INT32 => (left as i32).wrapping_sub(right as i32) as u32 as u64,
554            Type::INT64 => left.wrapping_sub(right) as u64,
555            _ => panic!("{}", DELTA_BIT_PACK_TYPE_ERROR),
556        }
557    }
558}
559
560// ----------------------------------------------------------------------
561// DELTA_LENGTH_BYTE_ARRAY encoding
562
563/// Encoding for byte arrays to separate the length values and the data.
564/// The lengths are encoded using DELTA_BINARY_PACKED encoding, data is
565/// stored as raw bytes.
566pub struct DeltaLengthByteArrayEncoder<T: DataType> {
567    // length encoder
568    len_encoder: DeltaBitPackEncoder<Int32Type>,
569    // byte array data
570    data: Vec<ByteArray>,
571    // data size in bytes of encoded values
572    encoded_size: usize,
573    _phantom: PhantomData<T>,
574}
575
576impl<T: DataType> Default for DeltaLengthByteArrayEncoder<T> {
577    fn default() -> Self {
578        Self::new()
579    }
580}
581
582impl<T: DataType> DeltaLengthByteArrayEncoder<T> {
583    /// Creates new delta length byte array encoder.
584    pub fn new() -> Self {
585        Self {
586            len_encoder: DeltaBitPackEncoder::new(),
587            data: vec![],
588            encoded_size: 0,
589            _phantom: PhantomData,
590        }
591    }
592}
593
594impl<T: DataType> Encoder<T> for DeltaLengthByteArrayEncoder<T> {
595    fn put(&mut self, values: &[T::T]) -> Result<()> {
596        ensure_phys_ty!(
597            Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY,
598            "DeltaLengthByteArrayEncoder only supports ByteArrayType"
599        );
600
601        let val_it = || {
602            values
603                .iter()
604                .map(|x| x.as_any().downcast_ref::<ByteArray>().unwrap())
605        };
606
607        let lengths: Vec<i32> = val_it().map(|byte_array| byte_array.len() as i32).collect();
608        self.len_encoder.put(&lengths)?;
609        for byte_array in val_it() {
610            self.encoded_size += byte_array.len();
611            self.data.push(byte_array.clone());
612        }
613
614        Ok(())
615    }
616
617    // Performance Note:
618    // As far as can be seen these functions are rarely called and as such we can hint to the
619    // compiler that they dont need to be folded into hot locations in the final output.
620    #[cold]
621    fn encoding(&self) -> Encoding {
622        Encoding::DELTA_LENGTH_BYTE_ARRAY
623    }
624
625    fn estimated_data_encoded_size(&self) -> usize {
626        self.len_encoder.estimated_data_encoded_size() + self.encoded_size
627    }
628
629    fn flush_buffer(&mut self) -> Result<Bytes> {
630        ensure_phys_ty!(
631            Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY,
632            "DeltaLengthByteArrayEncoder only supports ByteArrayType"
633        );
634
635        let mut total_bytes = vec![];
636        let lengths = self.len_encoder.flush_buffer()?;
637        total_bytes.extend_from_slice(&lengths);
638        self.data.iter().for_each(|byte_array| {
639            total_bytes.extend_from_slice(byte_array.data());
640        });
641        self.data.clear();
642        self.encoded_size = 0;
643
644        Ok(total_bytes.into())
645    }
646
647    /// return the estimated memory size of this encoder.
648    fn estimated_memory_size(&self) -> usize {
649        self.len_encoder.estimated_memory_size() + self.data.len() + std::mem::size_of::<Self>()
650    }
651}
652
653// ----------------------------------------------------------------------
654// DELTA_BYTE_ARRAY encoding
655
656/// Encoding for byte arrays, prefix lengths are encoded using DELTA_BINARY_PACKED
657/// encoding, followed by suffixes with DELTA_LENGTH_BYTE_ARRAY encoding.
658pub struct DeltaByteArrayEncoder<T: DataType> {
659    prefix_len_encoder: DeltaBitPackEncoder<Int32Type>,
660    suffix_writer: DeltaLengthByteArrayEncoder<ByteArrayType>,
661    previous: Vec<u8>,
662    _phantom: PhantomData<T>,
663}
664
665impl<T: DataType> Default for DeltaByteArrayEncoder<T> {
666    fn default() -> Self {
667        Self::new()
668    }
669}
670
671impl<T: DataType> DeltaByteArrayEncoder<T> {
672    /// Creates new delta byte array encoder.
673    pub fn new() -> Self {
674        Self {
675            prefix_len_encoder: DeltaBitPackEncoder::new(),
676            suffix_writer: DeltaLengthByteArrayEncoder::new(),
677            previous: vec![],
678            _phantom: PhantomData,
679        }
680    }
681}
682
683impl<T: DataType> Encoder<T> for DeltaByteArrayEncoder<T> {
684    fn put(&mut self, values: &[T::T]) -> Result<()> {
685        let mut prefix_lengths: Vec<i32> = vec![];
686        let mut suffixes: Vec<ByteArray> = vec![];
687
688        let values = values
689            .iter()
690            .map(|x| x.as_any())
691            .map(|x| match T::get_physical_type() {
692                Type::BYTE_ARRAY => x.downcast_ref::<ByteArray>().unwrap(),
693                Type::FIXED_LEN_BYTE_ARRAY => x.downcast_ref::<FixedLenByteArray>().unwrap(),
694                _ => panic!(
695                    "DeltaByteArrayEncoder only supports ByteArrayType and FixedLenByteArrayType"
696                ),
697            });
698
699        for byte_array in values {
700            let current = byte_array.data();
701            // Maximum prefix length that is shared between previous value and current
702            // value
703            let prefix_len = cmp::min(self.previous.len(), current.len());
704            let mut match_len = 0;
705            while match_len < prefix_len && self.previous[match_len] == current[match_len] {
706                match_len += 1;
707            }
708            prefix_lengths.push(match_len as i32);
709            suffixes.push(byte_array.slice(match_len, byte_array.len() - match_len));
710            // Update previous for the next prefix
711            self.previous.clear();
712            self.previous.extend_from_slice(current);
713        }
714        self.prefix_len_encoder.put(&prefix_lengths)?;
715        self.suffix_writer.put(&suffixes)?;
716
717        Ok(())
718    }
719
720    // Performance Note:
721    // As far as can be seen these functions are rarely called and as such we can hint to the
722    // compiler that they dont need to be folded into hot locations in the final output.
723    #[cold]
724    fn encoding(&self) -> Encoding {
725        Encoding::DELTA_BYTE_ARRAY
726    }
727
728    fn estimated_data_encoded_size(&self) -> usize {
729        self.prefix_len_encoder.estimated_data_encoded_size()
730            + self.suffix_writer.estimated_data_encoded_size()
731    }
732
733    fn flush_buffer(&mut self) -> Result<Bytes> {
734        match T::get_physical_type() {
735            Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
736                // TODO: investigate if we can merge lengths and suffixes
737                // without copying data into new vector.
738                let mut total_bytes = vec![];
739                // Insert lengths ...
740                let lengths = self.prefix_len_encoder.flush_buffer()?;
741                total_bytes.extend_from_slice(&lengths);
742                // ... followed by suffixes
743                let suffixes = self.suffix_writer.flush_buffer()?;
744                total_bytes.extend_from_slice(&suffixes);
745
746                self.previous.clear();
747                Ok(total_bytes.into())
748            }
749            _ => panic!(
750                "DeltaByteArrayEncoder only supports ByteArrayType and FixedLenByteArrayType"
751            ),
752        }
753    }
754
755    /// return the estimated memory size of this encoder.
756    fn estimated_memory_size(&self) -> usize {
757        self.prefix_len_encoder.estimated_memory_size()
758            + self.suffix_writer.estimated_memory_size()
759            + (self.previous.capacity() * std::mem::size_of::<u8>())
760    }
761}
762
763#[cfg(test)]
764mod tests {
765    use super::*;
766
767    use std::sync::Arc;
768
769    use crate::encodings::decoding::{Decoder, DictDecoder, PlainDecoder, get_decoder};
770    use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
771    use crate::util::bit_util;
772    use crate::util::test_common::rand_gen::{RandGen, random_bytes};
773
774    const TEST_SET_SIZE: usize = 1024;
775
776    #[test]
777    fn test_get_encoders() {
778        // supported encodings
779        create_and_check_encoder::<Int32Type>(0, Encoding::PLAIN, None);
780        create_and_check_encoder::<Int32Type>(0, Encoding::DELTA_BINARY_PACKED, None);
781        create_and_check_encoder::<Int32Type>(0, Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
782        create_and_check_encoder::<Int32Type>(0, Encoding::DELTA_BYTE_ARRAY, None);
783        create_and_check_encoder::<BoolType>(0, Encoding::RLE, None);
784
785        // error when initializing
786        create_and_check_encoder::<Int32Type>(
787            0,
788            Encoding::RLE_DICTIONARY,
789            Some(general_err!(
790                "Cannot initialize this encoding through this function"
791            )),
792        );
793        create_and_check_encoder::<Int32Type>(
794            0,
795            Encoding::PLAIN_DICTIONARY,
796            Some(general_err!(
797                "Cannot initialize this encoding through this function"
798            )),
799        );
800
801        // unsupported
802        #[allow(deprecated)]
803        create_and_check_encoder::<Int32Type>(
804            0,
805            Encoding::BIT_PACKED,
806            Some(nyi_err!("Encoding BIT_PACKED is not supported")),
807        );
808    }
809
810    #[test]
811    fn test_bool() {
812        BoolType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
813        BoolType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
814        BoolType::test(Encoding::RLE, TEST_SET_SIZE, -1);
815    }
816
817    #[test]
818    fn test_i32() {
819        Int32Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
820        Int32Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
821        Int32Type::test(Encoding::DELTA_BINARY_PACKED, TEST_SET_SIZE, -1);
822        Int32Type::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
823    }
824
825    #[test]
826    fn test_i64() {
827        Int64Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
828        Int64Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
829        Int64Type::test(Encoding::DELTA_BINARY_PACKED, TEST_SET_SIZE, -1);
830        Int64Type::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
831    }
832
833    #[test]
834    fn test_i96() {
835        Int96Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
836        Int96Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
837    }
838
839    #[test]
840    fn test_float() {
841        FloatType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
842        FloatType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
843        FloatType::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
844    }
845
846    #[test]
847    fn test_double() {
848        DoubleType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
849        DoubleType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
850        DoubleType::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
851    }
852
853    #[test]
854    fn test_byte_array() {
855        ByteArrayType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
856        ByteArrayType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
857        ByteArrayType::test(Encoding::DELTA_LENGTH_BYTE_ARRAY, TEST_SET_SIZE, -1);
858        ByteArrayType::test(Encoding::DELTA_BYTE_ARRAY, TEST_SET_SIZE, -1);
859    }
860
861    #[test]
862    fn test_fixed_len_byte_array() {
863        FixedLenByteArrayType::test(Encoding::PLAIN, TEST_SET_SIZE, 100);
864        FixedLenByteArrayType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, 100);
865        FixedLenByteArrayType::test(Encoding::DELTA_BYTE_ARRAY, TEST_SET_SIZE, 100);
866        FixedLenByteArrayType::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, 100);
867    }
868
869    #[test]
870    fn test_dict_encoded_size() {
871        fn run_test<T: DataType>(type_length: i32, values: &[T::T], expected_size: usize) {
872            let mut encoder = create_test_dict_encoder::<T>(type_length);
873            assert_eq!(encoder.dict_encoded_size(), 0);
874            encoder.put(values).unwrap();
875            assert_eq!(encoder.dict_encoded_size(), expected_size);
876            // We do not reset encoded size of the dictionary keys after flush_buffer
877            encoder.flush_buffer().unwrap();
878            assert_eq!(encoder.dict_encoded_size(), expected_size);
879        }
880
881        // Only 2 variations of values 1 byte each
882        run_test::<BoolType>(-1, &[true, false, true, false, true], 2);
883        run_test::<Int32Type>(-1, &[1i32, 2i32, 3i32, 4i32, 5i32], 20);
884        run_test::<Int64Type>(-1, &[1i64, 2i64, 3i64, 4i64, 5i64], 40);
885        run_test::<FloatType>(-1, &[1f32, 2f32, 3f32, 4f32, 5f32], 20);
886        run_test::<DoubleType>(-1, &[1f64, 2f64, 3f64, 4f64, 5f64], 40);
887        // Int96: len + reference
888        run_test::<Int96Type>(
889            -1,
890            &[Int96::from(vec![1, 2, 3]), Int96::from(vec![2, 3, 4])],
891            24,
892        );
893        run_test::<ByteArrayType>(-1, &[ByteArray::from("abcd"), ByteArray::from("efj")], 15);
894        run_test::<FixedLenByteArrayType>(
895            2,
896            &[ByteArray::from("ab").into(), ByteArray::from("bc").into()],
897            4,
898        );
899    }
900
901    #[test]
902    fn test_estimated_data_encoded_size() {
903        fn run_test<T: DataType>(
904            encoding: Encoding,
905            type_length: i32,
906            values: &[T::T],
907            initial_size: usize,
908            max_size: usize,
909            flush_size: usize,
910        ) {
911            let mut encoder = match encoding {
912                Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => {
913                    Box::new(create_test_dict_encoder::<T>(type_length))
914                }
915                _ => create_test_encoder::<T>(type_length, encoding),
916            };
917            assert_eq!(encoder.estimated_data_encoded_size(), initial_size);
918
919            encoder.put(values).unwrap();
920            assert_eq!(encoder.estimated_data_encoded_size(), max_size);
921
922            encoder.flush_buffer().unwrap();
923            assert_eq!(encoder.estimated_data_encoded_size(), flush_size);
924        }
925
926        // PLAIN
927        run_test::<Int32Type>(Encoding::PLAIN, -1, &[123; 1024], 0, 4096, 0);
928
929        // DICTIONARY
930        // NOTE: The final size is almost the same because the dictionary entries are
931        // preserved after encoded values have been written.
932        run_test::<Int32Type>(Encoding::RLE_DICTIONARY, -1, &[123, 1024], 0, 2, 0);
933
934        // DELTA_BINARY_PACKED
935        run_test::<Int32Type>(Encoding::DELTA_BINARY_PACKED, -1, &[123; 1024], 0, 35, 0);
936
937        // RLE
938        let mut values = vec![];
939        values.extend_from_slice(&[true; 16]);
940        values.extend_from_slice(&[false; 16]);
941        run_test::<BoolType>(Encoding::RLE, -1, &values, 0, 6, 0);
942
943        // DELTA_LENGTH_BYTE_ARRAY
944        run_test::<ByteArrayType>(
945            Encoding::DELTA_LENGTH_BYTE_ARRAY,
946            -1,
947            &[ByteArray::from("ab"), ByteArray::from("abc")],
948            0,
949            5, // only value bytes, length encoder is not flushed yet
950            0,
951        );
952
953        // DELTA_BYTE_ARRAY
954        run_test::<ByteArrayType>(
955            Encoding::DELTA_BYTE_ARRAY,
956            -1,
957            &[ByteArray::from("ab"), ByteArray::from("abc")],
958            0,
959            3, // only suffix bytes, length encoder is not flushed yet
960            0,
961        );
962
963        // BYTE_STREAM_SPLIT
964        run_test::<FloatType>(Encoding::BYTE_STREAM_SPLIT, -1, &[0.1, 0.2], 0, 8, 0);
965    }
966
967    #[test]
968    fn test_byte_stream_split_example_f32() {
969        // Test data from https://github.com/apache/parquet-format/blob/2a481fe1aad64ff770e21734533bb7ef5a057dac/Encodings.md#byte-stream-split-byte_stream_split--9
970        let mut encoder = create_test_encoder::<FloatType>(0, Encoding::BYTE_STREAM_SPLIT);
971        let mut decoder = create_test_decoder::<FloatType>(0, Encoding::BYTE_STREAM_SPLIT);
972
973        let input = vec![
974            f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
975            f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
976            f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6]),
977        ];
978
979        encoder.put(&input).unwrap();
980        let encoded = encoder.flush_buffer().unwrap();
981
982        assert_eq!(
983            encoded,
984            Bytes::from(vec![
985                0xAA_u8, 0x00, 0xA3, 0xBB, 0x11, 0xB4, 0xCC, 0x22, 0xC5, 0xDD, 0x33, 0xD6
986            ])
987        );
988
989        let mut decoded = vec![0.0; input.len()];
990        decoder.set_data(encoded, input.len()).unwrap();
991        decoder.get(&mut decoded).unwrap();
992
993        assert_eq!(decoded, input);
994    }
995
996    // See: https://github.com/sunchao/parquet-rs/issues/47
997    #[test]
998    fn test_issue_47() {
999        let mut encoder = create_test_encoder::<ByteArrayType>(0, Encoding::DELTA_BYTE_ARRAY);
1000        let mut decoder = create_test_decoder::<ByteArrayType>(0, Encoding::DELTA_BYTE_ARRAY);
1001
1002        let input = vec![
1003            ByteArray::from("aa"),
1004            ByteArray::from("aaa"),
1005            ByteArray::from("aa"),
1006            ByteArray::from("aaa"),
1007        ];
1008
1009        let mut output = vec![ByteArray::default(); input.len()];
1010
1011        let mut result = put_and_get(&mut encoder, &mut decoder, &input[..2], &mut output[..2]);
1012        assert!(
1013            result.is_ok(),
1014            "first put_and_get() failed with: {}",
1015            result.unwrap_err()
1016        );
1017        result = put_and_get(&mut encoder, &mut decoder, &input[2..], &mut output[2..]);
1018        assert!(
1019            result.is_ok(),
1020            "second put_and_get() failed with: {}",
1021            result.unwrap_err()
1022        );
1023        assert_eq!(output, input);
1024    }
1025
1026    trait EncodingTester<T: DataType> {
1027        fn test(enc: Encoding, total: usize, type_length: i32) {
1028            let result = match enc {
1029                Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => {
1030                    Self::test_dict_internal(total, type_length)
1031                }
1032                enc => Self::test_internal(enc, total, type_length),
1033            };
1034
1035            assert!(
1036                result.is_ok(),
1037                "Expected result to be OK but got err:\n {}",
1038                result.unwrap_err()
1039            );
1040        }
1041
1042        fn test_internal(enc: Encoding, total: usize, type_length: i32) -> Result<()>;
1043
1044        fn test_dict_internal(total: usize, type_length: i32) -> Result<()>;
1045    }
1046
1047    impl<T: DataType + RandGen<T>> EncodingTester<T> for T {
1048        fn test_internal(enc: Encoding, total: usize, type_length: i32) -> Result<()> {
1049            let mut encoder = create_test_encoder::<T>(type_length, enc);
1050            let mut decoder = create_test_decoder::<T>(type_length, enc);
1051            let mut values = <T as RandGen<T>>::gen_vec(type_length, total);
1052            let mut result_data = vec![T::T::default(); total];
1053
1054            // Test put/get spaced.
1055            let num_bytes = bit_util::ceil(total as i64, 8);
1056            let valid_bits = random_bytes(num_bytes as usize);
1057            let values_written = encoder.put_spaced(&values[..], &valid_bits[..])?;
1058            let data = encoder.flush_buffer()?;
1059            decoder.set_data(data, values_written)?;
1060            let _ = decoder.get_spaced(
1061                &mut result_data[..],
1062                values.len() - values_written,
1063                &valid_bits[..],
1064            )?;
1065
1066            // Check equality
1067            for i in 0..total {
1068                if bit_util::get_bit(&valid_bits[..], i) {
1069                    assert_eq!(result_data[i], values[i]);
1070                } else {
1071                    assert_eq!(result_data[i], T::T::default());
1072                }
1073            }
1074
1075            let mut actual_total = put_and_get(
1076                &mut encoder,
1077                &mut decoder,
1078                &values[..],
1079                &mut result_data[..],
1080            )?;
1081            assert_eq!(actual_total, total);
1082            assert_eq!(result_data, values);
1083
1084            // Encode more data after flush and test with decoder
1085
1086            values = <T as RandGen<T>>::gen_vec(type_length, total);
1087            actual_total = put_and_get(
1088                &mut encoder,
1089                &mut decoder,
1090                &values[..],
1091                &mut result_data[..],
1092            )?;
1093            assert_eq!(actual_total, total);
1094            assert_eq!(result_data, values);
1095
1096            Ok(())
1097        }
1098
1099        fn test_dict_internal(total: usize, type_length: i32) -> Result<()> {
1100            let mut encoder = create_test_dict_encoder::<T>(type_length);
1101            let mut values = <T as RandGen<T>>::gen_vec(type_length, total);
1102            encoder.put(&values[..])?;
1103
1104            let mut data = encoder.flush_buffer()?;
1105            let mut decoder = create_test_dict_decoder::<T>();
1106            let mut dict_decoder = PlainDecoder::<T>::new(type_length);
1107            dict_decoder.set_data(encoder.write_dict()?, encoder.num_entries())?;
1108            decoder.set_dict(Box::new(dict_decoder))?;
1109            let mut result_data = vec![T::T::default(); total];
1110            decoder.set_data(data, total)?;
1111            let mut actual_total = decoder.get(&mut result_data)?;
1112
1113            assert_eq!(actual_total, total);
1114            assert_eq!(result_data, values);
1115
1116            // Encode more data after flush and test with decoder
1117
1118            values = <T as RandGen<T>>::gen_vec(type_length, total);
1119            encoder.put(&values[..])?;
1120            data = encoder.flush_buffer()?;
1121
1122            let mut dict_decoder = PlainDecoder::<T>::new(type_length);
1123            dict_decoder.set_data(encoder.write_dict()?, encoder.num_entries())?;
1124            decoder.set_dict(Box::new(dict_decoder))?;
1125            decoder.set_data(data, total)?;
1126            actual_total = decoder.get(&mut result_data)?;
1127
1128            assert_eq!(actual_total, total);
1129            assert_eq!(result_data, values);
1130
1131            Ok(())
1132        }
1133    }
1134
1135    fn put_and_get<T: DataType>(
1136        encoder: &mut Box<dyn Encoder<T>>,
1137        decoder: &mut Box<dyn Decoder<T>>,
1138        input: &[T::T],
1139        output: &mut [T::T],
1140    ) -> Result<usize> {
1141        encoder.put(input)?;
1142        let data = encoder.flush_buffer()?;
1143        decoder.set_data(data, input.len())?;
1144        decoder.get(output)
1145    }
1146
1147    fn create_and_check_encoder<T: DataType>(
1148        type_length: i32,
1149        encoding: Encoding,
1150        err: Option<ParquetError>,
1151    ) {
1152        let desc = create_test_col_desc_ptr(type_length, T::get_physical_type());
1153        let encoder = get_encoder::<T>(encoding, &desc);
1154        match err {
1155            Some(parquet_error) => {
1156                assert_eq!(
1157                    encoder.err().unwrap().to_string(),
1158                    parquet_error.to_string()
1159                )
1160            }
1161            None => assert_eq!(encoder.unwrap().encoding(), encoding),
1162        }
1163    }
1164
1165    // Creates test column descriptor.
1166    fn create_test_col_desc_ptr(type_len: i32, t: Type) -> ColumnDescPtr {
1167        let ty = SchemaType::primitive_type_builder("t", t)
1168            .with_length(type_len)
1169            .build()
1170            .unwrap();
1171        Arc::new(ColumnDescriptor::new(
1172            Arc::new(ty),
1173            0,
1174            0,
1175            ColumnPath::new(vec![]),
1176        ))
1177    }
1178
1179    fn create_test_encoder<T: DataType>(type_len: i32, enc: Encoding) -> Box<dyn Encoder<T>> {
1180        let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
1181        get_encoder(enc, &desc).unwrap()
1182    }
1183
1184    fn create_test_decoder<T: DataType>(type_len: i32, enc: Encoding) -> Box<dyn Decoder<T>> {
1185        let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
1186        get_decoder(desc, enc).unwrap()
1187    }
1188
1189    fn create_test_dict_encoder<T: DataType>(type_len: i32) -> DictEncoder<T> {
1190        let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
1191        DictEncoder::<T>::new(desc)
1192    }
1193
1194    fn create_test_dict_decoder<T: DataType>() -> DictDecoder<T> {
1195        DictDecoder::<T>::new()
1196    }
1197}