parquet/
thrift.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Custom thrift definitions
19
20pub use thrift::protocol::TCompactOutputProtocol;
21use thrift::protocol::{
22    TFieldIdentifier, TInputProtocol, TListIdentifier, TMapIdentifier, TMessageIdentifier,
23    TOutputProtocol, TSetIdentifier, TStructIdentifier, TType,
24};
25
26/// Reads and writes the struct to Thrift protocols.
27///
28/// Unlike [`thrift::protocol::TSerializable`] this uses generics instead of trait objects
29pub trait TSerializable: Sized {
30    /// Reads the struct from the input Thrift protocol
31    fn read_from_in_protocol<T: TInputProtocol>(i_prot: &mut T) -> thrift::Result<Self>;
32    /// Writes the struct to the output Thrift protocol
33    fn write_to_out_protocol<T: TOutputProtocol>(&self, o_prot: &mut T) -> thrift::Result<()>;
34}
35
36/// A more performant implementation of [`TCompactInputProtocol`] that reads a slice
37///
38/// [`TCompactInputProtocol`]: thrift::protocol::TCompactInputProtocol
39pub(crate) struct TCompactSliceInputProtocol<'a> {
40    buf: &'a [u8],
41    // Identifier of the last field deserialized for a struct.
42    last_read_field_id: i16,
43    // Stack of the last read field ids (a new entry is added each time a nested struct is read).
44    read_field_id_stack: Vec<i16>,
45    // Boolean value for a field.
46    // Saved because boolean fields and their value are encoded in a single byte,
47    // and reading the field only occurs after the field id is read.
48    pending_read_bool_value: Option<bool>,
49}
50
51impl<'a> TCompactSliceInputProtocol<'a> {
52    pub fn new(buf: &'a [u8]) -> Self {
53        Self {
54            buf,
55            last_read_field_id: 0,
56            read_field_id_stack: Vec::with_capacity(16),
57            pending_read_bool_value: None,
58        }
59    }
60
61    pub fn as_slice(&self) -> &'a [u8] {
62        self.buf
63    }
64
65    fn read_vlq(&mut self) -> thrift::Result<u64> {
66        let mut in_progress = 0;
67        let mut shift = 0;
68        loop {
69            let byte = self.read_byte()?;
70            in_progress |= ((byte & 0x7F) as u64).wrapping_shl(shift);
71            shift += 7;
72            if byte & 0x80 == 0 {
73                return Ok(in_progress);
74            }
75        }
76    }
77
78    fn read_zig_zag(&mut self) -> thrift::Result<i64> {
79        let val = self.read_vlq()?;
80        Ok((val >> 1) as i64 ^ -((val & 1) as i64))
81    }
82
83    fn read_list_set_begin(&mut self) -> thrift::Result<(TType, i32)> {
84        let header = self.read_byte()?;
85        let element_type = collection_u8_to_type(header & 0x0F)?;
86
87        let possible_element_count = (header & 0xF0) >> 4;
88        let element_count = if possible_element_count != 15 {
89            // high bits set high if count and type encoded separately
90            possible_element_count as i32
91        } else {
92            self.read_vlq()? as _
93        };
94
95        Ok((element_type, element_count))
96    }
97}
98
99macro_rules! thrift_unimplemented {
100    () => {
101        Err(thrift::Error::Protocol(thrift::ProtocolError {
102            kind: thrift::ProtocolErrorKind::NotImplemented,
103            message: "not implemented".to_string(),
104        }))
105    };
106}
107
108impl TInputProtocol for TCompactSliceInputProtocol<'_> {
109    fn read_message_begin(&mut self) -> thrift::Result<TMessageIdentifier> {
110        unimplemented!()
111    }
112
113    fn read_message_end(&mut self) -> thrift::Result<()> {
114        thrift_unimplemented!()
115    }
116
117    fn read_struct_begin(&mut self) -> thrift::Result<Option<TStructIdentifier>> {
118        self.read_field_id_stack.push(self.last_read_field_id);
119        self.last_read_field_id = 0;
120        Ok(None)
121    }
122
123    fn read_struct_end(&mut self) -> thrift::Result<()> {
124        self.last_read_field_id = self
125            .read_field_id_stack
126            .pop()
127            .expect("should have previous field ids");
128        Ok(())
129    }
130
131    fn read_field_begin(&mut self) -> thrift::Result<TFieldIdentifier> {
132        // we can read at least one byte, which is:
133        // - the type
134        // - the field delta and the type
135        let field_type = self.read_byte()?;
136        let field_delta = (field_type & 0xF0) >> 4;
137        let field_type = match field_type & 0x0F {
138            0x01 => {
139                self.pending_read_bool_value = Some(true);
140                Ok(TType::Bool)
141            }
142            0x02 => {
143                self.pending_read_bool_value = Some(false);
144                Ok(TType::Bool)
145            }
146            ttu8 => u8_to_type(ttu8),
147        }?;
148
149        match field_type {
150            TType::Stop => Ok(
151                TFieldIdentifier::new::<Option<String>, String, Option<i16>>(
152                    None,
153                    TType::Stop,
154                    None,
155                ),
156            ),
157            _ => {
158                if field_delta != 0 {
159                    self.last_read_field_id = self
160                        .last_read_field_id
161                        .checked_add(field_delta as i16)
162                        .map_or_else(
163                            || {
164                                Err(thrift::Error::Protocol(thrift::ProtocolError {
165                                    kind: thrift::ProtocolErrorKind::InvalidData,
166                                    message: format!(
167                                        "cannot add {} to {}",
168                                        field_delta, self.last_read_field_id
169                                    ),
170                                }))
171                            },
172                            Ok,
173                        )?;
174                } else {
175                    self.last_read_field_id = self.read_i16()?;
176                };
177
178                Ok(TFieldIdentifier {
179                    name: None,
180                    field_type,
181                    id: Some(self.last_read_field_id),
182                })
183            }
184        }
185    }
186
187    fn read_field_end(&mut self) -> thrift::Result<()> {
188        Ok(())
189    }
190
191    fn read_bool(&mut self) -> thrift::Result<bool> {
192        match self.pending_read_bool_value.take() {
193            Some(b) => Ok(b),
194            None => {
195                let b = self.read_byte()?;
196                // Previous versions of the thrift specification said to use 0 and 1 inside collections,
197                // but that differed from existing implementations.
198                // The specification was updated in https://github.com/apache/thrift/commit/2c29c5665bc442e703480bb0ee60fe925ffe02e8.
199                // At least the go implementation seems to have followed the previously documented values.
200                match b {
201                    0x01 => Ok(true),
202                    0x00 | 0x02 => Ok(false),
203                    unkn => Err(thrift::Error::Protocol(thrift::ProtocolError {
204                        kind: thrift::ProtocolErrorKind::InvalidData,
205                        message: format!("cannot convert {} into bool", unkn),
206                    })),
207                }
208            }
209        }
210    }
211
212    fn read_bytes(&mut self) -> thrift::Result<Vec<u8>> {
213        let len = self.read_vlq()? as usize;
214        let ret = self.buf.get(..len).ok_or_else(eof_error)?.to_vec();
215        self.buf = &self.buf[len..];
216        Ok(ret)
217    }
218
219    fn read_i8(&mut self) -> thrift::Result<i8> {
220        Ok(self.read_byte()? as _)
221    }
222
223    fn read_i16(&mut self) -> thrift::Result<i16> {
224        Ok(self.read_zig_zag()? as _)
225    }
226
227    fn read_i32(&mut self) -> thrift::Result<i32> {
228        Ok(self.read_zig_zag()? as _)
229    }
230
231    fn read_i64(&mut self) -> thrift::Result<i64> {
232        self.read_zig_zag()
233    }
234
235    fn read_double(&mut self) -> thrift::Result<f64> {
236        let slice = (self.buf[..8]).try_into().unwrap();
237        self.buf = &self.buf[8..];
238        Ok(f64::from_le_bytes(slice))
239    }
240
241    fn read_string(&mut self) -> thrift::Result<String> {
242        let bytes = self.read_bytes()?;
243        String::from_utf8(bytes).map_err(From::from)
244    }
245
246    fn read_list_begin(&mut self) -> thrift::Result<TListIdentifier> {
247        let (element_type, element_count) = self.read_list_set_begin()?;
248        Ok(TListIdentifier::new(element_type, element_count))
249    }
250
251    fn read_list_end(&mut self) -> thrift::Result<()> {
252        Ok(())
253    }
254
255    fn read_set_begin(&mut self) -> thrift::Result<TSetIdentifier> {
256        thrift_unimplemented!()
257    }
258
259    fn read_set_end(&mut self) -> thrift::Result<()> {
260        thrift_unimplemented!()
261    }
262
263    fn read_map_begin(&mut self) -> thrift::Result<TMapIdentifier> {
264        thrift_unimplemented!()
265    }
266
267    fn read_map_end(&mut self) -> thrift::Result<()> {
268        Ok(())
269    }
270
271    #[inline]
272    fn read_byte(&mut self) -> thrift::Result<u8> {
273        let ret = *self.buf.first().ok_or_else(eof_error)?;
274        self.buf = &self.buf[1..];
275        Ok(ret)
276    }
277}
278
279fn collection_u8_to_type(b: u8) -> thrift::Result<TType> {
280    match b {
281        // For historical and compatibility reasons, a reader should be capable to deal with both cases.
282        // The only valid value in the original spec was 2, but due to an widespread implementation bug
283        // the defacto standard across large parts of the library became 1 instead.
284        // As a result, both values are now allowed.
285        // https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md#list-and-set
286        0x01 | 0x02 => Ok(TType::Bool),
287        o => u8_to_type(o),
288    }
289}
290
291fn u8_to_type(b: u8) -> thrift::Result<TType> {
292    match b {
293        0x00 => Ok(TType::Stop),
294        0x03 => Ok(TType::I08), // equivalent to TType::Byte
295        0x04 => Ok(TType::I16),
296        0x05 => Ok(TType::I32),
297        0x06 => Ok(TType::I64),
298        0x07 => Ok(TType::Double),
299        0x08 => Ok(TType::String),
300        0x09 => Ok(TType::List),
301        0x0A => Ok(TType::Set),
302        0x0B => Ok(TType::Map),
303        0x0C => Ok(TType::Struct),
304        unkn => Err(thrift::Error::Protocol(thrift::ProtocolError {
305            kind: thrift::ProtocolErrorKind::InvalidData,
306            message: format!("cannot convert {} into TType", unkn),
307        })),
308    }
309}
310
311fn eof_error() -> thrift::Error {
312    thrift::Error::Transport(thrift::TransportError {
313        kind: thrift::TransportErrorKind::EndOfFile,
314        message: "Unexpected EOF".to_string(),
315    })
316}
317
318#[cfg(test)]
319mod tests {
320    use crate::format::{BoundaryOrder, ColumnIndex};
321    use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
322
323    #[test]
324    pub fn read_boolean_list_field_type() {
325        // Boolean collection type encoded as 0x01, as used by this crate when writing.
326        // Values encoded as 1 (true) or 2 (false) as in the current version of the thrift
327        // documentation.
328        let bytes = vec![0x19, 0x21, 2, 1, 0x19, 8, 0x19, 8, 0x15, 0, 0];
329
330        let mut protocol = TCompactSliceInputProtocol::new(bytes.as_slice());
331        let index = ColumnIndex::read_from_in_protocol(&mut protocol).unwrap();
332        let expected = ColumnIndex {
333            null_pages: vec![false, true],
334            min_values: vec![],
335            max_values: vec![],
336            boundary_order: BoundaryOrder::UNORDERED,
337            null_counts: None,
338            repetition_level_histograms: None,
339            definition_level_histograms: None,
340        };
341
342        assert_eq!(&index, &expected);
343    }
344
345    #[test]
346    pub fn read_boolean_list_alternative_encoding() {
347        // Boolean collection type encoded as 0x02, as allowed by the spec.
348        // Values encoded as 1 (true) or 0 (false) as before the thrift documentation change on 2024-12-13.
349        let bytes = vec![0x19, 0x22, 0, 1, 0x19, 8, 0x19, 8, 0x15, 0, 0];
350
351        let mut protocol = TCompactSliceInputProtocol::new(bytes.as_slice());
352        let index = ColumnIndex::read_from_in_protocol(&mut protocol).unwrap();
353        let expected = ColumnIndex {
354            null_pages: vec![false, true],
355            min_values: vec![],
356            max_values: vec![],
357            boundary_order: BoundaryOrder::UNORDERED,
358            null_counts: None,
359            repetition_level_histograms: None,
360            definition_level_histograms: None,
361        };
362
363        assert_eq!(&index, &expected);
364    }
365}