parquet/encodings/encoding/
byte_stream_split_encoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::basic::{Encoding, Type};
19use crate::data_type::{AsBytes, DataType, SliceAsBytes};
20
21use crate::errors::{ParquetError, Result};
22
23use super::Encoder;
24
25use bytes::{BufMut, Bytes};
26use std::cmp;
27use std::marker::PhantomData;
28
29pub struct ByteStreamSplitEncoder<T> {
30    buffer: Vec<u8>,
31    _p: PhantomData<T>,
32}
33
34impl<T: DataType> ByteStreamSplitEncoder<T> {
35    pub(crate) fn new() -> Self {
36        Self {
37            buffer: Vec::new(),
38            _p: PhantomData,
39        }
40    }
41}
42
43// Here we assume src contains the full data (which it must, since we
44// can only know where to split the streams once all data is collected).
45// We iterate over the input bytes and write them to their strided output
46// byte locations.
47fn split_streams_const<const TYPE_SIZE: usize>(src: &[u8], dst: &mut [u8]) {
48    let stride = src.len() / TYPE_SIZE;
49    for i in 0..stride {
50        for j in 0..TYPE_SIZE {
51            dst[i + j * stride] = src[i * TYPE_SIZE + j];
52        }
53    }
54}
55
56// Like above, but type_size is not known at compile time.
57fn split_streams_variable(src: &[u8], dst: &mut [u8], type_size: usize) {
58    const BLOCK_SIZE: usize = 4;
59    let stride = src.len() / type_size;
60    for j in (0..type_size).step_by(BLOCK_SIZE) {
61        let jrange = cmp::min(BLOCK_SIZE, type_size - j);
62        for i in 0..stride {
63            for jj in 0..jrange {
64                dst[i + (j + jj) * stride] = src[i * type_size + j + jj];
65            }
66        }
67    }
68}
69
70impl<T: DataType> Encoder<T> for ByteStreamSplitEncoder<T> {
71    fn put(&mut self, values: &[T::T]) -> Result<()> {
72        self.buffer
73            .extend(<T as DataType>::T::slice_as_bytes(values));
74
75        ensure_phys_ty!(
76            Type::FLOAT | Type::DOUBLE | Type::INT32 | Type::INT64,
77            "ByteStreamSplitEncoder does not support Int96, Boolean, or ByteArray types"
78        );
79
80        Ok(())
81    }
82
83    fn encoding(&self) -> Encoding {
84        Encoding::BYTE_STREAM_SPLIT
85    }
86
87    fn estimated_data_encoded_size(&self) -> usize {
88        self.buffer.len()
89    }
90
91    fn flush_buffer(&mut self) -> Result<Bytes> {
92        let mut encoded = vec![0; self.buffer.len()];
93        let type_size = T::get_type_size();
94        match type_size {
95            4 => split_streams_const::<4>(&self.buffer, &mut encoded),
96            8 => split_streams_const::<8>(&self.buffer, &mut encoded),
97            _ => {
98                return Err(general_err!(
99                    "byte stream split unsupported for data types of size {} bytes",
100                    type_size
101                ));
102            }
103        }
104
105        self.buffer.clear();
106        Ok(encoded.into())
107    }
108
109    /// return the estimated memory size of this encoder.
110    fn estimated_memory_size(&self) -> usize {
111        self.buffer.capacity() * std::mem::size_of::<u8>()
112    }
113}
114
115pub struct VariableWidthByteStreamSplitEncoder<T> {
116    buffer: Vec<u8>,
117    type_width: usize,
118    _p: PhantomData<T>,
119}
120
121impl<T: DataType> VariableWidthByteStreamSplitEncoder<T> {
122    pub(crate) fn new(type_length: i32) -> Self {
123        Self {
124            buffer: Vec::new(),
125            type_width: type_length as usize,
126            _p: PhantomData,
127        }
128    }
129}
130
131fn put_fixed<T: DataType, const TYPE_SIZE: usize>(dst: &mut [u8], values: &[T::T]) {
132    let mut idx = 0;
133    values.iter().for_each(|x| {
134        let bytes = x.as_bytes();
135        if bytes.len() != TYPE_SIZE {
136            panic!(
137                "Mismatched FixedLenByteArray sizes: {} != {}",
138                bytes.len(),
139                TYPE_SIZE
140            );
141        }
142        dst[idx..(TYPE_SIZE + idx)].copy_from_slice(&bytes[..TYPE_SIZE]);
143        idx += TYPE_SIZE;
144    });
145}
146
147fn put_variable<T: DataType>(dst: &mut [u8], values: &[T::T], type_width: usize) {
148    let mut idx = 0;
149    values.iter().for_each(|x| {
150        let bytes = x.as_bytes();
151        if bytes.len() != type_width {
152            panic!(
153                "Mismatched FixedLenByteArray sizes: {} != {}",
154                bytes.len(),
155                type_width
156            );
157        }
158        dst[idx..idx + type_width].copy_from_slice(bytes);
159        idx += type_width;
160    });
161}
162
163impl<T: DataType> Encoder<T> for VariableWidthByteStreamSplitEncoder<T> {
164    fn put(&mut self, values: &[T::T]) -> Result<()> {
165        ensure_phys_ty!(
166            Type::FIXED_LEN_BYTE_ARRAY,
167            "VariableWidthByteStreamSplitEncoder only supports FixedLenByteArray types"
168        );
169
170        // FixedLenByteArray is implemented as ByteArray, so there may be gaps making
171        // slice_as_bytes untenable
172        let idx = self.buffer.len();
173        let data_len = values.len() * self.type_width;
174        // Ensure enough capacity for the new data
175        self.buffer.reserve(values.len() * self.type_width);
176        // ...and extend the size of buffer to allow direct access
177        self.buffer.put_bytes(0_u8, data_len);
178        // Get a slice of the buffer corresponding to the location of the new data
179        let out_buf = &mut self.buffer[idx..idx + data_len];
180
181        // Now copy `values` into the buffer. For `type_width` <= 8 use a fixed size when
182        // performing the copy as it is significantly faster.
183        match self.type_width {
184            2 => put_fixed::<T, 2>(out_buf, values),
185            3 => put_fixed::<T, 3>(out_buf, values),
186            4 => put_fixed::<T, 4>(out_buf, values),
187            5 => put_fixed::<T, 5>(out_buf, values),
188            6 => put_fixed::<T, 6>(out_buf, values),
189            7 => put_fixed::<T, 7>(out_buf, values),
190            8 => put_fixed::<T, 8>(out_buf, values),
191            _ => put_variable::<T>(out_buf, values, self.type_width),
192        }
193
194        Ok(())
195    }
196
197    fn encoding(&self) -> Encoding {
198        Encoding::BYTE_STREAM_SPLIT
199    }
200
201    fn estimated_data_encoded_size(&self) -> usize {
202        self.buffer.len()
203    }
204
205    fn flush_buffer(&mut self) -> Result<Bytes> {
206        let mut encoded = vec![0; self.buffer.len()];
207        let type_size = match T::get_physical_type() {
208            Type::FIXED_LEN_BYTE_ARRAY => self.type_width,
209            _ => T::get_type_size(),
210        };
211        // split_streams_const() is faster up to type_width == 8
212        match type_size {
213            2 => split_streams_const::<2>(&self.buffer, &mut encoded),
214            3 => split_streams_const::<3>(&self.buffer, &mut encoded),
215            4 => split_streams_const::<4>(&self.buffer, &mut encoded),
216            5 => split_streams_const::<5>(&self.buffer, &mut encoded),
217            6 => split_streams_const::<6>(&self.buffer, &mut encoded),
218            7 => split_streams_const::<7>(&self.buffer, &mut encoded),
219            8 => split_streams_const::<8>(&self.buffer, &mut encoded),
220            _ => split_streams_variable(&self.buffer, &mut encoded, type_size),
221        }
222
223        self.buffer.clear();
224        Ok(encoded.into())
225    }
226
227    /// return the estimated memory size of this encoder.
228    fn estimated_memory_size(&self) -> usize {
229        self.buffer.capacity() * std::mem::size_of::<u8>()
230    }
231}