1use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
21use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder, PrimitiveBuilder};
22use arrow_array::cast::AsArray;
23use arrow_array::types::*;
24use arrow_array::*;
25use arrow_buffer::{ArrowNativeType, BooleanBuffer, MutableBuffer, NullBuffer, OffsetBuffer};
26use arrow_data::transform::MutableArrayData;
27use arrow_data::ByteView;
28use arrow_schema::{ArrowError, DataType};
29use std::collections::HashMap;
30use std::sync::Arc;
31
32macro_rules! primitive_helper {
33 ($t:ty, $values:ident, $indices:ident, $data_type:ident) => {
34 interleave_primitive::<$t>($values, $indices, $data_type)
35 };
36}
37
38macro_rules! dict_helper {
39 ($t:ty, $values:expr, $indices:expr) => {
40 Ok(Arc::new(interleave_dictionaries::<$t>($values, $indices)?) as _)
41 };
42}
43
44pub fn interleave(
72 values: &[&dyn Array],
73 indices: &[(usize, usize)],
74) -> Result<ArrayRef, ArrowError> {
75 if values.is_empty() {
76 return Err(ArrowError::InvalidArgumentError(
77 "interleave requires input of at least one array".to_string(),
78 ));
79 }
80 let data_type = values[0].data_type();
81
82 for array in values.iter().skip(1) {
83 if array.data_type() != data_type {
84 return Err(ArrowError::InvalidArgumentError(format!(
85 "It is not possible to interleave arrays of different data types ({} and {})",
86 data_type,
87 array.data_type()
88 )));
89 }
90 }
91
92 if indices.is_empty() {
93 return Ok(new_empty_array(data_type));
94 }
95
96 downcast_primitive! {
97 data_type => (primitive_helper, values, indices, data_type),
98 DataType::Utf8 => interleave_bytes::<Utf8Type>(values, indices),
99 DataType::LargeUtf8 => interleave_bytes::<LargeUtf8Type>(values, indices),
100 DataType::Binary => interleave_bytes::<BinaryType>(values, indices),
101 DataType::LargeBinary => interleave_bytes::<LargeBinaryType>(values, indices),
102 DataType::BinaryView => interleave_views::<BinaryViewType>(values, indices),
103 DataType::Utf8View => interleave_views::<StringViewType>(values, indices),
104 DataType::Dictionary(k, _) => downcast_integer! {
105 k.as_ref() => (dict_helper, values, indices),
106 _ => unreachable!("illegal dictionary key type {k}")
107 },
108 _ => interleave_fallback(values, indices)
109 }
110}
111
112struct Interleave<'a, T> {
116 arrays: Vec<&'a T>,
118 nulls: Option<NullBuffer>,
120}
121
122impl<'a, T: Array + 'static> Interleave<'a, T> {
123 fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) -> Self {
124 let mut has_nulls = false;
125 let arrays: Vec<&T> = values
126 .iter()
127 .map(|x| {
128 has_nulls = has_nulls || x.null_count() != 0;
129 x.as_any().downcast_ref().unwrap()
130 })
131 .collect();
132
133 let nulls = match has_nulls {
134 true => {
135 let nulls = BooleanBuffer::collect_bool(indices.len(), |i| {
136 let (a, b) = indices[i];
137 arrays[a].is_valid(b)
138 });
139 Some(nulls.into())
140 }
141 false => None,
142 };
143
144 Self { arrays, nulls }
145 }
146}
147
148fn interleave_primitive<T: ArrowPrimitiveType>(
149 values: &[&dyn Array],
150 indices: &[(usize, usize)],
151 data_type: &DataType,
152) -> Result<ArrayRef, ArrowError> {
153 let interleaved = Interleave::<'_, PrimitiveArray<T>>::new(values, indices);
154
155 let values = indices
156 .iter()
157 .map(|(a, b)| interleaved.arrays[*a].value(*b))
158 .collect::<Vec<_>>();
159
160 let array = PrimitiveArray::<T>::new(values.into(), interleaved.nulls);
161 Ok(Arc::new(array.with_data_type(data_type.clone())))
162}
163
164fn interleave_bytes<T: ByteArrayType>(
165 values: &[&dyn Array],
166 indices: &[(usize, usize)],
167) -> Result<ArrayRef, ArrowError> {
168 let interleaved = Interleave::<'_, GenericByteArray<T>>::new(values, indices);
169
170 let mut capacity = 0;
171 let mut offsets = Vec::with_capacity(indices.len() + 1);
172 offsets.push(T::Offset::from_usize(0).unwrap());
173 offsets.extend(indices.iter().map(|(a, b)| {
174 let o = interleaved.arrays[*a].value_offsets();
175 let element_len = o[*b + 1].as_usize() - o[*b].as_usize();
176 capacity += element_len;
177 T::Offset::from_usize(capacity).expect("overflow")
178 }));
179
180 let mut values = Vec::with_capacity(capacity);
181 for (a, b) in indices {
182 values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
183 }
184
185 let array = unsafe {
187 let offsets = OffsetBuffer::new_unchecked(offsets.into());
188 GenericByteArray::<T>::new_unchecked(offsets, values.into(), interleaved.nulls)
189 };
190 Ok(Arc::new(array))
191}
192
193fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
194 arrays: &[&dyn Array],
195 indices: &[(usize, usize)],
196) -> Result<ArrayRef, ArrowError> {
197 let dictionaries: Vec<_> = arrays.iter().map(|x| x.as_dictionary::<K>()).collect();
198 if !should_merge_dictionary_values::<K>(&dictionaries, indices.len()) {
199 return interleave_fallback(arrays, indices);
200 }
201
202 let masks: Vec<_> = dictionaries
203 .iter()
204 .enumerate()
205 .map(|(a_idx, dictionary)| {
206 let mut key_mask = BooleanBufferBuilder::new_from_buffer(
207 MutableBuffer::new_null(dictionary.len()),
208 dictionary.len(),
209 );
210
211 for (_, key_idx) in indices.iter().filter(|(a, _)| *a == a_idx) {
212 key_mask.set_bit(*key_idx, true);
213 }
214 key_mask.finish()
215 })
216 .collect();
217
218 let merged = merge_dictionary_values(&dictionaries, Some(&masks))?;
219
220 let mut keys = PrimitiveBuilder::<K>::with_capacity(indices.len());
222 for (a, b) in indices {
223 let old_keys: &PrimitiveArray<K> = dictionaries[*a].keys();
224 match old_keys.is_valid(*b) {
225 true => {
226 let old_key = old_keys.values()[*b];
227 keys.append_value(merged.key_mappings[*a][old_key.as_usize()])
228 }
229 false => keys.append_null(),
230 }
231 }
232 let array = unsafe { DictionaryArray::new_unchecked(keys.finish(), merged.values) };
233 Ok(Arc::new(array))
234}
235
236fn interleave_views<T: ByteViewType>(
237 values: &[&dyn Array],
238 indices: &[(usize, usize)],
239) -> Result<ArrayRef, ArrowError> {
240 let interleaved = Interleave::<'_, GenericByteViewArray<T>>::new(values, indices);
241 let mut views_builder = BufferBuilder::new(indices.len());
242 let mut buffers = Vec::new();
243
244 let mut buffer_lookup: HashMap<(usize, u32), u32> = HashMap::new();
246 for (array_idx, value_idx) in indices {
247 let array = interleaved.arrays[*array_idx];
248 let raw_view = array.views().get(*value_idx).unwrap();
249 let view_len = *raw_view as u32;
250 if view_len <= 12 {
251 views_builder.append(*raw_view);
252 continue;
253 }
254 let view = ByteView::from(*raw_view);
256 let new_buffer_idx: &mut u32 = buffer_lookup
257 .entry((*array_idx, view.buffer_index))
258 .or_insert_with(|| {
259 buffers.push(array.data_buffers()[view.buffer_index as usize].clone());
260 (buffers.len() - 1) as u32
261 });
262 views_builder.append(view.with_buffer_index(*new_buffer_idx).into());
263 }
264
265 let array = unsafe {
266 GenericByteViewArray::<T>::new_unchecked(views_builder.into(), buffers, interleaved.nulls)
267 };
268 Ok(Arc::new(array))
269}
270
271fn interleave_fallback(
273 values: &[&dyn Array],
274 indices: &[(usize, usize)],
275) -> Result<ArrayRef, ArrowError> {
276 let arrays: Vec<_> = values.iter().map(|x| x.to_data()).collect();
277 let arrays: Vec<_> = arrays.iter().collect();
278 let mut array_data = MutableArrayData::new(arrays, false, indices.len());
279
280 let mut cur_array = indices[0].0;
281 let mut start_row_idx = indices[0].1;
282 let mut end_row_idx = start_row_idx + 1;
283
284 for (array, row) in indices.iter().skip(1).copied() {
285 if array == cur_array && row == end_row_idx {
286 end_row_idx += 1;
288 continue;
289 }
290
291 array_data.extend(cur_array, start_row_idx, end_row_idx);
293
294 cur_array = array;
296 start_row_idx = row;
297 end_row_idx = start_row_idx + 1;
298 }
299
300 array_data.extend(cur_array, start_row_idx, end_row_idx);
302 Ok(make_array(array_data.freeze()))
303}
304
305pub fn interleave_record_batch(
350 record_batches: &[&RecordBatch],
351 indices: &[(usize, usize)],
352) -> Result<RecordBatch, ArrowError> {
353 let schema = record_batches[0].schema();
354 let columns = (0..schema.fields().len())
355 .map(|i| {
356 let column_values: Vec<&dyn Array> = record_batches
357 .iter()
358 .map(|batch| batch.column(i).as_ref())
359 .collect();
360 interleave(&column_values, indices)
361 })
362 .collect::<Result<Vec<_>, _>>()?;
363 RecordBatch::try_new(schema, columns)
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369 use arrow_array::builder::{Int32Builder, ListBuilder};
370
371 #[test]
372 fn test_primitive() {
373 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
374 let b = Int32Array::from_iter_values([5, 6, 7]);
375 let c = Int32Array::from_iter_values([8, 9, 10]);
376 let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
377 let v = values.as_primitive::<Int32Type>();
378 assert_eq!(v.values(), &[4, 4, 10, 8, 6]);
379 }
380
381 #[test]
382 fn test_primitive_nulls() {
383 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
384 let b = Int32Array::from_iter([Some(1), Some(4), None]);
385 let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (0, 2)]).unwrap();
386 let v: Vec<_> = values.as_primitive::<Int32Type>().into_iter().collect();
387 assert_eq!(&v, &[Some(2), None, None, Some(4), Some(3)])
388 }
389
390 #[test]
391 fn test_primitive_empty() {
392 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
393 let v = interleave(&[&a], &[]).unwrap();
394 assert!(v.is_empty());
395 assert_eq!(v.data_type(), &DataType::Int32);
396 }
397
398 #[test]
399 fn test_strings() {
400 let a = StringArray::from_iter_values(["a", "b", "c"]);
401 let b = StringArray::from_iter_values(["hello", "world", "foo"]);
402 let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
403 let v = values.as_string::<i32>();
404 let values: Vec<_> = v.into_iter().collect();
405 assert_eq!(
406 &values,
407 &[
408 Some("c"),
409 Some("c"),
410 Some("hello"),
411 Some("world"),
412 Some("b")
413 ]
414 )
415 }
416
417 #[test]
418 fn test_interleave_dictionary() {
419 let a = DictionaryArray::<Int32Type>::from_iter(["a", "b", "c", "a", "b"]);
420 let b = DictionaryArray::<Int32Type>::from_iter(["a", "c", "a", "c", "a"]);
421
422 let values =
424 interleave(&[&a, &b], &[(0, 2), (0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
425 let v = values.as_dictionary::<Int32Type>();
426 assert_eq!(v.values().len(), 5);
427
428 let vc = v.downcast_dict::<StringArray>().unwrap();
429 let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
430 assert_eq!(&collected, &["c", "c", "c", "a", "c", "b"]);
431
432 let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 1)]).unwrap();
434 let v = values.as_dictionary::<Int32Type>();
435 assert_eq!(v.values().len(), 1);
436
437 let vc = v.downcast_dict::<StringArray>().unwrap();
438 let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
439 assert_eq!(&collected, &["c", "c", "c"]);
440 }
441
442 #[test]
443 fn test_interleave_dictionary_nulls() {
444 let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]);
445 let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]);
446 let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
447 let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
448
449 let expected = vec![Some("fiz"), None, None, Some("foo")];
450
451 let values = interleave(
452 &[&input_1 as _, &input_2 as _],
453 &[(0, 3), (0, 2), (1, 0), (0, 0)],
454 )
455 .unwrap();
456 let dictionary = values.as_dictionary::<Int32Type>();
457 let actual: Vec<Option<&str>> = dictionary
458 .downcast_dict::<StringArray>()
459 .unwrap()
460 .into_iter()
461 .collect();
462
463 assert_eq!(actual, expected);
464 }
465
466 #[test]
467 fn test_lists() {
468 let mut a = ListBuilder::new(Int32Builder::new());
470 a.values().append_value(1);
471 a.values().append_value(2);
472 a.append(true);
473 a.append(false);
474 a.values().append_value(3);
475 a.append(true);
476 let a = a.finish();
477
478 let mut b = ListBuilder::new(Int32Builder::new());
480 b.values().append_value(4);
481 b.append(true);
482 b.append(false);
483 b.values().append_value(5);
484 b.values().append_value(6);
485 b.values().append_null();
486 b.append(true);
487 let b = b.finish();
488
489 let values = interleave(&[&a, &b], &[(0, 2), (0, 1), (1, 0), (1, 2), (1, 1)]).unwrap();
490 let v = values.as_any().downcast_ref::<ListArray>().unwrap();
491
492 let mut expected = ListBuilder::new(Int32Builder::new());
494 expected.values().append_value(3);
495 expected.append(true);
496 expected.append(false);
497 expected.values().append_value(4);
498 expected.append(true);
499 expected.values().append_value(5);
500 expected.values().append_value(6);
501 expected.values().append_null();
502 expected.append(true);
503 expected.append(false);
504 let expected = expected.finish();
505
506 assert_eq!(v, &expected);
507 }
508
509 #[test]
510 fn interleave_sparse_nulls() {
511 let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
512 let keys = Int32Array::from_iter_values(0..10);
513 let dict_a = DictionaryArray::new(keys, Arc::new(values));
514 let values = StringArray::new_null(0);
515 let keys = Int32Array::new_null(10);
516 let dict_b = DictionaryArray::new(keys, Arc::new(values));
517
518 let indices = &[(0, 0), (0, 1), (0, 2), (1, 0)];
519 let array = interleave(&[&dict_a, &dict_b], indices).unwrap();
520
521 let expected =
522 DictionaryArray::<Int32Type>::from_iter(vec![Some("0"), Some("1"), Some("2"), None]);
523 assert_eq!(array.as_ref(), &expected)
524 }
525
526 #[test]
527 fn test_interleave_views() {
528 let values = StringArray::from_iter_values([
529 "hello",
530 "world_long_string_not_inlined",
531 "foo",
532 "bar",
533 "baz",
534 ]);
535 let view_a = StringViewArray::from(&values);
536
537 let values = StringArray::from_iter_values([
538 "test",
539 "data",
540 "more_long_string_not_inlined",
541 "views",
542 "here",
543 ]);
544 let view_b = StringViewArray::from(&values);
545
546 let indices = &[
547 (0, 2), (1, 0), (0, 4), (1, 3), (0, 1), ];
553
554 let values = interleave(&[&view_a, &view_b], indices).unwrap();
556 let result = values.as_string_view();
557 assert_eq!(result.data_buffers().len(), 1);
558
559 let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
560 let fallback_result = fallback.as_string_view();
561 assert_eq!(fallback_result.data_buffers().len(), 2);
563
564 let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
566
567 let fallback_collected: Vec<_> = fallback_result
568 .iter()
569 .map(|x| x.map(|s| s.to_string()))
570 .collect();
571
572 assert_eq!(&collected, &fallback_collected);
573
574 assert_eq!(
575 &collected,
576 &[
577 Some("foo".to_string()),
578 Some("test".to_string()),
579 Some("baz".to_string()),
580 Some("views".to_string()),
581 Some("world_long_string_not_inlined".to_string()),
582 ]
583 );
584 }
585
586 #[test]
587 fn test_interleave_views_with_nulls() {
588 let values = StringArray::from_iter([
589 Some("hello"),
590 None,
591 Some("foo_long_string_not_inlined"),
592 Some("bar"),
593 None,
594 ]);
595 let view_a = StringViewArray::from(&values);
596
597 let values = StringArray::from_iter([
598 Some("test"),
599 Some("data_long_string_not_inlined"),
600 None,
601 None,
602 Some("here"),
603 ]);
604 let view_b = StringViewArray::from(&values);
605
606 let indices = &[
607 (0, 1), (1, 2), (0, 2), (1, 3), (0, 4), ];
613
614 let values = interleave(&[&view_a, &view_b], indices).unwrap();
616 let result = values.as_string_view();
617 assert_eq!(result.data_buffers().len(), 1);
618
619 let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
620 let fallback_result = fallback.as_string_view();
621
622 let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
624
625 let fallback_collected: Vec<_> = fallback_result
626 .iter()
627 .map(|x| x.map(|s| s.to_string()))
628 .collect();
629
630 assert_eq!(&collected, &fallback_collected);
631
632 assert_eq!(
633 &collected,
634 &[
635 None,
636 None,
637 Some("foo_long_string_not_inlined".to_string()),
638 None,
639 None,
640 ]
641 );
642 }
643
644 #[test]
645 fn test_interleave_views_multiple_buffers() {
646 let str1 = "very_long_string_from_first_buffer".as_bytes();
647 let str2 = "very_long_string_from_second_buffer".as_bytes();
648 let buffer1 = str1.to_vec().into();
649 let buffer2 = str2.to_vec().into();
650
651 let view1 = ByteView::new(str1.len() as u32, &str1[..4])
652 .with_buffer_index(0)
653 .with_offset(0)
654 .as_u128();
655 let view2 = ByteView::new(str2.len() as u32, &str2[..4])
656 .with_buffer_index(1)
657 .with_offset(0)
658 .as_u128();
659 let view_a =
660 StringViewArray::try_new(vec![view1, view2].into(), vec![buffer1, buffer2], None)
661 .unwrap();
662
663 let str3 = "another_very_long_string_buffer_three".as_bytes();
664 let str4 = "different_long_string_in_buffer_four".as_bytes();
665 let buffer3 = str3.to_vec().into();
666 let buffer4 = str4.to_vec().into();
667
668 let view3 = ByteView::new(str3.len() as u32, &str3[..4])
669 .with_buffer_index(0)
670 .with_offset(0)
671 .as_u128();
672 let view4 = ByteView::new(str4.len() as u32, &str4[..4])
673 .with_buffer_index(1)
674 .with_offset(0)
675 .as_u128();
676 let view_b =
677 StringViewArray::try_new(vec![view3, view4].into(), vec![buffer3, buffer4], None)
678 .unwrap();
679
680 let indices = &[
681 (0, 0), (1, 0), (0, 1), (1, 1), (0, 0), (1, 1), ];
688
689 let values = interleave(&[&view_a, &view_b], indices).unwrap();
691 let result = values.as_string_view();
692
693 assert_eq!(
694 result.data_buffers().len(),
695 4,
696 "Expected four buffers (two from each input array)"
697 );
698
699 let result_strings: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
700 assert_eq!(
701 result_strings,
702 vec![
703 Some("very_long_string_from_first_buffer".to_string()),
704 Some("another_very_long_string_buffer_three".to_string()),
705 Some("very_long_string_from_second_buffer".to_string()),
706 Some("different_long_string_in_buffer_four".to_string()),
707 Some("very_long_string_from_first_buffer".to_string()),
708 Some("different_long_string_in_buffer_four".to_string()),
709 ]
710 );
711
712 let views = result.views();
713 let buffer_indices: Vec<_> = views
714 .iter()
715 .map(|raw_view| ByteView::from(*raw_view).buffer_index)
716 .collect();
717
718 assert_eq!(
719 buffer_indices,
720 vec![
721 0, 1, 2, 3, 0, 3, ]
728 );
729 }
730}