1use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
21use arrow_array::builder::{BooleanBufferBuilder, PrimitiveBuilder};
22use arrow_array::cast::AsArray;
23use arrow_array::types::*;
24use arrow_array::*;
25use arrow_buffer::{ArrowNativeType, BooleanBuffer, MutableBuffer, NullBuffer, OffsetBuffer};
26use arrow_data::ByteView;
27use arrow_data::transform::MutableArrayData;
28use arrow_schema::{ArrowError, DataType, Fields};
29use std::sync::Arc;
30
31macro_rules! primitive_helper {
32 ($t:ty, $values:ident, $indices:ident, $data_type:ident) => {
33 interleave_primitive::<$t>($values, $indices, $data_type)
34 };
35}
36
37macro_rules! dict_helper {
38 ($t:ty, $values:expr, $indices:expr) => {
39 Ok(Arc::new(interleave_dictionaries::<$t>($values, $indices)?) as _)
40 };
41}
42
43pub fn interleave(
71 values: &[&dyn Array],
72 indices: &[(usize, usize)],
73) -> Result<ArrayRef, ArrowError> {
74 if values.is_empty() {
75 return Err(ArrowError::InvalidArgumentError(
76 "interleave requires input of at least one array".to_string(),
77 ));
78 }
79 let data_type = values[0].data_type();
80
81 for array in values.iter().skip(1) {
82 if array.data_type() != data_type {
83 return Err(ArrowError::InvalidArgumentError(format!(
84 "It is not possible to interleave arrays of different data types ({} and {})",
85 data_type,
86 array.data_type()
87 )));
88 }
89 }
90
91 if indices.is_empty() {
92 return Ok(new_empty_array(data_type));
93 }
94
95 downcast_primitive! {
96 data_type => (primitive_helper, values, indices, data_type),
97 DataType::Utf8 => interleave_bytes::<Utf8Type>(values, indices),
98 DataType::LargeUtf8 => interleave_bytes::<LargeUtf8Type>(values, indices),
99 DataType::Binary => interleave_bytes::<BinaryType>(values, indices),
100 DataType::LargeBinary => interleave_bytes::<LargeBinaryType>(values, indices),
101 DataType::BinaryView => interleave_views::<BinaryViewType>(values, indices),
102 DataType::Utf8View => interleave_views::<StringViewType>(values, indices),
103 DataType::Dictionary(k, _) => downcast_integer! {
104 k.as_ref() => (dict_helper, values, indices),
105 _ => unreachable!("illegal dictionary key type {k}")
106 },
107 DataType::Struct(fields) => interleave_struct(fields, values, indices),
108 _ => interleave_fallback(values, indices)
109 }
110}
111
112struct Interleave<'a, T> {
116 arrays: Vec<&'a T>,
118 nulls: Option<NullBuffer>,
120}
121
122impl<'a, T: Array + 'static> Interleave<'a, T> {
123 fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) -> Self {
124 let mut has_nulls = false;
125 let arrays: Vec<&T> = values
126 .iter()
127 .map(|x| {
128 has_nulls = has_nulls || x.null_count() != 0;
129 x.as_any().downcast_ref().unwrap()
130 })
131 .collect();
132
133 let nulls = match has_nulls {
134 true => {
135 let nulls = BooleanBuffer::collect_bool(indices.len(), |i| {
136 let (a, b) = indices[i];
137 arrays[a].is_valid(b)
138 });
139 Some(nulls.into())
140 }
141 false => None,
142 };
143
144 Self { arrays, nulls }
145 }
146}
147
148fn interleave_primitive<T: ArrowPrimitiveType>(
149 values: &[&dyn Array],
150 indices: &[(usize, usize)],
151 data_type: &DataType,
152) -> Result<ArrayRef, ArrowError> {
153 let interleaved = Interleave::<'_, PrimitiveArray<T>>::new(values, indices);
154
155 let values = indices
156 .iter()
157 .map(|(a, b)| interleaved.arrays[*a].value(*b))
158 .collect::<Vec<_>>();
159
160 let array = PrimitiveArray::<T>::try_new(values.into(), interleaved.nulls)?;
161 Ok(Arc::new(array.with_data_type(data_type.clone())))
162}
163
164fn interleave_bytes<T: ByteArrayType>(
165 values: &[&dyn Array],
166 indices: &[(usize, usize)],
167) -> Result<ArrayRef, ArrowError> {
168 let interleaved = Interleave::<'_, GenericByteArray<T>>::new(values, indices);
169
170 let mut capacity = 0;
171 let mut offsets = Vec::with_capacity(indices.len() + 1);
172 offsets.push(T::Offset::from_usize(0).unwrap());
173 offsets.extend(indices.iter().map(|(a, b)| {
174 let o = interleaved.arrays[*a].value_offsets();
175 let element_len = o[*b + 1].as_usize() - o[*b].as_usize();
176 capacity += element_len;
177 T::Offset::from_usize(capacity).expect("overflow")
178 }));
179
180 let mut values = Vec::with_capacity(capacity);
181 for (a, b) in indices {
182 values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
183 }
184
185 let array = unsafe {
187 let offsets = OffsetBuffer::new_unchecked(offsets.into());
188 GenericByteArray::<T>::new_unchecked(offsets, values.into(), interleaved.nulls)
189 };
190 Ok(Arc::new(array))
191}
192
193fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
194 arrays: &[&dyn Array],
195 indices: &[(usize, usize)],
196) -> Result<ArrayRef, ArrowError> {
197 let dictionaries: Vec<_> = arrays.iter().map(|x| x.as_dictionary::<K>()).collect();
198 if !should_merge_dictionary_values::<K>(&dictionaries, indices.len()) {
199 return interleave_fallback(arrays, indices);
200 }
201
202 let masks: Vec<_> = dictionaries
203 .iter()
204 .enumerate()
205 .map(|(a_idx, dictionary)| {
206 let mut key_mask = BooleanBufferBuilder::new_from_buffer(
207 MutableBuffer::new_null(dictionary.len()),
208 dictionary.len(),
209 );
210
211 for (_, key_idx) in indices.iter().filter(|(a, _)| *a == a_idx) {
212 key_mask.set_bit(*key_idx, true);
213 }
214 key_mask.finish()
215 })
216 .collect();
217
218 let merged = merge_dictionary_values(&dictionaries, Some(&masks))?;
219
220 let mut keys = PrimitiveBuilder::<K>::with_capacity(indices.len());
222 for (a, b) in indices {
223 let old_keys: &PrimitiveArray<K> = dictionaries[*a].keys();
224 match old_keys.is_valid(*b) {
225 true => {
226 let old_key = old_keys.values()[*b];
227 keys.append_value(merged.key_mappings[*a][old_key.as_usize()])
228 }
229 false => keys.append_null(),
230 }
231 }
232 let array = unsafe { DictionaryArray::new_unchecked(keys.finish(), merged.values) };
233 Ok(Arc::new(array))
234}
235
236fn interleave_views<T: ByteViewType>(
237 values: &[&dyn Array],
238 indices: &[(usize, usize)],
239) -> Result<ArrayRef, ArrowError> {
240 let interleaved = Interleave::<'_, GenericByteViewArray<T>>::new(values, indices);
241 let mut buffers = Vec::new();
242
243 let mut offsets = Vec::with_capacity(interleaved.arrays.len() + 1);
245 offsets.push(0);
246 let mut total_buffers = 0;
247 for a in interleaved.arrays.iter() {
248 total_buffers += a.data_buffers().len();
249 offsets.push(total_buffers);
250 }
251
252 let mut buffer_to_new_index = vec![None; total_buffers];
254
255 let views: Vec<u128> = indices
256 .iter()
257 .map(|(array_idx, value_idx)| {
258 let array = interleaved.arrays[*array_idx];
259 let view = array.views().get(*value_idx).unwrap();
260 let view_len = *view as u32;
261 if view_len <= 12 {
262 return *view;
263 }
264 let view = ByteView::from(*view);
266 let buffer_to_new_idx = offsets[*array_idx] + view.buffer_index as usize;
267 let new_buffer_idx: u32 =
268 *buffer_to_new_index[buffer_to_new_idx].get_or_insert_with(|| {
269 buffers.push(array.data_buffers()[view.buffer_index as usize].clone());
270 (buffers.len() - 1) as u32
271 });
272 view.with_buffer_index(new_buffer_idx).as_u128()
273 })
274 .collect();
275
276 let array = unsafe {
277 GenericByteViewArray::<T>::new_unchecked(views.into(), buffers, interleaved.nulls)
278 };
279 Ok(Arc::new(array))
280}
281
282fn interleave_struct(
283 fields: &Fields,
284 values: &[&dyn Array],
285 indices: &[(usize, usize)],
286) -> Result<ArrayRef, ArrowError> {
287 let interleaved = Interleave::<'_, StructArray>::new(values, indices);
288
289 if fields.is_empty() {
290 let array = StructArray::try_new_with_length(
291 fields.clone(),
292 vec![],
293 interleaved.nulls,
294 indices.len(),
295 )?;
296 return Ok(Arc::new(array));
297 }
298
299 let struct_fields_array: Result<Vec<_>, _> = (0..fields.len())
300 .map(|i| {
301 let field_values: Vec<&dyn Array> = interleaved
302 .arrays
303 .iter()
304 .map(|x| x.column(i).as_ref())
305 .collect();
306 interleave(&field_values, indices)
307 })
308 .collect();
309
310 let struct_array =
311 StructArray::try_new(fields.clone(), struct_fields_array?, interleaved.nulls)?;
312 Ok(Arc::new(struct_array))
313}
314
315fn interleave_fallback(
317 values: &[&dyn Array],
318 indices: &[(usize, usize)],
319) -> Result<ArrayRef, ArrowError> {
320 let arrays: Vec<_> = values.iter().map(|x| x.to_data()).collect();
321 let arrays: Vec<_> = arrays.iter().collect();
322 let mut array_data = MutableArrayData::new(arrays, false, indices.len());
323
324 let mut cur_array = indices[0].0;
325 let mut start_row_idx = indices[0].1;
326 let mut end_row_idx = start_row_idx + 1;
327
328 for (array, row) in indices.iter().skip(1).copied() {
329 if array == cur_array && row == end_row_idx {
330 end_row_idx += 1;
332 continue;
333 }
334
335 array_data.extend(cur_array, start_row_idx, end_row_idx);
337
338 cur_array = array;
340 start_row_idx = row;
341 end_row_idx = start_row_idx + 1;
342 }
343
344 array_data.extend(cur_array, start_row_idx, end_row_idx);
346 Ok(make_array(array_data.freeze()))
347}
348
349pub fn interleave_record_batch(
394 record_batches: &[&RecordBatch],
395 indices: &[(usize, usize)],
396) -> Result<RecordBatch, ArrowError> {
397 let schema = record_batches[0].schema();
398 let columns = (0..schema.fields().len())
399 .map(|i| {
400 let column_values: Vec<&dyn Array> = record_batches
401 .iter()
402 .map(|batch| batch.column(i).as_ref())
403 .collect();
404 interleave(&column_values, indices)
405 })
406 .collect::<Result<Vec<_>, _>>()?;
407 RecordBatch::try_new(schema, columns)
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413 use arrow_array::Int32RunArray;
414 use arrow_array::builder::{Int32Builder, ListBuilder, PrimitiveRunBuilder};
415 use arrow_schema::Field;
416
417 #[test]
418 fn test_primitive() {
419 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
420 let b = Int32Array::from_iter_values([5, 6, 7]);
421 let c = Int32Array::from_iter_values([8, 9, 10]);
422 let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
423 let v = values.as_primitive::<Int32Type>();
424 assert_eq!(v.values(), &[4, 4, 10, 8, 6]);
425 }
426
427 #[test]
428 fn test_primitive_nulls() {
429 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
430 let b = Int32Array::from_iter([Some(1), Some(4), None]);
431 let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (0, 2)]).unwrap();
432 let v: Vec<_> = values.as_primitive::<Int32Type>().into_iter().collect();
433 assert_eq!(&v, &[Some(2), None, None, Some(4), Some(3)])
434 }
435
436 #[test]
437 fn test_primitive_empty() {
438 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
439 let v = interleave(&[&a], &[]).unwrap();
440 assert!(v.is_empty());
441 assert_eq!(v.data_type(), &DataType::Int32);
442 }
443
444 #[test]
445 fn test_strings() {
446 let a = StringArray::from_iter_values(["a", "b", "c"]);
447 let b = StringArray::from_iter_values(["hello", "world", "foo"]);
448 let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
449 let v = values.as_string::<i32>();
450 let values: Vec<_> = v.into_iter().collect();
451 assert_eq!(
452 &values,
453 &[
454 Some("c"),
455 Some("c"),
456 Some("hello"),
457 Some("world"),
458 Some("b")
459 ]
460 )
461 }
462
463 #[test]
464 fn test_interleave_dictionary() {
465 let a = DictionaryArray::<Int32Type>::from_iter(["a", "b", "c", "a", "b"]);
466 let b = DictionaryArray::<Int32Type>::from_iter(["a", "c", "a", "c", "a"]);
467
468 let values =
470 interleave(&[&a, &b], &[(0, 2), (0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
471 let v = values.as_dictionary::<Int32Type>();
472 assert_eq!(v.values().len(), 5);
473
474 let vc = v.downcast_dict::<StringArray>().unwrap();
475 let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
476 assert_eq!(&collected, &["c", "c", "c", "a", "c", "b"]);
477
478 let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 1)]).unwrap();
480 let v = values.as_dictionary::<Int32Type>();
481 assert_eq!(v.values().len(), 1);
482
483 let vc = v.downcast_dict::<StringArray>().unwrap();
484 let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
485 assert_eq!(&collected, &["c", "c", "c"]);
486 }
487
488 #[test]
489 fn test_interleave_dictionary_nulls() {
490 let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]);
491 let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]);
492 let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
493 let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
494
495 let expected = vec![Some("fiz"), None, None, Some("foo")];
496
497 let values = interleave(
498 &[&input_1 as _, &input_2 as _],
499 &[(0, 3), (0, 2), (1, 0), (0, 0)],
500 )
501 .unwrap();
502 let dictionary = values.as_dictionary::<Int32Type>();
503 let actual: Vec<Option<&str>> = dictionary
504 .downcast_dict::<StringArray>()
505 .unwrap()
506 .into_iter()
507 .collect();
508
509 assert_eq!(actual, expected);
510 }
511
512 #[test]
513 fn test_lists() {
514 let mut a = ListBuilder::new(Int32Builder::new());
516 a.values().append_value(1);
517 a.values().append_value(2);
518 a.append(true);
519 a.append(false);
520 a.values().append_value(3);
521 a.append(true);
522 let a = a.finish();
523
524 let mut b = ListBuilder::new(Int32Builder::new());
526 b.values().append_value(4);
527 b.append(true);
528 b.append(false);
529 b.values().append_value(5);
530 b.values().append_value(6);
531 b.values().append_null();
532 b.append(true);
533 let b = b.finish();
534
535 let values = interleave(&[&a, &b], &[(0, 2), (0, 1), (1, 0), (1, 2), (1, 1)]).unwrap();
536 let v = values.as_any().downcast_ref::<ListArray>().unwrap();
537
538 let mut expected = ListBuilder::new(Int32Builder::new());
540 expected.values().append_value(3);
541 expected.append(true);
542 expected.append(false);
543 expected.values().append_value(4);
544 expected.append(true);
545 expected.values().append_value(5);
546 expected.values().append_value(6);
547 expected.values().append_null();
548 expected.append(true);
549 expected.append(false);
550 let expected = expected.finish();
551
552 assert_eq!(v, &expected);
553 }
554
555 #[test]
556 fn test_struct_without_nulls() {
557 let fields = Fields::from(vec![
558 Field::new("number_col", DataType::Int32, false),
559 Field::new("string_col", DataType::Utf8, false),
560 ]);
561 let a = {
562 let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
563 let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
564
565 StructArray::try_new(
566 fields.clone(),
567 vec![Arc::new(number_col), Arc::new(string_col)],
568 None,
569 )
570 .unwrap()
571 };
572
573 let b = {
574 let number_col = Int32Array::from_iter_values([5, 6, 7]);
575 let string_col = StringArray::from_iter_values(["hello", "world", "foo"]);
576
577 StructArray::try_new(
578 fields.clone(),
579 vec![Arc::new(number_col), Arc::new(string_col)],
580 None,
581 )
582 .unwrap()
583 };
584
585 let c = {
586 let number_col = Int32Array::from_iter_values([8, 9, 10]);
587 let string_col = StringArray::from_iter_values(["x", "y", "z"]);
588
589 StructArray::try_new(
590 fields.clone(),
591 vec![Arc::new(number_col), Arc::new(string_col)],
592 None,
593 )
594 .unwrap()
595 };
596
597 let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
598 let values_struct = values.as_struct();
599 assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
600 assert_eq!(values_struct.null_count(), 0);
601
602 let values_number = values_struct.column(0).as_primitive::<Int32Type>();
603 assert_eq!(values_number.values(), &[4, 4, 10, 8, 6]);
604 let values_string = values_struct.column(1).as_string::<i32>();
605 let values_string: Vec<_> = values_string.into_iter().collect();
606 assert_eq!(
607 &values_string,
608 &[Some("d"), Some("d"), Some("z"), Some("x"), Some("world")]
609 );
610 }
611
612 #[test]
613 fn test_struct_with_nulls_in_values() {
614 let fields = Fields::from(vec![
615 Field::new("number_col", DataType::Int32, true),
616 Field::new("string_col", DataType::Utf8, true),
617 ]);
618 let a = {
619 let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
620 let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
621
622 StructArray::try_new(
623 fields.clone(),
624 vec![Arc::new(number_col), Arc::new(string_col)],
625 None,
626 )
627 .unwrap()
628 };
629
630 let b = {
631 let number_col = Int32Array::from_iter([Some(1), Some(4), None]);
632 let string_col = StringArray::from(vec![Some("hello"), None, Some("foo")]);
633
634 StructArray::try_new(
635 fields.clone(),
636 vec![Arc::new(number_col), Arc::new(string_col)],
637 None,
638 )
639 .unwrap()
640 };
641
642 let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (1, 1)]).unwrap();
643 let values_struct = values.as_struct();
644 assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
645
646 assert_eq!(values_struct.null_count(), 0);
648
649 let values_number: Vec<_> = values_struct
650 .column(0)
651 .as_primitive::<Int32Type>()
652 .into_iter()
653 .collect();
654 assert_eq!(values_number, &[Some(2), None, None, Some(4), Some(4)]);
655
656 let values_string = values_struct.column(1).as_string::<i32>();
657 let values_string: Vec<_> = values_string.into_iter().collect();
658 assert_eq!(
659 &values_string,
660 &[Some("b"), Some("foo"), Some("foo"), Some("d"), None]
661 );
662 }
663
664 #[test]
665 fn test_struct_with_nulls() {
666 let fields = Fields::from(vec![
667 Field::new("number_col", DataType::Int32, false),
668 Field::new("string_col", DataType::Utf8, false),
669 ]);
670 let a = {
671 let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
672 let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
673
674 StructArray::try_new(
675 fields.clone(),
676 vec![Arc::new(number_col), Arc::new(string_col)],
677 None,
678 )
679 .unwrap()
680 };
681
682 let b = {
683 let number_col = Int32Array::from_iter_values([5, 6, 7]);
684 let string_col = StringArray::from_iter_values(["hello", "world", "foo"]);
685
686 StructArray::try_new(
687 fields.clone(),
688 vec![Arc::new(number_col), Arc::new(string_col)],
689 Some(NullBuffer::from(&[true, false, true])),
690 )
691 .unwrap()
692 };
693
694 let c = {
695 let number_col = Int32Array::from_iter_values([8, 9, 10]);
696 let string_col = StringArray::from_iter_values(["x", "y", "z"]);
697
698 StructArray::try_new(
699 fields.clone(),
700 vec![Arc::new(number_col), Arc::new(string_col)],
701 None,
702 )
703 .unwrap()
704 };
705
706 let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (1, 1), (2, 0)]).unwrap();
707 let values_struct = values.as_struct();
708 assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
709
710 let validity: Vec<bool> = {
711 let null_buffer = values_struct.nulls().expect("should_have_nulls");
712
713 null_buffer.iter().collect()
714 };
715 assert_eq!(validity, &[true, true, true, false, true]);
716 let values_number = values_struct.column(0).as_primitive::<Int32Type>();
717 assert_eq!(values_number.values(), &[4, 4, 10, 6, 8]);
718 let values_string = values_struct.column(1).as_string::<i32>();
719 let values_string: Vec<_> = values_string.into_iter().collect();
720 assert_eq!(
721 &values_string,
722 &[Some("d"), Some("d"), Some("z"), Some("world"), Some("x"),]
723 );
724 }
725
726 #[test]
727 fn test_struct_empty() {
728 let fields = Fields::from(vec![
729 Field::new("number_col", DataType::Int32, false),
730 Field::new("string_col", DataType::Utf8, false),
731 ]);
732 let a = {
733 let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
734 let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
735
736 StructArray::try_new(
737 fields.clone(),
738 vec![Arc::new(number_col), Arc::new(string_col)],
739 None,
740 )
741 .unwrap()
742 };
743 let v = interleave(&[&a], &[]).unwrap();
744 assert!(v.is_empty());
745 assert_eq!(v.data_type(), &DataType::Struct(fields));
746 }
747
748 #[test]
749 fn interleave_sparse_nulls() {
750 let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
751 let keys = Int32Array::from_iter_values(0..10);
752 let dict_a = DictionaryArray::new(keys, Arc::new(values));
753 let values = StringArray::new_null(0);
754 let keys = Int32Array::new_null(10);
755 let dict_b = DictionaryArray::new(keys, Arc::new(values));
756
757 let indices = &[(0, 0), (0, 1), (0, 2), (1, 0)];
758 let array = interleave(&[&dict_a, &dict_b], indices).unwrap();
759
760 let expected =
761 DictionaryArray::<Int32Type>::from_iter(vec![Some("0"), Some("1"), Some("2"), None]);
762 assert_eq!(array.as_ref(), &expected)
763 }
764
765 #[test]
766 fn test_interleave_views() {
767 let values = StringArray::from_iter_values([
768 "hello",
769 "world_long_string_not_inlined",
770 "foo",
771 "bar",
772 "baz",
773 ]);
774 let view_a = StringViewArray::from(&values);
775
776 let values = StringArray::from_iter_values([
777 "test",
778 "data",
779 "more_long_string_not_inlined",
780 "views",
781 "here",
782 ]);
783 let view_b = StringViewArray::from(&values);
784
785 let indices = &[
786 (0, 2), (1, 0), (0, 4), (1, 3), (0, 1), ];
792
793 let values = interleave(&[&view_a, &view_b], indices).unwrap();
795 let result = values.as_string_view();
796 assert_eq!(result.data_buffers().len(), 1);
797
798 let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
799 let fallback_result = fallback.as_string_view();
800 assert_eq!(fallback_result.data_buffers().len(), 2);
802
803 let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
805
806 let fallback_collected: Vec<_> = fallback_result
807 .iter()
808 .map(|x| x.map(|s| s.to_string()))
809 .collect();
810
811 assert_eq!(&collected, &fallback_collected);
812
813 assert_eq!(
814 &collected,
815 &[
816 Some("foo".to_string()),
817 Some("test".to_string()),
818 Some("baz".to_string()),
819 Some("views".to_string()),
820 Some("world_long_string_not_inlined".to_string()),
821 ]
822 );
823 }
824
825 #[test]
826 fn test_interleave_views_with_nulls() {
827 let values = StringArray::from_iter([
828 Some("hello"),
829 None,
830 Some("foo_long_string_not_inlined"),
831 Some("bar"),
832 None,
833 ]);
834 let view_a = StringViewArray::from(&values);
835
836 let values = StringArray::from_iter([
837 Some("test"),
838 Some("data_long_string_not_inlined"),
839 None,
840 None,
841 Some("here"),
842 ]);
843 let view_b = StringViewArray::from(&values);
844
845 let indices = &[
846 (0, 1), (1, 2), (0, 2), (1, 3), (0, 4), ];
852
853 let values = interleave(&[&view_a, &view_b], indices).unwrap();
855 let result = values.as_string_view();
856 assert_eq!(result.data_buffers().len(), 1);
857
858 let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
859 let fallback_result = fallback.as_string_view();
860
861 let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
863
864 let fallback_collected: Vec<_> = fallback_result
865 .iter()
866 .map(|x| x.map(|s| s.to_string()))
867 .collect();
868
869 assert_eq!(&collected, &fallback_collected);
870
871 assert_eq!(
872 &collected,
873 &[
874 None,
875 None,
876 Some("foo_long_string_not_inlined".to_string()),
877 None,
878 None,
879 ]
880 );
881 }
882
883 #[test]
884 fn test_interleave_views_multiple_buffers() {
885 let str1 = "very_long_string_from_first_buffer".as_bytes();
886 let str2 = "very_long_string_from_second_buffer".as_bytes();
887 let buffer1 = str1.to_vec().into();
888 let buffer2 = str2.to_vec().into();
889
890 let view1 = ByteView::new(str1.len() as u32, &str1[..4])
891 .with_buffer_index(0)
892 .with_offset(0)
893 .as_u128();
894 let view2 = ByteView::new(str2.len() as u32, &str2[..4])
895 .with_buffer_index(1)
896 .with_offset(0)
897 .as_u128();
898 let view_a =
899 StringViewArray::try_new(vec![view1, view2].into(), vec![buffer1, buffer2], None)
900 .unwrap();
901
902 let str3 = "another_very_long_string_buffer_three".as_bytes();
903 let str4 = "different_long_string_in_buffer_four".as_bytes();
904 let buffer3 = str3.to_vec().into();
905 let buffer4 = str4.to_vec().into();
906
907 let view3 = ByteView::new(str3.len() as u32, &str3[..4])
908 .with_buffer_index(0)
909 .with_offset(0)
910 .as_u128();
911 let view4 = ByteView::new(str4.len() as u32, &str4[..4])
912 .with_buffer_index(1)
913 .with_offset(0)
914 .as_u128();
915 let view_b =
916 StringViewArray::try_new(vec![view3, view4].into(), vec![buffer3, buffer4], None)
917 .unwrap();
918
919 let indices = &[
920 (0, 0), (1, 0), (0, 1), (1, 1), (0, 0), (1, 1), ];
927
928 let values = interleave(&[&view_a, &view_b], indices).unwrap();
930 let result = values.as_string_view();
931
932 assert_eq!(
933 result.data_buffers().len(),
934 4,
935 "Expected four buffers (two from each input array)"
936 );
937
938 let result_strings: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
939 assert_eq!(
940 result_strings,
941 vec![
942 Some("very_long_string_from_first_buffer".to_string()),
943 Some("another_very_long_string_buffer_three".to_string()),
944 Some("very_long_string_from_second_buffer".to_string()),
945 Some("different_long_string_in_buffer_four".to_string()),
946 Some("very_long_string_from_first_buffer".to_string()),
947 Some("different_long_string_in_buffer_four".to_string()),
948 ]
949 );
950
951 let views = result.views();
952 let buffer_indices: Vec<_> = views
953 .iter()
954 .map(|raw_view| ByteView::from(*raw_view).buffer_index)
955 .collect();
956
957 assert_eq!(
958 buffer_indices,
959 vec![
960 0, 1, 2, 3, 0, 3, ]
967 );
968 }
969
970 #[test]
971 fn test_interleave_run_end_encoded_primitive() {
972 let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
973 builder.extend([1, 1, 2, 2, 2, 3].into_iter().map(Some));
974 let a = builder.finish();
975
976 let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
977 builder.extend([4, 5, 5, 6, 6, 6].into_iter().map(Some));
978 let b = builder.finish();
979
980 let indices = &[(0, 1), (1, 0), (0, 4), (1, 2), (0, 5)];
981 let result = interleave(&[&a, &b], indices).unwrap();
982
983 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
985
986 let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
988
989 let expected = vec![1, 4, 2, 5, 3];
991 let mut actual = Vec::new();
992 for i in 0..result_run_array.len() {
993 let physical_idx = result_run_array.get_physical_index(i);
994 let value = result_run_array
995 .values()
996 .as_primitive::<Int32Type>()
997 .value(physical_idx);
998 actual.push(value);
999 }
1000 assert_eq!(actual, expected);
1001 }
1002
1003 #[test]
1004 fn test_interleave_run_end_encoded_string() {
1005 let a: Int32RunArray = vec!["hello", "hello", "world", "world", "foo"]
1006 .into_iter()
1007 .collect();
1008 let b: Int32RunArray = vec!["bar", "baz", "baz", "qux"].into_iter().collect();
1009
1010 let indices = &[(0, 0), (1, 1), (0, 3), (1, 3), (0, 4)];
1011 let result = interleave(&[&a, &b], indices).unwrap();
1012
1013 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1015
1016 let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1018
1019 let expected = vec!["hello", "baz", "world", "qux", "foo"];
1021 let mut actual = Vec::new();
1022 for i in 0..result_run_array.len() {
1023 let physical_idx = result_run_array.get_physical_index(i);
1024 let value = result_run_array
1025 .values()
1026 .as_string::<i32>()
1027 .value(physical_idx);
1028 actual.push(value);
1029 }
1030 assert_eq!(actual, expected);
1031 }
1032
1033 #[test]
1034 fn test_interleave_run_end_encoded_with_nulls() {
1035 let a: Int32RunArray = vec![Some("a"), Some("a"), None, None, Some("b")]
1036 .into_iter()
1037 .collect();
1038 let b: Int32RunArray = vec![None, Some("c"), Some("c"), Some("d")]
1039 .into_iter()
1040 .collect();
1041
1042 let indices = &[(0, 1), (1, 0), (0, 2), (1, 3), (0, 4)];
1043 let result = interleave(&[&a, &b], indices).unwrap();
1044
1045 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1047
1048 let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1050
1051 let expected = vec![Some("a"), None, None, Some("d"), Some("b")];
1053 let mut actual = Vec::new();
1054 for i in 0..result_run_array.len() {
1055 let physical_idx = result_run_array.get_physical_index(i);
1056 if result_run_array.values().is_null(physical_idx) {
1057 actual.push(None);
1058 } else {
1059 let value = result_run_array
1060 .values()
1061 .as_string::<i32>()
1062 .value(physical_idx);
1063 actual.push(Some(value));
1064 }
1065 }
1066 assert_eq!(actual, expected);
1067 }
1068
1069 #[test]
1070 fn test_interleave_run_end_encoded_different_run_types() {
1071 let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1072 builder.extend([1, 1, 2, 3, 3].into_iter().map(Some));
1073 let a = builder.finish();
1074
1075 let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1076 builder.extend([4, 5, 5, 6].into_iter().map(Some));
1077 let b = builder.finish();
1078
1079 let indices = &[(0, 0), (1, 1), (0, 3), (1, 3)];
1080 let result = interleave(&[&a, &b], indices).unwrap();
1081
1082 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1084
1085 let result_run_array: &RunArray<Int16Type> = result.as_any().downcast_ref().unwrap();
1087
1088 let expected = vec![1, 5, 3, 6];
1090 let mut actual = Vec::new();
1091 for i in 0..result_run_array.len() {
1092 let physical_idx = result_run_array.get_physical_index(i);
1093 let value = result_run_array
1094 .values()
1095 .as_primitive::<Int32Type>()
1096 .value(physical_idx);
1097 actual.push(value);
1098 }
1099 assert_eq!(actual, expected);
1100 }
1101
1102 #[test]
1103 fn test_interleave_run_end_encoded_mixed_run_lengths() {
1104 let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
1105 builder.extend([1, 2, 2, 2, 2, 3, 3, 4].into_iter().map(Some));
1106 let a = builder.finish();
1107
1108 let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
1109 builder.extend([5, 5, 5, 6, 7, 7, 8, 8].into_iter().map(Some));
1110 let b = builder.finish();
1111
1112 let indices = &[
1113 (0, 0), (1, 2), (0, 3), (1, 3), (0, 6), (1, 6), (0, 7), (1, 4), ];
1122 let result = interleave(&[&a, &b], indices).unwrap();
1123
1124 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1126
1127 let result_run_array: &RunArray<Int64Type> = result.as_any().downcast_ref().unwrap();
1129
1130 let expected = vec![1, 5, 2, 6, 3, 8, 4, 7];
1132 let mut actual = Vec::new();
1133 for i in 0..result_run_array.len() {
1134 let physical_idx = result_run_array.get_physical_index(i);
1135 let value = result_run_array
1136 .values()
1137 .as_primitive::<Int32Type>()
1138 .value(physical_idx);
1139 actual.push(value);
1140 }
1141 assert_eq!(actual, expected);
1142 }
1143
1144 #[test]
1145 fn test_interleave_run_end_encoded_empty_runs() {
1146 let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1147 builder.extend([1].into_iter().map(Some));
1148 let a = builder.finish();
1149
1150 let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1151 builder.extend([2, 2, 2].into_iter().map(Some));
1152 let b = builder.finish();
1153
1154 let indices = &[(0, 0), (1, 1), (1, 2)];
1155 let result = interleave(&[&a, &b], indices).unwrap();
1156
1157 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1159
1160 let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1162
1163 let expected = vec![1, 2, 2];
1165 let mut actual = Vec::new();
1166 for i in 0..result_run_array.len() {
1167 let physical_idx = result_run_array.get_physical_index(i);
1168 let value = result_run_array
1169 .values()
1170 .as_primitive::<Int32Type>()
1171 .value(physical_idx);
1172 actual.push(value);
1173 }
1174 assert_eq!(actual, expected);
1175 }
1176
1177 #[test]
1178 fn test_struct_no_fields() {
1179 let fields = Fields::empty();
1180 let a = StructArray::try_new_with_length(fields.clone(), vec![], None, 10).unwrap();
1181 let v = interleave(&[&a], &[(0, 0)]).unwrap();
1182 assert_eq!(v.len(), 1);
1183 assert_eq!(v.data_type(), &DataType::Struct(fields));
1184 }
1185}