1use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
21use arrow_array::builder::{BooleanBufferBuilder, PrimitiveBuilder};
22use arrow_array::cast::AsArray;
23use arrow_array::types::*;
24use arrow_array::*;
25use arrow_buffer::{ArrowNativeType, BooleanBuffer, MutableBuffer, NullBuffer, OffsetBuffer};
26use arrow_data::transform::MutableArrayData;
27use arrow_data::ByteView;
28use arrow_schema::{ArrowError, DataType};
29use std::sync::Arc;
30
31macro_rules! primitive_helper {
32 ($t:ty, $values:ident, $indices:ident, $data_type:ident) => {
33 interleave_primitive::<$t>($values, $indices, $data_type)
34 };
35}
36
37macro_rules! dict_helper {
38 ($t:ty, $values:expr, $indices:expr) => {
39 Ok(Arc::new(interleave_dictionaries::<$t>($values, $indices)?) as _)
40 };
41}
42
43pub fn interleave(
71 values: &[&dyn Array],
72 indices: &[(usize, usize)],
73) -> Result<ArrayRef, ArrowError> {
74 if values.is_empty() {
75 return Err(ArrowError::InvalidArgumentError(
76 "interleave requires input of at least one array".to_string(),
77 ));
78 }
79 let data_type = values[0].data_type();
80
81 for array in values.iter().skip(1) {
82 if array.data_type() != data_type {
83 return Err(ArrowError::InvalidArgumentError(format!(
84 "It is not possible to interleave arrays of different data types ({} and {})",
85 data_type,
86 array.data_type()
87 )));
88 }
89 }
90
91 if indices.is_empty() {
92 return Ok(new_empty_array(data_type));
93 }
94
95 downcast_primitive! {
96 data_type => (primitive_helper, values, indices, data_type),
97 DataType::Utf8 => interleave_bytes::<Utf8Type>(values, indices),
98 DataType::LargeUtf8 => interleave_bytes::<LargeUtf8Type>(values, indices),
99 DataType::Binary => interleave_bytes::<BinaryType>(values, indices),
100 DataType::LargeBinary => interleave_bytes::<LargeBinaryType>(values, indices),
101 DataType::BinaryView => interleave_views::<BinaryViewType>(values, indices),
102 DataType::Utf8View => interleave_views::<StringViewType>(values, indices),
103 DataType::Dictionary(k, _) => downcast_integer! {
104 k.as_ref() => (dict_helper, values, indices),
105 _ => unreachable!("illegal dictionary key type {k}")
106 },
107 _ => interleave_fallback(values, indices)
108 }
109}
110
111struct Interleave<'a, T> {
115 arrays: Vec<&'a T>,
117 nulls: Option<NullBuffer>,
119}
120
121impl<'a, T: Array + 'static> Interleave<'a, T> {
122 fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) -> Self {
123 let mut has_nulls = false;
124 let arrays: Vec<&T> = values
125 .iter()
126 .map(|x| {
127 has_nulls = has_nulls || x.null_count() != 0;
128 x.as_any().downcast_ref().unwrap()
129 })
130 .collect();
131
132 let nulls = match has_nulls {
133 true => {
134 let nulls = BooleanBuffer::collect_bool(indices.len(), |i| {
135 let (a, b) = indices[i];
136 arrays[a].is_valid(b)
137 });
138 Some(nulls.into())
139 }
140 false => None,
141 };
142
143 Self { arrays, nulls }
144 }
145}
146
147fn interleave_primitive<T: ArrowPrimitiveType>(
148 values: &[&dyn Array],
149 indices: &[(usize, usize)],
150 data_type: &DataType,
151) -> Result<ArrayRef, ArrowError> {
152 let interleaved = Interleave::<'_, PrimitiveArray<T>>::new(values, indices);
153
154 let values = indices
155 .iter()
156 .map(|(a, b)| interleaved.arrays[*a].value(*b))
157 .collect::<Vec<_>>();
158
159 let array = PrimitiveArray::<T>::new(values.into(), interleaved.nulls);
160 Ok(Arc::new(array.with_data_type(data_type.clone())))
161}
162
163fn interleave_bytes<T: ByteArrayType>(
164 values: &[&dyn Array],
165 indices: &[(usize, usize)],
166) -> Result<ArrayRef, ArrowError> {
167 let interleaved = Interleave::<'_, GenericByteArray<T>>::new(values, indices);
168
169 let mut capacity = 0;
170 let mut offsets = Vec::with_capacity(indices.len() + 1);
171 offsets.push(T::Offset::from_usize(0).unwrap());
172 offsets.extend(indices.iter().map(|(a, b)| {
173 let o = interleaved.arrays[*a].value_offsets();
174 let element_len = o[*b + 1].as_usize() - o[*b].as_usize();
175 capacity += element_len;
176 T::Offset::from_usize(capacity).expect("overflow")
177 }));
178
179 let mut values = Vec::with_capacity(capacity);
180 for (a, b) in indices {
181 values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
182 }
183
184 let array = unsafe {
186 let offsets = OffsetBuffer::new_unchecked(offsets.into());
187 GenericByteArray::<T>::new_unchecked(offsets, values.into(), interleaved.nulls)
188 };
189 Ok(Arc::new(array))
190}
191
192fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
193 arrays: &[&dyn Array],
194 indices: &[(usize, usize)],
195) -> Result<ArrayRef, ArrowError> {
196 let dictionaries: Vec<_> = arrays.iter().map(|x| x.as_dictionary::<K>()).collect();
197 if !should_merge_dictionary_values::<K>(&dictionaries, indices.len()) {
198 return interleave_fallback(arrays, indices);
199 }
200
201 let masks: Vec<_> = dictionaries
202 .iter()
203 .enumerate()
204 .map(|(a_idx, dictionary)| {
205 let mut key_mask = BooleanBufferBuilder::new_from_buffer(
206 MutableBuffer::new_null(dictionary.len()),
207 dictionary.len(),
208 );
209
210 for (_, key_idx) in indices.iter().filter(|(a, _)| *a == a_idx) {
211 key_mask.set_bit(*key_idx, true);
212 }
213 key_mask.finish()
214 })
215 .collect();
216
217 let merged = merge_dictionary_values(&dictionaries, Some(&masks))?;
218
219 let mut keys = PrimitiveBuilder::<K>::with_capacity(indices.len());
221 for (a, b) in indices {
222 let old_keys: &PrimitiveArray<K> = dictionaries[*a].keys();
223 match old_keys.is_valid(*b) {
224 true => {
225 let old_key = old_keys.values()[*b];
226 keys.append_value(merged.key_mappings[*a][old_key.as_usize()])
227 }
228 false => keys.append_null(),
229 }
230 }
231 let array = unsafe { DictionaryArray::new_unchecked(keys.finish(), merged.values) };
232 Ok(Arc::new(array))
233}
234
235fn interleave_views<T: ByteViewType>(
236 values: &[&dyn Array],
237 indices: &[(usize, usize)],
238) -> Result<ArrayRef, ArrowError> {
239 let interleaved = Interleave::<'_, GenericByteViewArray<T>>::new(values, indices);
240 let mut buffers = Vec::new();
241
242 let mut offsets = Vec::with_capacity(interleaved.arrays.len() + 1);
244 offsets.push(0);
245 let mut total_buffers = 0;
246 for a in interleaved.arrays.iter() {
247 total_buffers += a.data_buffers().len();
248 offsets.push(total_buffers);
249 }
250
251 let mut buffer_to_new_index = vec![None; total_buffers];
253
254 let views: Vec<u128> = indices
255 .iter()
256 .map(|(array_idx, value_idx)| {
257 let array = interleaved.arrays[*array_idx];
258 let view = array.views().get(*value_idx).unwrap();
259 let view_len = *view as u32;
260 if view_len <= 12 {
261 return *view;
262 }
263 let view = ByteView::from(*view);
265 let buffer_to_new_idx = offsets[*array_idx] + view.buffer_index as usize;
266 let new_buffer_idx: u32 =
267 *buffer_to_new_index[buffer_to_new_idx].get_or_insert_with(|| {
268 buffers.push(array.data_buffers()[view.buffer_index as usize].clone());
269 (buffers.len() - 1) as u32
270 });
271 view.with_buffer_index(new_buffer_idx).as_u128()
272 })
273 .collect();
274
275 let array = unsafe {
276 GenericByteViewArray::<T>::new_unchecked(views.into(), buffers, interleaved.nulls)
277 };
278 Ok(Arc::new(array))
279}
280
281fn interleave_fallback(
283 values: &[&dyn Array],
284 indices: &[(usize, usize)],
285) -> Result<ArrayRef, ArrowError> {
286 let arrays: Vec<_> = values.iter().map(|x| x.to_data()).collect();
287 let arrays: Vec<_> = arrays.iter().collect();
288 let mut array_data = MutableArrayData::new(arrays, false, indices.len());
289
290 let mut cur_array = indices[0].0;
291 let mut start_row_idx = indices[0].1;
292 let mut end_row_idx = start_row_idx + 1;
293
294 for (array, row) in indices.iter().skip(1).copied() {
295 if array == cur_array && row == end_row_idx {
296 end_row_idx += 1;
298 continue;
299 }
300
301 array_data.extend(cur_array, start_row_idx, end_row_idx);
303
304 cur_array = array;
306 start_row_idx = row;
307 end_row_idx = start_row_idx + 1;
308 }
309
310 array_data.extend(cur_array, start_row_idx, end_row_idx);
312 Ok(make_array(array_data.freeze()))
313}
314
315pub fn interleave_record_batch(
360 record_batches: &[&RecordBatch],
361 indices: &[(usize, usize)],
362) -> Result<RecordBatch, ArrowError> {
363 let schema = record_batches[0].schema();
364 let columns = (0..schema.fields().len())
365 .map(|i| {
366 let column_values: Vec<&dyn Array> = record_batches
367 .iter()
368 .map(|batch| batch.column(i).as_ref())
369 .collect();
370 interleave(&column_values, indices)
371 })
372 .collect::<Result<Vec<_>, _>>()?;
373 RecordBatch::try_new(schema, columns)
374}
375
376#[cfg(test)]
377mod tests {
378 use super::*;
379 use arrow_array::builder::{Int32Builder, ListBuilder, PrimitiveRunBuilder};
380 use arrow_array::Int32RunArray;
381
382 #[test]
383 fn test_primitive() {
384 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
385 let b = Int32Array::from_iter_values([5, 6, 7]);
386 let c = Int32Array::from_iter_values([8, 9, 10]);
387 let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
388 let v = values.as_primitive::<Int32Type>();
389 assert_eq!(v.values(), &[4, 4, 10, 8, 6]);
390 }
391
392 #[test]
393 fn test_primitive_nulls() {
394 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
395 let b = Int32Array::from_iter([Some(1), Some(4), None]);
396 let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (0, 2)]).unwrap();
397 let v: Vec<_> = values.as_primitive::<Int32Type>().into_iter().collect();
398 assert_eq!(&v, &[Some(2), None, None, Some(4), Some(3)])
399 }
400
401 #[test]
402 fn test_primitive_empty() {
403 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
404 let v = interleave(&[&a], &[]).unwrap();
405 assert!(v.is_empty());
406 assert_eq!(v.data_type(), &DataType::Int32);
407 }
408
409 #[test]
410 fn test_strings() {
411 let a = StringArray::from_iter_values(["a", "b", "c"]);
412 let b = StringArray::from_iter_values(["hello", "world", "foo"]);
413 let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
414 let v = values.as_string::<i32>();
415 let values: Vec<_> = v.into_iter().collect();
416 assert_eq!(
417 &values,
418 &[
419 Some("c"),
420 Some("c"),
421 Some("hello"),
422 Some("world"),
423 Some("b")
424 ]
425 )
426 }
427
428 #[test]
429 fn test_interleave_dictionary() {
430 let a = DictionaryArray::<Int32Type>::from_iter(["a", "b", "c", "a", "b"]);
431 let b = DictionaryArray::<Int32Type>::from_iter(["a", "c", "a", "c", "a"]);
432
433 let values =
435 interleave(&[&a, &b], &[(0, 2), (0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
436 let v = values.as_dictionary::<Int32Type>();
437 assert_eq!(v.values().len(), 5);
438
439 let vc = v.downcast_dict::<StringArray>().unwrap();
440 let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
441 assert_eq!(&collected, &["c", "c", "c", "a", "c", "b"]);
442
443 let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 1)]).unwrap();
445 let v = values.as_dictionary::<Int32Type>();
446 assert_eq!(v.values().len(), 1);
447
448 let vc = v.downcast_dict::<StringArray>().unwrap();
449 let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
450 assert_eq!(&collected, &["c", "c", "c"]);
451 }
452
453 #[test]
454 fn test_interleave_dictionary_nulls() {
455 let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]);
456 let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]);
457 let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
458 let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
459
460 let expected = vec![Some("fiz"), None, None, Some("foo")];
461
462 let values = interleave(
463 &[&input_1 as _, &input_2 as _],
464 &[(0, 3), (0, 2), (1, 0), (0, 0)],
465 )
466 .unwrap();
467 let dictionary = values.as_dictionary::<Int32Type>();
468 let actual: Vec<Option<&str>> = dictionary
469 .downcast_dict::<StringArray>()
470 .unwrap()
471 .into_iter()
472 .collect();
473
474 assert_eq!(actual, expected);
475 }
476
477 #[test]
478 fn test_lists() {
479 let mut a = ListBuilder::new(Int32Builder::new());
481 a.values().append_value(1);
482 a.values().append_value(2);
483 a.append(true);
484 a.append(false);
485 a.values().append_value(3);
486 a.append(true);
487 let a = a.finish();
488
489 let mut b = ListBuilder::new(Int32Builder::new());
491 b.values().append_value(4);
492 b.append(true);
493 b.append(false);
494 b.values().append_value(5);
495 b.values().append_value(6);
496 b.values().append_null();
497 b.append(true);
498 let b = b.finish();
499
500 let values = interleave(&[&a, &b], &[(0, 2), (0, 1), (1, 0), (1, 2), (1, 1)]).unwrap();
501 let v = values.as_any().downcast_ref::<ListArray>().unwrap();
502
503 let mut expected = ListBuilder::new(Int32Builder::new());
505 expected.values().append_value(3);
506 expected.append(true);
507 expected.append(false);
508 expected.values().append_value(4);
509 expected.append(true);
510 expected.values().append_value(5);
511 expected.values().append_value(6);
512 expected.values().append_null();
513 expected.append(true);
514 expected.append(false);
515 let expected = expected.finish();
516
517 assert_eq!(v, &expected);
518 }
519
520 #[test]
521 fn interleave_sparse_nulls() {
522 let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
523 let keys = Int32Array::from_iter_values(0..10);
524 let dict_a = DictionaryArray::new(keys, Arc::new(values));
525 let values = StringArray::new_null(0);
526 let keys = Int32Array::new_null(10);
527 let dict_b = DictionaryArray::new(keys, Arc::new(values));
528
529 let indices = &[(0, 0), (0, 1), (0, 2), (1, 0)];
530 let array = interleave(&[&dict_a, &dict_b], indices).unwrap();
531
532 let expected =
533 DictionaryArray::<Int32Type>::from_iter(vec![Some("0"), Some("1"), Some("2"), None]);
534 assert_eq!(array.as_ref(), &expected)
535 }
536
537 #[test]
538 fn test_interleave_views() {
539 let values = StringArray::from_iter_values([
540 "hello",
541 "world_long_string_not_inlined",
542 "foo",
543 "bar",
544 "baz",
545 ]);
546 let view_a = StringViewArray::from(&values);
547
548 let values = StringArray::from_iter_values([
549 "test",
550 "data",
551 "more_long_string_not_inlined",
552 "views",
553 "here",
554 ]);
555 let view_b = StringViewArray::from(&values);
556
557 let indices = &[
558 (0, 2), (1, 0), (0, 4), (1, 3), (0, 1), ];
564
565 let values = interleave(&[&view_a, &view_b], indices).unwrap();
567 let result = values.as_string_view();
568 assert_eq!(result.data_buffers().len(), 1);
569
570 let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
571 let fallback_result = fallback.as_string_view();
572 assert_eq!(fallback_result.data_buffers().len(), 2);
574
575 let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
577
578 let fallback_collected: Vec<_> = fallback_result
579 .iter()
580 .map(|x| x.map(|s| s.to_string()))
581 .collect();
582
583 assert_eq!(&collected, &fallback_collected);
584
585 assert_eq!(
586 &collected,
587 &[
588 Some("foo".to_string()),
589 Some("test".to_string()),
590 Some("baz".to_string()),
591 Some("views".to_string()),
592 Some("world_long_string_not_inlined".to_string()),
593 ]
594 );
595 }
596
597 #[test]
598 fn test_interleave_views_with_nulls() {
599 let values = StringArray::from_iter([
600 Some("hello"),
601 None,
602 Some("foo_long_string_not_inlined"),
603 Some("bar"),
604 None,
605 ]);
606 let view_a = StringViewArray::from(&values);
607
608 let values = StringArray::from_iter([
609 Some("test"),
610 Some("data_long_string_not_inlined"),
611 None,
612 None,
613 Some("here"),
614 ]);
615 let view_b = StringViewArray::from(&values);
616
617 let indices = &[
618 (0, 1), (1, 2), (0, 2), (1, 3), (0, 4), ];
624
625 let values = interleave(&[&view_a, &view_b], indices).unwrap();
627 let result = values.as_string_view();
628 assert_eq!(result.data_buffers().len(), 1);
629
630 let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
631 let fallback_result = fallback.as_string_view();
632
633 let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
635
636 let fallback_collected: Vec<_> = fallback_result
637 .iter()
638 .map(|x| x.map(|s| s.to_string()))
639 .collect();
640
641 assert_eq!(&collected, &fallback_collected);
642
643 assert_eq!(
644 &collected,
645 &[
646 None,
647 None,
648 Some("foo_long_string_not_inlined".to_string()),
649 None,
650 None,
651 ]
652 );
653 }
654
655 #[test]
656 fn test_interleave_views_multiple_buffers() {
657 let str1 = "very_long_string_from_first_buffer".as_bytes();
658 let str2 = "very_long_string_from_second_buffer".as_bytes();
659 let buffer1 = str1.to_vec().into();
660 let buffer2 = str2.to_vec().into();
661
662 let view1 = ByteView::new(str1.len() as u32, &str1[..4])
663 .with_buffer_index(0)
664 .with_offset(0)
665 .as_u128();
666 let view2 = ByteView::new(str2.len() as u32, &str2[..4])
667 .with_buffer_index(1)
668 .with_offset(0)
669 .as_u128();
670 let view_a =
671 StringViewArray::try_new(vec![view1, view2].into(), vec![buffer1, buffer2], None)
672 .unwrap();
673
674 let str3 = "another_very_long_string_buffer_three".as_bytes();
675 let str4 = "different_long_string_in_buffer_four".as_bytes();
676 let buffer3 = str3.to_vec().into();
677 let buffer4 = str4.to_vec().into();
678
679 let view3 = ByteView::new(str3.len() as u32, &str3[..4])
680 .with_buffer_index(0)
681 .with_offset(0)
682 .as_u128();
683 let view4 = ByteView::new(str4.len() as u32, &str4[..4])
684 .with_buffer_index(1)
685 .with_offset(0)
686 .as_u128();
687 let view_b =
688 StringViewArray::try_new(vec![view3, view4].into(), vec![buffer3, buffer4], None)
689 .unwrap();
690
691 let indices = &[
692 (0, 0), (1, 0), (0, 1), (1, 1), (0, 0), (1, 1), ];
699
700 let values = interleave(&[&view_a, &view_b], indices).unwrap();
702 let result = values.as_string_view();
703
704 assert_eq!(
705 result.data_buffers().len(),
706 4,
707 "Expected four buffers (two from each input array)"
708 );
709
710 let result_strings: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
711 assert_eq!(
712 result_strings,
713 vec![
714 Some("very_long_string_from_first_buffer".to_string()),
715 Some("another_very_long_string_buffer_three".to_string()),
716 Some("very_long_string_from_second_buffer".to_string()),
717 Some("different_long_string_in_buffer_four".to_string()),
718 Some("very_long_string_from_first_buffer".to_string()),
719 Some("different_long_string_in_buffer_four".to_string()),
720 ]
721 );
722
723 let views = result.views();
724 let buffer_indices: Vec<_> = views
725 .iter()
726 .map(|raw_view| ByteView::from(*raw_view).buffer_index)
727 .collect();
728
729 assert_eq!(
730 buffer_indices,
731 vec![
732 0, 1, 2, 3, 0, 3, ]
739 );
740 }
741
742 #[test]
743 fn test_interleave_run_end_encoded_primitive() {
744 let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
745 builder.extend([1, 1, 2, 2, 2, 3].into_iter().map(Some));
746 let a = builder.finish();
747
748 let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
749 builder.extend([4, 5, 5, 6, 6, 6].into_iter().map(Some));
750 let b = builder.finish();
751
752 let indices = &[(0, 1), (1, 0), (0, 4), (1, 2), (0, 5)];
753 let result = interleave(&[&a, &b], indices).unwrap();
754
755 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
757
758 let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
760
761 let expected = vec![1, 4, 2, 5, 3];
763 let mut actual = Vec::new();
764 for i in 0..result_run_array.len() {
765 let physical_idx = result_run_array.get_physical_index(i);
766 let value = result_run_array
767 .values()
768 .as_primitive::<Int32Type>()
769 .value(physical_idx);
770 actual.push(value);
771 }
772 assert_eq!(actual, expected);
773 }
774
775 #[test]
776 fn test_interleave_run_end_encoded_string() {
777 let a: Int32RunArray = vec!["hello", "hello", "world", "world", "foo"]
778 .into_iter()
779 .collect();
780 let b: Int32RunArray = vec!["bar", "baz", "baz", "qux"].into_iter().collect();
781
782 let indices = &[(0, 0), (1, 1), (0, 3), (1, 3), (0, 4)];
783 let result = interleave(&[&a, &b], indices).unwrap();
784
785 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
787
788 let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
790
791 let expected = vec!["hello", "baz", "world", "qux", "foo"];
793 let mut actual = Vec::new();
794 for i in 0..result_run_array.len() {
795 let physical_idx = result_run_array.get_physical_index(i);
796 let value = result_run_array
797 .values()
798 .as_string::<i32>()
799 .value(physical_idx);
800 actual.push(value);
801 }
802 assert_eq!(actual, expected);
803 }
804
805 #[test]
806 fn test_interleave_run_end_encoded_with_nulls() {
807 let a: Int32RunArray = vec![Some("a"), Some("a"), None, None, Some("b")]
808 .into_iter()
809 .collect();
810 let b: Int32RunArray = vec![None, Some("c"), Some("c"), Some("d")]
811 .into_iter()
812 .collect();
813
814 let indices = &[(0, 1), (1, 0), (0, 2), (1, 3), (0, 4)];
815 let result = interleave(&[&a, &b], indices).unwrap();
816
817 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
819
820 let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
822
823 let expected = vec![Some("a"), None, None, Some("d"), Some("b")];
825 let mut actual = Vec::new();
826 for i in 0..result_run_array.len() {
827 let physical_idx = result_run_array.get_physical_index(i);
828 if result_run_array.values().is_null(physical_idx) {
829 actual.push(None);
830 } else {
831 let value = result_run_array
832 .values()
833 .as_string::<i32>()
834 .value(physical_idx);
835 actual.push(Some(value));
836 }
837 }
838 assert_eq!(actual, expected);
839 }
840
841 #[test]
842 fn test_interleave_run_end_encoded_different_run_types() {
843 let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
844 builder.extend([1, 1, 2, 3, 3].into_iter().map(Some));
845 let a = builder.finish();
846
847 let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
848 builder.extend([4, 5, 5, 6].into_iter().map(Some));
849 let b = builder.finish();
850
851 let indices = &[(0, 0), (1, 1), (0, 3), (1, 3)];
852 let result = interleave(&[&a, &b], indices).unwrap();
853
854 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
856
857 let result_run_array: &RunArray<Int16Type> = result.as_any().downcast_ref().unwrap();
859
860 let expected = vec![1, 5, 3, 6];
862 let mut actual = Vec::new();
863 for i in 0..result_run_array.len() {
864 let physical_idx = result_run_array.get_physical_index(i);
865 let value = result_run_array
866 .values()
867 .as_primitive::<Int32Type>()
868 .value(physical_idx);
869 actual.push(value);
870 }
871 assert_eq!(actual, expected);
872 }
873
874 #[test]
875 fn test_interleave_run_end_encoded_mixed_run_lengths() {
876 let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
877 builder.extend([1, 2, 2, 2, 2, 3, 3, 4].into_iter().map(Some));
878 let a = builder.finish();
879
880 let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
881 builder.extend([5, 5, 5, 6, 7, 7, 8, 8].into_iter().map(Some));
882 let b = builder.finish();
883
884 let indices = &[
885 (0, 0), (1, 2), (0, 3), (1, 3), (0, 6), (1, 6), (0, 7), (1, 4), ];
894 let result = interleave(&[&a, &b], indices).unwrap();
895
896 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
898
899 let result_run_array: &RunArray<Int64Type> = result.as_any().downcast_ref().unwrap();
901
902 let expected = vec![1, 5, 2, 6, 3, 8, 4, 7];
904 let mut actual = Vec::new();
905 for i in 0..result_run_array.len() {
906 let physical_idx = result_run_array.get_physical_index(i);
907 let value = result_run_array
908 .values()
909 .as_primitive::<Int32Type>()
910 .value(physical_idx);
911 actual.push(value);
912 }
913 assert_eq!(actual, expected);
914 }
915
916 #[test]
917 fn test_interleave_run_end_encoded_empty_runs() {
918 let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
919 builder.extend([1].into_iter().map(Some));
920 let a = builder.finish();
921
922 let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
923 builder.extend([2, 2, 2].into_iter().map(Some));
924 let b = builder.finish();
925
926 let indices = &[(0, 0), (1, 1), (1, 2)];
927 let result = interleave(&[&a, &b], indices).unwrap();
928
929 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
931
932 let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
934
935 let expected = vec![1, 2, 2];
937 let mut actual = Vec::new();
938 for i in 0..result_run_array.len() {
939 let physical_idx = result_run_array.get_physical_index(i);
940 let value = result_run_array
941 .values()
942 .as_primitive::<Int32Type>()
943 .value(physical_idx);
944 actual.push(value);
945 }
946 assert_eq!(actual, expected);
947 }
948}