1use crate::arrow::buffer::bit_util::iter_set_bits_rev;
19use crate::arrow::record_reader::buffer::ValuesBuffer;
20use crate::errors::{ParquetError, Result};
21use crate::util::utf8::check_valid_utf8;
22use arrow_array::{ArrayRef, OffsetSizeTrait, make_array};
23use arrow_buffer::{ArrowNativeType, Buffer};
24use arrow_data::ArrayDataBuilder;
25use arrow_schema::DataType as ArrowType;
26
27#[derive(Debug)]
30pub struct OffsetBuffer<I: OffsetSizeTrait> {
31 pub offsets: Vec<I>,
32 pub values: Vec<u8>,
33}
34
35impl<I: OffsetSizeTrait> OffsetBuffer<I> {
36 pub fn with_capacity(capacity: usize) -> Self {
41 let mut offsets = Vec::with_capacity(capacity + 1);
42 offsets.push(I::default());
43 Self {
44 offsets,
45 values: Vec::new(),
46 }
47 }
48
49 pub fn len(&self) -> usize {
51 self.offsets.len() - 1
52 }
53
54 pub fn is_empty(&self) -> bool {
55 self.len() == 0
56 }
57
58 pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> {
65 if validate_utf8 {
66 if let Some(&b) = data.first() {
67 if (b as i8) < -0x40 {
70 return Err(ParquetError::General(
71 "encountered non UTF-8 data".to_string(),
72 ));
73 }
74 }
75 }
76
77 self.values.extend_from_slice(data);
78
79 let index_offset = I::from_usize(self.values.len())
80 .ok_or_else(|| general_err!("index overflow decoding byte array"))?;
81
82 self.offsets.push(index_offset);
83 Ok(())
84 }
85
86 pub fn extend_from_dictionary<K: ArrowNativeType, V: ArrowNativeType>(
93 &mut self,
94 keys: &[K],
95 dict_offsets: &[V],
96 dict_values: &[u8],
97 ) -> Result<()> {
98 self.offsets.reserve(keys.len());
99
100 for key in keys {
101 let index = key.as_usize();
102 if index + 1 >= dict_offsets.len() {
103 return Err(general_err!(
104 "dictionary key beyond bounds of dictionary: 0..{}",
105 dict_offsets.len().saturating_sub(1)
106 ));
107 }
108 let start_offset = dict_offsets[index].as_usize();
109 let end_offset = dict_offsets[index + 1].as_usize();
110
111 self.values
113 .extend_from_slice(&dict_values[start_offset..end_offset]);
114 let index_offset = I::from_usize(self.values.len())
115 .ok_or_else(|| general_err!("index overflow decoding byte array"))?;
116 self.offsets.push(index_offset);
117 }
118 Ok(())
119 }
120
121 pub fn check_valid_utf8(&self, start_offset: usize) -> Result<()> {
129 check_valid_utf8(&self.values.as_slice()[start_offset..])
130 }
131
132 pub fn into_array(self, null_buffer: Option<Buffer>, data_type: ArrowType) -> ArrayRef {
134 let array_data_builder = ArrayDataBuilder::new(data_type)
135 .len(self.len())
136 .add_buffer(Buffer::from_vec(self.offsets))
137 .add_buffer(Buffer::from_vec(self.values))
138 .null_bit_buffer(null_buffer);
139
140 let data = match cfg!(debug_assertions) {
141 true => array_data_builder.build().unwrap(),
142 false => unsafe { array_data_builder.build_unchecked() },
143 };
144
145 make_array(data)
146 }
147}
148
149impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
150 fn with_capacity(capacity: usize) -> Self {
151 Self::with_capacity(capacity)
152 }
153
154 fn pad_nulls(
155 &mut self,
156 read_offset: usize,
157 values_read: usize,
158 levels_read: usize,
159 valid_mask: &[u8],
160 ) {
161 assert_eq!(self.offsets.len(), read_offset + values_read + 1);
162 self.offsets
163 .resize(read_offset + levels_read + 1, I::default());
164
165 let offsets = &mut self.offsets;
166
167 let mut last_pos = read_offset + levels_read + 1;
168 let mut last_start_offset = I::from_usize(self.values.len()).unwrap();
169
170 let values_range = read_offset..read_offset + values_read;
171 for (value_pos, level_pos) in values_range
172 .clone()
173 .rev()
174 .zip(iter_set_bits_rev(valid_mask))
175 {
176 assert!(level_pos >= value_pos);
177 assert!(level_pos < last_pos);
178
179 let end_offset = offsets[value_pos + 1];
180 let start_offset = offsets[value_pos];
181
182 for x in &mut offsets[level_pos + 1..last_pos] {
184 *x = end_offset;
185 }
186
187 if level_pos == value_pos {
188 return;
189 }
190
191 offsets[level_pos] = start_offset;
192 last_pos = level_pos;
193 last_start_offset = start_offset;
194 }
195
196 for x in &mut offsets[values_range.start + 1..last_pos] {
198 *x = last_start_offset
199 }
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206 use arrow_array::{Array, LargeStringArray, StringArray};
207
208 #[test]
209 fn test_offset_buffer_empty() {
210 let buffer = OffsetBuffer::<i32>::with_capacity(0);
211 let array = buffer.into_array(None, ArrowType::Utf8);
212 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
213 assert_eq!(strings.len(), 0);
214 }
215
216 #[test]
217 fn test_offset_buffer_append() {
218 let mut buffer = OffsetBuffer::<i64>::with_capacity(0);
219 buffer.try_push("hello".as_bytes(), true).unwrap();
220 buffer.try_push("bar".as_bytes(), true).unwrap();
221 buffer
222 .extend_from_dictionary(&[1, 3, 0, 2], &[0, 2, 4, 5, 6], "abcdef".as_bytes())
223 .unwrap();
224
225 let array = buffer.into_array(None, ArrowType::LargeUtf8);
226 let strings = array.as_any().downcast_ref::<LargeStringArray>().unwrap();
227 assert_eq!(
228 strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
229 vec!["hello", "bar", "cd", "f", "ab", "e"]
230 )
231 }
232
233 #[test]
234 fn test_offset_buffer() {
235 let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
236 for v in ["hello", "world", "cupcakes", "a", "b", "c"] {
237 buffer.try_push(v.as_bytes(), false).unwrap()
238 }
239 let split = std::mem::replace(&mut buffer, OffsetBuffer::with_capacity(0));
240
241 let array = split.into_array(None, ArrowType::Utf8);
242 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
243 assert_eq!(
244 strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
245 vec!["hello", "world", "cupcakes", "a", "b", "c"]
246 );
247
248 buffer.try_push("test".as_bytes(), false).unwrap();
249 let array = buffer.into_array(None, ArrowType::Utf8);
250 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
251 assert_eq!(
252 strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
253 vec!["test"]
254 );
255 }
256
257 #[test]
258 fn test_offset_buffer_pad_nulls() {
259 let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
260 let values = ["a", "b", "c", "def", "gh"];
261 for v in &values {
262 buffer.try_push(v.as_bytes(), false).unwrap()
263 }
264
265 let valid = [
266 true, false, false, true, false, true, false, true, true, false, false,
267 ];
268 let valid_mask = Buffer::from_iter(valid.iter().copied());
269
270 buffer.pad_nulls(1, values.len() - 1, valid.len() - 1, valid_mask.as_slice());
272
273 let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
274 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
275 assert_eq!(
276 strings.iter().collect::<Vec<_>>(),
277 vec![
278 Some("a"),
279 None,
280 None,
281 Some("b"),
282 None,
283 Some("c"),
284 None,
285 Some("def"),
286 Some("gh"),
287 None,
288 None
289 ]
290 );
291 }
292
293 #[test]
294 fn test_utf8_validation() {
295 let valid_2_byte_utf8 = &[0b11001000, 0b10001000];
296 std::str::from_utf8(valid_2_byte_utf8).unwrap();
297 let valid_3_byte_utf8 = &[0b11101000, 0b10001000, 0b10001000];
298 std::str::from_utf8(valid_3_byte_utf8).unwrap();
299 let valid_4_byte_utf8 = &[0b11110010, 0b10101000, 0b10101001, 0b10100101];
300 std::str::from_utf8(valid_4_byte_utf8).unwrap();
301
302 let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
303 buffer.try_push(valid_2_byte_utf8, true).unwrap();
304 buffer.try_push(valid_3_byte_utf8, true).unwrap();
305 buffer.try_push(valid_4_byte_utf8, true).unwrap();
306
307 buffer.try_push(&valid_2_byte_utf8[1..], true).unwrap_err();
309 buffer.try_push(&valid_3_byte_utf8[1..], true).unwrap_err();
310 buffer.try_push(&valid_3_byte_utf8[2..], true).unwrap_err();
311 buffer.try_push(&valid_4_byte_utf8[1..], true).unwrap_err();
312 buffer.try_push(&valid_4_byte_utf8[2..], true).unwrap_err();
313 buffer.try_push(&valid_4_byte_utf8[3..], true).unwrap_err();
314
315 buffer.try_push(&[0b01111111, 0b10111111], true).unwrap();
317
318 assert_eq!(buffer.len(), 4);
319 assert_eq!(buffer.values.len(), 11);
320
321 buffer.try_push(valid_3_byte_utf8, true).unwrap();
322
323 buffer.check_valid_utf8(0).unwrap_err();
325
326 buffer.check_valid_utf8(11).unwrap();
328
329 buffer.check_valid_utf8(12).unwrap_err();
331 }
332
333 #[test]
334 fn test_pad_nulls_empty() {
335 let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
336 let valid_mask = Buffer::from_iter(std::iter::repeat_n(false, 9));
337 buffer.pad_nulls(0, 0, 9, valid_mask.as_slice());
338
339 let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
340 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
341
342 assert_eq!(strings.len(), 9);
343 assert!(strings.iter().all(|x| x.is_none()))
344 }
345}