parquet/encodings/encoding/
byte_stream_split_encoder.rs1use crate::basic::{Encoding, Type};
19use crate::data_type::{AsBytes, DataType, SliceAsBytes};
20
21use crate::errors::{ParquetError, Result};
22
23use super::Encoder;
24
25use bytes::{BufMut, Bytes};
26use std::cmp;
27use std::marker::PhantomData;
28
29pub struct ByteStreamSplitEncoder<T> {
30 buffer: Vec<u8>,
31 _p: PhantomData<T>,
32}
33
34impl<T: DataType> ByteStreamSplitEncoder<T> {
35 pub(crate) fn new() -> Self {
36 Self {
37 buffer: Vec::new(),
38 _p: PhantomData,
39 }
40 }
41}
42
43fn split_streams_const<const TYPE_SIZE: usize>(src: &[u8], dst: &mut [u8]) {
48 let stride = src.len() / TYPE_SIZE;
49 for i in 0..stride {
50 for j in 0..TYPE_SIZE {
51 dst[i + j * stride] = src[i * TYPE_SIZE + j];
52 }
53 }
54}
55
56fn split_streams_variable(src: &[u8], dst: &mut [u8], type_size: usize) {
58 const BLOCK_SIZE: usize = 4;
59 let stride = src.len() / type_size;
60 for j in (0..type_size).step_by(BLOCK_SIZE) {
61 let jrange = cmp::min(BLOCK_SIZE, type_size - j);
62 for i in 0..stride {
63 for jj in 0..jrange {
64 dst[i + (j + jj) * stride] = src[i * type_size + j + jj];
65 }
66 }
67 }
68}
69
70impl<T: DataType> Encoder<T> for ByteStreamSplitEncoder<T> {
71 fn put(&mut self, values: &[T::T]) -> Result<()> {
72 self.buffer
73 .extend(<T as DataType>::T::slice_as_bytes(values));
74
75 ensure_phys_ty!(
76 Type::FLOAT | Type::DOUBLE | Type::INT32 | Type::INT64,
77 "ByteStreamSplitEncoder does not support Int96, Boolean, or ByteArray types"
78 );
79
80 Ok(())
81 }
82
83 fn encoding(&self) -> Encoding {
84 Encoding::BYTE_STREAM_SPLIT
85 }
86
87 fn estimated_data_encoded_size(&self) -> usize {
88 self.buffer.len()
89 }
90
91 fn flush_buffer(&mut self) -> Result<Bytes> {
92 let mut encoded = vec![0; self.buffer.len()];
93 let type_size = T::get_type_size();
94 match type_size {
95 4 => split_streams_const::<4>(&self.buffer, &mut encoded),
96 8 => split_streams_const::<8>(&self.buffer, &mut encoded),
97 _ => {
98 return Err(general_err!(
99 "byte stream split unsupported for data types of size {} bytes",
100 type_size
101 ));
102 }
103 }
104
105 self.buffer.clear();
106 Ok(encoded.into())
107 }
108
109 fn estimated_memory_size(&self) -> usize {
111 self.buffer.capacity() * std::mem::size_of::<u8>()
112 }
113}
114
115pub struct VariableWidthByteStreamSplitEncoder<T> {
116 buffer: Vec<u8>,
117 type_width: usize,
118 _p: PhantomData<T>,
119}
120
121impl<T: DataType> VariableWidthByteStreamSplitEncoder<T> {
122 pub(crate) fn new(type_length: i32) -> Self {
123 Self {
124 buffer: Vec::new(),
125 type_width: type_length as usize,
126 _p: PhantomData,
127 }
128 }
129}
130
131fn put_fixed<T: DataType, const TYPE_SIZE: usize>(dst: &mut [u8], values: &[T::T]) {
132 let mut idx = 0;
133 values.iter().for_each(|x| {
134 let bytes = x.as_bytes();
135 if bytes.len() != TYPE_SIZE {
136 panic!(
137 "Mismatched FixedLenByteArray sizes: {} != {}",
138 bytes.len(),
139 TYPE_SIZE
140 );
141 }
142 dst[idx..(TYPE_SIZE + idx)].copy_from_slice(&bytes[..TYPE_SIZE]);
143 idx += TYPE_SIZE;
144 });
145}
146
147fn put_variable<T: DataType>(dst: &mut [u8], values: &[T::T], type_width: usize) {
148 let mut idx = 0;
149 values.iter().for_each(|x| {
150 let bytes = x.as_bytes();
151 if bytes.len() != type_width {
152 panic!(
153 "Mismatched FixedLenByteArray sizes: {} != {}",
154 bytes.len(),
155 type_width
156 );
157 }
158 dst[idx..idx + type_width].copy_from_slice(bytes);
159 idx += type_width;
160 });
161}
162
163impl<T: DataType> Encoder<T> for VariableWidthByteStreamSplitEncoder<T> {
164 fn put(&mut self, values: &[T::T]) -> Result<()> {
165 ensure_phys_ty!(
166 Type::FIXED_LEN_BYTE_ARRAY,
167 "VariableWidthByteStreamSplitEncoder only supports FixedLenByteArray types"
168 );
169
170 let idx = self.buffer.len();
173 let data_len = values.len() * self.type_width;
174 self.buffer.reserve(values.len() * self.type_width);
176 self.buffer.put_bytes(0_u8, data_len);
178 let out_buf = &mut self.buffer[idx..idx + data_len];
180
181 match self.type_width {
184 2 => put_fixed::<T, 2>(out_buf, values),
185 3 => put_fixed::<T, 3>(out_buf, values),
186 4 => put_fixed::<T, 4>(out_buf, values),
187 5 => put_fixed::<T, 5>(out_buf, values),
188 6 => put_fixed::<T, 6>(out_buf, values),
189 7 => put_fixed::<T, 7>(out_buf, values),
190 8 => put_fixed::<T, 8>(out_buf, values),
191 _ => put_variable::<T>(out_buf, values, self.type_width),
192 }
193
194 Ok(())
195 }
196
197 fn encoding(&self) -> Encoding {
198 Encoding::BYTE_STREAM_SPLIT
199 }
200
201 fn estimated_data_encoded_size(&self) -> usize {
202 self.buffer.len()
203 }
204
205 fn flush_buffer(&mut self) -> Result<Bytes> {
206 let mut encoded = vec![0; self.buffer.len()];
207 let type_size = match T::get_physical_type() {
208 Type::FIXED_LEN_BYTE_ARRAY => self.type_width,
209 _ => T::get_type_size(),
210 };
211 match type_size {
213 2 => split_streams_const::<2>(&self.buffer, &mut encoded),
214 3 => split_streams_const::<3>(&self.buffer, &mut encoded),
215 4 => split_streams_const::<4>(&self.buffer, &mut encoded),
216 5 => split_streams_const::<5>(&self.buffer, &mut encoded),
217 6 => split_streams_const::<6>(&self.buffer, &mut encoded),
218 7 => split_streams_const::<7>(&self.buffer, &mut encoded),
219 8 => split_streams_const::<8>(&self.buffer, &mut encoded),
220 _ => split_streams_variable(&self.buffer, &mut encoded, type_size),
221 }
222
223 self.buffer.clear();
224 Ok(encoded.into())
225 }
226
227 fn estimated_memory_size(&self) -> usize {
229 self.buffer.capacity() * std::mem::size_of::<u8>()
230 }
231}