1use super::{_MutableArrayData, ArrayData, Extend};
19use arrow_buffer::{ArrowNativeType, Buffer, ToByteSlice};
20use arrow_schema::{ArrowError, DataType};
21use num_traits::CheckedAdd;
22
23fn get_last_run_end<T: ArrowNativeType>(run_ends_data: &super::MutableArrayData) -> T {
25 if run_ends_data.data.len == 0 {
26 T::default()
27 } else {
28 let typed_slice: &[T] = run_ends_data.data.buffer1.typed_data();
29 if typed_slice.len() >= run_ends_data.data.len {
30 typed_slice[run_ends_data.data.len - 1]
31 } else {
32 T::default()
33 }
34 }
35}
36
37pub fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) -> Result<(), ArrowError> {
42 if len == 0 {
43 return Ok(());
44 }
45
46 mutable.child_data[1].try_extend_nulls(1)?;
49
50 let run_end_type = if let DataType::RunEndEncoded(run_ends_field, _) = &mutable.data_type {
52 run_ends_field.data_type()
53 } else {
54 panic!("extend_nulls called on non-RunEndEncoded array");
55 };
56
57 macro_rules! extend_nulls_impl {
59 ($run_end_type:ty) => {{
60 let last_run_end = get_last_run_end::<$run_end_type>(&mutable.child_data[0]);
61 let new_value = last_run_end
62 .checked_add(<$run_end_type as ArrowNativeType>::usize_as(len))
63 .ok_or_else(|| {
64 ArrowError::InvalidArgumentError(
65 "run end overflow when extending RunEndEncoded array: \
66 use a larger run-end type (e.g. Int64 instead of Int32)"
67 .to_string(),
68 )
69 })?;
70 mutable.child_data[0]
71 .data
72 .buffer1
73 .extend_from_slice(new_value.to_byte_slice());
74 }};
75 }
76
77 match run_end_type {
79 DataType::Int16 => extend_nulls_impl!(i16),
80 DataType::Int32 => extend_nulls_impl!(i32),
81 DataType::Int64 => extend_nulls_impl!(i64),
82 _ => panic!("Invalid run end type for RunEndEncoded array: {run_end_type}"),
83 };
84
85 mutable.child_data[0].data.len += 1;
86 Ok(())
87}
88
89type ExtendArrays = (Vec<u8>, Option<(usize, usize)>);
91
92fn build_extend_arrays<T: ArrowNativeType + std::ops::Add<Output = T> + CheckedAdd>(
94 buffer: &Buffer,
95 length: usize,
96 start: usize,
97 len: usize,
98 dest_last_run_end: T,
99) -> Result<ExtendArrays, ArrowError> {
100 let mut run_ends_bytes = Vec::new();
101 let mut values_range: Option<(usize, usize)> = None;
102 let end = start + len;
103 let mut prev_end = 0;
104 let mut current_run_end = dest_last_run_end;
105
106 let typed_slice: &[T] = buffer.typed_data();
108
109 for i in 0..length {
110 if i < typed_slice.len() {
111 let run_end = typed_slice[i].to_usize().unwrap();
112
113 if prev_end <= start && run_end > start {
114 let start_offset = start - prev_end;
115 let end_offset = if run_end >= end {
116 end - prev_end
117 } else {
118 run_end - prev_end
119 };
120 current_run_end = current_run_end
121 .checked_add(&T::usize_as(end_offset - start_offset))
122 .ok_or_else(|| {
123 ArrowError::InvalidArgumentError(
124 "run end overflow when extending RunEndEncoded array: \
125 use a larger run-end type (e.g. Int64 instead of Int32)"
126 .to_string(),
127 )
128 })?;
129 run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
130
131 values_range = Some((i, i + 1));
133 } else if prev_end >= start && run_end <= end {
134 current_run_end = current_run_end
135 .checked_add(&T::usize_as(run_end - prev_end))
136 .ok_or_else(|| {
137 ArrowError::InvalidArgumentError(
138 "run end overflow when extending RunEndEncoded array: \
139 use a larger run-end type (e.g. Int64 instead of Int32)"
140 .to_string(),
141 )
142 })?;
143 run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
144
145 values_range = Some((values_range.expect("Unreachable: values_range cannot be None when prev_end >= start && run_end <= end. \
147 If prev_end >= start and run_end > prev_end (required for valid runs), then run_end > start, \
148 which means the first condition (prev_end <= start && run_end > start) would have been true \
149 and already set values_range to Some.").0, i + 1));
150 } else if prev_end < end && run_end >= end {
151 current_run_end = current_run_end
152 .checked_add(&T::usize_as(end - prev_end))
153 .ok_or_else(|| {
154 ArrowError::InvalidArgumentError(
155 "run end overflow when extending RunEndEncoded array: \
156 use a larger run-end type (e.g. Int64 instead of Int32)"
157 .to_string(),
158 )
159 })?;
160 run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
161
162 values_range = Some((values_range.expect("Unreachable: values_range cannot be None when prev_end < end && run_end >= end. \
164 Due to sequential processing and monotonic prev_end advancement, if we reach a run \
165 that spans beyond the slice end (run_end >= end), at least one previous condition \
166 must have matched first to set values_range. Either the first condition matched when \
167 the slice started (prev_end <= start && run_end > start), or the second condition \
168 matched for runs within the slice (prev_end >= start && run_end <= end).").0, i + 1));
169 break;
170 }
171
172 prev_end = run_end;
173 if prev_end >= end {
174 break;
175 }
176 } else {
177 break;
178 }
179 }
180 Ok((run_ends_bytes, values_range))
181}
182
183fn process_extends_batch<T: ArrowNativeType>(
185 mutable: &mut _MutableArrayData,
186 source_array_idx: usize,
187 run_ends_bytes: Vec<u8>,
188 values_range: Option<(usize, usize)>,
189) -> Result<(), ArrowError> {
190 if run_ends_bytes.is_empty() {
191 return Ok(());
192 }
193
194 mutable.child_data[0]
196 .data
197 .buffer1
198 .extend_from_slice(&run_ends_bytes);
199 mutable.child_data[0].data.len += run_ends_bytes.len() / std::mem::size_of::<T>();
200
201 let (start_idx, end_idx) =
203 values_range.expect("values_range should be Some if run_ends_bytes is not empty");
204 mutable.child_data[1].try_extend(source_array_idx, start_idx, end_idx)
205}
206
207pub fn build_extend(array: &ArrayData) -> Extend<'_> {
211 Box::new(
212 move |mutable: &mut _MutableArrayData, array_idx: usize, start: usize, len: usize| {
213 if len == 0 {
214 return Ok(());
215 }
216
217 let source_run_ends = &array.child_data()[0];
219 let source_buffer = &source_run_ends.buffers()[0];
220
221 let dest_run_end_type =
223 if let DataType::RunEndEncoded(run_ends_field, _) = &mutable.data_type {
224 run_ends_field.data_type()
225 } else {
226 panic!("extend called on non-RunEndEncoded mutable array");
227 };
228
229 macro_rules! build_and_process_impl {
231 ($run_end_type:ty) => {{
232 let dest_last_run_end =
233 get_last_run_end::<$run_end_type>(&mutable.child_data[0]);
234 let (run_ends_bytes, values_range) = build_extend_arrays::<$run_end_type>(
235 source_buffer,
236 source_run_ends.len(),
237 start + array.offset(),
238 len,
239 dest_last_run_end,
240 )?;
241 process_extends_batch::<$run_end_type>(
242 mutable,
243 array_idx,
244 run_ends_bytes,
245 values_range,
246 )?;
247 }};
248 }
249
250 match dest_run_end_type {
251 DataType::Int16 => build_and_process_impl!(i16),
252 DataType::Int32 => build_and_process_impl!(i32),
253 DataType::Int64 => build_and_process_impl!(i64),
254 _ => panic!("Invalid run end type for RunEndEncoded array: {dest_run_end_type}",),
255 }
256 Ok(())
257 },
258 )
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264 use crate::transform::MutableArrayData;
265 use crate::{ArrayData, ArrayDataBuilder};
266 use arrow_buffer::Buffer;
267 use arrow_schema::{DataType, Field};
268 use std::sync::Arc;
269
270 fn create_run_array_data(run_ends: Vec<i32>, values: ArrayData) -> ArrayData {
271 let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false));
272 let values_field = Arc::new(Field::new("values", values.data_type().clone(), true));
273 let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
274
275 let last_run_end = if run_ends.is_empty() {
276 0
277 } else {
278 run_ends[run_ends.len() - 1] as usize
279 };
280
281 let run_ends_buffer = Buffer::from_vec(run_ends);
282 let run_ends_data = ArrayDataBuilder::new(DataType::Int32)
283 .len(run_ends_buffer.len() / std::mem::size_of::<i32>())
284 .add_buffer(run_ends_buffer)
285 .build()
286 .unwrap();
287
288 ArrayDataBuilder::new(data_type)
289 .len(last_run_end)
290 .add_child_data(run_ends_data)
291 .add_child_data(values)
292 .build()
293 .unwrap()
294 }
295
296 fn create_run_array_data_int16(run_ends: Vec<i16>, values: ArrayData) -> ArrayData {
297 let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int16, false));
298 let values_field = Arc::new(Field::new("values", values.data_type().clone(), true));
299 let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
300
301 let last_run_end = if run_ends.is_empty() {
302 0
303 } else {
304 run_ends[run_ends.len() - 1] as usize
305 };
306
307 let run_ends_buffer = Buffer::from_vec(run_ends);
308 let run_ends_data = ArrayDataBuilder::new(DataType::Int16)
309 .len(run_ends_buffer.len() / std::mem::size_of::<i16>())
310 .add_buffer(run_ends_buffer)
311 .build()
312 .unwrap();
313
314 ArrayDataBuilder::new(data_type)
315 .len(last_run_end)
316 .add_child_data(run_ends_data)
317 .add_child_data(values)
318 .build()
319 .unwrap()
320 }
321
322 fn create_run_array_data_int64(run_ends: Vec<i64>, values: ArrayData) -> ArrayData {
323 let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int64, false));
324 let values_field = Arc::new(Field::new("values", values.data_type().clone(), true));
325 let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
326
327 let last_run_end = if run_ends.is_empty() {
328 0
329 } else {
330 run_ends[run_ends.len() - 1] as usize
331 };
332
333 let run_ends_buffer = Buffer::from_vec(run_ends);
334 let run_ends_data = ArrayDataBuilder::new(DataType::Int64)
335 .len(run_ends_buffer.len() / std::mem::size_of::<i64>())
336 .add_buffer(run_ends_buffer)
337 .build()
338 .unwrap();
339
340 ArrayDataBuilder::new(data_type)
341 .len(last_run_end)
342 .add_child_data(run_ends_data)
343 .add_child_data(values)
344 .build()
345 .unwrap()
346 }
347
348 fn create_int32_array_data(values: Vec<i32>) -> ArrayData {
349 let buffer = Buffer::from_vec(values);
350 ArrayDataBuilder::new(DataType::Int32)
351 .len(buffer.len() / std::mem::size_of::<i32>())
352 .add_buffer(buffer)
353 .build()
354 .unwrap()
355 }
356
357 fn create_string_dict_array_data(values: Vec<&str>, dict_values: Vec<&str>) -> ArrayData {
358 let dict_offsets: Vec<i32> = dict_values
360 .iter()
361 .scan(0i32, |acc, s| {
362 let offset = *acc;
363 *acc += s.len() as i32;
364 Some(offset)
365 })
366 .chain(std::iter::once(
367 dict_values.iter().map(|s| s.len()).sum::<usize>() as i32,
368 ))
369 .collect();
370
371 let dict_data: Vec<u8> = dict_values.iter().flat_map(|s| s.bytes()).collect();
372
373 let dict_array = ArrayDataBuilder::new(DataType::Utf8)
374 .len(dict_values.len())
375 .add_buffer(Buffer::from_vec(dict_offsets))
376 .add_buffer(Buffer::from_vec(dict_data))
377 .build()
378 .unwrap();
379
380 let keys: Vec<i32> = values
382 .iter()
383 .map(|v| dict_values.iter().position(|d| d == v).unwrap() as i32)
384 .collect();
385
386 let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
388
389 ArrayDataBuilder::new(dict_type)
390 .len(values.len())
391 .add_buffer(Buffer::from_vec(keys))
392 .add_child_data(dict_array)
393 .build()
394 .unwrap()
395 }
396
397 #[test]
398 fn test_extend_nulls_int32() {
399 let values = create_int32_array_data(vec![42]);
401
402 let ree_array = create_run_array_data(vec![5], values);
404
405 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
406
407 mutable.try_extend_nulls(3).unwrap();
408 mutable.try_extend(0, 0, 5).unwrap();
409 mutable.try_extend_nulls(3).unwrap();
410
411 let result = mutable.freeze();
413 let run_ends_buffer = &result.child_data()[0].buffers()[0];
414 let run_ends_slice = run_ends_buffer.as_slice();
415
416 assert_eq!(result.child_data()[0].len(), 3);
418 let first_run_end = i32::from_ne_bytes(run_ends_slice[0..4].try_into().unwrap());
419 let second_run_end = i32::from_ne_bytes(run_ends_slice[4..8].try_into().unwrap());
420 let third_run_end = i32::from_ne_bytes(run_ends_slice[8..12].try_into().unwrap());
421 assert_eq!(first_run_end, 3);
422 assert_eq!(second_run_end, 8);
423 assert_eq!(third_run_end, 11);
424
425 assert_eq!(result.child_data()[1].len(), 3); let values_buffer = &result.child_data()[1].buffers()[0];
428 let values_slice = values_buffer.as_slice();
429
430 let second_value = i32::from_ne_bytes(values_slice[4..8].try_into().unwrap());
432
433 assert_eq!(second_value, 42);
435
436 let values_array = &result.child_data()[1];
438 assert!(values_array.is_null(0));
440 assert!(values_array.is_valid(1));
442 assert!(values_array.is_null(2));
444 }
445
446 #[test]
447 fn test_extend_nulls_int16() {
448 let values = create_int32_array_data(vec![42]);
450
451 let ree_array = create_run_array_data_int16(vec![5i16], values);
453
454 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
455
456 mutable.try_extend(0, 0, 5).unwrap();
458
459 mutable.try_extend_nulls(3).unwrap();
461
462 let result = mutable.freeze();
464 let run_ends_buffer = &result.child_data()[0].buffers()[0];
465 let run_ends_slice = run_ends_buffer.as_slice();
466
467 assert_eq!(result.child_data()[0].len(), 2);
469 let first_run_end = i16::from_ne_bytes(run_ends_slice[0..2].try_into().unwrap());
470 let second_run_end = i16::from_ne_bytes(run_ends_slice[2..4].try_into().unwrap());
471 assert_eq!(first_run_end, 5);
472 assert_eq!(second_run_end, 8);
473 }
474
475 #[test]
476 fn test_extend_nulls_int64() {
477 let values = create_int32_array_data(vec![42]);
479
480 let ree_array = create_run_array_data_int64(vec![5i64], values);
482
483 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
484
485 mutable.try_extend(0, 0, 5).unwrap();
487
488 mutable.try_extend_nulls(3).unwrap();
490
491 let result = mutable.freeze();
493 let run_ends_buffer = &result.child_data()[0].buffers()[0];
494 let run_ends_slice = run_ends_buffer.as_slice();
495
496 assert_eq!(result.child_data()[0].len(), 2);
498 let first_run_end = i64::from_ne_bytes(run_ends_slice[0..8].try_into().unwrap());
499 let second_run_end = i64::from_ne_bytes(run_ends_slice[8..16].try_into().unwrap());
500 assert_eq!(first_run_end, 5);
501 assert_eq!(second_run_end, 8);
502 }
503
504 #[test]
505 fn test_extend_int32() {
506 let values = create_int32_array_data(vec![10, 20]);
508
509 let ree_array = create_run_array_data(vec![2, 5], values);
511
512 let mut mutable = MutableArrayData::new(vec![&ree_array], false, 10);
513
514 mutable.try_extend(0, 0, 5).unwrap();
516
517 let result = mutable.freeze();
518
519 assert_eq!(result.len(), 5); assert!(!result.child_data()[0].is_empty()); assert_eq!(result.child_data()[0].len(), result.child_data()[1].len()); }
526
527 #[test]
528 fn test_extend_empty() {
529 let values = create_int32_array_data(vec![]);
530 let ree_array = create_run_array_data(vec![], values);
531
532 let mut mutable = MutableArrayData::new(vec![&ree_array], false, 10);
533 mutable.try_extend(0, 0, 0).unwrap();
534
535 let result = mutable.freeze();
536 assert_eq!(result.len(), 0);
537 assert_eq!(result.child_data()[0].len(), 0);
538 }
539
540 #[test]
541 fn test_build_extend_arrays_int16() {
542 let buffer = Buffer::from_vec(vec![3i16, 5i16, 8i16]);
543 let (run_ends_bytes, values_range) =
544 build_extend_arrays::<i16>(&buffer, 3, 2, 4, 0i16).unwrap();
545
546 assert_eq!(run_ends_bytes.len(), 3 * std::mem::size_of::<i16>());
553 assert_eq!(values_range, Some((0, 3)));
554
555 let expected_bytes = [1i16, 3i16, 4i16]
557 .iter()
558 .flat_map(|&val| val.to_ne_bytes())
559 .collect::<Vec<u8>>();
560 assert_eq!(run_ends_bytes, expected_bytes);
561 }
562
563 #[test]
564 fn test_build_extend_arrays_int64() {
565 let buffer = Buffer::from_vec(vec![3i64, 5i64, 8i64]);
566 let (run_ends_bytes, values_range) =
567 build_extend_arrays::<i64>(&buffer, 3, 2, 4, 0i64).unwrap();
568
569 assert_eq!(run_ends_bytes.len(), 3 * std::mem::size_of::<i64>());
571 assert_eq!(values_range, Some((0, 3)));
572
573 let expected_bytes = [1i64, 3i64, 4i64]
575 .iter()
576 .flat_map(|&val| val.to_ne_bytes())
577 .collect::<Vec<u8>>();
578 assert_eq!(run_ends_bytes, expected_bytes);
579 }
580
581 #[test]
582 fn test_extend_string_dict() {
583 let dict_values = vec!["hello", "world"];
585 let values = create_string_dict_array_data(vec!["hello", "world"], dict_values);
586
587 let ree_array = create_run_array_data(vec![2, 5], values);
589
590 let mut mutable = MutableArrayData::new(vec![&ree_array], false, 10);
591
592 mutable.try_extend(0, 0, 5).unwrap();
594
595 let result = mutable.freeze();
596
597 assert_eq!(result.len(), 5); assert!(!result.child_data()[0].is_empty()); assert_eq!(result.child_data()[0].len(), result.child_data()[1].len()); assert_eq!(result.child_data()[0].len(), 2);
606 assert_eq!(result.child_data()[1].len(), 2);
607 }
608
609 #[test]
610 fn test_extend_nulls_overflow_i16() {
611 let values = create_int32_array_data(vec![42]);
612 let ree_array = create_run_array_data_int16(vec![5], values);
614 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
615
616 mutable.try_extend(0, 0, 5_usize).unwrap();
618
619 let err = mutable.try_extend_nulls(i16::MAX as usize).unwrap_err();
621 assert!(
622 err.to_string().contains("run end overflow"),
623 "unexpected error: {err}"
624 );
625 }
626
627 #[test]
628 fn test_extend_nulls_overflow_i32() {
629 let values = create_int32_array_data(vec![42]);
630 let ree_array = create_run_array_data(vec![10], values);
632 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
633
634 mutable.try_extend(0, 0, 10_usize).unwrap();
636
637 let err = mutable.try_extend_nulls(i32::MAX as usize).unwrap_err();
639 assert!(
640 err.to_string().contains("run end overflow"),
641 "unexpected error: {err}"
642 );
643 }
644
645 #[test]
646 fn test_build_extend_overflow_i16() {
647 let values = create_int32_array_data(vec![10]);
649 let source_array = create_run_array_data_int16(vec![20], values);
650
651 let dest_values = create_int32_array_data(vec![42]);
653 let dest_array = create_run_array_data_int16(vec![i16::MAX - 5], dest_values);
654
655 let mut mutable = MutableArrayData::new(vec![&source_array, &dest_array], false, 10);
656
657 mutable.try_extend(1, 0, (i16::MAX - 5) as usize).unwrap();
659
660 let err = mutable.try_extend(0, 0, 20).unwrap_err();
662 assert!(
663 err.to_string().contains("run end overflow"),
664 "unexpected error: {err}"
665 );
666 }
667
668 #[test]
669 fn test_build_extend_overflow_i32() {
670 let values = create_int32_array_data(vec![10]);
672 let source_array = create_run_array_data(vec![100], values);
673
674 let dest_values = create_int32_array_data(vec![42]);
676 let dest_array = create_run_array_data(vec![i32::MAX - 50], dest_values);
677
678 let mut mutable = MutableArrayData::new(vec![&source_array, &dest_array], false, 10);
679
680 mutable.try_extend(1, 0, (i32::MAX - 50) as usize).unwrap();
682
683 let err = mutable.try_extend(0, 0, 100).unwrap_err();
685 assert!(
686 err.to_string().contains("run end overflow"),
687 "unexpected error: {err}"
688 );
689 }
690}