1use super::{_MutableArrayData, ArrayData, Extend};
19use arrow_buffer::{ArrowNativeType, Buffer, ToByteSlice};
20use arrow_schema::DataType;
21use num_traits::CheckedAdd;
22
23fn get_last_run_end<T: ArrowNativeType>(run_ends_data: &super::MutableArrayData) -> T {
25 if run_ends_data.data.len == 0 {
26 T::default()
27 } else {
28 let buffer = Buffer::from(run_ends_data.data.buffer1.as_slice());
30 let typed_slice: &[T] = buffer.typed_data();
31 if typed_slice.len() >= run_ends_data.data.len {
32 typed_slice[run_ends_data.data.len - 1]
33 } else {
34 T::default()
35 }
36 }
37}
38
39pub fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) {
44 if len == 0 {
45 return;
46 }
47
48 mutable.child_data[1].extend_nulls(1);
51
52 let run_end_type = if let DataType::RunEndEncoded(run_ends_field, _) = &mutable.data_type {
54 run_ends_field.data_type()
55 } else {
56 panic!("extend_nulls called on non-RunEndEncoded array");
57 };
58
59 macro_rules! extend_nulls_impl {
61 ($run_end_type:ty) => {{
62 let last_run_end = get_last_run_end::<$run_end_type>(&mutable.child_data[0]);
63 let new_value = last_run_end
64 .checked_add(<$run_end_type as ArrowNativeType>::usize_as(len))
65 .expect("run end overflow");
66 mutable.child_data[0]
67 .data
68 .buffer1
69 .extend_from_slice(new_value.to_byte_slice());
70 }};
71 }
72
73 match run_end_type {
75 DataType::Int16 => extend_nulls_impl!(i16),
76 DataType::Int32 => extend_nulls_impl!(i32),
77 DataType::Int64 => extend_nulls_impl!(i64),
78 _ => panic!("Invalid run end type for RunEndEncoded array: {run_end_type}"),
79 };
80
81 mutable.child_data[0].data.len += 1;
82}
83
84fn build_extend_arrays<T: ArrowNativeType + std::ops::Add<Output = T> + CheckedAdd>(
86 buffer: &Buffer,
87 length: usize,
88 start: usize,
89 len: usize,
90 dest_last_run_end: T,
91) -> (Vec<u8>, Option<(usize, usize)>) {
92 let mut run_ends_bytes = Vec::new();
93 let mut values_range: Option<(usize, usize)> = None;
94 let end = start + len;
95 let mut prev_end = 0;
96 let mut current_run_end = dest_last_run_end;
97
98 let typed_slice: &[T] = buffer.typed_data();
100
101 for i in 0..length {
102 if i < typed_slice.len() {
103 let run_end = typed_slice[i].to_usize().unwrap();
104
105 if prev_end <= start && run_end > start {
106 let start_offset = start - prev_end;
107 let end_offset = if run_end >= end {
108 end - prev_end
109 } else {
110 run_end - prev_end
111 };
112 current_run_end = current_run_end
113 .checked_add(&T::usize_as(end_offset - start_offset))
114 .expect("run end overflow");
115 run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
116
117 values_range = Some((i, i + 1));
119 } else if prev_end >= start && run_end <= end {
120 current_run_end = current_run_end
121 .checked_add(&T::usize_as(run_end - prev_end))
122 .expect("run end overflow");
123 run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
124
125 values_range = Some((values_range.expect("Unreachable: values_range cannot be None when prev_end >= start && run_end <= end. \
127 If prev_end >= start and run_end > prev_end (required for valid runs), then run_end > start, \
128 which means the first condition (prev_end <= start && run_end > start) would have been true \
129 and already set values_range to Some.").0, i + 1));
130 } else if prev_end < end && run_end >= end {
131 current_run_end = current_run_end
132 .checked_add(&T::usize_as(end - prev_end))
133 .expect("run end overflow");
134 run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
135
136 values_range = Some((values_range.expect("Unreachable: values_range cannot be None when prev_end < end && run_end >= end. \
138 Due to sequential processing and monotonic prev_end advancement, if we reach a run \
139 that spans beyond the slice end (run_end >= end), at least one previous condition \
140 must have matched first to set values_range. Either the first condition matched when \
141 the slice started (prev_end <= start && run_end > start), or the second condition \
142 matched for runs within the slice (prev_end >= start && run_end <= end).").0, i + 1));
143 break;
144 }
145
146 prev_end = run_end;
147 if prev_end >= end {
148 break;
149 }
150 } else {
151 break;
152 }
153 }
154 (run_ends_bytes, values_range)
155}
156
157fn process_extends_batch<T: ArrowNativeType>(
159 mutable: &mut _MutableArrayData,
160 source_array_idx: usize,
161 run_ends_bytes: Vec<u8>,
162 values_range: Option<(usize, usize)>,
163) {
164 if run_ends_bytes.is_empty() {
165 return;
166 }
167
168 mutable.child_data[0]
170 .data
171 .buffer1
172 .extend_from_slice(&run_ends_bytes);
173 mutable.child_data[0].data.len += run_ends_bytes.len() / std::mem::size_of::<T>();
174
175 let (start_idx, end_idx) =
177 values_range.expect("values_range should be Some if run_ends_bytes is not empty");
178 mutable.child_data[1].extend(source_array_idx, start_idx, end_idx);
179}
180
181pub fn build_extend(array: &ArrayData) -> Extend<'_> {
185 Box::new(
186 move |mutable: &mut _MutableArrayData, array_idx: usize, start: usize, len: usize| {
187 if len == 0 {
188 return;
189 }
190
191 let source_run_ends = &array.child_data()[0];
193 let source_buffer = &source_run_ends.buffers()[0];
194
195 let dest_run_end_type =
197 if let DataType::RunEndEncoded(run_ends_field, _) = &mutable.data_type {
198 run_ends_field.data_type()
199 } else {
200 panic!("extend called on non-RunEndEncoded mutable array");
201 };
202
203 macro_rules! build_and_process_impl {
205 ($run_end_type:ty) => {{
206 let dest_last_run_end =
207 get_last_run_end::<$run_end_type>(&mutable.child_data[0]);
208 let (run_ends_bytes, values_range) = build_extend_arrays::<$run_end_type>(
209 source_buffer,
210 source_run_ends.len(),
211 start,
212 len,
213 dest_last_run_end,
214 );
215 process_extends_batch::<$run_end_type>(
216 mutable,
217 array_idx,
218 run_ends_bytes,
219 values_range,
220 );
221 }};
222 }
223
224 match dest_run_end_type {
225 DataType::Int16 => build_and_process_impl!(i16),
226 DataType::Int32 => build_and_process_impl!(i32),
227 DataType::Int64 => build_and_process_impl!(i64),
228 _ => panic!("Invalid run end type for RunEndEncoded array: {dest_run_end_type}",),
229 }
230 },
231 )
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237 use crate::transform::MutableArrayData;
238 use crate::{ArrayData, ArrayDataBuilder};
239 use arrow_buffer::Buffer;
240 use arrow_schema::{DataType, Field};
241 use std::sync::Arc;
242
243 fn create_run_array_data(run_ends: Vec<i32>, values: ArrayData) -> ArrayData {
244 let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false));
245 let values_field = Arc::new(Field::new("values", values.data_type().clone(), true));
246 let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
247
248 let last_run_end = if run_ends.is_empty() {
249 0
250 } else {
251 run_ends[run_ends.len() - 1] as usize
252 };
253
254 let run_ends_buffer = Buffer::from_vec(run_ends);
255 let run_ends_data = ArrayDataBuilder::new(DataType::Int32)
256 .len(run_ends_buffer.len() / std::mem::size_of::<i32>())
257 .add_buffer(run_ends_buffer)
258 .build()
259 .unwrap();
260
261 ArrayDataBuilder::new(data_type)
262 .len(last_run_end)
263 .add_child_data(run_ends_data)
264 .add_child_data(values)
265 .build()
266 .unwrap()
267 }
268
269 fn create_run_array_data_int16(run_ends: Vec<i16>, values: ArrayData) -> ArrayData {
270 let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int16, false));
271 let values_field = Arc::new(Field::new("values", values.data_type().clone(), true));
272 let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
273
274 let last_run_end = if run_ends.is_empty() {
275 0
276 } else {
277 run_ends[run_ends.len() - 1] as usize
278 };
279
280 let run_ends_buffer = Buffer::from_vec(run_ends);
281 let run_ends_data = ArrayDataBuilder::new(DataType::Int16)
282 .len(run_ends_buffer.len() / std::mem::size_of::<i16>())
283 .add_buffer(run_ends_buffer)
284 .build()
285 .unwrap();
286
287 ArrayDataBuilder::new(data_type)
288 .len(last_run_end)
289 .add_child_data(run_ends_data)
290 .add_child_data(values)
291 .build()
292 .unwrap()
293 }
294
295 fn create_run_array_data_int64(run_ends: Vec<i64>, values: ArrayData) -> ArrayData {
296 let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int64, false));
297 let values_field = Arc::new(Field::new("values", values.data_type().clone(), true));
298 let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
299
300 let last_run_end = if run_ends.is_empty() {
301 0
302 } else {
303 run_ends[run_ends.len() - 1] as usize
304 };
305
306 let run_ends_buffer = Buffer::from_vec(run_ends);
307 let run_ends_data = ArrayDataBuilder::new(DataType::Int64)
308 .len(run_ends_buffer.len() / std::mem::size_of::<i64>())
309 .add_buffer(run_ends_buffer)
310 .build()
311 .unwrap();
312
313 ArrayDataBuilder::new(data_type)
314 .len(last_run_end)
315 .add_child_data(run_ends_data)
316 .add_child_data(values)
317 .build()
318 .unwrap()
319 }
320
321 fn create_int32_array_data(values: Vec<i32>) -> ArrayData {
322 let buffer = Buffer::from_vec(values);
323 ArrayDataBuilder::new(DataType::Int32)
324 .len(buffer.len() / std::mem::size_of::<i32>())
325 .add_buffer(buffer)
326 .build()
327 .unwrap()
328 }
329
330 fn create_string_dict_array_data(values: Vec<&str>, dict_values: Vec<&str>) -> ArrayData {
331 let dict_offsets: Vec<i32> = dict_values
333 .iter()
334 .scan(0i32, |acc, s| {
335 let offset = *acc;
336 *acc += s.len() as i32;
337 Some(offset)
338 })
339 .chain(std::iter::once(
340 dict_values.iter().map(|s| s.len()).sum::<usize>() as i32,
341 ))
342 .collect();
343
344 let dict_data: Vec<u8> = dict_values.iter().flat_map(|s| s.bytes()).collect();
345
346 let dict_array = ArrayDataBuilder::new(DataType::Utf8)
347 .len(dict_values.len())
348 .add_buffer(Buffer::from_vec(dict_offsets))
349 .add_buffer(Buffer::from_vec(dict_data))
350 .build()
351 .unwrap();
352
353 let keys: Vec<i32> = values
355 .iter()
356 .map(|v| dict_values.iter().position(|d| d == v).unwrap() as i32)
357 .collect();
358
359 let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
361
362 ArrayDataBuilder::new(dict_type)
363 .len(values.len())
364 .add_buffer(Buffer::from_vec(keys))
365 .add_child_data(dict_array)
366 .build()
367 .unwrap()
368 }
369
370 #[test]
371 fn test_extend_nulls_int32() {
372 let values = create_int32_array_data(vec![42]);
374
375 let ree_array = create_run_array_data(vec![5], values);
377
378 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
379
380 mutable.extend_nulls(3);
381 mutable.extend(0, 0, 5);
382 mutable.extend_nulls(3);
383
384 let result = mutable.freeze();
386 let run_ends_buffer = &result.child_data()[0].buffers()[0];
387 let run_ends_slice = run_ends_buffer.as_slice();
388
389 assert_eq!(result.child_data()[0].len(), 3);
391 let first_run_end = i32::from_ne_bytes(run_ends_slice[0..4].try_into().unwrap());
392 let second_run_end = i32::from_ne_bytes(run_ends_slice[4..8].try_into().unwrap());
393 let third_run_end = i32::from_ne_bytes(run_ends_slice[8..12].try_into().unwrap());
394 assert_eq!(first_run_end, 3);
395 assert_eq!(second_run_end, 8);
396 assert_eq!(third_run_end, 11);
397
398 assert_eq!(result.child_data()[1].len(), 3); let values_buffer = &result.child_data()[1].buffers()[0];
401 let values_slice = values_buffer.as_slice();
402
403 let second_value = i32::from_ne_bytes(values_slice[4..8].try_into().unwrap());
405
406 assert_eq!(second_value, 42);
408
409 let values_array = &result.child_data()[1];
411 assert!(values_array.is_null(0));
413 assert!(values_array.is_valid(1));
415 assert!(values_array.is_null(2));
417 }
418
419 #[test]
420 fn test_extend_nulls_int16() {
421 let values = create_int32_array_data(vec![42]);
423
424 let ree_array = create_run_array_data_int16(vec![5i16], values);
426
427 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
428
429 mutable.extend(0, 0, 5);
431
432 mutable.extend_nulls(3);
434
435 let result = mutable.freeze();
437 let run_ends_buffer = &result.child_data()[0].buffers()[0];
438 let run_ends_slice = run_ends_buffer.as_slice();
439
440 assert_eq!(result.child_data()[0].len(), 2);
442 let first_run_end = i16::from_ne_bytes(run_ends_slice[0..2].try_into().unwrap());
443 let second_run_end = i16::from_ne_bytes(run_ends_slice[2..4].try_into().unwrap());
444 assert_eq!(first_run_end, 5);
445 assert_eq!(second_run_end, 8);
446 }
447
448 #[test]
449 fn test_extend_nulls_int64() {
450 let values = create_int32_array_data(vec![42]);
452
453 let ree_array = create_run_array_data_int64(vec![5i64], values);
455
456 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
457
458 mutable.extend(0, 0, 5);
460
461 mutable.extend_nulls(3);
463
464 let result = mutable.freeze();
466 let run_ends_buffer = &result.child_data()[0].buffers()[0];
467 let run_ends_slice = run_ends_buffer.as_slice();
468
469 assert_eq!(result.child_data()[0].len(), 2);
471 let first_run_end = i64::from_ne_bytes(run_ends_slice[0..8].try_into().unwrap());
472 let second_run_end = i64::from_ne_bytes(run_ends_slice[8..16].try_into().unwrap());
473 assert_eq!(first_run_end, 5);
474 assert_eq!(second_run_end, 8);
475 }
476
477 #[test]
478 fn test_extend_int32() {
479 let values = create_int32_array_data(vec![10, 20]);
481
482 let ree_array = create_run_array_data(vec![2, 5], values);
484
485 let mut mutable = MutableArrayData::new(vec![&ree_array], false, 10);
486
487 mutable.extend(0, 0, 5);
489
490 let result = mutable.freeze();
491
492 assert_eq!(result.len(), 5); assert!(!result.child_data()[0].is_empty()); assert_eq!(result.child_data()[0].len(), result.child_data()[1].len()); }
499
500 #[test]
501 fn test_extend_empty() {
502 let values = create_int32_array_data(vec![]);
503 let ree_array = create_run_array_data(vec![], values);
504
505 let mut mutable = MutableArrayData::new(vec![&ree_array], false, 10);
506 mutable.extend(0, 0, 0);
507
508 let result = mutable.freeze();
509 assert_eq!(result.len(), 0);
510 assert_eq!(result.child_data()[0].len(), 0);
511 }
512
513 #[test]
514 fn test_build_extend_arrays_int16() {
515 let buffer = Buffer::from_vec(vec![3i16, 5i16, 8i16]);
516 let (run_ends_bytes, values_range) = build_extend_arrays::<i16>(&buffer, 3, 2, 4, 0i16);
517
518 assert_eq!(run_ends_bytes.len(), 3 * std::mem::size_of::<i16>());
525 assert_eq!(values_range, Some((0, 3)));
526
527 let expected_bytes = [1i16, 3i16, 4i16]
529 .iter()
530 .flat_map(|&val| val.to_ne_bytes())
531 .collect::<Vec<u8>>();
532 assert_eq!(run_ends_bytes, expected_bytes);
533 }
534
535 #[test]
536 fn test_build_extend_arrays_int64() {
537 let buffer = Buffer::from_vec(vec![3i64, 5i64, 8i64]);
538 let (run_ends_bytes, values_range) = build_extend_arrays::<i64>(&buffer, 3, 2, 4, 0i64);
539
540 assert_eq!(run_ends_bytes.len(), 3 * std::mem::size_of::<i64>());
542 assert_eq!(values_range, Some((0, 3)));
543
544 let expected_bytes = [1i64, 3i64, 4i64]
546 .iter()
547 .flat_map(|&val| val.to_ne_bytes())
548 .collect::<Vec<u8>>();
549 assert_eq!(run_ends_bytes, expected_bytes);
550 }
551
552 #[test]
553 fn test_extend_string_dict() {
554 let dict_values = vec!["hello", "world"];
556 let values = create_string_dict_array_data(vec!["hello", "world"], dict_values);
557
558 let ree_array = create_run_array_data(vec![2, 5], values);
560
561 let mut mutable = MutableArrayData::new(vec![&ree_array], false, 10);
562
563 mutable.extend(0, 0, 5);
565
566 let result = mutable.freeze();
567
568 assert_eq!(result.len(), 5); assert!(!result.child_data()[0].is_empty()); assert_eq!(result.child_data()[0].len(), result.child_data()[1].len()); assert_eq!(result.child_data()[0].len(), 2);
577 assert_eq!(result.child_data()[1].len(), 2);
578 }
579
580 #[test]
581 #[should_panic(expected = "run end overflow")]
582 fn test_extend_nulls_overflow_i16() {
583 let values = create_int32_array_data(vec![42]);
584 let ree_array = create_run_array_data_int16(vec![5], values);
586 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
587
588 mutable.extend(0, 0, 5_usize);
590
591 mutable.extend_nulls(i16::MAX as usize);
593 }
594
595 #[test]
596 #[should_panic(expected = "run end overflow")]
597 fn test_extend_nulls_overflow_i32() {
598 let values = create_int32_array_data(vec![42]);
599 let ree_array = create_run_array_data(vec![10], values);
601 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
602
603 mutable.extend(0, 0, 10_usize);
605
606 mutable.extend_nulls(i32::MAX as usize);
608 }
609
610 #[test]
611 #[should_panic(expected = "run end overflow")]
612 fn test_build_extend_overflow_i16() {
613 let values = create_int32_array_data(vec![10]);
615 let source_array = create_run_array_data_int16(vec![20], values);
616
617 let dest_values = create_int32_array_data(vec![42]);
619 let dest_array = create_run_array_data_int16(vec![i16::MAX - 5], dest_values);
620
621 let mut mutable = MutableArrayData::new(vec![&source_array, &dest_array], false, 10);
622
623 mutable.extend(1, 0, (i16::MAX - 5) as usize);
625
626 mutable.extend(0, 0, 20);
628 }
629
630 #[test]
631 #[should_panic(expected = "run end overflow")]
632 fn test_build_extend_overflow_i32() {
633 let values = create_int32_array_data(vec![10]);
635 let source_array = create_run_array_data(vec![100], values);
636
637 let dest_values = create_int32_array_data(vec![42]);
639 let dest_array = create_run_array_data(vec![i32::MAX - 50], dest_values);
640
641 let mut mutable = MutableArrayData::new(vec![&source_array, &dest_array], false, 10);
642
643 mutable.extend(1, 0, (i32::MAX - 50) as usize);
645
646 mutable.extend(0, 0, 100);
648 }
649}