1use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
21use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder, PrimitiveBuilder};
22use arrow_array::cast::AsArray;
23use arrow_array::types::*;
24use arrow_array::*;
25use arrow_buffer::{ArrowNativeType, MutableBuffer, NullBuffer, NullBufferBuilder, OffsetBuffer};
26use arrow_data::transform::MutableArrayData;
27use arrow_data::ByteView;
28use arrow_schema::{ArrowError, DataType};
29use std::collections::HashMap;
30use std::sync::Arc;
31
32macro_rules! primitive_helper {
33 ($t:ty, $values:ident, $indices:ident, $data_type:ident) => {
34 interleave_primitive::<$t>($values, $indices, $data_type)
35 };
36}
37
38macro_rules! dict_helper {
39 ($t:ty, $values:expr, $indices:expr) => {
40 Ok(Arc::new(interleave_dictionaries::<$t>($values, $indices)?) as _)
41 };
42}
43
44pub fn interleave(
72 values: &[&dyn Array],
73 indices: &[(usize, usize)],
74) -> Result<ArrayRef, ArrowError> {
75 if values.is_empty() {
76 return Err(ArrowError::InvalidArgumentError(
77 "interleave requires input of at least one array".to_string(),
78 ));
79 }
80 let data_type = values[0].data_type();
81
82 for array in values.iter().skip(1) {
83 if array.data_type() != data_type {
84 return Err(ArrowError::InvalidArgumentError(format!(
85 "It is not possible to interleave arrays of different data types ({} and {})",
86 data_type,
87 array.data_type()
88 )));
89 }
90 }
91
92 if indices.is_empty() {
93 return Ok(new_empty_array(data_type));
94 }
95
96 downcast_primitive! {
97 data_type => (primitive_helper, values, indices, data_type),
98 DataType::Utf8 => interleave_bytes::<Utf8Type>(values, indices),
99 DataType::LargeUtf8 => interleave_bytes::<LargeUtf8Type>(values, indices),
100 DataType::Binary => interleave_bytes::<BinaryType>(values, indices),
101 DataType::LargeBinary => interleave_bytes::<LargeBinaryType>(values, indices),
102 DataType::BinaryView => interleave_views::<BinaryViewType>(values, indices),
103 DataType::Utf8View => interleave_views::<StringViewType>(values, indices),
104 DataType::Dictionary(k, _) => downcast_integer! {
105 k.as_ref() => (dict_helper, values, indices),
106 _ => unreachable!("illegal dictionary key type {k}")
107 },
108 _ => interleave_fallback(values, indices)
109 }
110}
111
112struct Interleave<'a, T> {
116 arrays: Vec<&'a T>,
118 nulls: Option<NullBuffer>,
120}
121
122impl<'a, T: Array + 'static> Interleave<'a, T> {
123 fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) -> Self {
124 let mut has_nulls = false;
125 let arrays: Vec<&T> = values
126 .iter()
127 .map(|x| {
128 has_nulls = has_nulls || x.null_count() != 0;
129 x.as_any().downcast_ref().unwrap()
130 })
131 .collect();
132
133 let nulls = match has_nulls {
134 true => {
135 let mut builder = NullBufferBuilder::new(indices.len());
136 for (a, b) in indices {
137 let v = arrays[*a].is_valid(*b);
138 builder.append(v)
139 }
140 builder.finish()
141 }
142 false => None,
143 };
144
145 Self { arrays, nulls }
146 }
147}
148
149fn interleave_primitive<T: ArrowPrimitiveType>(
150 values: &[&dyn Array],
151 indices: &[(usize, usize)],
152 data_type: &DataType,
153) -> Result<ArrayRef, ArrowError> {
154 let interleaved = Interleave::<'_, PrimitiveArray<T>>::new(values, indices);
155
156 let mut values = Vec::with_capacity(indices.len());
157 for (a, b) in indices {
158 let v = interleaved.arrays[*a].value(*b);
159 values.push(v)
160 }
161
162 let array = PrimitiveArray::<T>::new(values.into(), interleaved.nulls);
163 Ok(Arc::new(array.with_data_type(data_type.clone())))
164}
165
166fn interleave_bytes<T: ByteArrayType>(
167 values: &[&dyn Array],
168 indices: &[(usize, usize)],
169) -> Result<ArrayRef, ArrowError> {
170 let interleaved = Interleave::<'_, GenericByteArray<T>>::new(values, indices);
171
172 let mut capacity = 0;
173 let mut offsets = BufferBuilder::<T::Offset>::new(indices.len() + 1);
174 offsets.append(T::Offset::from_usize(0).unwrap());
175 for (a, b) in indices {
176 let o = interleaved.arrays[*a].value_offsets();
177 let element_len = o[*b + 1].as_usize() - o[*b].as_usize();
178 capacity += element_len;
179 offsets.append(T::Offset::from_usize(capacity).expect("overflow"));
180 }
181
182 let mut values = MutableBuffer::new(capacity);
183 for (a, b) in indices {
184 values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
185 }
186
187 let array = unsafe {
189 let offsets = OffsetBuffer::new_unchecked(offsets.finish().into());
190 GenericByteArray::<T>::new_unchecked(offsets, values.into(), interleaved.nulls)
191 };
192 Ok(Arc::new(array))
193}
194
195fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
196 arrays: &[&dyn Array],
197 indices: &[(usize, usize)],
198) -> Result<ArrayRef, ArrowError> {
199 let dictionaries: Vec<_> = arrays.iter().map(|x| x.as_dictionary::<K>()).collect();
200 if !should_merge_dictionary_values::<K>(&dictionaries, indices.len()) {
201 return interleave_fallback(arrays, indices);
202 }
203
204 let masks: Vec<_> = dictionaries
205 .iter()
206 .enumerate()
207 .map(|(a_idx, dictionary)| {
208 let mut key_mask = BooleanBufferBuilder::new_from_buffer(
209 MutableBuffer::new_null(dictionary.len()),
210 dictionary.len(),
211 );
212
213 for (_, key_idx) in indices.iter().filter(|(a, _)| *a == a_idx) {
214 key_mask.set_bit(*key_idx, true);
215 }
216 key_mask.finish()
217 })
218 .collect();
219
220 let merged = merge_dictionary_values(&dictionaries, Some(&masks))?;
221
222 let mut keys = PrimitiveBuilder::<K>::with_capacity(indices.len());
224 for (a, b) in indices {
225 let old_keys: &PrimitiveArray<K> = dictionaries[*a].keys();
226 match old_keys.is_valid(*b) {
227 true => {
228 let old_key = old_keys.values()[*b];
229 keys.append_value(merged.key_mappings[*a][old_key.as_usize()])
230 }
231 false => keys.append_null(),
232 }
233 }
234 let array = unsafe { DictionaryArray::new_unchecked(keys.finish(), merged.values) };
235 Ok(Arc::new(array))
236}
237
238fn interleave_views<T: ByteViewType>(
239 values: &[&dyn Array],
240 indices: &[(usize, usize)],
241) -> Result<ArrayRef, ArrowError> {
242 let interleaved = Interleave::<'_, GenericByteViewArray<T>>::new(values, indices);
243 let mut views_builder = BufferBuilder::new(indices.len());
244 let mut buffers = Vec::new();
245
246 let mut buffer_lookup: HashMap<(usize, u32), u32> = HashMap::new();
248 for (array_idx, value_idx) in indices {
249 let array = interleaved.arrays[*array_idx];
250 let raw_view = array.views().get(*value_idx).unwrap();
251 let view_len = *raw_view as u32;
252 if view_len <= 12 {
253 views_builder.append(*raw_view);
254 continue;
255 }
256 let view = ByteView::from(*raw_view);
258 let new_buffer_idx: &mut u32 = buffer_lookup
259 .entry((*array_idx, view.buffer_index))
260 .or_insert_with(|| {
261 buffers.push(array.data_buffers()[view.buffer_index as usize].clone());
262 (buffers.len() - 1) as u32
263 });
264 views_builder.append(view.with_buffer_index(*new_buffer_idx).into());
265 }
266
267 let array = unsafe {
268 GenericByteViewArray::<T>::new_unchecked(views_builder.into(), buffers, interleaved.nulls)
269 };
270 Ok(Arc::new(array))
271}
272
273fn interleave_fallback(
275 values: &[&dyn Array],
276 indices: &[(usize, usize)],
277) -> Result<ArrayRef, ArrowError> {
278 let arrays: Vec<_> = values.iter().map(|x| x.to_data()).collect();
279 let arrays: Vec<_> = arrays.iter().collect();
280 let mut array_data = MutableArrayData::new(arrays, false, indices.len());
281
282 let mut cur_array = indices[0].0;
283 let mut start_row_idx = indices[0].1;
284 let mut end_row_idx = start_row_idx + 1;
285
286 for (array, row) in indices.iter().skip(1).copied() {
287 if array == cur_array && row == end_row_idx {
288 end_row_idx += 1;
290 continue;
291 }
292
293 array_data.extend(cur_array, start_row_idx, end_row_idx);
295
296 cur_array = array;
298 start_row_idx = row;
299 end_row_idx = start_row_idx + 1;
300 }
301
302 array_data.extend(cur_array, start_row_idx, end_row_idx);
304 Ok(make_array(array_data.freeze()))
305}
306
307pub fn interleave_record_batch(
352 record_batches: &[&RecordBatch],
353 indices: &[(usize, usize)],
354) -> Result<RecordBatch, ArrowError> {
355 let schema = record_batches[0].schema();
356 let columns = (0..schema.fields().len())
357 .map(|i| {
358 let column_values: Vec<&dyn Array> = record_batches
359 .iter()
360 .map(|batch| batch.column(i).as_ref())
361 .collect();
362 interleave(&column_values, indices)
363 })
364 .collect::<Result<Vec<_>, _>>()?;
365 RecordBatch::try_new(schema, columns)
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371 use arrow_array::builder::{Int32Builder, ListBuilder};
372
373 #[test]
374 fn test_primitive() {
375 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
376 let b = Int32Array::from_iter_values([5, 6, 7]);
377 let c = Int32Array::from_iter_values([8, 9, 10]);
378 let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
379 let v = values.as_primitive::<Int32Type>();
380 assert_eq!(v.values(), &[4, 4, 10, 8, 6]);
381 }
382
383 #[test]
384 fn test_primitive_nulls() {
385 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
386 let b = Int32Array::from_iter([Some(1), Some(4), None]);
387 let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (0, 2)]).unwrap();
388 let v: Vec<_> = values.as_primitive::<Int32Type>().into_iter().collect();
389 assert_eq!(&v, &[Some(2), None, None, Some(4), Some(3)])
390 }
391
392 #[test]
393 fn test_primitive_empty() {
394 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
395 let v = interleave(&[&a], &[]).unwrap();
396 assert!(v.is_empty());
397 assert_eq!(v.data_type(), &DataType::Int32);
398 }
399
400 #[test]
401 fn test_strings() {
402 let a = StringArray::from_iter_values(["a", "b", "c"]);
403 let b = StringArray::from_iter_values(["hello", "world", "foo"]);
404 let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
405 let v = values.as_string::<i32>();
406 let values: Vec<_> = v.into_iter().collect();
407 assert_eq!(
408 &values,
409 &[
410 Some("c"),
411 Some("c"),
412 Some("hello"),
413 Some("world"),
414 Some("b")
415 ]
416 )
417 }
418
419 #[test]
420 fn test_interleave_dictionary() {
421 let a = DictionaryArray::<Int32Type>::from_iter(["a", "b", "c", "a", "b"]);
422 let b = DictionaryArray::<Int32Type>::from_iter(["a", "c", "a", "c", "a"]);
423
424 let values =
426 interleave(&[&a, &b], &[(0, 2), (0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
427 let v = values.as_dictionary::<Int32Type>();
428 assert_eq!(v.values().len(), 5);
429
430 let vc = v.downcast_dict::<StringArray>().unwrap();
431 let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
432 assert_eq!(&collected, &["c", "c", "c", "a", "c", "b"]);
433
434 let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 1)]).unwrap();
436 let v = values.as_dictionary::<Int32Type>();
437 assert_eq!(v.values().len(), 1);
438
439 let vc = v.downcast_dict::<StringArray>().unwrap();
440 let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
441 assert_eq!(&collected, &["c", "c", "c"]);
442 }
443
444 #[test]
445 fn test_interleave_dictionary_nulls() {
446 let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]);
447 let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]);
448 let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
449 let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
450
451 let expected = vec![Some("fiz"), None, None, Some("foo")];
452
453 let values = interleave(
454 &[&input_1 as _, &input_2 as _],
455 &[(0, 3), (0, 2), (1, 0), (0, 0)],
456 )
457 .unwrap();
458 let dictionary = values.as_dictionary::<Int32Type>();
459 let actual: Vec<Option<&str>> = dictionary
460 .downcast_dict::<StringArray>()
461 .unwrap()
462 .into_iter()
463 .collect();
464
465 assert_eq!(actual, expected);
466 }
467
468 #[test]
469 fn test_lists() {
470 let mut a = ListBuilder::new(Int32Builder::new());
472 a.values().append_value(1);
473 a.values().append_value(2);
474 a.append(true);
475 a.append(false);
476 a.values().append_value(3);
477 a.append(true);
478 let a = a.finish();
479
480 let mut b = ListBuilder::new(Int32Builder::new());
482 b.values().append_value(4);
483 b.append(true);
484 b.append(false);
485 b.values().append_value(5);
486 b.values().append_value(6);
487 b.values().append_null();
488 b.append(true);
489 let b = b.finish();
490
491 let values = interleave(&[&a, &b], &[(0, 2), (0, 1), (1, 0), (1, 2), (1, 1)]).unwrap();
492 let v = values.as_any().downcast_ref::<ListArray>().unwrap();
493
494 let mut expected = ListBuilder::new(Int32Builder::new());
496 expected.values().append_value(3);
497 expected.append(true);
498 expected.append(false);
499 expected.values().append_value(4);
500 expected.append(true);
501 expected.values().append_value(5);
502 expected.values().append_value(6);
503 expected.values().append_null();
504 expected.append(true);
505 expected.append(false);
506 let expected = expected.finish();
507
508 assert_eq!(v, &expected);
509 }
510
511 #[test]
512 fn interleave_sparse_nulls() {
513 let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
514 let keys = Int32Array::from_iter_values(0..10);
515 let dict_a = DictionaryArray::new(keys, Arc::new(values));
516 let values = StringArray::new_null(0);
517 let keys = Int32Array::new_null(10);
518 let dict_b = DictionaryArray::new(keys, Arc::new(values));
519
520 let indices = &[(0, 0), (0, 1), (0, 2), (1, 0)];
521 let array = interleave(&[&dict_a, &dict_b], indices).unwrap();
522
523 let expected =
524 DictionaryArray::<Int32Type>::from_iter(vec![Some("0"), Some("1"), Some("2"), None]);
525 assert_eq!(array.as_ref(), &expected)
526 }
527
528 #[test]
529 fn test_interleave_views() {
530 let values = StringArray::from_iter_values([
531 "hello",
532 "world_long_string_not_inlined",
533 "foo",
534 "bar",
535 "baz",
536 ]);
537 let view_a = StringViewArray::from(&values);
538
539 let values = StringArray::from_iter_values([
540 "test",
541 "data",
542 "more_long_string_not_inlined",
543 "views",
544 "here",
545 ]);
546 let view_b = StringViewArray::from(&values);
547
548 let indices = &[
549 (0, 2), (1, 0), (0, 4), (1, 3), (0, 1), ];
555
556 let values = interleave(&[&view_a, &view_b], indices).unwrap();
558 let result = values.as_string_view();
559 assert_eq!(result.data_buffers().len(), 1);
560
561 let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
562 let fallback_result = fallback.as_string_view();
563 assert_eq!(fallback_result.data_buffers().len(), 2);
565
566 let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
568
569 let fallback_collected: Vec<_> = fallback_result
570 .iter()
571 .map(|x| x.map(|s| s.to_string()))
572 .collect();
573
574 assert_eq!(&collected, &fallback_collected);
575
576 assert_eq!(
577 &collected,
578 &[
579 Some("foo".to_string()),
580 Some("test".to_string()),
581 Some("baz".to_string()),
582 Some("views".to_string()),
583 Some("world_long_string_not_inlined".to_string()),
584 ]
585 );
586 }
587
588 #[test]
589 fn test_interleave_views_with_nulls() {
590 let values = StringArray::from_iter([
591 Some("hello"),
592 None,
593 Some("foo_long_string_not_inlined"),
594 Some("bar"),
595 None,
596 ]);
597 let view_a = StringViewArray::from(&values);
598
599 let values = StringArray::from_iter([
600 Some("test"),
601 Some("data_long_string_not_inlined"),
602 None,
603 None,
604 Some("here"),
605 ]);
606 let view_b = StringViewArray::from(&values);
607
608 let indices = &[
609 (0, 1), (1, 2), (0, 2), (1, 3), (0, 4), ];
615
616 let values = interleave(&[&view_a, &view_b], indices).unwrap();
618 let result = values.as_string_view();
619 assert_eq!(result.data_buffers().len(), 1);
620
621 let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
622 let fallback_result = fallback.as_string_view();
623
624 let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
626
627 let fallback_collected: Vec<_> = fallback_result
628 .iter()
629 .map(|x| x.map(|s| s.to_string()))
630 .collect();
631
632 assert_eq!(&collected, &fallback_collected);
633
634 assert_eq!(
635 &collected,
636 &[
637 None,
638 None,
639 Some("foo_long_string_not_inlined".to_string()),
640 None,
641 None,
642 ]
643 );
644 }
645
646 #[test]
647 fn test_interleave_views_multiple_buffers() {
648 let str1 = "very_long_string_from_first_buffer".as_bytes();
649 let str2 = "very_long_string_from_second_buffer".as_bytes();
650 let buffer1 = str1.to_vec().into();
651 let buffer2 = str2.to_vec().into();
652
653 let view1 = ByteView::new(str1.len() as u32, &str1[..4])
654 .with_buffer_index(0)
655 .with_offset(0)
656 .as_u128();
657 let view2 = ByteView::new(str2.len() as u32, &str2[..4])
658 .with_buffer_index(1)
659 .with_offset(0)
660 .as_u128();
661 let view_a =
662 StringViewArray::try_new(vec![view1, view2].into(), vec![buffer1, buffer2], None)
663 .unwrap();
664
665 let str3 = "another_very_long_string_buffer_three".as_bytes();
666 let str4 = "different_long_string_in_buffer_four".as_bytes();
667 let buffer3 = str3.to_vec().into();
668 let buffer4 = str4.to_vec().into();
669
670 let view3 = ByteView::new(str3.len() as u32, &str3[..4])
671 .with_buffer_index(0)
672 .with_offset(0)
673 .as_u128();
674 let view4 = ByteView::new(str4.len() as u32, &str4[..4])
675 .with_buffer_index(1)
676 .with_offset(0)
677 .as_u128();
678 let view_b =
679 StringViewArray::try_new(vec![view3, view4].into(), vec![buffer3, buffer4], None)
680 .unwrap();
681
682 let indices = &[
683 (0, 0), (1, 0), (0, 1), (1, 1), (0, 0), (1, 1), ];
690
691 let values = interleave(&[&view_a, &view_b], indices).unwrap();
693 let result = values.as_string_view();
694
695 assert_eq!(
696 result.data_buffers().len(),
697 4,
698 "Expected four buffers (two from each input array)"
699 );
700
701 let result_strings: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
702 assert_eq!(
703 result_strings,
704 vec![
705 Some("very_long_string_from_first_buffer".to_string()),
706 Some("another_very_long_string_buffer_three".to_string()),
707 Some("very_long_string_from_second_buffer".to_string()),
708 Some("different_long_string_in_buffer_four".to_string()),
709 Some("very_long_string_from_first_buffer".to_string()),
710 Some("different_long_string_in_buffer_four".to_string()),
711 ]
712 );
713
714 let views = result.views();
715 let buffer_indices: Vec<_> = views
716 .iter()
717 .map(|raw_view| ByteView::from(*raw_view).buffer_index)
718 .collect();
719
720 assert_eq!(
721 buffer_indices,
722 vec![
723 0, 1, 2, 3, 0, 3, ]
730 );
731 }
732}