parquet/encodings/decoding/
byte_stream_split_decoder.rs1use std::marker::PhantomData;
19
20use bytes::Bytes;
21
22use crate::basic::{Encoding, Type};
23use crate::data_type::private::ParquetValueType;
24use crate::data_type::{DataType, SliceAsBytes};
25use crate::errors::{ParquetError, Result};
26
27use super::Decoder;
28
29pub struct ByteStreamSplitDecoder<T: DataType> {
30 _phantom: PhantomData<T>,
31 encoded_bytes: Bytes,
32 total_num_values: usize,
33 values_decoded: usize,
34}
35
36impl<T: DataType> ByteStreamSplitDecoder<T> {
37 pub(crate) fn new() -> Self {
38 Self {
39 _phantom: PhantomData,
40 encoded_bytes: Bytes::new(),
41 total_num_values: 0,
42 values_decoded: 0,
43 }
44 }
45}
46
47fn join_streams_const<const TYPE_SIZE: usize>(
53 src: &[u8],
54 dst: &mut [u8],
55 stride: usize,
56 values_decoded: usize,
57) {
58 let sub_src = &src[values_decoded..];
59 for i in 0..dst.len() / TYPE_SIZE {
60 for j in 0..TYPE_SIZE {
61 dst[i * TYPE_SIZE + j] = sub_src[i + j * stride];
62 }
63 }
64}
65
66fn join_streams_variable(
68 src: &[u8],
69 dst: &mut [u8],
70 stride: usize,
71 type_size: usize,
72 values_decoded: usize,
73) {
74 let sub_src = &src[values_decoded..];
75 for i in 0..dst.len() / type_size {
76 for j in 0..type_size {
77 dst[i * type_size + j] = sub_src[i + j * stride];
78 }
79 }
80}
81
82impl<T: DataType> Decoder<T> for ByteStreamSplitDecoder<T> {
83 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
84 self.encoded_bytes = data;
85 self.total_num_values = num_values;
86 self.values_decoded = 0;
87
88 Ok(())
89 }
90
91 fn get(&mut self, buffer: &mut [<T as DataType>::T]) -> Result<usize> {
92 let total_remaining_values = self.values_left();
93 let num_values = buffer.len().min(total_remaining_values);
94 let buffer = &mut buffer[..num_values];
95
96 let raw_out_bytes = unsafe { <T as DataType>::T::slice_as_bytes_mut(buffer) };
98 let type_size = T::get_type_size();
99 let stride = self.encoded_bytes.len() / type_size;
100 match type_size {
101 4 => join_streams_const::<4>(
102 &self.encoded_bytes,
103 raw_out_bytes,
104 stride,
105 self.values_decoded,
106 ),
107 8 => join_streams_const::<8>(
108 &self.encoded_bytes,
109 raw_out_bytes,
110 stride,
111 self.values_decoded,
112 ),
113 _ => {
114 return Err(general_err!(
115 "byte stream split unsupported for data types of size {} bytes",
116 type_size
117 ));
118 }
119 }
120 self.values_decoded += num_values;
121
122 Ok(num_values)
123 }
124
125 fn values_left(&self) -> usize {
126 self.total_num_values - self.values_decoded
127 }
128
129 fn encoding(&self) -> Encoding {
130 Encoding::BYTE_STREAM_SPLIT
131 }
132
133 fn skip(&mut self, num_values: usize) -> Result<usize> {
134 let to_skip = usize::min(self.values_left(), num_values);
135 self.values_decoded += to_skip;
136 Ok(to_skip)
137 }
138}
139
140pub struct VariableWidthByteStreamSplitDecoder<T: DataType> {
141 _phantom: PhantomData<T>,
142 encoded_bytes: Bytes,
143 total_num_values: usize,
144 values_decoded: usize,
145 type_width: usize,
146}
147
148impl<T: DataType> VariableWidthByteStreamSplitDecoder<T> {
149 pub(crate) fn new(type_length: i32) -> Self {
150 Self {
151 _phantom: PhantomData,
152 encoded_bytes: Bytes::new(),
153 total_num_values: 0,
154 values_decoded: 0,
155 type_width: type_length as usize,
156 }
157 }
158}
159
160impl<T: DataType> Decoder<T> for VariableWidthByteStreamSplitDecoder<T> {
161 fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> {
162 if data.len() % self.type_width != 0 {
164 return Err(general_err!(
165 "Input data length is not a multiple of type width {}",
166 self.type_width
167 ));
168 }
169
170 match T::get_physical_type() {
171 Type::FIXED_LEN_BYTE_ARRAY => {
172 self.encoded_bytes = data;
173 self.total_num_values = num_values;
174 self.values_decoded = 0;
175 Ok(())
176 }
177 _ => Err(general_err!(
178 "VariableWidthByteStreamSplitDecoder only supports FixedLenByteArrayType"
179 )),
180 }
181 }
182
183 fn get(&mut self, buffer: &mut [<T as DataType>::T]) -> Result<usize> {
184 let total_remaining_values = self.values_left();
185 let num_values = buffer.len().min(total_remaining_values);
186 let buffer = &mut buffer[..num_values];
187 let type_size = self.type_width;
188
189 let mut tmp_vec = vec![0_u8; num_values * type_size];
192 let raw_out_bytes = tmp_vec.as_mut_slice();
193
194 let stride = self.encoded_bytes.len() / type_size;
195 match type_size {
196 2 => join_streams_const::<2>(
197 &self.encoded_bytes,
198 raw_out_bytes,
199 stride,
200 self.values_decoded,
201 ),
202 4 => join_streams_const::<4>(
203 &self.encoded_bytes,
204 raw_out_bytes,
205 stride,
206 self.values_decoded,
207 ),
208 8 => join_streams_const::<8>(
209 &self.encoded_bytes,
210 raw_out_bytes,
211 stride,
212 self.values_decoded,
213 ),
214 16 => join_streams_const::<16>(
215 &self.encoded_bytes,
216 raw_out_bytes,
217 stride,
218 self.values_decoded,
219 ),
220 _ => join_streams_variable(
221 &self.encoded_bytes,
222 raw_out_bytes,
223 stride,
224 type_size,
225 self.values_decoded,
226 ),
227 }
228 self.values_decoded += num_values;
229
230 let vec_with_data = std::mem::take(&mut tmp_vec);
232 let bytes_with_data = Bytes::from(vec_with_data);
234 for (i, bi) in buffer.iter_mut().enumerate().take(num_values) {
235 let data = bytes_with_data.slice(i * type_size..(i + 1) * type_size);
237 bi.set_from_bytes(data);
238 }
239
240 Ok(num_values)
241 }
242
243 fn values_left(&self) -> usize {
244 self.total_num_values - self.values_decoded
245 }
246
247 fn encoding(&self) -> Encoding {
248 Encoding::BYTE_STREAM_SPLIT
249 }
250
251 fn skip(&mut self, num_values: usize) -> Result<usize> {
252 let to_skip = usize::min(self.values_left(), num_values);
253 self.values_decoded += to_skip;
254 Ok(to_skip)
255 }
256}