Skip to main content

parquet/arrow/buffer/
offset_buffer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::buffer::bit_util::iter_set_bits_rev;
19use crate::arrow::record_reader::buffer::ValuesBuffer;
20use crate::errors::{ParquetError, Result};
21use crate::util::utf8::check_valid_utf8;
22use arrow_array::{ArrayRef, OffsetSizeTrait, make_array};
23use arrow_buffer::{ArrowNativeType, Buffer};
24use arrow_data::ArrayDataBuilder;
25use arrow_schema::DataType as ArrowType;
26
27/// A buffer of variable-sized byte arrays that can be converted into
28/// a corresponding [`ArrayRef`]
29#[derive(Debug)]
30pub struct OffsetBuffer<I: OffsetSizeTrait> {
31    pub offsets: Vec<I>,
32    pub values: Vec<u8>,
33}
34
35impl<I: OffsetSizeTrait> OffsetBuffer<I> {
36    /// Create a new `OffsetBuffer` with capacity for at least `capacity` elements
37    ///
38    /// Pre-allocates the offsets vector to avoid reallocations during reading.
39    /// The values vector is not pre-allocated as its size is unpredictable.
40    pub fn with_capacity(capacity: usize) -> Self {
41        let mut offsets = Vec::with_capacity(capacity + 1);
42        offsets.push(I::default());
43        Self {
44            offsets,
45            values: Vec::new(),
46        }
47    }
48
49    /// Returns the number of byte arrays in this buffer
50    pub fn len(&self) -> usize {
51        self.offsets.len() - 1
52    }
53
54    pub fn is_empty(&self) -> bool {
55        self.len() == 0
56    }
57
58    /// If `validate_utf8` this verifies that the first character of `data` is
59    /// the start of a UTF-8 codepoint
60    ///
61    /// Note: This does not verify that the entirety of `data` is valid
62    /// UTF-8. This should be done by calling [`Self::check_valid_utf8`] after
63    /// all data has been written
64    pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> {
65        if validate_utf8 {
66            if let Some(&b) = data.first() {
67                // A valid code-point iff it does not start with 0b10xxxxxx
68                // Bit-magic taken from `std::str::is_char_boundary`
69                if (b as i8) < -0x40 {
70                    return Err(ParquetError::General(
71                        "encountered non UTF-8 data".to_string(),
72                    ));
73                }
74            }
75        }
76
77        self.values.extend_from_slice(data);
78
79        let index_offset = I::from_usize(self.values.len())
80            .ok_or_else(|| general_err!("index overflow decoding byte array"))?;
81
82        self.offsets.push(index_offset);
83        Ok(())
84    }
85
86    /// Extends this buffer with a list of keys
87    ///
88    /// For each value `key` in `keys` this will insert
89    /// `&dict_values[dict_offsets[key]..dict_offsets[key+1]]`
90    ///
91    /// Note: This will validate offsets are valid
92    pub fn extend_from_dictionary<K: ArrowNativeType, V: ArrowNativeType>(
93        &mut self,
94        keys: &[K],
95        dict_offsets: &[V],
96        dict_values: &[u8],
97    ) -> Result<()> {
98        self.offsets.reserve(keys.len());
99
100        for key in keys {
101            let index = key.as_usize();
102            if index + 1 >= dict_offsets.len() {
103                return Err(general_err!(
104                    "dictionary key beyond bounds of dictionary: 0..{}",
105                    dict_offsets.len().saturating_sub(1)
106                ));
107            }
108            let start_offset = dict_offsets[index].as_usize();
109            let end_offset = dict_offsets[index + 1].as_usize();
110
111            // Dictionary values are verified when decoding dictionary page
112            self.values
113                .extend_from_slice(&dict_values[start_offset..end_offset]);
114            let index_offset = I::from_usize(self.values.len())
115                .ok_or_else(|| general_err!("index overflow decoding byte array"))?;
116            self.offsets.push(index_offset);
117        }
118        Ok(())
119    }
120
121    /// Validates that `&self.values[start_offset..]` is a valid UTF-8 sequence
122    ///
123    /// This MUST be combined with validating that the offsets start on a character
124    /// boundary, otherwise it would be possible for the values array to be a valid UTF-8
125    /// sequence, but not the individual string slices it contains
126    ///
127    /// [`Self::try_push`] can perform this validation check on insertion
128    pub fn check_valid_utf8(&self, start_offset: usize) -> Result<()> {
129        check_valid_utf8(&self.values.as_slice()[start_offset..])
130    }
131
132    /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer`
133    pub fn into_array(self, null_buffer: Option<Buffer>, data_type: ArrowType) -> ArrayRef {
134        let array_data_builder = ArrayDataBuilder::new(data_type)
135            .len(self.len())
136            .add_buffer(Buffer::from_vec(self.offsets))
137            .add_buffer(Buffer::from_vec(self.values))
138            .null_bit_buffer(null_buffer);
139
140        let data = match cfg!(debug_assertions) {
141            true => array_data_builder.build().unwrap(),
142            false => unsafe { array_data_builder.build_unchecked() },
143        };
144
145        make_array(data)
146    }
147}
148
149impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
150    fn with_capacity(capacity: usize) -> Self {
151        Self::with_capacity(capacity)
152    }
153
154    fn pad_nulls(
155        &mut self,
156        read_offset: usize,
157        values_read: usize,
158        levels_read: usize,
159        valid_mask: &[u8],
160    ) -> Result<()> {
161        if self.offsets.len() != read_offset + values_read + 1 {
162            return Err(general_err!(
163                "found inconsistent offsets while padding nulls: expected {} offsets, got {}",
164                read_offset + values_read + 1,
165                self.offsets.len()
166            ));
167        }
168        self.offsets
169            .resize(read_offset + levels_read + 1, I::default());
170
171        let offsets = &mut self.offsets;
172
173        let mut last_pos = read_offset + levels_read + 1;
174        let mut last_start_offset = I::from_usize(self.values.len()).unwrap();
175
176        let values_range = read_offset..read_offset + values_read;
177        for (value_pos, level_pos) in values_range
178            .clone()
179            .rev()
180            .zip(iter_set_bits_rev(valid_mask))
181        {
182            if level_pos < value_pos || level_pos >= last_pos {
183                return Err(general_err!("found corrupt level data while padding nulls"));
184            }
185
186            let end_offset = offsets[value_pos + 1];
187            let start_offset = offsets[value_pos];
188
189            // Fill in any nulls
190            for x in &mut offsets[level_pos + 1..last_pos] {
191                *x = end_offset;
192            }
193
194            if level_pos == value_pos {
195                return Ok(());
196            }
197
198            offsets[level_pos] = start_offset;
199            last_pos = level_pos;
200            last_start_offset = start_offset;
201        }
202
203        // Pad leading nulls up to `last_offset`
204        for x in &mut offsets[values_range.start + 1..last_pos] {
205            *x = last_start_offset
206        }
207        Ok(())
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use arrow_array::{Array, LargeStringArray, StringArray};
215
216    #[test]
217    fn test_offset_buffer_empty() {
218        let buffer = OffsetBuffer::<i32>::with_capacity(0);
219        let array = buffer.into_array(None, ArrowType::Utf8);
220        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
221        assert_eq!(strings.len(), 0);
222    }
223
224    #[test]
225    fn test_offset_buffer_append() {
226        let mut buffer = OffsetBuffer::<i64>::with_capacity(0);
227        buffer.try_push("hello".as_bytes(), true).unwrap();
228        buffer.try_push("bar".as_bytes(), true).unwrap();
229        buffer
230            .extend_from_dictionary(&[1, 3, 0, 2], &[0, 2, 4, 5, 6], "abcdef".as_bytes())
231            .unwrap();
232
233        let array = buffer.into_array(None, ArrowType::LargeUtf8);
234        let strings = array.as_any().downcast_ref::<LargeStringArray>().unwrap();
235        assert_eq!(
236            strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
237            vec!["hello", "bar", "cd", "f", "ab", "e"]
238        )
239    }
240
241    #[test]
242    fn test_offset_buffer() {
243        let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
244        for v in ["hello", "world", "cupcakes", "a", "b", "c"] {
245            buffer.try_push(v.as_bytes(), false).unwrap()
246        }
247        let split = std::mem::replace(&mut buffer, OffsetBuffer::with_capacity(0));
248
249        let array = split.into_array(None, ArrowType::Utf8);
250        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
251        assert_eq!(
252            strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
253            vec!["hello", "world", "cupcakes", "a", "b", "c"]
254        );
255
256        buffer.try_push("test".as_bytes(), false).unwrap();
257        let array = buffer.into_array(None, ArrowType::Utf8);
258        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
259        assert_eq!(
260            strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
261            vec!["test"]
262        );
263    }
264
265    #[test]
266    fn test_offset_buffer_pad_nulls() {
267        let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
268        let values = ["a", "b", "c", "def", "gh"];
269        for v in &values {
270            buffer.try_push(v.as_bytes(), false).unwrap()
271        }
272
273        let valid = [
274            true, false, false, true, false, true, false, true, true, false, false,
275        ];
276        let valid_mask = Buffer::from_iter(valid.iter().copied());
277
278        // Both trailing and leading nulls
279        buffer
280            .pad_nulls(1, values.len() - 1, valid.len() - 1, valid_mask.as_slice())
281            .unwrap();
282
283        let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
284        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
285        assert_eq!(
286            strings.iter().collect::<Vec<_>>(),
287            vec![
288                Some("a"),
289                None,
290                None,
291                Some("b"),
292                None,
293                Some("c"),
294                None,
295                Some("def"),
296                Some("gh"),
297                None,
298                None
299            ]
300        );
301    }
302
303    #[test]
304    fn test_utf8_validation() {
305        let valid_2_byte_utf8 = &[0b11001000, 0b10001000];
306        std::str::from_utf8(valid_2_byte_utf8).unwrap();
307        let valid_3_byte_utf8 = &[0b11101000, 0b10001000, 0b10001000];
308        std::str::from_utf8(valid_3_byte_utf8).unwrap();
309        let valid_4_byte_utf8 = &[0b11110010, 0b10101000, 0b10101001, 0b10100101];
310        std::str::from_utf8(valid_4_byte_utf8).unwrap();
311
312        let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
313        buffer.try_push(valid_2_byte_utf8, true).unwrap();
314        buffer.try_push(valid_3_byte_utf8, true).unwrap();
315        buffer.try_push(valid_4_byte_utf8, true).unwrap();
316
317        // Cannot append string starting with incomplete codepoint
318        buffer.try_push(&valid_2_byte_utf8[1..], true).unwrap_err();
319        buffer.try_push(&valid_3_byte_utf8[1..], true).unwrap_err();
320        buffer.try_push(&valid_3_byte_utf8[2..], true).unwrap_err();
321        buffer.try_push(&valid_4_byte_utf8[1..], true).unwrap_err();
322        buffer.try_push(&valid_4_byte_utf8[2..], true).unwrap_err();
323        buffer.try_push(&valid_4_byte_utf8[3..], true).unwrap_err();
324
325        // Can append data containing an incomplete codepoint
326        buffer.try_push(&[0b01111111, 0b10111111], true).unwrap();
327
328        assert_eq!(buffer.len(), 4);
329        assert_eq!(buffer.values.len(), 11);
330
331        buffer.try_push(valid_3_byte_utf8, true).unwrap();
332
333        // Should fail due to incomplete codepoint
334        buffer.check_valid_utf8(0).unwrap_err();
335
336        // After broken codepoint -> success
337        buffer.check_valid_utf8(11).unwrap();
338
339        // Fails if run from middle of codepoint
340        buffer.check_valid_utf8(12).unwrap_err();
341    }
342
343    #[test]
344    fn test_pad_nulls_corrupt_input_returns_err() {
345        // Corrupt input must produce a decode error rather than panicking.
346
347        // Offsets inconsistent with `values_read`: only one value was pushed,
348        // but three are claimed to have been read.
349        let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
350        buffer.try_push("a".as_bytes(), false).unwrap();
351        let valid_mask = Buffer::from_iter([true, false, false]);
352        let err = buffer
353            .pad_nulls(0, 3, 3, valid_mask.as_slice())
354            .unwrap_err();
355        assert!(
356            err.to_string().contains("inconsistent offsets"),
357            "unexpected error: {err}"
358        );
359
360        // Valid mask has fewer set bits than `values_read`, which previously
361        // tripped an assertion in the null-padding loop.
362        let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
363        for v in ["a", "b", "c"] {
364            buffer.try_push(v.as_bytes(), false).unwrap();
365        }
366        let valid_mask = Buffer::from_iter([true, false, false]);
367        let err = buffer
368            .pad_nulls(0, 3, 3, valid_mask.as_slice())
369            .unwrap_err();
370        assert!(
371            err.to_string().contains("corrupt level data"),
372            "unexpected error: {err}"
373        );
374    }
375
376    #[test]
377    fn test_pad_nulls_empty() {
378        let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
379        let valid_mask = Buffer::from_iter(std::iter::repeat_n(false, 9));
380        buffer.pad_nulls(0, 0, 9, valid_mask.as_slice()).unwrap();
381
382        let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
383        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
384
385        assert_eq!(strings.len(), 9);
386        assert!(strings.iter().all(|x| x.is_none()))
387    }
388}