1use super::{ArrayData, Extend, _MutableArrayData};
19use arrow_buffer::{ArrowNativeType, Buffer, ToByteSlice};
20use arrow_schema::DataType;
21use num::CheckedAdd;
22
23fn get_last_run_end<T: ArrowNativeType>(run_ends_data: &super::MutableArrayData) -> T {
25 if run_ends_data.data.len == 0 {
26 T::default()
27 } else {
28 let buffer = Buffer::from(run_ends_data.data.buffer1.as_slice());
30 let typed_slice: &[T] = buffer.typed_data();
31 if typed_slice.len() >= run_ends_data.data.len {
32 typed_slice[run_ends_data.data.len - 1]
33 } else {
34 T::default()
35 }
36 }
37}
38
39pub fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) {
44 if len == 0 {
45 return;
46 }
47
48 mutable.child_data[1].extend_nulls(1);
51
52 let run_end_type = if let DataType::RunEndEncoded(run_ends_field, _) = &mutable.data_type {
54 run_ends_field.data_type()
55 } else {
56 panic!("extend_nulls called on non-RunEndEncoded array");
57 };
58
59 macro_rules! extend_nulls_impl {
61 ($run_end_type:ty) => {{
62 let last_run_end = get_last_run_end::<$run_end_type>(&mutable.child_data[0]);
63 let new_value = last_run_end
64 .checked_add(<$run_end_type as ArrowNativeType>::usize_as(len))
65 .expect("run end overflow");
66 mutable.child_data[0]
67 .data
68 .buffer1
69 .extend_from_slice(new_value.to_byte_slice());
70 }};
71 }
72
73 match run_end_type {
75 DataType::Int16 => extend_nulls_impl!(i16),
76 DataType::Int32 => extend_nulls_impl!(i32),
77 DataType::Int64 => extend_nulls_impl!(i64),
78 _ => panic!(
79 "Invalid run end type for RunEndEncoded array: {:?}",
80 run_end_type
81 ),
82 };
83
84 mutable.child_data[0].data.len += 1;
85}
86
87fn build_extend_arrays<T: ArrowNativeType + std::ops::Add<Output = T> + CheckedAdd>(
89 buffer: &Buffer,
90 length: usize,
91 start: usize,
92 len: usize,
93 dest_last_run_end: T,
94) -> (Vec<u8>, Option<(usize, usize)>) {
95 let mut run_ends_bytes = Vec::new();
96 let mut values_range: Option<(usize, usize)> = None;
97 let end = start + len;
98 let mut prev_end = 0;
99 let mut current_run_end = dest_last_run_end;
100
101 let typed_slice: &[T] = buffer.typed_data();
103
104 for i in 0..length {
105 if i < typed_slice.len() {
106 let run_end = typed_slice[i].to_usize().unwrap();
107
108 if prev_end <= start && run_end > start {
109 let start_offset = start - prev_end;
110 let end_offset = if run_end >= end {
111 end - prev_end
112 } else {
113 run_end - prev_end
114 };
115 current_run_end = current_run_end
116 .checked_add(&T::usize_as(end_offset - start_offset))
117 .expect("run end overflow");
118 run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
119
120 values_range = Some((i, i + 1));
122 } else if prev_end >= start && run_end <= end {
123 current_run_end = current_run_end
124 .checked_add(&T::usize_as(run_end - prev_end))
125 .expect("run end overflow");
126 run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
127
128 values_range = Some((values_range.expect("Unreachable: values_range cannot be None when prev_end >= start && run_end <= end. \
130 If prev_end >= start and run_end > prev_end (required for valid runs), then run_end > start, \
131 which means the first condition (prev_end <= start && run_end > start) would have been true \
132 and already set values_range to Some.").0, i + 1));
133 } else if prev_end < end && run_end >= end {
134 current_run_end = current_run_end
135 .checked_add(&T::usize_as(end - prev_end))
136 .expect("run end overflow");
137 run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
138
139 values_range = Some((values_range.expect("Unreachable: values_range cannot be None when prev_end < end && run_end >= end. \
141 Due to sequential processing and monotonic prev_end advancement, if we reach a run \
142 that spans beyond the slice end (run_end >= end), at least one previous condition \
143 must have matched first to set values_range. Either the first condition matched when \
144 the slice started (prev_end <= start && run_end > start), or the second condition \
145 matched for runs within the slice (prev_end >= start && run_end <= end).").0, i + 1));
146 break;
147 }
148
149 prev_end = run_end;
150 if prev_end >= end {
151 break;
152 }
153 } else {
154 break;
155 }
156 }
157 (run_ends_bytes, values_range)
158}
159
160fn process_extends_batch<T: ArrowNativeType>(
162 mutable: &mut _MutableArrayData,
163 source_array_idx: usize,
164 run_ends_bytes: Vec<u8>,
165 values_range: Option<(usize, usize)>,
166) {
167 if run_ends_bytes.is_empty() {
168 return;
169 }
170
171 mutable.child_data[0]
173 .data
174 .buffer1
175 .extend_from_slice(&run_ends_bytes);
176 mutable.child_data[0].data.len += run_ends_bytes.len() / std::mem::size_of::<T>();
177
178 let (start_idx, end_idx) =
180 values_range.expect("values_range should be Some if run_ends_bytes is not empty");
181 mutable.child_data[1].extend(source_array_idx, start_idx, end_idx);
182}
183
184pub fn build_extend(array: &ArrayData) -> Extend {
188 Box::new(
189 move |mutable: &mut _MutableArrayData, array_idx: usize, start: usize, len: usize| {
190 if len == 0 {
191 return;
192 }
193
194 let source_run_ends = &array.child_data()[0];
196 let source_buffer = &source_run_ends.buffers()[0];
197
198 let dest_run_end_type =
200 if let DataType::RunEndEncoded(run_ends_field, _) = &mutable.data_type {
201 run_ends_field.data_type()
202 } else {
203 panic!("extend called on non-RunEndEncoded mutable array");
204 };
205
206 macro_rules! build_and_process_impl {
208 ($run_end_type:ty) => {{
209 let dest_last_run_end =
210 get_last_run_end::<$run_end_type>(&mutable.child_data[0]);
211 let (run_ends_bytes, values_range) = build_extend_arrays::<$run_end_type>(
212 source_buffer,
213 source_run_ends.len(),
214 start,
215 len,
216 dest_last_run_end,
217 );
218 process_extends_batch::<$run_end_type>(
219 mutable,
220 array_idx,
221 run_ends_bytes,
222 values_range,
223 );
224 }};
225 }
226
227 match dest_run_end_type {
228 DataType::Int16 => build_and_process_impl!(i16),
229 DataType::Int32 => build_and_process_impl!(i32),
230 DataType::Int64 => build_and_process_impl!(i64),
231 _ => panic!(
232 "Invalid run end type for RunEndEncoded array: {:?}",
233 dest_run_end_type
234 ),
235 }
236 },
237 )
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243 use crate::transform::MutableArrayData;
244 use crate::{ArrayData, ArrayDataBuilder};
245 use arrow_buffer::Buffer;
246 use arrow_schema::{DataType, Field};
247 use std::sync::Arc;
248
249 fn create_run_array_data(run_ends: Vec<i32>, values: ArrayData) -> ArrayData {
250 let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false));
251 let values_field = Arc::new(Field::new("values", values.data_type().clone(), true));
252 let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
253
254 let last_run_end = if run_ends.is_empty() {
255 0
256 } else {
257 run_ends[run_ends.len() - 1] as usize
258 };
259
260 let run_ends_buffer = Buffer::from_vec(run_ends);
261 let run_ends_data = ArrayDataBuilder::new(DataType::Int32)
262 .len(run_ends_buffer.len() / std::mem::size_of::<i32>())
263 .add_buffer(run_ends_buffer)
264 .build()
265 .unwrap();
266
267 ArrayDataBuilder::new(data_type)
268 .len(last_run_end)
269 .add_child_data(run_ends_data)
270 .add_child_data(values)
271 .build()
272 .unwrap()
273 }
274
275 fn create_run_array_data_int16(run_ends: Vec<i16>, values: ArrayData) -> ArrayData {
276 let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int16, false));
277 let values_field = Arc::new(Field::new("values", values.data_type().clone(), true));
278 let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
279
280 let last_run_end = if run_ends.is_empty() {
281 0
282 } else {
283 run_ends[run_ends.len() - 1] as usize
284 };
285
286 let run_ends_buffer = Buffer::from_vec(run_ends);
287 let run_ends_data = ArrayDataBuilder::new(DataType::Int16)
288 .len(run_ends_buffer.len() / std::mem::size_of::<i16>())
289 .add_buffer(run_ends_buffer)
290 .build()
291 .unwrap();
292
293 ArrayDataBuilder::new(data_type)
294 .len(last_run_end)
295 .add_child_data(run_ends_data)
296 .add_child_data(values)
297 .build()
298 .unwrap()
299 }
300
301 fn create_run_array_data_int64(run_ends: Vec<i64>, values: ArrayData) -> ArrayData {
302 let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int64, false));
303 let values_field = Arc::new(Field::new("values", values.data_type().clone(), true));
304 let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
305
306 let last_run_end = if run_ends.is_empty() {
307 0
308 } else {
309 run_ends[run_ends.len() - 1] as usize
310 };
311
312 let run_ends_buffer = Buffer::from_vec(run_ends);
313 let run_ends_data = ArrayDataBuilder::new(DataType::Int64)
314 .len(run_ends_buffer.len() / std::mem::size_of::<i64>())
315 .add_buffer(run_ends_buffer)
316 .build()
317 .unwrap();
318
319 ArrayDataBuilder::new(data_type)
320 .len(last_run_end)
321 .add_child_data(run_ends_data)
322 .add_child_data(values)
323 .build()
324 .unwrap()
325 }
326
327 fn create_int32_array_data(values: Vec<i32>) -> ArrayData {
328 let buffer = Buffer::from_vec(values);
329 ArrayDataBuilder::new(DataType::Int32)
330 .len(buffer.len() / std::mem::size_of::<i32>())
331 .add_buffer(buffer)
332 .build()
333 .unwrap()
334 }
335
336 fn create_string_dict_array_data(values: Vec<&str>, dict_values: Vec<&str>) -> ArrayData {
337 let dict_offsets: Vec<i32> = dict_values
339 .iter()
340 .scan(0i32, |acc, s| {
341 let offset = *acc;
342 *acc += s.len() as i32;
343 Some(offset)
344 })
345 .chain(std::iter::once(
346 dict_values.iter().map(|s| s.len()).sum::<usize>() as i32,
347 ))
348 .collect();
349
350 let dict_data: Vec<u8> = dict_values.iter().flat_map(|s| s.bytes()).collect();
351
352 let dict_array = ArrayDataBuilder::new(DataType::Utf8)
353 .len(dict_values.len())
354 .add_buffer(Buffer::from_vec(dict_offsets))
355 .add_buffer(Buffer::from_vec(dict_data))
356 .build()
357 .unwrap();
358
359 let keys: Vec<i32> = values
361 .iter()
362 .map(|v| dict_values.iter().position(|d| d == v).unwrap() as i32)
363 .collect();
364
365 let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
367
368 ArrayDataBuilder::new(dict_type)
369 .len(values.len())
370 .add_buffer(Buffer::from_vec(keys))
371 .add_child_data(dict_array)
372 .build()
373 .unwrap()
374 }
375
376 #[test]
377 fn test_extend_nulls_int32() {
378 let values = create_int32_array_data(vec![42]);
380
381 let ree_array = create_run_array_data(vec![5], values);
383
384 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
385
386 mutable.extend_nulls(3);
387 mutable.extend(0, 0, 5);
388 mutable.extend_nulls(3);
389
390 let result = mutable.freeze();
392 let run_ends_buffer = &result.child_data()[0].buffers()[0];
393 let run_ends_slice = run_ends_buffer.as_slice();
394
395 assert_eq!(result.child_data()[0].len(), 3);
397 let first_run_end = i32::from_ne_bytes(run_ends_slice[0..4].try_into().unwrap());
398 let second_run_end = i32::from_ne_bytes(run_ends_slice[4..8].try_into().unwrap());
399 let third_run_end = i32::from_ne_bytes(run_ends_slice[8..12].try_into().unwrap());
400 assert_eq!(first_run_end, 3);
401 assert_eq!(second_run_end, 8);
402 assert_eq!(third_run_end, 11);
403
404 assert_eq!(result.child_data()[1].len(), 3); let values_buffer = &result.child_data()[1].buffers()[0];
407 let values_slice = values_buffer.as_slice();
408
409 let second_value = i32::from_ne_bytes(values_slice[4..8].try_into().unwrap());
411
412 assert_eq!(second_value, 42);
414
415 let values_array = &result.child_data()[1];
417 assert!(values_array.is_null(0));
419 assert!(values_array.is_valid(1));
421 assert!(values_array.is_null(2));
423 }
424
425 #[test]
426 fn test_extend_nulls_int16() {
427 let values = create_int32_array_data(vec![42]);
429
430 let ree_array = create_run_array_data_int16(vec![5i16], values);
432
433 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
434
435 mutable.extend(0, 0, 5);
437
438 mutable.extend_nulls(3);
440
441 let result = mutable.freeze();
443 let run_ends_buffer = &result.child_data()[0].buffers()[0];
444 let run_ends_slice = run_ends_buffer.as_slice();
445
446 assert_eq!(result.child_data()[0].len(), 2);
448 let first_run_end = i16::from_ne_bytes(run_ends_slice[0..2].try_into().unwrap());
449 let second_run_end = i16::from_ne_bytes(run_ends_slice[2..4].try_into().unwrap());
450 assert_eq!(first_run_end, 5);
451 assert_eq!(second_run_end, 8);
452 }
453
454 #[test]
455 fn test_extend_nulls_int64() {
456 let values = create_int32_array_data(vec![42]);
458
459 let ree_array = create_run_array_data_int64(vec![5i64], values);
461
462 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
463
464 mutable.extend(0, 0, 5);
466
467 mutable.extend_nulls(3);
469
470 let result = mutable.freeze();
472 let run_ends_buffer = &result.child_data()[0].buffers()[0];
473 let run_ends_slice = run_ends_buffer.as_slice();
474
475 assert_eq!(result.child_data()[0].len(), 2);
477 let first_run_end = i64::from_ne_bytes(run_ends_slice[0..8].try_into().unwrap());
478 let second_run_end = i64::from_ne_bytes(run_ends_slice[8..16].try_into().unwrap());
479 assert_eq!(first_run_end, 5);
480 assert_eq!(second_run_end, 8);
481 }
482
483 #[test]
484 fn test_extend_int32() {
485 let values = create_int32_array_data(vec![10, 20]);
487
488 let ree_array = create_run_array_data(vec![2, 5], values);
490
491 let mut mutable = MutableArrayData::new(vec![&ree_array], false, 10);
492
493 mutable.extend(0, 0, 5);
495
496 let result = mutable.freeze();
497
498 assert_eq!(result.len(), 5); assert!(!result.child_data()[0].is_empty()); assert_eq!(result.child_data()[0].len(), result.child_data()[1].len()); }
505
506 #[test]
507 fn test_extend_empty() {
508 let values = create_int32_array_data(vec![]);
509 let ree_array = create_run_array_data(vec![], values);
510
511 let mut mutable = MutableArrayData::new(vec![&ree_array], false, 10);
512 mutable.extend(0, 0, 0);
513
514 let result = mutable.freeze();
515 assert_eq!(result.len(), 0);
516 assert_eq!(result.child_data()[0].len(), 0);
517 }
518
519 #[test]
520 fn test_build_extend_arrays_int16() {
521 let buffer = Buffer::from_vec(vec![3i16, 5i16, 8i16]);
522 let (run_ends_bytes, values_range) = build_extend_arrays::<i16>(&buffer, 3, 2, 4, 0i16);
523
524 assert_eq!(run_ends_bytes.len(), 3 * std::mem::size_of::<i16>());
531 assert_eq!(values_range, Some((0, 3)));
532
533 let expected_bytes = [1i16, 3i16, 4i16]
535 .iter()
536 .flat_map(|&val| val.to_ne_bytes())
537 .collect::<Vec<u8>>();
538 assert_eq!(run_ends_bytes, expected_bytes);
539 }
540
541 #[test]
542 fn test_build_extend_arrays_int64() {
543 let buffer = Buffer::from_vec(vec![3i64, 5i64, 8i64]);
544 let (run_ends_bytes, values_range) = build_extend_arrays::<i64>(&buffer, 3, 2, 4, 0i64);
545
546 assert_eq!(run_ends_bytes.len(), 3 * std::mem::size_of::<i64>());
548 assert_eq!(values_range, Some((0, 3)));
549
550 let expected_bytes = [1i64, 3i64, 4i64]
552 .iter()
553 .flat_map(|&val| val.to_ne_bytes())
554 .collect::<Vec<u8>>();
555 assert_eq!(run_ends_bytes, expected_bytes);
556 }
557
558 #[test]
559 fn test_extend_string_dict() {
560 let dict_values = vec!["hello", "world"];
562 let values = create_string_dict_array_data(vec!["hello", "world"], dict_values);
563
564 let ree_array = create_run_array_data(vec![2, 5], values);
566
567 let mut mutable = MutableArrayData::new(vec![&ree_array], false, 10);
568
569 mutable.extend(0, 0, 5);
571
572 let result = mutable.freeze();
573
574 assert_eq!(result.len(), 5); assert!(!result.child_data()[0].is_empty()); assert_eq!(result.child_data()[0].len(), result.child_data()[1].len()); assert_eq!(result.child_data()[0].len(), 2);
583 assert_eq!(result.child_data()[1].len(), 2);
584 }
585
586 #[test]
587 #[should_panic(expected = "run end overflow")]
588 fn test_extend_nulls_overflow_i16() {
589 let values = create_int32_array_data(vec![42]);
590 let ree_array = create_run_array_data_int16(vec![5], values);
592 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
593
594 mutable.extend(0, 0, 5_usize);
596
597 mutable.extend_nulls(i16::MAX as usize);
599 }
600
601 #[test]
602 #[should_panic(expected = "run end overflow")]
603 fn test_extend_nulls_overflow_i32() {
604 let values = create_int32_array_data(vec![42]);
605 let ree_array = create_run_array_data(vec![10], values);
607 let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
608
609 mutable.extend(0, 0, 10_usize);
611
612 mutable.extend_nulls(i32::MAX as usize);
614 }
615
616 #[test]
617 #[should_panic(expected = "run end overflow")]
618 fn test_build_extend_overflow_i16() {
619 let values = create_int32_array_data(vec![10]);
621 let source_array = create_run_array_data_int16(vec![20], values);
622
623 let dest_values = create_int32_array_data(vec![42]);
625 let dest_array = create_run_array_data_int16(vec![i16::MAX - 5], dest_values);
626
627 let mut mutable = MutableArrayData::new(vec![&source_array, &dest_array], false, 10);
628
629 mutable.extend(1, 0, (i16::MAX - 5) as usize);
631
632 mutable.extend(0, 0, 20);
634 }
635
636 #[test]
637 #[should_panic(expected = "run end overflow")]
638 fn test_build_extend_overflow_i32() {
639 let values = create_int32_array_data(vec![10]);
641 let source_array = create_run_array_data(vec![100], values);
642
643 let dest_values = create_int32_array_data(vec![42]);
645 let dest_array = create_run_array_data(vec![i32::MAX - 50], dest_values);
646
647 let mut mutable = MutableArrayData::new(vec![&source_array, &dest_array], false, 10);
648
649 mutable.extend(1, 0, (i32::MAX - 50) as usize);
651
652 mutable.extend(0, 0, 100);
654 }
655}