parquet/encodings/encoding/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains all supported encoders for Parquet.
19
20use std::{cmp, marker::PhantomData};
21
22use crate::basic::*;
23use crate::data_type::private::ParquetValueType;
24use crate::data_type::*;
25use crate::encodings::rle::RleEncoder;
26use crate::errors::{ParquetError, Result};
27use crate::schema::types::ColumnDescPtr;
28use crate::util::bit_util::{num_required_bits, BitWriter};
29
30use byte_stream_split_encoder::{ByteStreamSplitEncoder, VariableWidthByteStreamSplitEncoder};
31use bytes::Bytes;
32pub use dict_encoder::DictEncoder;
33
34mod byte_stream_split_encoder;
35mod dict_encoder;
36
37// ----------------------------------------------------------------------
38// Encoders
39
40/// An Parquet encoder for the data type `T`.
41///
42/// Currently this allocates internal buffers for the encoded values. After done putting
43/// values, caller should call `flush_buffer()` to get an immutable buffer pointer.
44pub trait Encoder<T: DataType>: Send {
45    /// Encodes data from `values`.
46    fn put(&mut self, values: &[T::T]) -> Result<()>;
47
48    /// Encodes data from `values`, which contains spaces for null values, that is
49    /// identified by `valid_bits`.
50    ///
51    /// Returns the number of non-null values encoded.
52    #[cfg(test)]
53    fn put_spaced(&mut self, values: &[T::T], valid_bits: &[u8]) -> Result<usize> {
54        let num_values = values.len();
55        let mut buffer = Vec::with_capacity(num_values);
56        // TODO: this is pretty inefficient. Revisit in future.
57        for (i, item) in values.iter().enumerate().take(num_values) {
58            if crate::util::bit_util::get_bit(valid_bits, i) {
59                buffer.push(item.clone());
60            }
61        }
62        self.put(&buffer[..])?;
63        Ok(buffer.len())
64    }
65
66    /// Returns the encoding type of this encoder.
67    fn encoding(&self) -> Encoding;
68
69    /// Returns an estimate of the encoded data, in bytes.
70    /// Method call must be O(1).
71    fn estimated_data_encoded_size(&self) -> usize;
72
73    /// Returns an estimate of the memory use of this encoder, in bytes
74    fn estimated_memory_size(&self) -> usize;
75
76    /// Flushes the underlying byte buffer that's being processed by this encoder, and
77    /// return the immutable copy of it. This will also reset the internal state.
78    fn flush_buffer(&mut self) -> Result<Bytes>;
79}
80
81/// Gets a encoder for the particular data type `T` and encoding `encoding`. Memory usage
82/// for the encoder instance is tracked by `mem_tracker`.
83pub fn get_encoder<T: DataType>(
84    encoding: Encoding,
85    descr: &ColumnDescPtr,
86) -> Result<Box<dyn Encoder<T>>> {
87    let encoder: Box<dyn Encoder<T>> = match encoding {
88        Encoding::PLAIN => Box::new(PlainEncoder::new()),
89        Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
90            return Err(general_err!(
91                "Cannot initialize this encoding through this function"
92            ));
93        }
94        Encoding::RLE => Box::new(RleValueEncoder::new()),
95        Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackEncoder::new()),
96        Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayEncoder::new()),
97        Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayEncoder::new()),
98        Encoding::BYTE_STREAM_SPLIT => match T::get_physical_type() {
99            Type::FIXED_LEN_BYTE_ARRAY => Box::new(VariableWidthByteStreamSplitEncoder::new(
100                descr.type_length(),
101            )),
102            _ => Box::new(ByteStreamSplitEncoder::new()),
103        },
104        e => return Err(nyi_err!("Encoding {} is not supported", e)),
105    };
106    Ok(encoder)
107}
108
109// ----------------------------------------------------------------------
110// Plain encoding
111
112/// Plain encoding that supports all types.
113/// Values are encoded back to back.
114/// The plain encoding is used whenever a more efficient encoding can not be used.
115/// It stores the data in the following format:
116/// - BOOLEAN - 1 bit per value, 0 is false; 1 is true.
117/// - INT32 - 4 bytes per value, stored as little-endian.
118/// - INT64 - 8 bytes per value, stored as little-endian.
119/// - FLOAT - 4 bytes per value, stored as IEEE little-endian.
120/// - DOUBLE - 8 bytes per value, stored as IEEE little-endian.
121/// - BYTE_ARRAY - 4 byte length stored as little endian, followed by bytes.
122/// - FIXED_LEN_BYTE_ARRAY - just the bytes are stored.
123pub struct PlainEncoder<T: DataType> {
124    buffer: Vec<u8>,
125    bit_writer: BitWriter,
126    _phantom: PhantomData<T>,
127}
128
129impl<T: DataType> Default for PlainEncoder<T> {
130    fn default() -> Self {
131        Self::new()
132    }
133}
134
135impl<T: DataType> PlainEncoder<T> {
136    /// Creates new plain encoder.
137    pub fn new() -> Self {
138        Self {
139            buffer: vec![],
140            bit_writer: BitWriter::new(256),
141            _phantom: PhantomData,
142        }
143    }
144}
145
146impl<T: DataType> Encoder<T> for PlainEncoder<T> {
147    // Performance Note:
148    // As far as can be seen these functions are rarely called and as such we can hint to the
149    // compiler that they dont need to be folded into hot locations in the final output.
150    #[cold]
151    fn encoding(&self) -> Encoding {
152        Encoding::PLAIN
153    }
154
155    fn estimated_data_encoded_size(&self) -> usize {
156        self.buffer.len() + self.bit_writer.bytes_written()
157    }
158
159    #[inline]
160    fn flush_buffer(&mut self) -> Result<Bytes> {
161        self.buffer
162            .extend_from_slice(self.bit_writer.flush_buffer());
163        self.bit_writer.clear();
164        Ok(std::mem::take(&mut self.buffer).into())
165    }
166
167    #[inline]
168    fn put(&mut self, values: &[T::T]) -> Result<()> {
169        T::T::encode(values, &mut self.buffer, &mut self.bit_writer)?;
170        Ok(())
171    }
172
173    /// Return the estimated memory size of this encoder.
174    fn estimated_memory_size(&self) -> usize {
175        self.buffer.capacity() * std::mem::size_of::<u8>() + self.bit_writer.estimated_memory_size()
176    }
177}
178
179// ----------------------------------------------------------------------
180// RLE encoding
181
182const DEFAULT_RLE_BUFFER_LEN: usize = 1024;
183
184/// RLE/Bit-Packing hybrid encoding for values.
185/// Currently is used only for data pages v2 and supports boolean types.
186pub struct RleValueEncoder<T: DataType> {
187    // Buffer with raw values that we collect,
188    // when flushing buffer they are encoded using RLE encoder
189    encoder: Option<RleEncoder>,
190    _phantom: PhantomData<T>,
191}
192
193impl<T: DataType> Default for RleValueEncoder<T> {
194    fn default() -> Self {
195        Self::new()
196    }
197}
198
199impl<T: DataType> RleValueEncoder<T> {
200    /// Creates new rle value encoder.
201    pub fn new() -> Self {
202        Self {
203            encoder: None,
204            _phantom: PhantomData,
205        }
206    }
207}
208
209impl<T: DataType> Encoder<T> for RleValueEncoder<T> {
210    #[inline]
211    fn put(&mut self, values: &[T::T]) -> Result<()> {
212        ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType");
213
214        let rle_encoder = self.encoder.get_or_insert_with(|| {
215            let mut buffer = Vec::with_capacity(DEFAULT_RLE_BUFFER_LEN);
216            // Reserve space for length
217            buffer.extend_from_slice(&[0; 4]);
218            RleEncoder::new_from_buf(1, buffer)
219        });
220
221        for value in values {
222            let value = value.as_u64()?;
223            rle_encoder.put(value)
224        }
225        Ok(())
226    }
227
228    // Performance Note:
229    // As far as can be seen these functions are rarely called and as such we can hint to the
230    // compiler that they dont need to be folded into hot locations in the final output.
231    #[cold]
232    fn encoding(&self) -> Encoding {
233        Encoding::RLE
234    }
235
236    #[inline]
237    fn estimated_data_encoded_size(&self) -> usize {
238        match self.encoder {
239            Some(ref enc) => enc.len(),
240            None => 0,
241        }
242    }
243
244    #[inline]
245    fn flush_buffer(&mut self) -> Result<Bytes> {
246        ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType");
247        let rle_encoder = self
248            .encoder
249            .take()
250            .expect("RLE value encoder is not initialized");
251
252        // Flush all encoder buffers and raw values
253        let mut buf = rle_encoder.consume();
254        assert!(buf.len() >= 4, "should have had padding inserted");
255
256        // Note that buf does not have any offset, all data is encoded bytes
257        let len = (buf.len() - 4) as i32;
258        buf[..4].copy_from_slice(&len.to_le_bytes());
259
260        Ok(buf.into())
261    }
262
263    /// return the estimated memory size of this encoder.
264    fn estimated_memory_size(&self) -> usize {
265        self.encoder
266            .as_ref()
267            .map_or(0, |enc| enc.estimated_memory_size())
268    }
269}
270
271// ----------------------------------------------------------------------
272// DELTA_BINARY_PACKED encoding
273
274const MAX_PAGE_HEADER_WRITER_SIZE: usize = 32;
275const DEFAULT_BIT_WRITER_SIZE: usize = 1024 * 1024;
276const DEFAULT_NUM_MINI_BLOCKS: usize = 4;
277
278/// Delta bit packed encoder.
279/// Consists of a header followed by blocks of delta encoded values binary packed.
280///
281/// Delta-binary-packing:
282/// ```shell
283///   [page-header] [block 1], [block 2], ... [block N]
284/// ```
285///
286/// Each page header consists of:
287/// ```shell
288///   [block size] [number of miniblocks in a block] [total value count] [first value]
289/// ```
290///
291/// Each block consists of:
292/// ```shell
293///   [min delta] [list of bitwidths of miniblocks] [miniblocks]
294/// ```
295///
296/// Current implementation writes values in `put` method, multiple calls to `put` to
297/// existing block or start new block if block size is exceeded. Calling `flush_buffer`
298/// writes out all data and resets internal state, including page header.
299///
300/// Supports only INT32 and INT64.
301pub struct DeltaBitPackEncoder<T: DataType> {
302    page_header_writer: BitWriter,
303    bit_writer: BitWriter,
304    total_values: usize,
305    first_value: i64,
306    current_value: i64,
307    block_size: usize,
308    mini_block_size: usize,
309    num_mini_blocks: usize,
310    values_in_block: usize,
311    deltas: Vec<i64>,
312    _phantom: PhantomData<T>,
313}
314
315impl<T: DataType> Default for DeltaBitPackEncoder<T> {
316    fn default() -> Self {
317        Self::new()
318    }
319}
320
321impl<T: DataType> DeltaBitPackEncoder<T> {
322    /// Creates new delta bit packed encoder.
323    pub fn new() -> Self {
324        Self::assert_supported_type();
325
326        // Size miniblocks so that they can be efficiently decoded
327        let mini_block_size = match T::T::PHYSICAL_TYPE {
328            Type::INT32 => 32,
329            Type::INT64 => 64,
330            _ => unreachable!(),
331        };
332
333        let num_mini_blocks = DEFAULT_NUM_MINI_BLOCKS;
334        let block_size = mini_block_size * num_mini_blocks;
335        assert_eq!(block_size % 128, 0);
336
337        DeltaBitPackEncoder {
338            page_header_writer: BitWriter::new(MAX_PAGE_HEADER_WRITER_SIZE),
339            bit_writer: BitWriter::new(DEFAULT_BIT_WRITER_SIZE),
340            total_values: 0,
341            first_value: 0,
342            current_value: 0, // current value to keep adding deltas
343            block_size,       // can write fewer values than block size for last block
344            mini_block_size,
345            num_mini_blocks,
346            values_in_block: 0, // will be at most block_size
347            deltas: vec![0; block_size],
348            _phantom: PhantomData,
349        }
350    }
351
352    /// Writes page header for blocks, this method is invoked when we are done encoding
353    /// values. It is also okay to encode when no values have been provided
354    fn write_page_header(&mut self) {
355        // We ignore the result of each 'put' operation, because
356        // MAX_PAGE_HEADER_WRITER_SIZE is chosen to fit all header values and
357        // guarantees that writes will not fail.
358
359        // Write the size of each block
360        self.page_header_writer.put_vlq_int(self.block_size as u64);
361        // Write the number of mini blocks
362        self.page_header_writer
363            .put_vlq_int(self.num_mini_blocks as u64);
364        // Write the number of all values (including non-encoded first value)
365        self.page_header_writer
366            .put_vlq_int(self.total_values as u64);
367        // Write first value
368        self.page_header_writer.put_zigzag_vlq_int(self.first_value);
369    }
370
371    // Write current delta buffer (<= 'block size' values) into bit writer
372    #[inline(never)]
373    fn flush_block_values(&mut self) -> Result<()> {
374        if self.values_in_block == 0 {
375            return Ok(());
376        }
377
378        let mut min_delta = i64::MAX;
379        for i in 0..self.values_in_block {
380            min_delta = cmp::min(min_delta, self.deltas[i]);
381        }
382
383        // Write min delta
384        self.bit_writer.put_zigzag_vlq_int(min_delta);
385
386        // Slice to store bit width for each mini block
387        let offset = self.bit_writer.skip(self.num_mini_blocks);
388
389        for i in 0..self.num_mini_blocks {
390            // Find how many values we need to encode - either block size or whatever
391            // values left
392            let n = cmp::min(self.mini_block_size, self.values_in_block);
393            if n == 0 {
394                // Decoders should be agnostic to the padding value, we therefore use 0xFF
395                // when running tests. However, not all implementations may handle this correctly
396                // so pad with 0 when not running tests
397                let pad_value = cfg!(test).then(|| 0xFF).unwrap_or(0);
398                for j in i..self.num_mini_blocks {
399                    self.bit_writer.write_at(offset + j, pad_value);
400                }
401                break;
402            }
403
404            // Compute the max delta in current mini block
405            let mut max_delta = i64::MIN;
406            for j in 0..n {
407                max_delta = cmp::max(max_delta, self.deltas[i * self.mini_block_size + j]);
408            }
409
410            // Compute bit width to store (max_delta - min_delta)
411            let bit_width = num_required_bits(self.subtract_u64(max_delta, min_delta)) as usize;
412            self.bit_writer.write_at(offset + i, bit_width as u8);
413
414            // Encode values in current mini block using min_delta and bit_width
415            for j in 0..n {
416                let packed_value =
417                    self.subtract_u64(self.deltas[i * self.mini_block_size + j], min_delta);
418                self.bit_writer.put_value(packed_value, bit_width);
419            }
420
421            // Pad the last block (n < mini_block_size)
422            for _ in n..self.mini_block_size {
423                self.bit_writer.put_value(0, bit_width);
424            }
425
426            self.values_in_block -= n;
427        }
428
429        assert_eq!(
430            self.values_in_block, 0,
431            "Expected 0 values in block, found {}",
432            self.values_in_block
433        );
434        Ok(())
435    }
436}
437
438// Implementation is shared between Int32Type and Int64Type,
439// see `DeltaBitPackEncoderConversion` below for specifics.
440impl<T: DataType> Encoder<T> for DeltaBitPackEncoder<T> {
441    fn put(&mut self, values: &[T::T]) -> Result<()> {
442        if values.is_empty() {
443            return Ok(());
444        }
445
446        // Define values to encode, initialize state
447        let mut idx = if self.total_values == 0 {
448            self.first_value = self.as_i64(values, 0);
449            self.current_value = self.first_value;
450            1
451        } else {
452            0
453        };
454        // Add all values (including first value)
455        self.total_values += values.len();
456
457        // Write block
458        while idx < values.len() {
459            let value = self.as_i64(values, idx);
460            self.deltas[self.values_in_block] = self.subtract(value, self.current_value);
461            self.current_value = value;
462            idx += 1;
463            self.values_in_block += 1;
464            if self.values_in_block == self.block_size {
465                self.flush_block_values()?;
466            }
467        }
468        Ok(())
469    }
470
471    // Performance Note:
472    // As far as can be seen these functions are rarely called and as such we can hint to the
473    // compiler that they dont need to be folded into hot locations in the final output.
474    #[cold]
475    fn encoding(&self) -> Encoding {
476        Encoding::DELTA_BINARY_PACKED
477    }
478
479    fn estimated_data_encoded_size(&self) -> usize {
480        self.bit_writer.bytes_written()
481    }
482
483    fn flush_buffer(&mut self) -> Result<Bytes> {
484        // Write remaining values
485        self.flush_block_values()?;
486        // Write page header with total values
487        self.write_page_header();
488
489        let mut buffer = Vec::new();
490        buffer.extend_from_slice(self.page_header_writer.flush_buffer());
491        buffer.extend_from_slice(self.bit_writer.flush_buffer());
492
493        // Reset state
494        self.page_header_writer.clear();
495        self.bit_writer.clear();
496        self.total_values = 0;
497        self.first_value = 0;
498        self.current_value = 0;
499        self.values_in_block = 0;
500
501        Ok(buffer.into())
502    }
503
504    /// return the estimated memory size of this encoder.
505    fn estimated_memory_size(&self) -> usize {
506        self.page_header_writer.estimated_memory_size()
507            + self.bit_writer.estimated_memory_size()
508            + self.deltas.capacity() * std::mem::size_of::<i64>()
509            + std::mem::size_of::<Self>()
510    }
511}
512
513/// Helper trait to define specific conversions and subtractions when computing deltas
514trait DeltaBitPackEncoderConversion<T: DataType> {
515    // Method should panic if type is not supported, otherwise no-op
516    fn assert_supported_type();
517
518    fn as_i64(&self, values: &[T::T], index: usize) -> i64;
519
520    fn subtract(&self, left: i64, right: i64) -> i64;
521
522    fn subtract_u64(&self, left: i64, right: i64) -> u64;
523}
524
525impl<T: DataType> DeltaBitPackEncoderConversion<T> for DeltaBitPackEncoder<T> {
526    #[inline]
527    fn assert_supported_type() {
528        ensure_phys_ty!(
529            Type::INT32 | Type::INT64,
530            "DeltaBitPackDecoder only supports Int32Type and Int64Type"
531        );
532    }
533
534    #[inline]
535    fn as_i64(&self, values: &[T::T], index: usize) -> i64 {
536        values[index]
537            .as_i64()
538            .expect("DeltaBitPackDecoder only supports Int32Type and Int64Type")
539    }
540
541    #[inline]
542    fn subtract(&self, left: i64, right: i64) -> i64 {
543        // It is okay for values to overflow, wrapping_sub wrapping around at the boundary
544        match T::get_physical_type() {
545            Type::INT32 => (left as i32).wrapping_sub(right as i32) as i64,
546            Type::INT64 => left.wrapping_sub(right),
547            _ => panic!("DeltaBitPackDecoder only supports Int32Type and Int64Type"),
548        }
549    }
550
551    #[inline]
552    fn subtract_u64(&self, left: i64, right: i64) -> u64 {
553        match T::get_physical_type() {
554            // Conversion of i32 -> u32 -> u64 is to avoid non-zero left most bytes in int repr
555            Type::INT32 => (left as i32).wrapping_sub(right as i32) as u32 as u64,
556            Type::INT64 => left.wrapping_sub(right) as u64,
557            _ => panic!("DeltaBitPackDecoder only supports Int32Type and Int64Type"),
558        }
559    }
560}
561
562// ----------------------------------------------------------------------
563// DELTA_LENGTH_BYTE_ARRAY encoding
564
565/// Encoding for byte arrays to separate the length values and the data.
566/// The lengths are encoded using DELTA_BINARY_PACKED encoding, data is
567/// stored as raw bytes.
568pub struct DeltaLengthByteArrayEncoder<T: DataType> {
569    // length encoder
570    len_encoder: DeltaBitPackEncoder<Int32Type>,
571    // byte array data
572    data: Vec<ByteArray>,
573    // data size in bytes of encoded values
574    encoded_size: usize,
575    _phantom: PhantomData<T>,
576}
577
578impl<T: DataType> Default for DeltaLengthByteArrayEncoder<T> {
579    fn default() -> Self {
580        Self::new()
581    }
582}
583
584impl<T: DataType> DeltaLengthByteArrayEncoder<T> {
585    /// Creates new delta length byte array encoder.
586    pub fn new() -> Self {
587        Self {
588            len_encoder: DeltaBitPackEncoder::new(),
589            data: vec![],
590            encoded_size: 0,
591            _phantom: PhantomData,
592        }
593    }
594}
595
596impl<T: DataType> Encoder<T> for DeltaLengthByteArrayEncoder<T> {
597    fn put(&mut self, values: &[T::T]) -> Result<()> {
598        ensure_phys_ty!(
599            Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY,
600            "DeltaLengthByteArrayEncoder only supports ByteArrayType"
601        );
602
603        let val_it = || {
604            values
605                .iter()
606                .map(|x| x.as_any().downcast_ref::<ByteArray>().unwrap())
607        };
608
609        let lengths: Vec<i32> = val_it().map(|byte_array| byte_array.len() as i32).collect();
610        self.len_encoder.put(&lengths)?;
611        for byte_array in val_it() {
612            self.encoded_size += byte_array.len();
613            self.data.push(byte_array.clone());
614        }
615
616        Ok(())
617    }
618
619    // Performance Note:
620    // As far as can be seen these functions are rarely called and as such we can hint to the
621    // compiler that they dont need to be folded into hot locations in the final output.
622    #[cold]
623    fn encoding(&self) -> Encoding {
624        Encoding::DELTA_LENGTH_BYTE_ARRAY
625    }
626
627    fn estimated_data_encoded_size(&self) -> usize {
628        self.len_encoder.estimated_data_encoded_size() + self.encoded_size
629    }
630
631    fn flush_buffer(&mut self) -> Result<Bytes> {
632        ensure_phys_ty!(
633            Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY,
634            "DeltaLengthByteArrayEncoder only supports ByteArrayType"
635        );
636
637        let mut total_bytes = vec![];
638        let lengths = self.len_encoder.flush_buffer()?;
639        total_bytes.extend_from_slice(&lengths);
640        self.data.iter().for_each(|byte_array| {
641            total_bytes.extend_from_slice(byte_array.data());
642        });
643        self.data.clear();
644        self.encoded_size = 0;
645
646        Ok(total_bytes.into())
647    }
648
649    /// return the estimated memory size of this encoder.
650    fn estimated_memory_size(&self) -> usize {
651        self.len_encoder.estimated_memory_size() + self.data.len() + std::mem::size_of::<Self>()
652    }
653}
654
655// ----------------------------------------------------------------------
656// DELTA_BYTE_ARRAY encoding
657
658/// Encoding for byte arrays, prefix lengths are encoded using DELTA_BINARY_PACKED
659/// encoding, followed by suffixes with DELTA_LENGTH_BYTE_ARRAY encoding.
660pub struct DeltaByteArrayEncoder<T: DataType> {
661    prefix_len_encoder: DeltaBitPackEncoder<Int32Type>,
662    suffix_writer: DeltaLengthByteArrayEncoder<ByteArrayType>,
663    previous: Vec<u8>,
664    _phantom: PhantomData<T>,
665}
666
667impl<T: DataType> Default for DeltaByteArrayEncoder<T> {
668    fn default() -> Self {
669        Self::new()
670    }
671}
672
673impl<T: DataType> DeltaByteArrayEncoder<T> {
674    /// Creates new delta byte array encoder.
675    pub fn new() -> Self {
676        Self {
677            prefix_len_encoder: DeltaBitPackEncoder::new(),
678            suffix_writer: DeltaLengthByteArrayEncoder::new(),
679            previous: vec![],
680            _phantom: PhantomData,
681        }
682    }
683}
684
685impl<T: DataType> Encoder<T> for DeltaByteArrayEncoder<T> {
686    fn put(&mut self, values: &[T::T]) -> Result<()> {
687        let mut prefix_lengths: Vec<i32> = vec![];
688        let mut suffixes: Vec<ByteArray> = vec![];
689
690        let values = values
691            .iter()
692            .map(|x| x.as_any())
693            .map(|x| match T::get_physical_type() {
694                Type::BYTE_ARRAY => x.downcast_ref::<ByteArray>().unwrap(),
695                Type::FIXED_LEN_BYTE_ARRAY => x.downcast_ref::<FixedLenByteArray>().unwrap(),
696                _ => panic!(
697                    "DeltaByteArrayEncoder only supports ByteArrayType and FixedLenByteArrayType"
698                ),
699            });
700
701        for byte_array in values {
702            let current = byte_array.data();
703            // Maximum prefix length that is shared between previous value and current
704            // value
705            let prefix_len = cmp::min(self.previous.len(), current.len());
706            let mut match_len = 0;
707            while match_len < prefix_len && self.previous[match_len] == current[match_len] {
708                match_len += 1;
709            }
710            prefix_lengths.push(match_len as i32);
711            suffixes.push(byte_array.slice(match_len, byte_array.len() - match_len));
712            // Update previous for the next prefix
713            self.previous.clear();
714            self.previous.extend_from_slice(current);
715        }
716        self.prefix_len_encoder.put(&prefix_lengths)?;
717        self.suffix_writer.put(&suffixes)?;
718
719        Ok(())
720    }
721
722    // Performance Note:
723    // As far as can be seen these functions are rarely called and as such we can hint to the
724    // compiler that they dont need to be folded into hot locations in the final output.
725    #[cold]
726    fn encoding(&self) -> Encoding {
727        Encoding::DELTA_BYTE_ARRAY
728    }
729
730    fn estimated_data_encoded_size(&self) -> usize {
731        self.prefix_len_encoder.estimated_data_encoded_size()
732            + self.suffix_writer.estimated_data_encoded_size()
733    }
734
735    fn flush_buffer(&mut self) -> Result<Bytes> {
736        match T::get_physical_type() {
737            Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
738                // TODO: investigate if we can merge lengths and suffixes
739                // without copying data into new vector.
740                let mut total_bytes = vec![];
741                // Insert lengths ...
742                let lengths = self.prefix_len_encoder.flush_buffer()?;
743                total_bytes.extend_from_slice(&lengths);
744                // ... followed by suffixes
745                let suffixes = self.suffix_writer.flush_buffer()?;
746                total_bytes.extend_from_slice(&suffixes);
747
748                self.previous.clear();
749                Ok(total_bytes.into())
750            }
751            _ => panic!(
752                "DeltaByteArrayEncoder only supports ByteArrayType and FixedLenByteArrayType"
753            ),
754        }
755    }
756
757    /// return the estimated memory size of this encoder.
758    fn estimated_memory_size(&self) -> usize {
759        self.prefix_len_encoder.estimated_memory_size()
760            + self.suffix_writer.estimated_memory_size()
761            + (self.previous.capacity() * std::mem::size_of::<u8>())
762    }
763}
764
765#[cfg(test)]
766mod tests {
767    use super::*;
768
769    use std::sync::Arc;
770
771    use crate::encodings::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder};
772    use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
773    use crate::util::bit_util;
774    use crate::util::test_common::rand_gen::{random_bytes, RandGen};
775
776    const TEST_SET_SIZE: usize = 1024;
777
778    #[test]
779    fn test_get_encoders() {
780        // supported encodings
781        create_and_check_encoder::<Int32Type>(0, Encoding::PLAIN, None);
782        create_and_check_encoder::<Int32Type>(0, Encoding::DELTA_BINARY_PACKED, None);
783        create_and_check_encoder::<Int32Type>(0, Encoding::DELTA_LENGTH_BYTE_ARRAY, None);
784        create_and_check_encoder::<Int32Type>(0, Encoding::DELTA_BYTE_ARRAY, None);
785        create_and_check_encoder::<BoolType>(0, Encoding::RLE, None);
786
787        // error when initializing
788        create_and_check_encoder::<Int32Type>(
789            0,
790            Encoding::RLE_DICTIONARY,
791            Some(general_err!(
792                "Cannot initialize this encoding through this function"
793            )),
794        );
795        create_and_check_encoder::<Int32Type>(
796            0,
797            Encoding::PLAIN_DICTIONARY,
798            Some(general_err!(
799                "Cannot initialize this encoding through this function"
800            )),
801        );
802
803        // unsupported
804        #[allow(deprecated)]
805        create_and_check_encoder::<Int32Type>(
806            0,
807            Encoding::BIT_PACKED,
808            Some(nyi_err!("Encoding BIT_PACKED is not supported")),
809        );
810    }
811
812    #[test]
813    fn test_bool() {
814        BoolType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
815        BoolType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
816        BoolType::test(Encoding::RLE, TEST_SET_SIZE, -1);
817    }
818
819    #[test]
820    fn test_i32() {
821        Int32Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
822        Int32Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
823        Int32Type::test(Encoding::DELTA_BINARY_PACKED, TEST_SET_SIZE, -1);
824        Int32Type::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
825    }
826
827    #[test]
828    fn test_i64() {
829        Int64Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
830        Int64Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
831        Int64Type::test(Encoding::DELTA_BINARY_PACKED, TEST_SET_SIZE, -1);
832        Int64Type::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
833    }
834
835    #[test]
836    fn test_i96() {
837        Int96Type::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
838        Int96Type::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
839    }
840
841    #[test]
842    fn test_float() {
843        FloatType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
844        FloatType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
845        FloatType::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
846    }
847
848    #[test]
849    fn test_double() {
850        DoubleType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
851        DoubleType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
852        DoubleType::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, -1);
853    }
854
855    #[test]
856    fn test_byte_array() {
857        ByteArrayType::test(Encoding::PLAIN, TEST_SET_SIZE, -1);
858        ByteArrayType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, -1);
859        ByteArrayType::test(Encoding::DELTA_LENGTH_BYTE_ARRAY, TEST_SET_SIZE, -1);
860        ByteArrayType::test(Encoding::DELTA_BYTE_ARRAY, TEST_SET_SIZE, -1);
861    }
862
863    #[test]
864    fn test_fixed_len_byte_array() {
865        FixedLenByteArrayType::test(Encoding::PLAIN, TEST_SET_SIZE, 100);
866        FixedLenByteArrayType::test(Encoding::PLAIN_DICTIONARY, TEST_SET_SIZE, 100);
867        FixedLenByteArrayType::test(Encoding::DELTA_BYTE_ARRAY, TEST_SET_SIZE, 100);
868        FixedLenByteArrayType::test(Encoding::BYTE_STREAM_SPLIT, TEST_SET_SIZE, 100);
869    }
870
871    #[test]
872    fn test_dict_encoded_size() {
873        fn run_test<T: DataType>(type_length: i32, values: &[T::T], expected_size: usize) {
874            let mut encoder = create_test_dict_encoder::<T>(type_length);
875            assert_eq!(encoder.dict_encoded_size(), 0);
876            encoder.put(values).unwrap();
877            assert_eq!(encoder.dict_encoded_size(), expected_size);
878            // We do not reset encoded size of the dictionary keys after flush_buffer
879            encoder.flush_buffer().unwrap();
880            assert_eq!(encoder.dict_encoded_size(), expected_size);
881        }
882
883        // Only 2 variations of values 1 byte each
884        run_test::<BoolType>(-1, &[true, false, true, false, true], 2);
885        run_test::<Int32Type>(-1, &[1i32, 2i32, 3i32, 4i32, 5i32], 20);
886        run_test::<Int64Type>(-1, &[1i64, 2i64, 3i64, 4i64, 5i64], 40);
887        run_test::<FloatType>(-1, &[1f32, 2f32, 3f32, 4f32, 5f32], 20);
888        run_test::<DoubleType>(-1, &[1f64, 2f64, 3f64, 4f64, 5f64], 40);
889        // Int96: len + reference
890        run_test::<Int96Type>(
891            -1,
892            &[Int96::from(vec![1, 2, 3]), Int96::from(vec![2, 3, 4])],
893            24,
894        );
895        run_test::<ByteArrayType>(-1, &[ByteArray::from("abcd"), ByteArray::from("efj")], 15);
896        run_test::<FixedLenByteArrayType>(
897            2,
898            &[ByteArray::from("ab").into(), ByteArray::from("bc").into()],
899            4,
900        );
901    }
902
903    #[test]
904    fn test_estimated_data_encoded_size() {
905        fn run_test<T: DataType>(
906            encoding: Encoding,
907            type_length: i32,
908            values: &[T::T],
909            initial_size: usize,
910            max_size: usize,
911            flush_size: usize,
912        ) {
913            let mut encoder = match encoding {
914                Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => {
915                    Box::new(create_test_dict_encoder::<T>(type_length))
916                }
917                _ => create_test_encoder::<T>(type_length, encoding),
918            };
919            assert_eq!(encoder.estimated_data_encoded_size(), initial_size);
920
921            encoder.put(values).unwrap();
922            assert_eq!(encoder.estimated_data_encoded_size(), max_size);
923
924            encoder.flush_buffer().unwrap();
925            assert_eq!(encoder.estimated_data_encoded_size(), flush_size);
926        }
927
928        // PLAIN
929        run_test::<Int32Type>(Encoding::PLAIN, -1, &[123; 1024], 0, 4096, 0);
930
931        // DICTIONARY
932        // NOTE: The final size is almost the same because the dictionary entries are
933        // preserved after encoded values have been written.
934        run_test::<Int32Type>(Encoding::RLE_DICTIONARY, -1, &[123, 1024], 0, 2, 0);
935
936        // DELTA_BINARY_PACKED
937        run_test::<Int32Type>(Encoding::DELTA_BINARY_PACKED, -1, &[123; 1024], 0, 35, 0);
938
939        // RLE
940        let mut values = vec![];
941        values.extend_from_slice(&[true; 16]);
942        values.extend_from_slice(&[false; 16]);
943        run_test::<BoolType>(Encoding::RLE, -1, &values, 0, 6, 0);
944
945        // DELTA_LENGTH_BYTE_ARRAY
946        run_test::<ByteArrayType>(
947            Encoding::DELTA_LENGTH_BYTE_ARRAY,
948            -1,
949            &[ByteArray::from("ab"), ByteArray::from("abc")],
950            0,
951            5, // only value bytes, length encoder is not flushed yet
952            0,
953        );
954
955        // DELTA_BYTE_ARRAY
956        run_test::<ByteArrayType>(
957            Encoding::DELTA_BYTE_ARRAY,
958            -1,
959            &[ByteArray::from("ab"), ByteArray::from("abc")],
960            0,
961            3, // only suffix bytes, length encoder is not flushed yet
962            0,
963        );
964
965        // BYTE_STREAM_SPLIT
966        run_test::<FloatType>(Encoding::BYTE_STREAM_SPLIT, -1, &[0.1, 0.2], 0, 8, 0);
967    }
968
969    #[test]
970    fn test_byte_stream_split_example_f32() {
971        // Test data from https://github.com/apache/parquet-format/blob/2a481fe1aad64ff770e21734533bb7ef5a057dac/Encodings.md#byte-stream-split-byte_stream_split--9
972        let mut encoder = create_test_encoder::<FloatType>(0, Encoding::BYTE_STREAM_SPLIT);
973        let mut decoder = create_test_decoder::<FloatType>(0, Encoding::BYTE_STREAM_SPLIT);
974
975        let input = vec![
976            f32::from_le_bytes([0xAA, 0xBB, 0xCC, 0xDD]),
977            f32::from_le_bytes([0x00, 0x11, 0x22, 0x33]),
978            f32::from_le_bytes([0xA3, 0xB4, 0xC5, 0xD6]),
979        ];
980
981        encoder.put(&input).unwrap();
982        let encoded = encoder.flush_buffer().unwrap();
983
984        assert_eq!(
985            encoded,
986            Bytes::from(vec![
987                0xAA_u8, 0x00, 0xA3, 0xBB, 0x11, 0xB4, 0xCC, 0x22, 0xC5, 0xDD, 0x33, 0xD6
988            ])
989        );
990
991        let mut decoded = vec![0.0; input.len()];
992        decoder.set_data(encoded, input.len()).unwrap();
993        decoder.get(&mut decoded).unwrap();
994
995        assert_eq!(decoded, input);
996    }
997
998    // See: https://github.com/sunchao/parquet-rs/issues/47
999    #[test]
1000    fn test_issue_47() {
1001        let mut encoder = create_test_encoder::<ByteArrayType>(0, Encoding::DELTA_BYTE_ARRAY);
1002        let mut decoder = create_test_decoder::<ByteArrayType>(0, Encoding::DELTA_BYTE_ARRAY);
1003
1004        let input = vec![
1005            ByteArray::from("aa"),
1006            ByteArray::from("aaa"),
1007            ByteArray::from("aa"),
1008            ByteArray::from("aaa"),
1009        ];
1010
1011        let mut output = vec![ByteArray::default(); input.len()];
1012
1013        let mut result = put_and_get(&mut encoder, &mut decoder, &input[..2], &mut output[..2]);
1014        assert!(
1015            result.is_ok(),
1016            "first put_and_get() failed with: {}",
1017            result.unwrap_err()
1018        );
1019        result = put_and_get(&mut encoder, &mut decoder, &input[2..], &mut output[2..]);
1020        assert!(
1021            result.is_ok(),
1022            "second put_and_get() failed with: {}",
1023            result.unwrap_err()
1024        );
1025        assert_eq!(output, input);
1026    }
1027
1028    trait EncodingTester<T: DataType> {
1029        fn test(enc: Encoding, total: usize, type_length: i32) {
1030            let result = match enc {
1031                Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => {
1032                    Self::test_dict_internal(total, type_length)
1033                }
1034                enc => Self::test_internal(enc, total, type_length),
1035            };
1036
1037            assert!(
1038                result.is_ok(),
1039                "Expected result to be OK but got err:\n {}",
1040                result.unwrap_err()
1041            );
1042        }
1043
1044        fn test_internal(enc: Encoding, total: usize, type_length: i32) -> Result<()>;
1045
1046        fn test_dict_internal(total: usize, type_length: i32) -> Result<()>;
1047    }
1048
1049    impl<T: DataType + RandGen<T>> EncodingTester<T> for T {
1050        fn test_internal(enc: Encoding, total: usize, type_length: i32) -> Result<()> {
1051            let mut encoder = create_test_encoder::<T>(type_length, enc);
1052            let mut decoder = create_test_decoder::<T>(type_length, enc);
1053            let mut values = <T as RandGen<T>>::gen_vec(type_length, total);
1054            let mut result_data = vec![T::T::default(); total];
1055
1056            // Test put/get spaced.
1057            let num_bytes = bit_util::ceil(total as i64, 8);
1058            let valid_bits = random_bytes(num_bytes as usize);
1059            let values_written = encoder.put_spaced(&values[..], &valid_bits[..])?;
1060            let data = encoder.flush_buffer()?;
1061            decoder.set_data(data, values_written)?;
1062            let _ = decoder.get_spaced(
1063                &mut result_data[..],
1064                values.len() - values_written,
1065                &valid_bits[..],
1066            )?;
1067
1068            // Check equality
1069            for i in 0..total {
1070                if bit_util::get_bit(&valid_bits[..], i) {
1071                    assert_eq!(result_data[i], values[i]);
1072                } else {
1073                    assert_eq!(result_data[i], T::T::default());
1074                }
1075            }
1076
1077            let mut actual_total = put_and_get(
1078                &mut encoder,
1079                &mut decoder,
1080                &values[..],
1081                &mut result_data[..],
1082            )?;
1083            assert_eq!(actual_total, total);
1084            assert_eq!(result_data, values);
1085
1086            // Encode more data after flush and test with decoder
1087
1088            values = <T as RandGen<T>>::gen_vec(type_length, total);
1089            actual_total = put_and_get(
1090                &mut encoder,
1091                &mut decoder,
1092                &values[..],
1093                &mut result_data[..],
1094            )?;
1095            assert_eq!(actual_total, total);
1096            assert_eq!(result_data, values);
1097
1098            Ok(())
1099        }
1100
1101        fn test_dict_internal(total: usize, type_length: i32) -> Result<()> {
1102            let mut encoder = create_test_dict_encoder::<T>(type_length);
1103            let mut values = <T as RandGen<T>>::gen_vec(type_length, total);
1104            encoder.put(&values[..])?;
1105
1106            let mut data = encoder.flush_buffer()?;
1107            let mut decoder = create_test_dict_decoder::<T>();
1108            let mut dict_decoder = PlainDecoder::<T>::new(type_length);
1109            dict_decoder.set_data(encoder.write_dict()?, encoder.num_entries())?;
1110            decoder.set_dict(Box::new(dict_decoder))?;
1111            let mut result_data = vec![T::T::default(); total];
1112            decoder.set_data(data, total)?;
1113            let mut actual_total = decoder.get(&mut result_data)?;
1114
1115            assert_eq!(actual_total, total);
1116            assert_eq!(result_data, values);
1117
1118            // Encode more data after flush and test with decoder
1119
1120            values = <T as RandGen<T>>::gen_vec(type_length, total);
1121            encoder.put(&values[..])?;
1122            data = encoder.flush_buffer()?;
1123
1124            let mut dict_decoder = PlainDecoder::<T>::new(type_length);
1125            dict_decoder.set_data(encoder.write_dict()?, encoder.num_entries())?;
1126            decoder.set_dict(Box::new(dict_decoder))?;
1127            decoder.set_data(data, total)?;
1128            actual_total = decoder.get(&mut result_data)?;
1129
1130            assert_eq!(actual_total, total);
1131            assert_eq!(result_data, values);
1132
1133            Ok(())
1134        }
1135    }
1136
1137    fn put_and_get<T: DataType>(
1138        encoder: &mut Box<dyn Encoder<T>>,
1139        decoder: &mut Box<dyn Decoder<T>>,
1140        input: &[T::T],
1141        output: &mut [T::T],
1142    ) -> Result<usize> {
1143        encoder.put(input)?;
1144        let data = encoder.flush_buffer()?;
1145        decoder.set_data(data, input.len())?;
1146        decoder.get(output)
1147    }
1148
1149    fn create_and_check_encoder<T: DataType>(
1150        type_length: i32,
1151        encoding: Encoding,
1152        err: Option<ParquetError>,
1153    ) {
1154        let desc = create_test_col_desc_ptr(type_length, T::get_physical_type());
1155        let encoder = get_encoder::<T>(encoding, &desc);
1156        match err {
1157            Some(parquet_error) => {
1158                assert_eq!(
1159                    encoder.err().unwrap().to_string(),
1160                    parquet_error.to_string()
1161                )
1162            }
1163            None => assert_eq!(encoder.unwrap().encoding(), encoding),
1164        }
1165    }
1166
1167    // Creates test column descriptor.
1168    fn create_test_col_desc_ptr(type_len: i32, t: Type) -> ColumnDescPtr {
1169        let ty = SchemaType::primitive_type_builder("t", t)
1170            .with_length(type_len)
1171            .build()
1172            .unwrap();
1173        Arc::new(ColumnDescriptor::new(
1174            Arc::new(ty),
1175            0,
1176            0,
1177            ColumnPath::new(vec![]),
1178        ))
1179    }
1180
1181    fn create_test_encoder<T: DataType>(type_len: i32, enc: Encoding) -> Box<dyn Encoder<T>> {
1182        let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
1183        get_encoder(enc, &desc).unwrap()
1184    }
1185
1186    fn create_test_decoder<T: DataType>(type_len: i32, enc: Encoding) -> Box<dyn Decoder<T>> {
1187        let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
1188        get_decoder(desc, enc).unwrap()
1189    }
1190
1191    fn create_test_dict_encoder<T: DataType>(type_len: i32) -> DictEncoder<T> {
1192        let desc = create_test_col_desc_ptr(type_len, T::get_physical_type());
1193        DictEncoder::<T>::new(desc)
1194    }
1195
1196    fn create_test_dict_decoder<T: DataType>() -> DictDecoder<T> {
1197        DictDecoder::<T>::new()
1198    }
1199}