1use crate::arrow::buffer::bit_util::iter_set_bits_rev;
19use crate::arrow::record_reader::buffer::ValuesBuffer;
20use crate::errors::{ParquetError, Result};
21use crate::util::utf8::check_valid_utf8;
22use arrow_array::{make_array, ArrayRef, OffsetSizeTrait};
23use arrow_buffer::{ArrowNativeType, Buffer};
24use arrow_data::ArrayDataBuilder;
25use arrow_schema::DataType as ArrowType;
26
27#[derive(Debug)]
30pub struct OffsetBuffer<I: OffsetSizeTrait> {
31 pub offsets: Vec<I>,
32 pub values: Vec<u8>,
33}
34
35impl<I: OffsetSizeTrait> Default for OffsetBuffer<I> {
36 fn default() -> Self {
37 let mut offsets = Vec::new();
38 offsets.resize(1, I::default());
39 Self {
40 offsets,
41 values: Vec::new(),
42 }
43 }
44}
45
46impl<I: OffsetSizeTrait> OffsetBuffer<I> {
47 pub fn len(&self) -> usize {
49 self.offsets.len() - 1
50 }
51
52 pub fn is_empty(&self) -> bool {
53 self.len() == 0
54 }
55
56 pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> {
63 if validate_utf8 {
64 if let Some(&b) = data.first() {
65 if (b as i8) < -0x40 {
68 return Err(ParquetError::General(
69 "encountered non UTF-8 data".to_string(),
70 ));
71 }
72 }
73 }
74
75 self.values.extend_from_slice(data);
76
77 let index_offset = I::from_usize(self.values.len())
78 .ok_or_else(|| general_err!("index overflow decoding byte array"))?;
79
80 self.offsets.push(index_offset);
81 Ok(())
82 }
83
84 pub fn extend_from_dictionary<K: ArrowNativeType, V: ArrowNativeType>(
91 &mut self,
92 keys: &[K],
93 dict_offsets: &[V],
94 dict_values: &[u8],
95 ) -> Result<()> {
96 for key in keys {
97 let index = key.as_usize();
98 if index + 1 >= dict_offsets.len() {
99 return Err(general_err!(
100 "dictionary key beyond bounds of dictionary: 0..{}",
101 dict_offsets.len().saturating_sub(1)
102 ));
103 }
104 let start_offset = dict_offsets[index].as_usize();
105 let end_offset = dict_offsets[index + 1].as_usize();
106
107 self.try_push(&dict_values[start_offset..end_offset], false)?;
109 }
110 Ok(())
111 }
112
113 pub fn check_valid_utf8(&self, start_offset: usize) -> Result<()> {
121 check_valid_utf8(&self.values.as_slice()[start_offset..])
122 }
123
124 pub fn into_array(self, null_buffer: Option<Buffer>, data_type: ArrowType) -> ArrayRef {
126 let array_data_builder = ArrayDataBuilder::new(data_type)
127 .len(self.len())
128 .add_buffer(Buffer::from_vec(self.offsets))
129 .add_buffer(Buffer::from_vec(self.values))
130 .null_bit_buffer(null_buffer);
131
132 let data = match cfg!(debug_assertions) {
133 true => array_data_builder.build().unwrap(),
134 false => unsafe { array_data_builder.build_unchecked() },
135 };
136
137 make_array(data)
138 }
139}
140
141impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
142 fn pad_nulls(
143 &mut self,
144 read_offset: usize,
145 values_read: usize,
146 levels_read: usize,
147 valid_mask: &[u8],
148 ) {
149 assert_eq!(self.offsets.len(), read_offset + values_read + 1);
150 self.offsets
151 .resize(read_offset + levels_read + 1, I::default());
152
153 let offsets = &mut self.offsets;
154
155 let mut last_pos = read_offset + levels_read + 1;
156 let mut last_start_offset = I::from_usize(self.values.len()).unwrap();
157
158 let values_range = read_offset..read_offset + values_read;
159 for (value_pos, level_pos) in values_range
160 .clone()
161 .rev()
162 .zip(iter_set_bits_rev(valid_mask))
163 {
164 assert!(level_pos >= value_pos);
165 assert!(level_pos < last_pos);
166
167 let end_offset = offsets[value_pos + 1];
168 let start_offset = offsets[value_pos];
169
170 for x in &mut offsets[level_pos + 1..last_pos] {
172 *x = end_offset;
173 }
174
175 if level_pos == value_pos {
176 return;
177 }
178
179 offsets[level_pos] = start_offset;
180 last_pos = level_pos;
181 last_start_offset = start_offset;
182 }
183
184 for x in &mut offsets[values_range.start + 1..last_pos] {
186 *x = last_start_offset
187 }
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194 use arrow_array::{Array, LargeStringArray, StringArray};
195
196 #[test]
197 fn test_offset_buffer_empty() {
198 let buffer = OffsetBuffer::<i32>::default();
199 let array = buffer.into_array(None, ArrowType::Utf8);
200 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
201 assert_eq!(strings.len(), 0);
202 }
203
204 #[test]
205 fn test_offset_buffer_append() {
206 let mut buffer = OffsetBuffer::<i64>::default();
207 buffer.try_push("hello".as_bytes(), true).unwrap();
208 buffer.try_push("bar".as_bytes(), true).unwrap();
209 buffer
210 .extend_from_dictionary(&[1, 3, 0, 2], &[0, 2, 4, 5, 6], "abcdef".as_bytes())
211 .unwrap();
212
213 let array = buffer.into_array(None, ArrowType::LargeUtf8);
214 let strings = array.as_any().downcast_ref::<LargeStringArray>().unwrap();
215 assert_eq!(
216 strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
217 vec!["hello", "bar", "cd", "f", "ab", "e"]
218 )
219 }
220
221 #[test]
222 fn test_offset_buffer() {
223 let mut buffer = OffsetBuffer::<i32>::default();
224 for v in ["hello", "world", "cupcakes", "a", "b", "c"] {
225 buffer.try_push(v.as_bytes(), false).unwrap()
226 }
227 let split = std::mem::take(&mut buffer);
228
229 let array = split.into_array(None, ArrowType::Utf8);
230 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
231 assert_eq!(
232 strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
233 vec!["hello", "world", "cupcakes", "a", "b", "c"]
234 );
235
236 buffer.try_push("test".as_bytes(), false).unwrap();
237 let array = buffer.into_array(None, ArrowType::Utf8);
238 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
239 assert_eq!(
240 strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
241 vec!["test"]
242 );
243 }
244
245 #[test]
246 fn test_offset_buffer_pad_nulls() {
247 let mut buffer = OffsetBuffer::<i32>::default();
248 let values = ["a", "b", "c", "def", "gh"];
249 for v in &values {
250 buffer.try_push(v.as_bytes(), false).unwrap()
251 }
252
253 let valid = [
254 true, false, false, true, false, true, false, true, true, false, false,
255 ];
256 let valid_mask = Buffer::from_iter(valid.iter().copied());
257
258 buffer.pad_nulls(1, values.len() - 1, valid.len() - 1, valid_mask.as_slice());
260
261 let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
262 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
263 assert_eq!(
264 strings.iter().collect::<Vec<_>>(),
265 vec![
266 Some("a"),
267 None,
268 None,
269 Some("b"),
270 None,
271 Some("c"),
272 None,
273 Some("def"),
274 Some("gh"),
275 None,
276 None
277 ]
278 );
279 }
280
281 #[test]
282 fn test_utf8_validation() {
283 let valid_2_byte_utf8 = &[0b11001000, 0b10001000];
284 std::str::from_utf8(valid_2_byte_utf8).unwrap();
285 let valid_3_byte_utf8 = &[0b11101000, 0b10001000, 0b10001000];
286 std::str::from_utf8(valid_3_byte_utf8).unwrap();
287 let valid_4_byte_utf8 = &[0b11110010, 0b10101000, 0b10101001, 0b10100101];
288 std::str::from_utf8(valid_4_byte_utf8).unwrap();
289
290 let mut buffer = OffsetBuffer::<i32>::default();
291 buffer.try_push(valid_2_byte_utf8, true).unwrap();
292 buffer.try_push(valid_3_byte_utf8, true).unwrap();
293 buffer.try_push(valid_4_byte_utf8, true).unwrap();
294
295 buffer.try_push(&valid_2_byte_utf8[1..], true).unwrap_err();
297 buffer.try_push(&valid_3_byte_utf8[1..], true).unwrap_err();
298 buffer.try_push(&valid_3_byte_utf8[2..], true).unwrap_err();
299 buffer.try_push(&valid_4_byte_utf8[1..], true).unwrap_err();
300 buffer.try_push(&valid_4_byte_utf8[2..], true).unwrap_err();
301 buffer.try_push(&valid_4_byte_utf8[3..], true).unwrap_err();
302
303 buffer.try_push(&[0b01111111, 0b10111111], true).unwrap();
305
306 assert_eq!(buffer.len(), 4);
307 assert_eq!(buffer.values.len(), 11);
308
309 buffer.try_push(valid_3_byte_utf8, true).unwrap();
310
311 buffer.check_valid_utf8(0).unwrap_err();
313
314 buffer.check_valid_utf8(11).unwrap();
316
317 buffer.check_valid_utf8(12).unwrap_err();
319 }
320
321 #[test]
322 fn test_pad_nulls_empty() {
323 let mut buffer = OffsetBuffer::<i32>::default();
324 let valid_mask = Buffer::from_iter(std::iter::repeat(false).take(9));
325 buffer.pad_nulls(0, 0, 9, valid_mask.as_slice());
326
327 let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
328 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
329
330 assert_eq!(strings.len(), 9);
331 assert!(strings.iter().all(|x| x.is_none()))
332 }
333}