1use crate::arrow::buffer::bit_util::iter_set_bits_rev;
19use crate::arrow::record_reader::buffer::ValuesBuffer;
20use crate::errors::{ParquetError, Result};
21use crate::util::utf8::check_valid_utf8;
22use arrow_array::{ArrayRef, OffsetSizeTrait, make_array};
23use arrow_buffer::{ArrowNativeType, Buffer};
24use arrow_data::ArrayDataBuilder;
25use arrow_schema::DataType as ArrowType;
26
27#[derive(Debug)]
30pub struct OffsetBuffer<I: OffsetSizeTrait> {
31 pub offsets: Vec<I>,
32 pub values: Vec<u8>,
33}
34
35impl<I: OffsetSizeTrait> OffsetBuffer<I> {
36 pub fn with_capacity(capacity: usize) -> Self {
41 let mut offsets = Vec::with_capacity(capacity + 1);
42 offsets.push(I::default());
43 Self {
44 offsets,
45 values: Vec::new(),
46 }
47 }
48
49 pub fn len(&self) -> usize {
51 self.offsets.len() - 1
52 }
53
54 pub fn is_empty(&self) -> bool {
55 self.len() == 0
56 }
57
58 pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> {
65 if validate_utf8 {
66 if let Some(&b) = data.first() {
67 if (b as i8) < -0x40 {
70 return Err(ParquetError::General(
71 "encountered non UTF-8 data".to_string(),
72 ));
73 }
74 }
75 }
76
77 self.values.extend_from_slice(data);
78
79 let index_offset = I::from_usize(self.values.len())
80 .ok_or_else(|| general_err!("index overflow decoding byte array"))?;
81
82 self.offsets.push(index_offset);
83 Ok(())
84 }
85
86 pub fn extend_from_dictionary<K: ArrowNativeType, V: ArrowNativeType>(
93 &mut self,
94 keys: &[K],
95 dict_offsets: &[V],
96 dict_values: &[u8],
97 ) -> Result<()> {
98 self.offsets.reserve(keys.len());
99
100 for key in keys {
101 let index = key.as_usize();
102 if index + 1 >= dict_offsets.len() {
103 return Err(general_err!(
104 "dictionary key beyond bounds of dictionary: 0..{}",
105 dict_offsets.len().saturating_sub(1)
106 ));
107 }
108 let start_offset = dict_offsets[index].as_usize();
109 let end_offset = dict_offsets[index + 1].as_usize();
110
111 self.values
113 .extend_from_slice(&dict_values[start_offset..end_offset]);
114 let index_offset = I::from_usize(self.values.len())
115 .ok_or_else(|| general_err!("index overflow decoding byte array"))?;
116 self.offsets.push(index_offset);
117 }
118 Ok(())
119 }
120
121 pub fn check_valid_utf8(&self, start_offset: usize) -> Result<()> {
129 check_valid_utf8(&self.values.as_slice()[start_offset..])
130 }
131
132 pub fn into_array(self, null_buffer: Option<Buffer>, data_type: ArrowType) -> ArrayRef {
134 let array_data_builder = ArrayDataBuilder::new(data_type)
135 .len(self.len())
136 .add_buffer(Buffer::from_vec(self.offsets))
137 .add_buffer(Buffer::from_vec(self.values))
138 .null_bit_buffer(null_buffer);
139
140 let data = match cfg!(debug_assertions) {
141 true => array_data_builder.build().unwrap(),
142 false => unsafe { array_data_builder.build_unchecked() },
143 };
144
145 make_array(data)
146 }
147}
148
149impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
150 fn with_capacity(capacity: usize) -> Self {
151 Self::with_capacity(capacity)
152 }
153
154 fn pad_nulls(
155 &mut self,
156 read_offset: usize,
157 values_read: usize,
158 levels_read: usize,
159 valid_mask: &[u8],
160 ) -> Result<()> {
161 if self.offsets.len() != read_offset + values_read + 1 {
162 return Err(general_err!(
163 "found inconsistent offsets while padding nulls: expected {} offsets, got {}",
164 read_offset + values_read + 1,
165 self.offsets.len()
166 ));
167 }
168 self.offsets
169 .resize(read_offset + levels_read + 1, I::default());
170
171 let offsets = &mut self.offsets;
172
173 let mut last_pos = read_offset + levels_read + 1;
174 let mut last_start_offset = I::from_usize(self.values.len()).unwrap();
175
176 let values_range = read_offset..read_offset + values_read;
177 for (value_pos, level_pos) in values_range
178 .clone()
179 .rev()
180 .zip(iter_set_bits_rev(valid_mask))
181 {
182 if level_pos < value_pos || level_pos >= last_pos {
183 return Err(general_err!("found corrupt level data while padding nulls"));
184 }
185
186 let end_offset = offsets[value_pos + 1];
187 let start_offset = offsets[value_pos];
188
189 for x in &mut offsets[level_pos + 1..last_pos] {
191 *x = end_offset;
192 }
193
194 if level_pos == value_pos {
195 return Ok(());
196 }
197
198 offsets[level_pos] = start_offset;
199 last_pos = level_pos;
200 last_start_offset = start_offset;
201 }
202
203 for x in &mut offsets[values_range.start + 1..last_pos] {
205 *x = last_start_offset
206 }
207 Ok(())
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use arrow_array::{Array, LargeStringArray, StringArray};
215
216 #[test]
217 fn test_offset_buffer_empty() {
218 let buffer = OffsetBuffer::<i32>::with_capacity(0);
219 let array = buffer.into_array(None, ArrowType::Utf8);
220 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
221 assert_eq!(strings.len(), 0);
222 }
223
224 #[test]
225 fn test_offset_buffer_append() {
226 let mut buffer = OffsetBuffer::<i64>::with_capacity(0);
227 buffer.try_push("hello".as_bytes(), true).unwrap();
228 buffer.try_push("bar".as_bytes(), true).unwrap();
229 buffer
230 .extend_from_dictionary(&[1, 3, 0, 2], &[0, 2, 4, 5, 6], "abcdef".as_bytes())
231 .unwrap();
232
233 let array = buffer.into_array(None, ArrowType::LargeUtf8);
234 let strings = array.as_any().downcast_ref::<LargeStringArray>().unwrap();
235 assert_eq!(
236 strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
237 vec!["hello", "bar", "cd", "f", "ab", "e"]
238 )
239 }
240
241 #[test]
242 fn test_offset_buffer() {
243 let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
244 for v in ["hello", "world", "cupcakes", "a", "b", "c"] {
245 buffer.try_push(v.as_bytes(), false).unwrap()
246 }
247 let split = std::mem::replace(&mut buffer, OffsetBuffer::with_capacity(0));
248
249 let array = split.into_array(None, ArrowType::Utf8);
250 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
251 assert_eq!(
252 strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
253 vec!["hello", "world", "cupcakes", "a", "b", "c"]
254 );
255
256 buffer.try_push("test".as_bytes(), false).unwrap();
257 let array = buffer.into_array(None, ArrowType::Utf8);
258 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
259 assert_eq!(
260 strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
261 vec!["test"]
262 );
263 }
264
265 #[test]
266 fn test_offset_buffer_pad_nulls() {
267 let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
268 let values = ["a", "b", "c", "def", "gh"];
269 for v in &values {
270 buffer.try_push(v.as_bytes(), false).unwrap()
271 }
272
273 let valid = [
274 true, false, false, true, false, true, false, true, true, false, false,
275 ];
276 let valid_mask = Buffer::from_iter(valid.iter().copied());
277
278 buffer
280 .pad_nulls(1, values.len() - 1, valid.len() - 1, valid_mask.as_slice())
281 .unwrap();
282
283 let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
284 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
285 assert_eq!(
286 strings.iter().collect::<Vec<_>>(),
287 vec![
288 Some("a"),
289 None,
290 None,
291 Some("b"),
292 None,
293 Some("c"),
294 None,
295 Some("def"),
296 Some("gh"),
297 None,
298 None
299 ]
300 );
301 }
302
303 #[test]
304 fn test_utf8_validation() {
305 let valid_2_byte_utf8 = &[0b11001000, 0b10001000];
306 std::str::from_utf8(valid_2_byte_utf8).unwrap();
307 let valid_3_byte_utf8 = &[0b11101000, 0b10001000, 0b10001000];
308 std::str::from_utf8(valid_3_byte_utf8).unwrap();
309 let valid_4_byte_utf8 = &[0b11110010, 0b10101000, 0b10101001, 0b10100101];
310 std::str::from_utf8(valid_4_byte_utf8).unwrap();
311
312 let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
313 buffer.try_push(valid_2_byte_utf8, true).unwrap();
314 buffer.try_push(valid_3_byte_utf8, true).unwrap();
315 buffer.try_push(valid_4_byte_utf8, true).unwrap();
316
317 buffer.try_push(&valid_2_byte_utf8[1..], true).unwrap_err();
319 buffer.try_push(&valid_3_byte_utf8[1..], true).unwrap_err();
320 buffer.try_push(&valid_3_byte_utf8[2..], true).unwrap_err();
321 buffer.try_push(&valid_4_byte_utf8[1..], true).unwrap_err();
322 buffer.try_push(&valid_4_byte_utf8[2..], true).unwrap_err();
323 buffer.try_push(&valid_4_byte_utf8[3..], true).unwrap_err();
324
325 buffer.try_push(&[0b01111111, 0b10111111], true).unwrap();
327
328 assert_eq!(buffer.len(), 4);
329 assert_eq!(buffer.values.len(), 11);
330
331 buffer.try_push(valid_3_byte_utf8, true).unwrap();
332
333 buffer.check_valid_utf8(0).unwrap_err();
335
336 buffer.check_valid_utf8(11).unwrap();
338
339 buffer.check_valid_utf8(12).unwrap_err();
341 }
342
343 #[test]
344 fn test_pad_nulls_corrupt_input_returns_err() {
345 let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
350 buffer.try_push("a".as_bytes(), false).unwrap();
351 let valid_mask = Buffer::from_iter([true, false, false]);
352 let err = buffer
353 .pad_nulls(0, 3, 3, valid_mask.as_slice())
354 .unwrap_err();
355 assert!(
356 err.to_string().contains("inconsistent offsets"),
357 "unexpected error: {err}"
358 );
359
360 let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
363 for v in ["a", "b", "c"] {
364 buffer.try_push(v.as_bytes(), false).unwrap();
365 }
366 let valid_mask = Buffer::from_iter([true, false, false]);
367 let err = buffer
368 .pad_nulls(0, 3, 3, valid_mask.as_slice())
369 .unwrap_err();
370 assert!(
371 err.to_string().contains("corrupt level data"),
372 "unexpected error: {err}"
373 );
374 }
375
376 #[test]
377 fn test_pad_nulls_empty() {
378 let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
379 let valid_mask = Buffer::from_iter(std::iter::repeat_n(false, 9));
380 buffer.pad_nulls(0, 0, 9, valid_mask.as_slice()).unwrap();
381
382 let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
383 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
384
385 assert_eq!(strings.len(), 9);
386 assert!(strings.iter().all(|x| x.is_none()))
387 }
388}