Skip to main content

parquet/arrow/buffer/
offset_buffer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::buffer::bit_util::iter_set_bits_rev;
19use crate::arrow::record_reader::buffer::ValuesBuffer;
20use crate::errors::{ParquetError, Result};
21use crate::util::utf8::check_valid_utf8;
22use arrow_array::{ArrayRef, OffsetSizeTrait, make_array};
23use arrow_buffer::{ArrowNativeType, Buffer};
24use arrow_data::ArrayDataBuilder;
25use arrow_schema::DataType as ArrowType;
26
27/// A buffer of variable-sized byte arrays that can be converted into
28/// a corresponding [`ArrayRef`]
29#[derive(Debug)]
30pub struct OffsetBuffer<I: OffsetSizeTrait> {
31    pub offsets: Vec<I>,
32    pub values: Vec<u8>,
33}
34
35impl<I: OffsetSizeTrait> OffsetBuffer<I> {
36    /// Create a new `OffsetBuffer` with capacity for at least `capacity` elements
37    ///
38    /// Pre-allocates the offsets vector to avoid reallocations during reading.
39    /// The values vector is not pre-allocated as its size is unpredictable.
40    pub fn with_capacity(capacity: usize) -> Self {
41        let mut offsets = Vec::with_capacity(capacity + 1);
42        offsets.push(I::default());
43        Self {
44            offsets,
45            values: Vec::new(),
46        }
47    }
48
49    /// Returns the number of byte arrays in this buffer
50    pub fn len(&self) -> usize {
51        self.offsets.len() - 1
52    }
53
54    pub fn is_empty(&self) -> bool {
55        self.len() == 0
56    }
57
58    /// If `validate_utf8` this verifies that the first character of `data` is
59    /// the start of a UTF-8 codepoint
60    ///
61    /// Note: This does not verify that the entirety of `data` is valid
62    /// UTF-8. This should be done by calling [`Self::check_valid_utf8`] after
63    /// all data has been written
64    pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> {
65        if validate_utf8 {
66            if let Some(&b) = data.first() {
67                // A valid code-point iff it does not start with 0b10xxxxxx
68                // Bit-magic taken from `std::str::is_char_boundary`
69                if (b as i8) < -0x40 {
70                    return Err(ParquetError::General(
71                        "encountered non UTF-8 data".to_string(),
72                    ));
73                }
74            }
75        }
76
77        self.values.extend_from_slice(data);
78
79        let index_offset = I::from_usize(self.values.len())
80            .ok_or_else(|| general_err!("index overflow decoding byte array"))?;
81
82        self.offsets.push(index_offset);
83        Ok(())
84    }
85
86    /// Extends this buffer with a list of keys
87    ///
88    /// For each value `key` in `keys` this will insert
89    /// `&dict_values[dict_offsets[key]..dict_offsets[key+1]]`
90    ///
91    /// Note: This will validate offsets are valid
92    pub fn extend_from_dictionary<K: ArrowNativeType, V: ArrowNativeType>(
93        &mut self,
94        keys: &[K],
95        dict_offsets: &[V],
96        dict_values: &[u8],
97    ) -> Result<()> {
98        self.offsets.reserve(keys.len());
99
100        for key in keys {
101            let index = key.as_usize();
102            if index + 1 >= dict_offsets.len() {
103                return Err(general_err!(
104                    "dictionary key beyond bounds of dictionary: 0..{}",
105                    dict_offsets.len().saturating_sub(1)
106                ));
107            }
108            let start_offset = dict_offsets[index].as_usize();
109            let end_offset = dict_offsets[index + 1].as_usize();
110
111            // Dictionary values are verified when decoding dictionary page
112            self.values
113                .extend_from_slice(&dict_values[start_offset..end_offset]);
114            let index_offset = I::from_usize(self.values.len())
115                .ok_or_else(|| general_err!("index overflow decoding byte array"))?;
116            self.offsets.push(index_offset);
117        }
118        Ok(())
119    }
120
121    /// Validates that `&self.values[start_offset..]` is a valid UTF-8 sequence
122    ///
123    /// This MUST be combined with validating that the offsets start on a character
124    /// boundary, otherwise it would be possible for the values array to be a valid UTF-8
125    /// sequence, but not the individual string slices it contains
126    ///
127    /// [`Self::try_push`] can perform this validation check on insertion
128    pub fn check_valid_utf8(&self, start_offset: usize) -> Result<()> {
129        check_valid_utf8(&self.values.as_slice()[start_offset..])
130    }
131
132    /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer`
133    pub fn into_array(self, null_buffer: Option<Buffer>, data_type: ArrowType) -> ArrayRef {
134        let array_data_builder = ArrayDataBuilder::new(data_type)
135            .len(self.len())
136            .add_buffer(Buffer::from_vec(self.offsets))
137            .add_buffer(Buffer::from_vec(self.values))
138            .null_bit_buffer(null_buffer);
139
140        let data = match cfg!(debug_assertions) {
141            true => array_data_builder.build().unwrap(),
142            false => unsafe { array_data_builder.build_unchecked() },
143        };
144
145        make_array(data)
146    }
147}
148
149impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
150    fn with_capacity(capacity: usize) -> Self {
151        Self::with_capacity(capacity)
152    }
153
154    fn pad_nulls(
155        &mut self,
156        read_offset: usize,
157        values_read: usize,
158        levels_read: usize,
159        valid_mask: &[u8],
160    ) {
161        assert_eq!(self.offsets.len(), read_offset + values_read + 1);
162        self.offsets
163            .resize(read_offset + levels_read + 1, I::default());
164
165        let offsets = &mut self.offsets;
166
167        let mut last_pos = read_offset + levels_read + 1;
168        let mut last_start_offset = I::from_usize(self.values.len()).unwrap();
169
170        let values_range = read_offset..read_offset + values_read;
171        for (value_pos, level_pos) in values_range
172            .clone()
173            .rev()
174            .zip(iter_set_bits_rev(valid_mask))
175        {
176            assert!(level_pos >= value_pos);
177            assert!(level_pos < last_pos);
178
179            let end_offset = offsets[value_pos + 1];
180            let start_offset = offsets[value_pos];
181
182            // Fill in any nulls
183            for x in &mut offsets[level_pos + 1..last_pos] {
184                *x = end_offset;
185            }
186
187            if level_pos == value_pos {
188                return;
189            }
190
191            offsets[level_pos] = start_offset;
192            last_pos = level_pos;
193            last_start_offset = start_offset;
194        }
195
196        // Pad leading nulls up to `last_offset`
197        for x in &mut offsets[values_range.start + 1..last_pos] {
198            *x = last_start_offset
199        }
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use arrow_array::{Array, LargeStringArray, StringArray};
207
208    #[test]
209    fn test_offset_buffer_empty() {
210        let buffer = OffsetBuffer::<i32>::with_capacity(0);
211        let array = buffer.into_array(None, ArrowType::Utf8);
212        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
213        assert_eq!(strings.len(), 0);
214    }
215
216    #[test]
217    fn test_offset_buffer_append() {
218        let mut buffer = OffsetBuffer::<i64>::with_capacity(0);
219        buffer.try_push("hello".as_bytes(), true).unwrap();
220        buffer.try_push("bar".as_bytes(), true).unwrap();
221        buffer
222            .extend_from_dictionary(&[1, 3, 0, 2], &[0, 2, 4, 5, 6], "abcdef".as_bytes())
223            .unwrap();
224
225        let array = buffer.into_array(None, ArrowType::LargeUtf8);
226        let strings = array.as_any().downcast_ref::<LargeStringArray>().unwrap();
227        assert_eq!(
228            strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
229            vec!["hello", "bar", "cd", "f", "ab", "e"]
230        )
231    }
232
233    #[test]
234    fn test_offset_buffer() {
235        let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
236        for v in ["hello", "world", "cupcakes", "a", "b", "c"] {
237            buffer.try_push(v.as_bytes(), false).unwrap()
238        }
239        let split = std::mem::replace(&mut buffer, OffsetBuffer::with_capacity(0));
240
241        let array = split.into_array(None, ArrowType::Utf8);
242        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
243        assert_eq!(
244            strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
245            vec!["hello", "world", "cupcakes", "a", "b", "c"]
246        );
247
248        buffer.try_push("test".as_bytes(), false).unwrap();
249        let array = buffer.into_array(None, ArrowType::Utf8);
250        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
251        assert_eq!(
252            strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
253            vec!["test"]
254        );
255    }
256
257    #[test]
258    fn test_offset_buffer_pad_nulls() {
259        let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
260        let values = ["a", "b", "c", "def", "gh"];
261        for v in &values {
262            buffer.try_push(v.as_bytes(), false).unwrap()
263        }
264
265        let valid = [
266            true, false, false, true, false, true, false, true, true, false, false,
267        ];
268        let valid_mask = Buffer::from_iter(valid.iter().copied());
269
270        // Both trailing and leading nulls
271        buffer.pad_nulls(1, values.len() - 1, valid.len() - 1, valid_mask.as_slice());
272
273        let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
274        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
275        assert_eq!(
276            strings.iter().collect::<Vec<_>>(),
277            vec![
278                Some("a"),
279                None,
280                None,
281                Some("b"),
282                None,
283                Some("c"),
284                None,
285                Some("def"),
286                Some("gh"),
287                None,
288                None
289            ]
290        );
291    }
292
293    #[test]
294    fn test_utf8_validation() {
295        let valid_2_byte_utf8 = &[0b11001000, 0b10001000];
296        std::str::from_utf8(valid_2_byte_utf8).unwrap();
297        let valid_3_byte_utf8 = &[0b11101000, 0b10001000, 0b10001000];
298        std::str::from_utf8(valid_3_byte_utf8).unwrap();
299        let valid_4_byte_utf8 = &[0b11110010, 0b10101000, 0b10101001, 0b10100101];
300        std::str::from_utf8(valid_4_byte_utf8).unwrap();
301
302        let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
303        buffer.try_push(valid_2_byte_utf8, true).unwrap();
304        buffer.try_push(valid_3_byte_utf8, true).unwrap();
305        buffer.try_push(valid_4_byte_utf8, true).unwrap();
306
307        // Cannot append string starting with incomplete codepoint
308        buffer.try_push(&valid_2_byte_utf8[1..], true).unwrap_err();
309        buffer.try_push(&valid_3_byte_utf8[1..], true).unwrap_err();
310        buffer.try_push(&valid_3_byte_utf8[2..], true).unwrap_err();
311        buffer.try_push(&valid_4_byte_utf8[1..], true).unwrap_err();
312        buffer.try_push(&valid_4_byte_utf8[2..], true).unwrap_err();
313        buffer.try_push(&valid_4_byte_utf8[3..], true).unwrap_err();
314
315        // Can append data containing an incomplete codepoint
316        buffer.try_push(&[0b01111111, 0b10111111], true).unwrap();
317
318        assert_eq!(buffer.len(), 4);
319        assert_eq!(buffer.values.len(), 11);
320
321        buffer.try_push(valid_3_byte_utf8, true).unwrap();
322
323        // Should fail due to incomplete codepoint
324        buffer.check_valid_utf8(0).unwrap_err();
325
326        // After broken codepoint -> success
327        buffer.check_valid_utf8(11).unwrap();
328
329        // Fails if run from middle of codepoint
330        buffer.check_valid_utf8(12).unwrap_err();
331    }
332
333    #[test]
334    fn test_pad_nulls_empty() {
335        let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
336        let valid_mask = Buffer::from_iter(std::iter::repeat_n(false, 9));
337        buffer.pad_nulls(0, 0, 9, valid_mask.as_slice());
338
339        let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
340        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
341
342        assert_eq!(strings.len(), 9);
343        assert!(strings.iter().all(|x| x.is_none()))
344    }
345}