parquet/encodings/decoding/
byte_stream_split_decoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::marker::PhantomData;
19
20use bytes::Bytes;
21
22use crate::basic::{Encoding, Type};
23use crate::data_type::private::ParquetValueType;
24use crate::data_type::{DataType, SliceAsBytes};
25use crate::errors::{ParquetError, Result};
26
27use super::Decoder;
28
29pub struct ByteStreamSplitDecoder<T: DataType> {
30    _phantom: PhantomData<T>,
31    encoded_bytes: Bytes,
32    total_num_values: usize,
33    values_decoded: usize,
34}
35
36impl<T: DataType> ByteStreamSplitDecoder<T> {
37    pub(crate) fn new() -> Self {
38        Self {
39            _phantom: PhantomData,
40            encoded_bytes: Bytes::new(),
41            total_num_values: 0,
42            values_decoded: 0,
43        }
44    }
45}
46
47// Here we assume src contains the full data (which it must, since we're
48// can only know where to split the streams once all data is collected),
49// but dst can be just a slice starting from the given index.
50// We iterate over the output bytes and fill them in from their strided
51// input byte locations.
52fn join_streams_const<const TYPE_SIZE: usize>(
53    src: &[u8],
54    dst: &mut [u8],
55    stride: usize,
56    values_decoded: usize,
57) {
58    let sub_src = &src[values_decoded..];
59    for i in 0..dst.len() / TYPE_SIZE {
60        for j in 0..TYPE_SIZE {
61            dst[i * TYPE_SIZE + j] = sub_src[i + j * stride];
62        }
63    }
64}
65
66// Like the above, but type_size is not known at compile time.
67fn join_streams_variable(
68    src: &[u8],
69    dst: &mut [u8],
70    stride: usize,
71    type_size: usize,
72    values_decoded: usize,
73) {
74    let sub_src = &src[values_decoded..];
75    for i in 0..dst.len() / type_size {
76        for j in 0..type_size {
77            dst[i * type_size + j] = sub_src[i + j * stride];
78        }
79    }
80}
81
82impl<T: DataType> Decoder<T> for ByteStreamSplitDecoder<T> {
83    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
84        self.encoded_bytes = data;
85        self.total_num_values = num_values;
86        self.values_decoded = 0;
87
88        Ok(())
89    }
90
91    fn get(&mut self, buffer: &mut [<T as DataType>::T]) -> Result<usize> {
92        let total_remaining_values = self.values_left();
93        let num_values = buffer.len().min(total_remaining_values);
94        let buffer = &mut buffer[..num_values];
95
96        // SAFETY: i/f32 and i/f64 has no constraints on their internal representation, so we can modify it as we want
97        let raw_out_bytes = unsafe { <T as DataType>::T::slice_as_bytes_mut(buffer) };
98        let type_size = T::get_type_size();
99        let stride = self.encoded_bytes.len() / type_size;
100        match type_size {
101            4 => join_streams_const::<4>(
102                &self.encoded_bytes,
103                raw_out_bytes,
104                stride,
105                self.values_decoded,
106            ),
107            8 => join_streams_const::<8>(
108                &self.encoded_bytes,
109                raw_out_bytes,
110                stride,
111                self.values_decoded,
112            ),
113            _ => {
114                return Err(general_err!(
115                    "byte stream split unsupported for data types of size {} bytes",
116                    type_size
117                ));
118            }
119        }
120        self.values_decoded += num_values;
121
122        Ok(num_values)
123    }
124
125    fn values_left(&self) -> usize {
126        self.total_num_values - self.values_decoded
127    }
128
129    fn encoding(&self) -> Encoding {
130        Encoding::BYTE_STREAM_SPLIT
131    }
132
133    fn skip(&mut self, num_values: usize) -> Result<usize> {
134        let to_skip = usize::min(self.values_left(), num_values);
135        self.values_decoded += to_skip;
136        Ok(to_skip)
137    }
138}
139
140pub struct VariableWidthByteStreamSplitDecoder<T: DataType> {
141    _phantom: PhantomData<T>,
142    encoded_bytes: Bytes,
143    total_num_values: usize,
144    values_decoded: usize,
145    type_width: usize,
146}
147
148impl<T: DataType> VariableWidthByteStreamSplitDecoder<T> {
149    pub(crate) fn new(type_length: i32) -> Self {
150        Self {
151            _phantom: PhantomData,
152            encoded_bytes: Bytes::new(),
153            total_num_values: 0,
154            values_decoded: 0,
155            type_width: type_length as usize,
156        }
157    }
158}
159
160impl<T: DataType> Decoder<T> for VariableWidthByteStreamSplitDecoder<T> {
161    fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
162        // Rough check that all data elements are the same length
163        if data.len() % self.type_width != 0 {
164            return Err(general_err!(
165                "Input data length is not a multiple of type width {}",
166                self.type_width
167            ));
168        }
169
170        match T::get_physical_type() {
171            Type::FIXED_LEN_BYTE_ARRAY => {
172                self.encoded_bytes = data;
173                self.total_num_values = num_values;
174                self.values_decoded = 0;
175                Ok(())
176            }
177            _ => Err(general_err!(
178                "VariableWidthByteStreamSplitDecoder only supports FixedLenByteArrayType"
179            )),
180        }
181    }
182
183    fn get(&mut self, buffer: &mut [<T as DataType>::T]) -> Result<usize> {
184        let total_remaining_values = self.values_left();
185        let num_values = buffer.len().min(total_remaining_values);
186        let buffer = &mut buffer[..num_values];
187        let type_size = self.type_width;
188
189        // Since this is FIXED_LEN_BYTE_ARRAY data, we can't use slice_as_bytes_mut. Instead we'll
190        // have to do some data copies.
191        let mut tmp_vec = vec![0_u8; num_values * type_size];
192        let raw_out_bytes = tmp_vec.as_mut_slice();
193
194        let stride = self.encoded_bytes.len() / type_size;
195        match type_size {
196            2 => join_streams_const::<2>(
197                &self.encoded_bytes,
198                raw_out_bytes,
199                stride,
200                self.values_decoded,
201            ),
202            4 => join_streams_const::<4>(
203                &self.encoded_bytes,
204                raw_out_bytes,
205                stride,
206                self.values_decoded,
207            ),
208            8 => join_streams_const::<8>(
209                &self.encoded_bytes,
210                raw_out_bytes,
211                stride,
212                self.values_decoded,
213            ),
214            16 => join_streams_const::<16>(
215                &self.encoded_bytes,
216                raw_out_bytes,
217                stride,
218                self.values_decoded,
219            ),
220            _ => join_streams_variable(
221                &self.encoded_bytes,
222                raw_out_bytes,
223                stride,
224                type_size,
225                self.values_decoded,
226            ),
227        }
228        self.values_decoded += num_values;
229
230        // create a buffer from the vec so far (and leave a new Vec in its place)
231        let vec_with_data = std::mem::take(&mut tmp_vec);
232        // convert Vec to Bytes (which is a ref counted wrapper)
233        let bytes_with_data = Bytes::from(vec_with_data);
234        for (i, bi) in buffer.iter_mut().enumerate().take(num_values) {
235            // Get a view into the data, without also copying the bytes
236            let data = bytes_with_data.slice(i * type_size..(i + 1) * type_size);
237            bi.set_from_bytes(data);
238        }
239
240        Ok(num_values)
241    }
242
243    fn values_left(&self) -> usize {
244        self.total_num_values - self.values_decoded
245    }
246
247    fn encoding(&self) -> Encoding {
248        Encoding::BYTE_STREAM_SPLIT
249    }
250
251    fn skip(&mut self, num_values: usize) -> Result<usize> {
252        let to_skip = usize::min(self.values_left(), num_values);
253        self.values_decoded += to_skip;
254        Ok(to_skip)
255    }
256}