1use super::{_MutableArrayData, ArrayData, Extend};
19use arrow_buffer::{ArrowNativeType, Buffer, ToByteSlice};
20use arrow_schema::DataType;
21use num_traits::CheckedAdd;
22
23fn get_last_run_end<T: ArrowNativeType>(run_ends_data: &super::MutableArrayData) -> T {
25 if run_ends_data.data.len == 0 {
26 T::default()
27 } else {
28 let typed_slice: &[T] = run_ends_data.data.buffer1.typed_data();
29 if typed_slice.len() >= run_ends_data.data.len {
30 typed_slice[run_ends_data.data.len - 1]
31 } else {
32 T::default()
33 }
34 }
35}
36
37pub fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) {
42 if len == 0 {
43 return;
44 }
45
46 mutable.child_data[1].extend_nulls(1);
49
50 let run_end_type = if let DataType::RunEndEncoded(run_ends_field, _) = &mutable.data_type {
52 run_ends_field.data_type()
53 } else {
54 panic!("extend_nulls called on non-RunEndEncoded array");
55 };
56
57 macro_rules! extend_nulls_impl {
59 ($run_end_type:ty) => {{
60 let last_run_end = get_last_run_end::<$run_end_type>(&mutable.child_data[0]);
61 let new_value = last_run_end
62 .checked_add(<$run_end_type as ArrowNativeType>::usize_as(len))
63 .expect("run end overflow");
64 mutable.child_data[0]
65 .data
66 .buffer1
67 .extend_from_slice(new_value.to_byte_slice());
68 }};
69 }
70
71 match run_end_type {
73 DataType::Int16 => extend_nulls_impl!(i16),
74 DataType::Int32 => extend_nulls_impl!(i32),
75 DataType::Int64 => extend_nulls_impl!(i64),
76 _ => panic!("Invalid run end type for RunEndEncoded array: {run_end_type}"),
77 };
78
79 mutable.child_data[0].data.len += 1;
80}
81
82fn build_extend_arrays<T: ArrowNativeType + std::ops::Add<Output = T> + CheckedAdd>(
84 buffer: &Buffer,
85 length: usize,
86 start: usize,
87 len: usize,
88 dest_last_run_end: T,
89) -> (Vec<u8>, Option<(usize, usize)>) {
90 let mut run_ends_bytes = Vec::new();
91 let mut values_range: Option<(usize, usize)> = None;
92 let end = start + len;
93 let mut prev_end = 0;
94 let mut current_run_end = dest_last_run_end;
95
96 let typed_slice: &[T] = buffer.typed_data();
98
99 for i in 0..length {
100 if i < typed_slice.len() {
101 let run_end = typed_slice[i].to_usize().unwrap();
102
103 if prev_end <= start && run_end > start {
104 let start_offset = start - prev_end;
105 let end_offset = if run_end >= end {
106 end - prev_end
107 } else {
108 run_end - prev_end
109 };
110 current_run_end = current_run_end
111 .checked_add(&T::usize_as(end_offset - start_offset))
112 .expect("run end overflow");
113 run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
114
115 values_range = Some((i, i + 1));
117 } else if prev_end >= start && run_end <= end {
118 current_run_end = current_run_end
119 .checked_add(&T::usize_as(run_end - prev_end))
120 .expect("run end overflow");
121 run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
122
123 values_range = Some((values_range.expect("Unreachable: values_range cannot be None when prev_end >= start && run_end <= end. \
125 If prev_end >= start and run_end > prev_end (required for valid runs), then run_end > start, \
126 which means the first condition (prev_end <= start && run_end > start) would have been true \
127 and already set values_range to Some.").0, i + 1));
128 } else if prev_end < end && run_end >= end {
129 current_run_end = current_run_end
130 .checked_add(&T::usize_as(end - prev_end))
131 .expect("run end overflow");
132 run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
133
134 values_range = Some((values_range.expect("Unreachable: values_range cannot be None when prev_end < end && run_end >= end. \
136 Due to sequential processing and monotonic prev_end advancement, if we reach a run \
137 that spans beyond the slice end (run_end >= end), at least one previous condition \
138 must have matched first to set values_range. Either the first condition matched when \
139 the slice started (prev_end <= start && run_end > start), or the second condition \
140 matched for runs within the slice (prev_end >= start && run_end <= end).").0, i + 1));
141 break;
142 }
143
144 prev_end = run_end;
145 if prev_end >= end {
146 break;
147 }
148 } else {
149 break;
150 }
151 }
152 (run_ends_bytes, values_range)
153}
154
155fn process_extends_batch<T: ArrowNativeType>(
157 mutable: &mut _MutableArrayData,
158 source_array_idx: usize,
159 run_ends_bytes: Vec<u8>,
160 values_range: Option<(usize, usize)>,
161) {
162 if run_ends_bytes.is_empty() {
163 return;
164 }
165
166 mutable.child_data[0]
168 .data
169 .buffer1
170 .extend_from_slice(&run_ends_bytes);
171 mutable.child_data[0].data.len += run_ends_bytes.len() / std::mem::size_of::<T>();
172
173 let (start_idx, end_idx) =
175 values_range.expect("values_range should be Some if run_ends_bytes is not empty");
176 mutable.child_data[1].extend(source_array_idx, start_idx, end_idx);
177}
178
179pub fn build_extend(array: &ArrayData) -> Extend<'_> {
183 Box::new(
184 move |mutable: &mut _MutableArrayData, array_idx: usize, start: usize, len: usize| {
185 if len == 0 {
186 return;
187 }
188
189 let source_run_ends = &array.child_data()[0];
191 let source_buffer = &source_run_ends.buffers()[0];
192
193 let dest_run_end_type =
195 if let DataType::RunEndEncoded(run_ends_field, _) = &mutable.data_type {
196 run_ends_field.data_type()
197 } else {
198 panic!("extend called on non-RunEndEncoded mutable array");
199 };
200
201 macro_rules! build_and_process_impl {
203 ($run_end_type:ty) => {{
204 let dest_last_run_end =
205 get_last_run_end::<$run_end_type>(&mutable.child_data[0]);
206 let (run_ends_bytes, values_range) = build_extend_arrays::<$run_end_type>(
207 source_buffer,
208 source_run_ends.len(),
209 start,
210 len,
211 dest_last_run_end,
212 );
213 process_extends_batch::<$run_end_type>(
214 mutable,
215 array_idx,
216 run_ends_bytes,
217 values_range,
218 );
219 }};
220 }
221
222 match dest_run_end_type {
223 DataType::Int16 => build_and_process_impl!(i16),
224 DataType::Int32 => build_and_process_impl!(i32),
225 DataType::Int64 => build_and_process_impl!(i64),
226 _ => panic!("Invalid run end type for RunEndEncoded array: {dest_run_end_type}",),
227 }
228 },
229 )
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use crate::transform::MutableArrayData;
236 use crate::{ArrayData, ArrayDataBuilder};
237 use arrow_buffer::Buffer;
238 use arrow_schema::{DataType, Field};
239 use std::sync::Arc;
240
241 fn create_run_array_data(run_ends: Vec<i32>, values: ArrayData) -> ArrayData {
242 let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false));
243 let values_field = Arc::new(Field::new("values", values.data_type().clone(), true));
244 let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
245
246 let last_run_end = if run_ends.is_empty() {
247 0
248 } else {
249 run_ends[run_ends.len() - 1] as usize
250 };
251
252 let run_ends_buffer = Buffer::from_vec(run_ends);
253 let run_ends_data = ArrayDataBuilder::new(DataType::Int32)
254 .len(run_ends_buffer.len() / std::mem::size_of::<i32>())
255 .add_buffer(run_ends_buffer)
256 .build()
257 .unwrap();
258
259 ArrayDataBuilder::new(data_type)
260 .len(last_run_end)
261 .add_child_data(run_ends_data)
262 .add_child_data(values)
263 .build()
264 .unwrap()
265 }
266
267 fn create_run_array_data_int16(run_ends: Vec<i16>, values: ArrayData) -> ArrayData {
268 let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int16, false));
269 let values_field = Arc::new(Field::new("values", values.data_type().clone(), true));
270 let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
271
272 let last_run_end = if run_ends.is_empty() {
273 0
274 } else {
275 run_ends[run_ends.len() - 1] as usize
276 };
277
278 let run_ends_buffer = Buffer::from_vec(run_ends);
279 let run_ends_data = ArrayDataBuilder::new(DataType::Int16)
280 .len(run_ends_buffer.len() / std::mem::size_of::<i16>())
281 .add_buffer(run_ends_buffer)
282 .build()
283 .unwrap();
284
285 ArrayDataBuilder::new(data_type)
286 .len(last_run_end)
287 .add_child_data(run_ends_data)
288 .add_child_data(values)
289 .build()
290 .unwrap()
291 }
292
293 fn create_run_array_data_int64(run_ends: Vec<i64>, values: ArrayData) -> ArrayData {
294 let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int64, false));
295 let values_field = Arc::new(Field::new("values", values.data_type().clone(), true));
296 let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
297
298 let last_run_end = if run_ends.is_empty() {
299 0
300 } else {
301 run_ends[run_ends.len() - 1] as usize
302 };
303
304 let run_ends_buffer = Buffer::from_vec(run_ends);
305 let run_ends_data = ArrayDataBuilder::new(DataType::Int64)
306 .len(run_ends_buffer.len() / std::mem::size_of::<i64>())
307 .add_buffer(run_ends_buffer)
308 .build()
309 .unwrap();
310
311 ArrayDataBuilder::new(data_type)
312 .len(last_run_end)
313 .add_child_data(run_ends_data)
314 .add_child_data(values)
315 .build()
316 .unwrap()
317 }
318
319 fn create_int32_array_data(values: Vec<i32>) -> ArrayData {
320 let buffer = Buffer::from_vec(values);
321 ArrayDataBuilder::new(DataType::Int32)
322 .len(buffer.len() / std::mem::size_of::<i32>())
323 .add_buffer(buffer)
324 .build()
325 .unwrap()
326 }
327
328 fn create_string_dict_array_data(values: Vec<&str>, dict_values: Vec<&str>) -> ArrayData {
329 let dict_offsets: Vec<i32> = dict_values
331 .iter()
332 .scan(0i32, |acc, s| {
333 let offset = *acc;
334 *acc += s.len() as i32;
335 Some(offset)
336 })
337 .chain(std::iter::once(
338 dict_values.iter().map(|s| s.len()).sum::<usize>() as i32,
339 ))
340 .collect();
341
342 let dict_data: Vec<u8> = dict_values.iter().flat_map(|s| s.bytes()).collect();
343
344 let dict_array = ArrayDataBuilder::new(DataType::Utf8)
345 .len(dict_values.len())
346 .add_buffer(Buffer::from_vec(dict_offsets))
347 .add_buffer(Buffer::from_vec(dict_data))
348 .build()
349 .unwrap();
350
351 let keys: Vec<i32> = values
353 .iter()
354 .map(|v| dict_values.iter().position(|d| d == v).unwrap() as i32)
355 .collect();
356
357 let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
359
360 ArrayDataBuilder::new(dict_type)
361 .len(values.len())
362 .add_buffer(Buffer::from_vec(keys))
363 .add_child_data(dict_array)
364 .build()
365 .unwrap()
366 }
367
368 #[test]
369 fn test_extend_nulls_int32() {
370 let values = create_int32_array_data(vec![42]);
372
373 let ree_array = create_run_array_data(vec![5], values);
375
376 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
377
378 mutable.extend_nulls(3);
379 mutable.extend(0, 0, 5);
380 mutable.extend_nulls(3);
381
382 let result = mutable.freeze();
384 let run_ends_buffer = &result.child_data()[0].buffers()[0];
385 let run_ends_slice = run_ends_buffer.as_slice();
386
387 assert_eq!(result.child_data()[0].len(), 3);
389 let first_run_end = i32::from_ne_bytes(run_ends_slice[0..4].try_into().unwrap());
390 let second_run_end = i32::from_ne_bytes(run_ends_slice[4..8].try_into().unwrap());
391 let third_run_end = i32::from_ne_bytes(run_ends_slice[8..12].try_into().unwrap());
392 assert_eq!(first_run_end, 3);
393 assert_eq!(second_run_end, 8);
394 assert_eq!(third_run_end, 11);
395
396 assert_eq!(result.child_data()[1].len(), 3); let values_buffer = &result.child_data()[1].buffers()[0];
399 let values_slice = values_buffer.as_slice();
400
401 let second_value = i32::from_ne_bytes(values_slice[4..8].try_into().unwrap());
403
404 assert_eq!(second_value, 42);
406
407 let values_array = &result.child_data()[1];
409 assert!(values_array.is_null(0));
411 assert!(values_array.is_valid(1));
413 assert!(values_array.is_null(2));
415 }
416
417 #[test]
418 fn test_extend_nulls_int16() {
419 let values = create_int32_array_data(vec![42]);
421
422 let ree_array = create_run_array_data_int16(vec![5i16], values);
424
425 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
426
427 mutable.extend(0, 0, 5);
429
430 mutable.extend_nulls(3);
432
433 let result = mutable.freeze();
435 let run_ends_buffer = &result.child_data()[0].buffers()[0];
436 let run_ends_slice = run_ends_buffer.as_slice();
437
438 assert_eq!(result.child_data()[0].len(), 2);
440 let first_run_end = i16::from_ne_bytes(run_ends_slice[0..2].try_into().unwrap());
441 let second_run_end = i16::from_ne_bytes(run_ends_slice[2..4].try_into().unwrap());
442 assert_eq!(first_run_end, 5);
443 assert_eq!(second_run_end, 8);
444 }
445
446 #[test]
447 fn test_extend_nulls_int64() {
448 let values = create_int32_array_data(vec![42]);
450
451 let ree_array = create_run_array_data_int64(vec![5i64], values);
453
454 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
455
456 mutable.extend(0, 0, 5);
458
459 mutable.extend_nulls(3);
461
462 let result = mutable.freeze();
464 let run_ends_buffer = &result.child_data()[0].buffers()[0];
465 let run_ends_slice = run_ends_buffer.as_slice();
466
467 assert_eq!(result.child_data()[0].len(), 2);
469 let first_run_end = i64::from_ne_bytes(run_ends_slice[0..8].try_into().unwrap());
470 let second_run_end = i64::from_ne_bytes(run_ends_slice[8..16].try_into().unwrap());
471 assert_eq!(first_run_end, 5);
472 assert_eq!(second_run_end, 8);
473 }
474
475 #[test]
476 fn test_extend_int32() {
477 let values = create_int32_array_data(vec![10, 20]);
479
480 let ree_array = create_run_array_data(vec![2, 5], values);
482
483 let mut mutable = MutableArrayData::new(vec![&ree_array], false, 10);
484
485 mutable.extend(0, 0, 5);
487
488 let result = mutable.freeze();
489
490 assert_eq!(result.len(), 5); assert!(!result.child_data()[0].is_empty()); assert_eq!(result.child_data()[0].len(), result.child_data()[1].len()); }
497
498 #[test]
499 fn test_extend_empty() {
500 let values = create_int32_array_data(vec![]);
501 let ree_array = create_run_array_data(vec![], values);
502
503 let mut mutable = MutableArrayData::new(vec![&ree_array], false, 10);
504 mutable.extend(0, 0, 0);
505
506 let result = mutable.freeze();
507 assert_eq!(result.len(), 0);
508 assert_eq!(result.child_data()[0].len(), 0);
509 }
510
511 #[test]
512 fn test_build_extend_arrays_int16() {
513 let buffer = Buffer::from_vec(vec![3i16, 5i16, 8i16]);
514 let (run_ends_bytes, values_range) = build_extend_arrays::<i16>(&buffer, 3, 2, 4, 0i16);
515
516 assert_eq!(run_ends_bytes.len(), 3 * std::mem::size_of::<i16>());
523 assert_eq!(values_range, Some((0, 3)));
524
525 let expected_bytes = [1i16, 3i16, 4i16]
527 .iter()
528 .flat_map(|&val| val.to_ne_bytes())
529 .collect::<Vec<u8>>();
530 assert_eq!(run_ends_bytes, expected_bytes);
531 }
532
533 #[test]
534 fn test_build_extend_arrays_int64() {
535 let buffer = Buffer::from_vec(vec![3i64, 5i64, 8i64]);
536 let (run_ends_bytes, values_range) = build_extend_arrays::<i64>(&buffer, 3, 2, 4, 0i64);
537
538 assert_eq!(run_ends_bytes.len(), 3 * std::mem::size_of::<i64>());
540 assert_eq!(values_range, Some((0, 3)));
541
542 let expected_bytes = [1i64, 3i64, 4i64]
544 .iter()
545 .flat_map(|&val| val.to_ne_bytes())
546 .collect::<Vec<u8>>();
547 assert_eq!(run_ends_bytes, expected_bytes);
548 }
549
550 #[test]
551 fn test_extend_string_dict() {
552 let dict_values = vec!["hello", "world"];
554 let values = create_string_dict_array_data(vec!["hello", "world"], dict_values);
555
556 let ree_array = create_run_array_data(vec![2, 5], values);
558
559 let mut mutable = MutableArrayData::new(vec![&ree_array], false, 10);
560
561 mutable.extend(0, 0, 5);
563
564 let result = mutable.freeze();
565
566 assert_eq!(result.len(), 5); assert!(!result.child_data()[0].is_empty()); assert_eq!(result.child_data()[0].len(), result.child_data()[1].len()); assert_eq!(result.child_data()[0].len(), 2);
575 assert_eq!(result.child_data()[1].len(), 2);
576 }
577
578 #[test]
579 #[should_panic(expected = "run end overflow")]
580 fn test_extend_nulls_overflow_i16() {
581 let values = create_int32_array_data(vec![42]);
582 let ree_array = create_run_array_data_int16(vec![5], values);
584 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
585
586 mutable.extend(0, 0, 5_usize);
588
589 mutable.extend_nulls(i16::MAX as usize);
591 }
592
593 #[test]
594 #[should_panic(expected = "run end overflow")]
595 fn test_extend_nulls_overflow_i32() {
596 let values = create_int32_array_data(vec![42]);
597 let ree_array = create_run_array_data(vec![10], values);
599 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
600
601 mutable.extend(0, 0, 10_usize);
603
604 mutable.extend_nulls(i32::MAX as usize);
606 }
607
608 #[test]
609 #[should_panic(expected = "run end overflow")]
610 fn test_build_extend_overflow_i16() {
611 let values = create_int32_array_data(vec![10]);
613 let source_array = create_run_array_data_int16(vec![20], values);
614
615 let dest_values = create_int32_array_data(vec![42]);
617 let dest_array = create_run_array_data_int16(vec![i16::MAX - 5], dest_values);
618
619 let mut mutable = MutableArrayData::new(vec![&source_array, &dest_array], false, 10);
620
621 mutable.extend(1, 0, (i16::MAX - 5) as usize);
623
624 mutable.extend(0, 0, 20);
626 }
627
628 #[test]
629 #[should_panic(expected = "run end overflow")]
630 fn test_build_extend_overflow_i32() {
631 let values = create_int32_array_data(vec![10]);
633 let source_array = create_run_array_data(vec![100], values);
634
635 let dest_values = create_int32_array_data(vec![42]);
637 let dest_array = create_run_array_data(vec![i32::MAX - 50], dest_values);
638
639 let mut mutable = MutableArrayData::new(vec![&source_array, &dest_array], false, 10);
640
641 mutable.extend(1, 0, (i32::MAX - 50) as usize);
643
644 mutable.extend(0, 0, 100);
646 }
647}