parquet/arrow/buffer/
offset_buffer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::buffer::bit_util::iter_set_bits_rev;
19use crate::arrow::record_reader::buffer::ValuesBuffer;
20use crate::errors::{ParquetError, Result};
21use crate::util::utf8::check_valid_utf8;
22use arrow_array::{make_array, ArrayRef, OffsetSizeTrait};
23use arrow_buffer::{ArrowNativeType, Buffer};
24use arrow_data::ArrayDataBuilder;
25use arrow_schema::DataType as ArrowType;
26
27/// A buffer of variable-sized byte arrays that can be converted into
28/// a corresponding [`ArrayRef`]
29#[derive(Debug)]
30pub struct OffsetBuffer<I: OffsetSizeTrait> {
31    pub offsets: Vec<I>,
32    pub values: Vec<u8>,
33}
34
35impl<I: OffsetSizeTrait> Default for OffsetBuffer<I> {
36    fn default() -> Self {
37        let mut offsets = Vec::new();
38        offsets.resize(1, I::default());
39        Self {
40            offsets,
41            values: Vec::new(),
42        }
43    }
44}
45
46impl<I: OffsetSizeTrait> OffsetBuffer<I> {
47    /// Returns the number of byte arrays in this buffer
48    pub fn len(&self) -> usize {
49        self.offsets.len() - 1
50    }
51
52    pub fn is_empty(&self) -> bool {
53        self.len() == 0
54    }
55
56    /// If `validate_utf8` this verifies that the first character of `data` is
57    /// the start of a UTF-8 codepoint
58    ///
59    /// Note: This does not verify that the entirety of `data` is valid
60    /// UTF-8. This should be done by calling [`Self::check_valid_utf8`] after
61    /// all data has been written
62    pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> {
63        if validate_utf8 {
64            if let Some(&b) = data.first() {
65                // A valid code-point iff it does not start with 0b10xxxxxx
66                // Bit-magic taken from `std::str::is_char_boundary`
67                if (b as i8) < -0x40 {
68                    return Err(ParquetError::General(
69                        "encountered non UTF-8 data".to_string(),
70                    ));
71                }
72            }
73        }
74
75        self.values.extend_from_slice(data);
76
77        let index_offset = I::from_usize(self.values.len())
78            .ok_or_else(|| general_err!("index overflow decoding byte array"))?;
79
80        self.offsets.push(index_offset);
81        Ok(())
82    }
83
84    /// Extends this buffer with a list of keys
85    ///
86    /// For each value `key` in `keys` this will insert
87    /// `&dict_values[dict_offsets[key]..dict_offsets[key+1]]`
88    ///
89    /// Note: This will validate offsets are valid
90    pub fn extend_from_dictionary<K: ArrowNativeType, V: ArrowNativeType>(
91        &mut self,
92        keys: &[K],
93        dict_offsets: &[V],
94        dict_values: &[u8],
95    ) -> Result<()> {
96        for key in keys {
97            let index = key.as_usize();
98            if index + 1 >= dict_offsets.len() {
99                return Err(general_err!(
100                    "dictionary key beyond bounds of dictionary: 0..{}",
101                    dict_offsets.len().saturating_sub(1)
102                ));
103            }
104            let start_offset = dict_offsets[index].as_usize();
105            let end_offset = dict_offsets[index + 1].as_usize();
106
107            // Dictionary values are verified when decoding dictionary page
108            self.try_push(&dict_values[start_offset..end_offset], false)?;
109        }
110        Ok(())
111    }
112
113    /// Validates that `&self.values[start_offset..]` is a valid UTF-8 sequence
114    ///
115    /// This MUST be combined with validating that the offsets start on a character
116    /// boundary, otherwise it would be possible for the values array to be a valid UTF-8
117    /// sequence, but not the individual string slices it contains
118    ///
119    /// [`Self::try_push`] can perform this validation check on insertion
120    pub fn check_valid_utf8(&self, start_offset: usize) -> Result<()> {
121        check_valid_utf8(&self.values.as_slice()[start_offset..])
122    }
123
124    /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer`
125    pub fn into_array(self, null_buffer: Option<Buffer>, data_type: ArrowType) -> ArrayRef {
126        let array_data_builder = ArrayDataBuilder::new(data_type)
127            .len(self.len())
128            .add_buffer(Buffer::from_vec(self.offsets))
129            .add_buffer(Buffer::from_vec(self.values))
130            .null_bit_buffer(null_buffer);
131
132        let data = match cfg!(debug_assertions) {
133            true => array_data_builder.build().unwrap(),
134            false => unsafe { array_data_builder.build_unchecked() },
135        };
136
137        make_array(data)
138    }
139}
140
141impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
142    fn pad_nulls(
143        &mut self,
144        read_offset: usize,
145        values_read: usize,
146        levels_read: usize,
147        valid_mask: &[u8],
148    ) {
149        assert_eq!(self.offsets.len(), read_offset + values_read + 1);
150        self.offsets
151            .resize(read_offset + levels_read + 1, I::default());
152
153        let offsets = &mut self.offsets;
154
155        let mut last_pos = read_offset + levels_read + 1;
156        let mut last_start_offset = I::from_usize(self.values.len()).unwrap();
157
158        let values_range = read_offset..read_offset + values_read;
159        for (value_pos, level_pos) in values_range
160            .clone()
161            .rev()
162            .zip(iter_set_bits_rev(valid_mask))
163        {
164            assert!(level_pos >= value_pos);
165            assert!(level_pos < last_pos);
166
167            let end_offset = offsets[value_pos + 1];
168            let start_offset = offsets[value_pos];
169
170            // Fill in any nulls
171            for x in &mut offsets[level_pos + 1..last_pos] {
172                *x = end_offset;
173            }
174
175            if level_pos == value_pos {
176                return;
177            }
178
179            offsets[level_pos] = start_offset;
180            last_pos = level_pos;
181            last_start_offset = start_offset;
182        }
183
184        // Pad leading nulls up to `last_offset`
185        for x in &mut offsets[values_range.start + 1..last_pos] {
186            *x = last_start_offset
187        }
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194    use arrow_array::{Array, LargeStringArray, StringArray};
195
196    #[test]
197    fn test_offset_buffer_empty() {
198        let buffer = OffsetBuffer::<i32>::default();
199        let array = buffer.into_array(None, ArrowType::Utf8);
200        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
201        assert_eq!(strings.len(), 0);
202    }
203
204    #[test]
205    fn test_offset_buffer_append() {
206        let mut buffer = OffsetBuffer::<i64>::default();
207        buffer.try_push("hello".as_bytes(), true).unwrap();
208        buffer.try_push("bar".as_bytes(), true).unwrap();
209        buffer
210            .extend_from_dictionary(&[1, 3, 0, 2], &[0, 2, 4, 5, 6], "abcdef".as_bytes())
211            .unwrap();
212
213        let array = buffer.into_array(None, ArrowType::LargeUtf8);
214        let strings = array.as_any().downcast_ref::<LargeStringArray>().unwrap();
215        assert_eq!(
216            strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
217            vec!["hello", "bar", "cd", "f", "ab", "e"]
218        )
219    }
220
221    #[test]
222    fn test_offset_buffer() {
223        let mut buffer = OffsetBuffer::<i32>::default();
224        for v in ["hello", "world", "cupcakes", "a", "b", "c"] {
225            buffer.try_push(v.as_bytes(), false).unwrap()
226        }
227        let split = std::mem::take(&mut buffer);
228
229        let array = split.into_array(None, ArrowType::Utf8);
230        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
231        assert_eq!(
232            strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
233            vec!["hello", "world", "cupcakes", "a", "b", "c"]
234        );
235
236        buffer.try_push("test".as_bytes(), false).unwrap();
237        let array = buffer.into_array(None, ArrowType::Utf8);
238        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
239        assert_eq!(
240            strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
241            vec!["test"]
242        );
243    }
244
245    #[test]
246    fn test_offset_buffer_pad_nulls() {
247        let mut buffer = OffsetBuffer::<i32>::default();
248        let values = ["a", "b", "c", "def", "gh"];
249        for v in &values {
250            buffer.try_push(v.as_bytes(), false).unwrap()
251        }
252
253        let valid = [
254            true, false, false, true, false, true, false, true, true, false, false,
255        ];
256        let valid_mask = Buffer::from_iter(valid.iter().copied());
257
258        // Both trailing and leading nulls
259        buffer.pad_nulls(1, values.len() - 1, valid.len() - 1, valid_mask.as_slice());
260
261        let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
262        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
263        assert_eq!(
264            strings.iter().collect::<Vec<_>>(),
265            vec![
266                Some("a"),
267                None,
268                None,
269                Some("b"),
270                None,
271                Some("c"),
272                None,
273                Some("def"),
274                Some("gh"),
275                None,
276                None
277            ]
278        );
279    }
280
281    #[test]
282    fn test_utf8_validation() {
283        let valid_2_byte_utf8 = &[0b11001000, 0b10001000];
284        std::str::from_utf8(valid_2_byte_utf8).unwrap();
285        let valid_3_byte_utf8 = &[0b11101000, 0b10001000, 0b10001000];
286        std::str::from_utf8(valid_3_byte_utf8).unwrap();
287        let valid_4_byte_utf8 = &[0b11110010, 0b10101000, 0b10101001, 0b10100101];
288        std::str::from_utf8(valid_4_byte_utf8).unwrap();
289
290        let mut buffer = OffsetBuffer::<i32>::default();
291        buffer.try_push(valid_2_byte_utf8, true).unwrap();
292        buffer.try_push(valid_3_byte_utf8, true).unwrap();
293        buffer.try_push(valid_4_byte_utf8, true).unwrap();
294
295        // Cannot append string starting with incomplete codepoint
296        buffer.try_push(&valid_2_byte_utf8[1..], true).unwrap_err();
297        buffer.try_push(&valid_3_byte_utf8[1..], true).unwrap_err();
298        buffer.try_push(&valid_3_byte_utf8[2..], true).unwrap_err();
299        buffer.try_push(&valid_4_byte_utf8[1..], true).unwrap_err();
300        buffer.try_push(&valid_4_byte_utf8[2..], true).unwrap_err();
301        buffer.try_push(&valid_4_byte_utf8[3..], true).unwrap_err();
302
303        // Can append data containing an incomplete codepoint
304        buffer.try_push(&[0b01111111, 0b10111111], true).unwrap();
305
306        assert_eq!(buffer.len(), 4);
307        assert_eq!(buffer.values.len(), 11);
308
309        buffer.try_push(valid_3_byte_utf8, true).unwrap();
310
311        // Should fail due to incomplete codepoint
312        buffer.check_valid_utf8(0).unwrap_err();
313
314        // After broken codepoint -> success
315        buffer.check_valid_utf8(11).unwrap();
316
317        // Fails if run from middle of codepoint
318        buffer.check_valid_utf8(12).unwrap_err();
319    }
320
321    #[test]
322    fn test_pad_nulls_empty() {
323        let mut buffer = OffsetBuffer::<i32>::default();
324        let valid_mask = Buffer::from_iter(std::iter::repeat(false).take(9));
325        buffer.pad_nulls(0, 0, 9, valid_mask.as_slice());
326
327        let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
328        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
329
330        assert_eq!(strings.len(), 9);
331        assert!(strings.iter().all(|x| x.is_none()))
332    }
333}