1pub use thrift::protocol::TCompactOutputProtocol;
21use thrift::protocol::{
22 TFieldIdentifier, TInputProtocol, TListIdentifier, TMapIdentifier, TMessageIdentifier,
23 TOutputProtocol, TSetIdentifier, TStructIdentifier, TType,
24};
25
26pub trait TSerializable: Sized {
30 fn read_from_in_protocol<T: TInputProtocol>(i_prot: &mut T) -> thrift::Result<Self>;
32 fn write_to_out_protocol<T: TOutputProtocol>(&self, o_prot: &mut T) -> thrift::Result<()>;
34}
35
36pub(crate) struct TCompactSliceInputProtocol<'a> {
40 buf: &'a [u8],
41 last_read_field_id: i16,
43 read_field_id_stack: Vec<i16>,
45 pending_read_bool_value: Option<bool>,
49}
50
51impl<'a> TCompactSliceInputProtocol<'a> {
52 pub fn new(buf: &'a [u8]) -> Self {
53 Self {
54 buf,
55 last_read_field_id: 0,
56 read_field_id_stack: Vec::with_capacity(16),
57 pending_read_bool_value: None,
58 }
59 }
60
61 pub fn as_slice(&self) -> &'a [u8] {
62 self.buf
63 }
64
65 fn read_vlq(&mut self) -> thrift::Result<u64> {
66 let mut in_progress = 0;
67 let mut shift = 0;
68 loop {
69 let byte = self.read_byte()?;
70 in_progress |= ((byte & 0x7F) as u64).wrapping_shl(shift);
71 shift += 7;
72 if byte & 0x80 == 0 {
73 return Ok(in_progress);
74 }
75 }
76 }
77
78 fn read_zig_zag(&mut self) -> thrift::Result<i64> {
79 let val = self.read_vlq()?;
80 Ok((val >> 1) as i64 ^ -((val & 1) as i64))
81 }
82
83 fn read_list_set_begin(&mut self) -> thrift::Result<(TType, i32)> {
84 let header = self.read_byte()?;
85 let element_type = collection_u8_to_type(header & 0x0F)?;
86
87 let possible_element_count = (header & 0xF0) >> 4;
88 let element_count = if possible_element_count != 15 {
89 possible_element_count as i32
91 } else {
92 self.read_vlq()? as _
93 };
94
95 Ok((element_type, element_count))
96 }
97}
98
99macro_rules! thrift_unimplemented {
100 () => {
101 Err(thrift::Error::Protocol(thrift::ProtocolError {
102 kind: thrift::ProtocolErrorKind::NotImplemented,
103 message: "not implemented".to_string(),
104 }))
105 };
106}
107
108impl TInputProtocol for TCompactSliceInputProtocol<'_> {
109 fn read_message_begin(&mut self) -> thrift::Result<TMessageIdentifier> {
110 unimplemented!()
111 }
112
113 fn read_message_end(&mut self) -> thrift::Result<()> {
114 thrift_unimplemented!()
115 }
116
117 fn read_struct_begin(&mut self) -> thrift::Result<Option<TStructIdentifier>> {
118 self.read_field_id_stack.push(self.last_read_field_id);
119 self.last_read_field_id = 0;
120 Ok(None)
121 }
122
123 fn read_struct_end(&mut self) -> thrift::Result<()> {
124 self.last_read_field_id = self
125 .read_field_id_stack
126 .pop()
127 .expect("should have previous field ids");
128 Ok(())
129 }
130
131 fn read_field_begin(&mut self) -> thrift::Result<TFieldIdentifier> {
132 let field_type = self.read_byte()?;
136 let field_delta = (field_type & 0xF0) >> 4;
137 let field_type = match field_type & 0x0F {
138 0x01 => {
139 self.pending_read_bool_value = Some(true);
140 Ok(TType::Bool)
141 }
142 0x02 => {
143 self.pending_read_bool_value = Some(false);
144 Ok(TType::Bool)
145 }
146 ttu8 => u8_to_type(ttu8),
147 }?;
148
149 match field_type {
150 TType::Stop => Ok(
151 TFieldIdentifier::new::<Option<String>, String, Option<i16>>(
152 None,
153 TType::Stop,
154 None,
155 ),
156 ),
157 _ => {
158 if field_delta != 0 {
159 self.last_read_field_id = self
160 .last_read_field_id
161 .checked_add(field_delta as i16)
162 .map_or_else(
163 || {
164 Err(thrift::Error::Protocol(thrift::ProtocolError {
165 kind: thrift::ProtocolErrorKind::InvalidData,
166 message: format!(
167 "cannot add {} to {}",
168 field_delta, self.last_read_field_id
169 ),
170 }))
171 },
172 Ok,
173 )?;
174 } else {
175 self.last_read_field_id = self.read_i16()?;
176 };
177
178 Ok(TFieldIdentifier {
179 name: None,
180 field_type,
181 id: Some(self.last_read_field_id),
182 })
183 }
184 }
185 }
186
187 fn read_field_end(&mut self) -> thrift::Result<()> {
188 Ok(())
189 }
190
191 fn read_bool(&mut self) -> thrift::Result<bool> {
192 match self.pending_read_bool_value.take() {
193 Some(b) => Ok(b),
194 None => {
195 let b = self.read_byte()?;
196 match b {
201 0x01 => Ok(true),
202 0x00 | 0x02 => Ok(false),
203 unkn => Err(thrift::Error::Protocol(thrift::ProtocolError {
204 kind: thrift::ProtocolErrorKind::InvalidData,
205 message: format!("cannot convert {} into bool", unkn),
206 })),
207 }
208 }
209 }
210 }
211
212 fn read_bytes(&mut self) -> thrift::Result<Vec<u8>> {
213 let len = self.read_vlq()? as usize;
214 let ret = self.buf.get(..len).ok_or_else(eof_error)?.to_vec();
215 self.buf = &self.buf[len..];
216 Ok(ret)
217 }
218
219 fn read_i8(&mut self) -> thrift::Result<i8> {
220 Ok(self.read_byte()? as _)
221 }
222
223 fn read_i16(&mut self) -> thrift::Result<i16> {
224 Ok(self.read_zig_zag()? as _)
225 }
226
227 fn read_i32(&mut self) -> thrift::Result<i32> {
228 Ok(self.read_zig_zag()? as _)
229 }
230
231 fn read_i64(&mut self) -> thrift::Result<i64> {
232 self.read_zig_zag()
233 }
234
235 fn read_double(&mut self) -> thrift::Result<f64> {
236 let slice = (self.buf[..8]).try_into().unwrap();
237 self.buf = &self.buf[8..];
238 Ok(f64::from_le_bytes(slice))
239 }
240
241 fn read_string(&mut self) -> thrift::Result<String> {
242 let bytes = self.read_bytes()?;
243 String::from_utf8(bytes).map_err(From::from)
244 }
245
246 fn read_list_begin(&mut self) -> thrift::Result<TListIdentifier> {
247 let (element_type, element_count) = self.read_list_set_begin()?;
248 Ok(TListIdentifier::new(element_type, element_count))
249 }
250
251 fn read_list_end(&mut self) -> thrift::Result<()> {
252 Ok(())
253 }
254
255 fn read_set_begin(&mut self) -> thrift::Result<TSetIdentifier> {
256 thrift_unimplemented!()
257 }
258
259 fn read_set_end(&mut self) -> thrift::Result<()> {
260 thrift_unimplemented!()
261 }
262
263 fn read_map_begin(&mut self) -> thrift::Result<TMapIdentifier> {
264 thrift_unimplemented!()
265 }
266
267 fn read_map_end(&mut self) -> thrift::Result<()> {
268 Ok(())
269 }
270
271 #[inline]
272 fn read_byte(&mut self) -> thrift::Result<u8> {
273 let ret = *self.buf.first().ok_or_else(eof_error)?;
274 self.buf = &self.buf[1..];
275 Ok(ret)
276 }
277}
278
279fn collection_u8_to_type(b: u8) -> thrift::Result<TType> {
280 match b {
281 0x01 | 0x02 => Ok(TType::Bool),
287 o => u8_to_type(o),
288 }
289}
290
291fn u8_to_type(b: u8) -> thrift::Result<TType> {
292 match b {
293 0x00 => Ok(TType::Stop),
294 0x03 => Ok(TType::I08), 0x04 => Ok(TType::I16),
296 0x05 => Ok(TType::I32),
297 0x06 => Ok(TType::I64),
298 0x07 => Ok(TType::Double),
299 0x08 => Ok(TType::String),
300 0x09 => Ok(TType::List),
301 0x0A => Ok(TType::Set),
302 0x0B => Ok(TType::Map),
303 0x0C => Ok(TType::Struct),
304 unkn => Err(thrift::Error::Protocol(thrift::ProtocolError {
305 kind: thrift::ProtocolErrorKind::InvalidData,
306 message: format!("cannot convert {} into TType", unkn),
307 })),
308 }
309}
310
311fn eof_error() -> thrift::Error {
312 thrift::Error::Transport(thrift::TransportError {
313 kind: thrift::TransportErrorKind::EndOfFile,
314 message: "Unexpected EOF".to_string(),
315 })
316}
317
318#[cfg(test)]
319mod tests {
320 use crate::format::{BoundaryOrder, ColumnIndex};
321 use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
322
323 #[test]
324 pub fn read_boolean_list_field_type() {
325 let bytes = vec![0x19, 0x21, 2, 1, 0x19, 8, 0x19, 8, 0x15, 0, 0];
329
330 let mut protocol = TCompactSliceInputProtocol::new(bytes.as_slice());
331 let index = ColumnIndex::read_from_in_protocol(&mut protocol).unwrap();
332 let expected = ColumnIndex {
333 null_pages: vec![false, true],
334 min_values: vec![],
335 max_values: vec![],
336 boundary_order: BoundaryOrder::UNORDERED,
337 null_counts: None,
338 repetition_level_histograms: None,
339 definition_level_histograms: None,
340 };
341
342 assert_eq!(&index, &expected);
343 }
344
345 #[test]
346 pub fn read_boolean_list_alternative_encoding() {
347 let bytes = vec![0x19, 0x22, 0, 1, 0x19, 8, 0x19, 8, 0x15, 0, 0];
350
351 let mut protocol = TCompactSliceInputProtocol::new(bytes.as_slice());
352 let index = ColumnIndex::read_from_in_protocol(&mut protocol).unwrap();
353 let expected = ColumnIndex {
354 null_pages: vec![false, true],
355 min_values: vec![],
356 max_values: vec![],
357 boundary_order: BoundaryOrder::UNORDERED,
358 null_counts: None,
359 repetition_level_histograms: None,
360 definition_level_histograms: None,
361 };
362
363 assert_eq!(&index, &expected);
364 }
365}