1use crate::concat::concat;
21use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
22use arrow_array::builder::{BooleanBufferBuilder, PrimitiveBuilder};
23use arrow_array::cast::AsArray;
24use arrow_array::types::*;
25use arrow_array::*;
26use arrow_buffer::{ArrowNativeType, BooleanBuffer, MutableBuffer, NullBuffer, OffsetBuffer};
27use arrow_data::ByteView;
28use arrow_data::transform::MutableArrayData;
29use arrow_schema::{ArrowError, DataType, FieldRef, Fields};
30use std::sync::Arc;
31
32macro_rules! primitive_helper {
33 ($t:ty, $values:ident, $indices:ident, $data_type:ident) => {
34 interleave_primitive::<$t>($values, $indices, $data_type)
35 };
36}
37
38macro_rules! dict_helper {
39 ($t:ty, $values:expr, $indices:expr) => {
40 Ok(Arc::new(interleave_dictionaries::<$t>($values, $indices)?) as _)
41 };
42}
43
44pub fn interleave(
72 values: &[&dyn Array],
73 indices: &[(usize, usize)],
74) -> Result<ArrayRef, ArrowError> {
75 if values.is_empty() {
76 return Err(ArrowError::InvalidArgumentError(
77 "interleave requires input of at least one array".to_string(),
78 ));
79 }
80 let data_type = values[0].data_type();
81
82 for array in values.iter().skip(1) {
83 if array.data_type() != data_type {
84 return Err(ArrowError::InvalidArgumentError(format!(
85 "It is not possible to interleave arrays of different data types ({} and {})",
86 data_type,
87 array.data_type()
88 )));
89 }
90 }
91
92 if indices.is_empty() {
93 return Ok(new_empty_array(data_type));
94 }
95
96 downcast_primitive! {
97 data_type => (primitive_helper, values, indices, data_type),
98 DataType::Utf8 => interleave_bytes::<Utf8Type>(values, indices),
99 DataType::LargeUtf8 => interleave_bytes::<LargeUtf8Type>(values, indices),
100 DataType::Binary => interleave_bytes::<BinaryType>(values, indices),
101 DataType::LargeBinary => interleave_bytes::<LargeBinaryType>(values, indices),
102 DataType::BinaryView => interleave_views::<BinaryViewType>(values, indices),
103 DataType::Utf8View => interleave_views::<StringViewType>(values, indices),
104 DataType::Dictionary(k, _) => downcast_integer! {
105 k.as_ref() => (dict_helper, values, indices),
106 _ => unreachable!("illegal dictionary key type {k}")
107 },
108 DataType::Struct(fields) => interleave_struct(fields, values, indices),
109 DataType::List(field) => interleave_list::<i32>(values, indices, field),
110 DataType::LargeList(field) => interleave_list::<i64>(values, indices, field),
111 _ => interleave_fallback(values, indices)
112 }
113}
114
115struct Interleave<'a, T> {
119 arrays: Vec<&'a T>,
121 nulls: Option<NullBuffer>,
123}
124
125impl<'a, T: Array + 'static> Interleave<'a, T> {
126 fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) -> Self {
127 let mut has_nulls = false;
128 let arrays: Vec<&T> = values
129 .iter()
130 .map(|x| {
131 has_nulls = has_nulls || x.null_count() != 0;
132 x.as_any().downcast_ref().unwrap()
133 })
134 .collect();
135
136 let nulls = match has_nulls {
137 true => {
138 let nulls = BooleanBuffer::collect_bool(indices.len(), |i| {
139 let (a, b) = indices[i];
140 arrays[a].is_valid(b)
141 });
142 Some(nulls.into())
143 }
144 false => None,
145 };
146
147 Self { arrays, nulls }
148 }
149}
150
151fn interleave_primitive<T: ArrowPrimitiveType>(
152 values: &[&dyn Array],
153 indices: &[(usize, usize)],
154 data_type: &DataType,
155) -> Result<ArrayRef, ArrowError> {
156 let interleaved = Interleave::<'_, PrimitiveArray<T>>::new(values, indices);
157
158 let values = indices
159 .iter()
160 .map(|(a, b)| interleaved.arrays[*a].value(*b))
161 .collect::<Vec<_>>();
162
163 let array = PrimitiveArray::<T>::try_new(values.into(), interleaved.nulls)?;
164 Ok(Arc::new(array.with_data_type(data_type.clone())))
165}
166
167fn interleave_bytes<T: ByteArrayType>(
168 values: &[&dyn Array],
169 indices: &[(usize, usize)],
170) -> Result<ArrayRef, ArrowError> {
171 let interleaved = Interleave::<'_, GenericByteArray<T>>::new(values, indices);
172
173 let mut capacity = 0;
174 let mut offsets = Vec::with_capacity(indices.len() + 1);
175 offsets.push(T::Offset::from_usize(0).unwrap());
176 offsets.extend(indices.iter().map(|(a, b)| {
177 let o = interleaved.arrays[*a].value_offsets();
178 let element_len = o[*b + 1].as_usize() - o[*b].as_usize();
179 capacity += element_len;
180 T::Offset::from_usize(capacity).expect("overflow")
181 }));
182
183 let mut values = Vec::with_capacity(capacity);
184 for (a, b) in indices {
185 values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
186 }
187
188 let array = unsafe {
190 let offsets = OffsetBuffer::new_unchecked(offsets.into());
191 GenericByteArray::<T>::new_unchecked(offsets, values.into(), interleaved.nulls)
192 };
193 Ok(Arc::new(array))
194}
195
196fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
197 arrays: &[&dyn Array],
198 indices: &[(usize, usize)],
199) -> Result<ArrayRef, ArrowError> {
200 let dictionaries: Vec<_> = arrays.iter().map(|x| x.as_dictionary::<K>()).collect();
201 let (should_merge, has_overflow) =
202 should_merge_dictionary_values::<K>(&dictionaries, indices.len());
203 if !should_merge {
204 return if has_overflow {
205 interleave_fallback(arrays, indices)
206 } else {
207 interleave_fallback_dictionary::<K>(&dictionaries, indices)
208 };
209 }
210
211 let masks: Vec<_> = dictionaries
212 .iter()
213 .enumerate()
214 .map(|(a_idx, dictionary)| {
215 let mut key_mask = BooleanBufferBuilder::new_from_buffer(
216 MutableBuffer::new_null(dictionary.len()),
217 dictionary.len(),
218 );
219
220 for (_, key_idx) in indices.iter().filter(|(a, _)| *a == a_idx) {
221 key_mask.set_bit(*key_idx, true);
222 }
223 key_mask.finish()
224 })
225 .collect();
226
227 let merged = merge_dictionary_values(&dictionaries, Some(&masks))?;
228
229 let mut keys = PrimitiveBuilder::<K>::with_capacity(indices.len());
231 for (a, b) in indices {
232 let old_keys: &PrimitiveArray<K> = dictionaries[*a].keys();
233 match old_keys.is_valid(*b) {
234 true => {
235 let old_key = old_keys.values()[*b];
236 keys.append_value(merged.key_mappings[*a][old_key.as_usize()])
237 }
238 false => keys.append_null(),
239 }
240 }
241 let array = unsafe { DictionaryArray::new_unchecked(keys.finish(), merged.values) };
242 Ok(Arc::new(array))
243}
244
245fn interleave_views<T: ByteViewType>(
246 values: &[&dyn Array],
247 indices: &[(usize, usize)],
248) -> Result<ArrayRef, ArrowError> {
249 let interleaved = Interleave::<'_, GenericByteViewArray<T>>::new(values, indices);
250 let mut buffers = Vec::new();
251
252 let mut offsets = Vec::with_capacity(interleaved.arrays.len() + 1);
254 offsets.push(0);
255 let mut total_buffers = 0;
256 for a in interleaved.arrays.iter() {
257 total_buffers += a.data_buffers().len();
258 offsets.push(total_buffers);
259 }
260
261 let mut buffer_to_new_index = vec![None; total_buffers];
263
264 let views: Vec<u128> = indices
265 .iter()
266 .map(|(array_idx, value_idx)| {
267 let array = interleaved.arrays[*array_idx];
268 let view = array.views().get(*value_idx).unwrap();
269 let view_len = *view as u32;
270 if view_len <= 12 {
271 return *view;
272 }
273 let view = ByteView::from(*view);
275 let buffer_to_new_idx = offsets[*array_idx] + view.buffer_index as usize;
276 let new_buffer_idx: u32 =
277 *buffer_to_new_index[buffer_to_new_idx].get_or_insert_with(|| {
278 buffers.push(array.data_buffers()[view.buffer_index as usize].clone());
279 (buffers.len() - 1) as u32
280 });
281 view.with_buffer_index(new_buffer_idx).as_u128()
282 })
283 .collect();
284
285 let array = unsafe {
286 GenericByteViewArray::<T>::new_unchecked(views.into(), buffers, interleaved.nulls)
287 };
288 Ok(Arc::new(array))
289}
290
291fn interleave_struct(
292 fields: &Fields,
293 values: &[&dyn Array],
294 indices: &[(usize, usize)],
295) -> Result<ArrayRef, ArrowError> {
296 let interleaved = Interleave::<'_, StructArray>::new(values, indices);
297
298 if fields.is_empty() {
299 let array = StructArray::try_new_with_length(
300 fields.clone(),
301 vec![],
302 interleaved.nulls,
303 indices.len(),
304 )?;
305 return Ok(Arc::new(array));
306 }
307
308 let struct_fields_array: Result<Vec<_>, _> = (0..fields.len())
309 .map(|i| {
310 let field_values: Vec<&dyn Array> = interleaved
311 .arrays
312 .iter()
313 .map(|x| x.column(i).as_ref())
314 .collect();
315 interleave(&field_values, indices)
316 })
317 .collect();
318
319 let struct_array =
320 StructArray::try_new(fields.clone(), struct_fields_array?, interleaved.nulls)?;
321 Ok(Arc::new(struct_array))
322}
323
324fn interleave_list<O: OffsetSizeTrait>(
325 values: &[&dyn Array],
326 indices: &[(usize, usize)],
327 field: &FieldRef,
328) -> Result<ArrayRef, ArrowError> {
329 let interleaved = Interleave::<'_, GenericListArray<O>>::new(values, indices);
330
331 let mut capacity = 0usize;
332 let mut offsets = Vec::with_capacity(indices.len() + 1);
333 offsets.push(O::from_usize(0).unwrap());
334 offsets.extend(indices.iter().map(|(array, row)| {
335 let o = interleaved.arrays[*array].value_offsets();
336 let element_len = o[*row + 1].as_usize() - o[*row].as_usize();
337 capacity += element_len;
338 O::from_usize(capacity).expect("offset overflow")
339 }));
340
341 let mut child_indices = Vec::with_capacity(capacity);
342 for (array, row) in indices {
343 let list = interleaved.arrays[*array];
344 let start = list.value_offsets()[*row].as_usize();
345 let end = list.value_offsets()[*row + 1].as_usize();
346 child_indices.extend((start..end).map(|i| (*array, i)));
347 }
348
349 let child_arrays: Vec<&dyn Array> = interleaved
350 .arrays
351 .iter()
352 .map(|list| list.values().as_ref())
353 .collect();
354
355 let interleaved_values = interleave(&child_arrays, &child_indices)?;
356
357 let offsets = OffsetBuffer::new(offsets.into());
358 let list_array = GenericListArray::<O>::new(
359 field.clone(),
360 offsets,
361 interleaved_values,
362 interleaved.nulls,
363 );
364
365 Ok(Arc::new(list_array))
366}
367
368fn interleave_fallback(
370 values: &[&dyn Array],
371 indices: &[(usize, usize)],
372) -> Result<ArrayRef, ArrowError> {
373 let arrays: Vec<_> = values.iter().map(|x| x.to_data()).collect();
374 let arrays: Vec<_> = arrays.iter().collect();
375 let mut array_data = MutableArrayData::new(arrays, false, indices.len());
376
377 let mut cur_array = indices[0].0;
378 let mut start_row_idx = indices[0].1;
379 let mut end_row_idx = start_row_idx + 1;
380
381 for (array, row) in indices.iter().skip(1).copied() {
382 if array == cur_array && row == end_row_idx {
383 end_row_idx += 1;
385 continue;
386 }
387
388 array_data.extend(cur_array, start_row_idx, end_row_idx);
390
391 cur_array = array;
393 start_row_idx = row;
394 end_row_idx = start_row_idx + 1;
395 }
396
397 array_data.extend(cur_array, start_row_idx, end_row_idx);
399 Ok(make_array(array_data.freeze()))
400}
401
402fn interleave_fallback_dictionary<K: ArrowDictionaryKeyType>(
412 dictionaries: &[&DictionaryArray<K>],
413 indices: &[(usize, usize)],
414) -> Result<ArrayRef, ArrowError> {
415 let relative_offsets: Vec<usize> = dictionaries
416 .iter()
417 .scan(0usize, |offset, dict| {
418 let current = *offset;
419 *offset += dict.values().len();
420 Some(current)
421 })
422 .collect();
423 let all_values: Vec<&dyn Array> = dictionaries.iter().map(|d| d.values().as_ref()).collect();
424 let concatenated_values = concat(&all_values)?;
425
426 let any_nulls = dictionaries.iter().any(|d| d.keys().nulls().is_some());
427 let (new_keys, nulls) = if any_nulls {
428 let mut has_nulls = false;
429 let new_keys: Vec<K::Native> = indices
430 .iter()
431 .map(|(array, row)| {
432 let old_keys = dictionaries[*array].keys();
433 if old_keys.is_valid(*row) {
434 let old_key = old_keys.values()[*row].as_usize();
435 K::Native::from_usize(relative_offsets[*array] + old_key)
436 .expect("key overflow should be checked by caller")
437 } else {
438 has_nulls = true;
439 K::Native::ZERO
440 }
441 })
442 .collect();
443
444 let nulls = if has_nulls {
445 let null_buffer = BooleanBuffer::collect_bool(indices.len(), |i| {
446 let (array, row) = indices[i];
447 dictionaries[array].keys().is_valid(row)
448 });
449 Some(NullBuffer::new(null_buffer))
450 } else {
451 None
452 };
453 (new_keys, nulls)
454 } else {
455 let new_keys: Vec<K::Native> = indices
456 .iter()
457 .map(|(array, row)| {
458 let old_key = dictionaries[*array].keys().values()[*row].as_usize();
459 K::Native::from_usize(relative_offsets[*array] + old_key)
460 .expect("key overflow should be checked by caller")
461 })
462 .collect();
463 (new_keys, None)
464 };
465
466 let keys_array = PrimitiveArray::<K>::new(new_keys.into(), nulls);
467 let array = unsafe { DictionaryArray::new_unchecked(keys_array, concatenated_values) };
469 Ok(Arc::new(array))
470}
471
472pub fn interleave_record_batch(
517 record_batches: &[&RecordBatch],
518 indices: &[(usize, usize)],
519) -> Result<RecordBatch, ArrowError> {
520 let schema = record_batches[0].schema();
521 let columns = (0..schema.fields().len())
522 .map(|i| {
523 let column_values: Vec<&dyn Array> = record_batches
524 .iter()
525 .map(|batch| batch.column(i).as_ref())
526 .collect();
527 interleave(&column_values, indices)
528 })
529 .collect::<Result<Vec<_>, _>>()?;
530 RecordBatch::try_new(schema, columns)
531}
532
533#[cfg(test)]
534mod tests {
535 use super::*;
536 use arrow_array::Int32RunArray;
537 use arrow_array::builder::{GenericListBuilder, Int32Builder, PrimitiveRunBuilder};
538 use arrow_array::types::Int8Type;
539 use arrow_schema::Field;
540
541 #[test]
542 fn test_primitive() {
543 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
544 let b = Int32Array::from_iter_values([5, 6, 7]);
545 let c = Int32Array::from_iter_values([8, 9, 10]);
546 let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
547 let v = values.as_primitive::<Int32Type>();
548 assert_eq!(v.values(), &[4, 4, 10, 8, 6]);
549 }
550
551 #[test]
552 fn test_primitive_nulls() {
553 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
554 let b = Int32Array::from_iter([Some(1), Some(4), None]);
555 let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (0, 2)]).unwrap();
556 let v: Vec<_> = values.as_primitive::<Int32Type>().into_iter().collect();
557 assert_eq!(&v, &[Some(2), None, None, Some(4), Some(3)])
558 }
559
560 #[test]
561 fn test_primitive_empty() {
562 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
563 let v = interleave(&[&a], &[]).unwrap();
564 assert!(v.is_empty());
565 assert_eq!(v.data_type(), &DataType::Int32);
566 }
567
568 #[test]
569 fn test_strings() {
570 let a = StringArray::from_iter_values(["a", "b", "c"]);
571 let b = StringArray::from_iter_values(["hello", "world", "foo"]);
572 let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
573 let v = values.as_string::<i32>();
574 let values: Vec<_> = v.into_iter().collect();
575 assert_eq!(
576 &values,
577 &[
578 Some("c"),
579 Some("c"),
580 Some("hello"),
581 Some("world"),
582 Some("b")
583 ]
584 )
585 }
586
587 #[test]
588 fn test_interleave_dictionary() {
589 let a = DictionaryArray::<Int32Type>::from_iter(["a", "b", "c", "a", "b"]);
590 let b = DictionaryArray::<Int32Type>::from_iter(["a", "c", "a", "c", "a"]);
591
592 let values =
594 interleave(&[&a, &b], &[(0, 2), (0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
595 let v = values.as_dictionary::<Int32Type>();
596 assert_eq!(v.values().len(), 5);
597
598 let vc = v.downcast_dict::<StringArray>().unwrap();
599 let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
600 assert_eq!(&collected, &["c", "c", "c", "a", "c", "b"]);
601
602 let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 1)]).unwrap();
604 let v = values.as_dictionary::<Int32Type>();
605 assert_eq!(v.values().len(), 1);
606
607 let vc = v.downcast_dict::<StringArray>().unwrap();
608 let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
609 assert_eq!(&collected, &["c", "c", "c"]);
610 }
611
612 #[test]
613 fn test_interleave_dictionary_nulls() {
614 let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]);
615 let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]);
616 let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
617 let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
618
619 let expected = vec![Some("fiz"), None, None, Some("foo")];
620
621 let values = interleave(
622 &[&input_1 as _, &input_2 as _],
623 &[(0, 3), (0, 2), (1, 0), (0, 0)],
624 )
625 .unwrap();
626 let dictionary = values.as_dictionary::<Int32Type>();
627 let actual: Vec<Option<&str>> = dictionary
628 .downcast_dict::<StringArray>()
629 .unwrap()
630 .into_iter()
631 .collect();
632
633 assert_eq!(actual, expected);
634 }
635
636 #[test]
637 fn test_interleave_dictionary_overflow_same_values() {
638 let values: ArrayRef = Arc::new(StringArray::from_iter_values(
639 (0..50).map(|i| format!("v{i}")),
640 ));
641
642 let dict1 = DictionaryArray::<Int8Type>::new(
648 Int8Array::from_iter_values([0, 1, 2]),
649 values.clone(),
650 );
651 let dict2 = DictionaryArray::<Int8Type>::new(
652 Int8Array::from_iter_values([0, 1, 2]),
653 values.clone(),
654 );
655 let dict3 =
656 DictionaryArray::<Int8Type>::new(Int8Array::from_iter_values([49]), values.clone());
657
658 let indices = &[(0, 0), (1, 0), (2, 0)];
659 let result = interleave(&[&dict1, &dict2, &dict3], indices).unwrap();
660
661 let dict_result = result.as_dictionary::<Int8Type>();
662 let string_result: Vec<_> = dict_result
663 .downcast_dict::<StringArray>()
664 .unwrap()
665 .into_iter()
666 .map(|x| x.unwrap())
667 .collect();
668 assert_eq!(string_result, vec!["v0", "v0", "v49"]);
669 }
670
671 fn test_interleave_lists<O: OffsetSizeTrait>() {
672 let mut a = GenericListBuilder::<O, _>::new(Int32Builder::new());
674 a.values().append_value(1);
675 a.values().append_value(2);
676 a.append(true);
677 a.append(false);
678 a.values().append_value(3);
679 a.append(true);
680 let a = a.finish();
681
682 let mut b = GenericListBuilder::<O, _>::new(Int32Builder::new());
684 b.values().append_value(4);
685 b.append(true);
686 b.append(false);
687 b.values().append_value(5);
688 b.values().append_value(6);
689 b.values().append_null();
690 b.append(true);
691 let b = b.finish();
692
693 let values = interleave(&[&a, &b], &[(0, 2), (0, 1), (1, 0), (1, 2), (1, 1)]).unwrap();
694 let v = values
695 .as_any()
696 .downcast_ref::<GenericListArray<O>>()
697 .unwrap();
698
699 let mut expected = GenericListBuilder::<O, _>::new(Int32Builder::new());
701 expected.values().append_value(3);
702 expected.append(true);
703 expected.append(false);
704 expected.values().append_value(4);
705 expected.append(true);
706 expected.values().append_value(5);
707 expected.values().append_value(6);
708 expected.values().append_null();
709 expected.append(true);
710 expected.append(false);
711 let expected = expected.finish();
712
713 assert_eq!(v, &expected);
714 }
715
716 #[test]
717 fn test_lists() {
718 test_interleave_lists::<i32>();
719 }
720
721 #[test]
722 fn test_large_lists() {
723 test_interleave_lists::<i64>();
724 }
725
726 #[test]
727 fn test_struct_without_nulls() {
728 let fields = Fields::from(vec![
729 Field::new("number_col", DataType::Int32, false),
730 Field::new("string_col", DataType::Utf8, false),
731 ]);
732 let a = {
733 let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
734 let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
735
736 StructArray::try_new(
737 fields.clone(),
738 vec![Arc::new(number_col), Arc::new(string_col)],
739 None,
740 )
741 .unwrap()
742 };
743
744 let b = {
745 let number_col = Int32Array::from_iter_values([5, 6, 7]);
746 let string_col = StringArray::from_iter_values(["hello", "world", "foo"]);
747
748 StructArray::try_new(
749 fields.clone(),
750 vec![Arc::new(number_col), Arc::new(string_col)],
751 None,
752 )
753 .unwrap()
754 };
755
756 let c = {
757 let number_col = Int32Array::from_iter_values([8, 9, 10]);
758 let string_col = StringArray::from_iter_values(["x", "y", "z"]);
759
760 StructArray::try_new(
761 fields.clone(),
762 vec![Arc::new(number_col), Arc::new(string_col)],
763 None,
764 )
765 .unwrap()
766 };
767
768 let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
769 let values_struct = values.as_struct();
770 assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
771 assert_eq!(values_struct.null_count(), 0);
772
773 let values_number = values_struct.column(0).as_primitive::<Int32Type>();
774 assert_eq!(values_number.values(), &[4, 4, 10, 8, 6]);
775 let values_string = values_struct.column(1).as_string::<i32>();
776 let values_string: Vec<_> = values_string.into_iter().collect();
777 assert_eq!(
778 &values_string,
779 &[Some("d"), Some("d"), Some("z"), Some("x"), Some("world")]
780 );
781 }
782
783 #[test]
784 fn test_struct_with_nulls_in_values() {
785 let fields = Fields::from(vec![
786 Field::new("number_col", DataType::Int32, true),
787 Field::new("string_col", DataType::Utf8, true),
788 ]);
789 let a = {
790 let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
791 let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
792
793 StructArray::try_new(
794 fields.clone(),
795 vec![Arc::new(number_col), Arc::new(string_col)],
796 None,
797 )
798 .unwrap()
799 };
800
801 let b = {
802 let number_col = Int32Array::from_iter([Some(1), Some(4), None]);
803 let string_col = StringArray::from(vec![Some("hello"), None, Some("foo")]);
804
805 StructArray::try_new(
806 fields.clone(),
807 vec![Arc::new(number_col), Arc::new(string_col)],
808 None,
809 )
810 .unwrap()
811 };
812
813 let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (1, 1)]).unwrap();
814 let values_struct = values.as_struct();
815 assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
816
817 assert_eq!(values_struct.null_count(), 0);
819
820 let values_number: Vec<_> = values_struct
821 .column(0)
822 .as_primitive::<Int32Type>()
823 .into_iter()
824 .collect();
825 assert_eq!(values_number, &[Some(2), None, None, Some(4), Some(4)]);
826
827 let values_string = values_struct.column(1).as_string::<i32>();
828 let values_string: Vec<_> = values_string.into_iter().collect();
829 assert_eq!(
830 &values_string,
831 &[Some("b"), Some("foo"), Some("foo"), Some("d"), None]
832 );
833 }
834
835 #[test]
836 fn test_struct_with_nulls() {
837 let fields = Fields::from(vec![
838 Field::new("number_col", DataType::Int32, false),
839 Field::new("string_col", DataType::Utf8, false),
840 ]);
841 let a = {
842 let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
843 let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
844
845 StructArray::try_new(
846 fields.clone(),
847 vec![Arc::new(number_col), Arc::new(string_col)],
848 None,
849 )
850 .unwrap()
851 };
852
853 let b = {
854 let number_col = Int32Array::from_iter_values([5, 6, 7]);
855 let string_col = StringArray::from_iter_values(["hello", "world", "foo"]);
856
857 StructArray::try_new(
858 fields.clone(),
859 vec![Arc::new(number_col), Arc::new(string_col)],
860 Some(NullBuffer::from(&[true, false, true])),
861 )
862 .unwrap()
863 };
864
865 let c = {
866 let number_col = Int32Array::from_iter_values([8, 9, 10]);
867 let string_col = StringArray::from_iter_values(["x", "y", "z"]);
868
869 StructArray::try_new(
870 fields.clone(),
871 vec![Arc::new(number_col), Arc::new(string_col)],
872 None,
873 )
874 .unwrap()
875 };
876
877 let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (1, 1), (2, 0)]).unwrap();
878 let values_struct = values.as_struct();
879 assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
880
881 let validity: Vec<bool> = {
882 let null_buffer = values_struct.nulls().expect("should_have_nulls");
883
884 null_buffer.iter().collect()
885 };
886 assert_eq!(validity, &[true, true, true, false, true]);
887 let values_number = values_struct.column(0).as_primitive::<Int32Type>();
888 assert_eq!(values_number.values(), &[4, 4, 10, 6, 8]);
889 let values_string = values_struct.column(1).as_string::<i32>();
890 let values_string: Vec<_> = values_string.into_iter().collect();
891 assert_eq!(
892 &values_string,
893 &[Some("d"), Some("d"), Some("z"), Some("world"), Some("x"),]
894 );
895 }
896
897 #[test]
898 fn test_struct_empty() {
899 let fields = Fields::from(vec![
900 Field::new("number_col", DataType::Int32, false),
901 Field::new("string_col", DataType::Utf8, false),
902 ]);
903 let a = {
904 let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
905 let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
906
907 StructArray::try_new(
908 fields.clone(),
909 vec![Arc::new(number_col), Arc::new(string_col)],
910 None,
911 )
912 .unwrap()
913 };
914 let v = interleave(&[&a], &[]).unwrap();
915 assert!(v.is_empty());
916 assert_eq!(v.data_type(), &DataType::Struct(fields));
917 }
918
919 #[test]
920 fn interleave_sparse_nulls() {
921 let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
922 let keys = Int32Array::from_iter_values(0..10);
923 let dict_a = DictionaryArray::new(keys, Arc::new(values));
924 let values = StringArray::new_null(0);
925 let keys = Int32Array::new_null(10);
926 let dict_b = DictionaryArray::new(keys, Arc::new(values));
927
928 let indices = &[(0, 0), (0, 1), (0, 2), (1, 0)];
929 let array = interleave(&[&dict_a, &dict_b], indices).unwrap();
930
931 let expected =
932 DictionaryArray::<Int32Type>::from_iter(vec![Some("0"), Some("1"), Some("2"), None]);
933 assert_eq!(array.as_ref(), &expected)
934 }
935
936 #[test]
937 fn test_interleave_views() {
938 let values = StringArray::from_iter_values([
939 "hello",
940 "world_long_string_not_inlined",
941 "foo",
942 "bar",
943 "baz",
944 ]);
945 let view_a = StringViewArray::from(&values);
946
947 let values = StringArray::from_iter_values([
948 "test",
949 "data",
950 "more_long_string_not_inlined",
951 "views",
952 "here",
953 ]);
954 let view_b = StringViewArray::from(&values);
955
956 let indices = &[
957 (0, 2), (1, 0), (0, 4), (1, 3), (0, 1), ];
963
964 let values = interleave(&[&view_a, &view_b], indices).unwrap();
966 let result = values.as_string_view();
967 assert_eq!(result.data_buffers().len(), 1);
968
969 let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
970 let fallback_result = fallback.as_string_view();
971 assert_eq!(fallback_result.data_buffers().len(), 2);
973
974 let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
976
977 let fallback_collected: Vec<_> = fallback_result
978 .iter()
979 .map(|x| x.map(|s| s.to_string()))
980 .collect();
981
982 assert_eq!(&collected, &fallback_collected);
983
984 assert_eq!(
985 &collected,
986 &[
987 Some("foo".to_string()),
988 Some("test".to_string()),
989 Some("baz".to_string()),
990 Some("views".to_string()),
991 Some("world_long_string_not_inlined".to_string()),
992 ]
993 );
994 }
995
996 #[test]
997 fn test_interleave_views_with_nulls() {
998 let values = StringArray::from_iter([
999 Some("hello"),
1000 None,
1001 Some("foo_long_string_not_inlined"),
1002 Some("bar"),
1003 None,
1004 ]);
1005 let view_a = StringViewArray::from(&values);
1006
1007 let values = StringArray::from_iter([
1008 Some("test"),
1009 Some("data_long_string_not_inlined"),
1010 None,
1011 None,
1012 Some("here"),
1013 ]);
1014 let view_b = StringViewArray::from(&values);
1015
1016 let indices = &[
1017 (0, 1), (1, 2), (0, 2), (1, 3), (0, 4), ];
1023
1024 let values = interleave(&[&view_a, &view_b], indices).unwrap();
1026 let result = values.as_string_view();
1027 assert_eq!(result.data_buffers().len(), 1);
1028
1029 let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
1030 let fallback_result = fallback.as_string_view();
1031
1032 let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
1034
1035 let fallback_collected: Vec<_> = fallback_result
1036 .iter()
1037 .map(|x| x.map(|s| s.to_string()))
1038 .collect();
1039
1040 assert_eq!(&collected, &fallback_collected);
1041
1042 assert_eq!(
1043 &collected,
1044 &[
1045 None,
1046 None,
1047 Some("foo_long_string_not_inlined".to_string()),
1048 None,
1049 None,
1050 ]
1051 );
1052 }
1053
1054 #[test]
1055 fn test_interleave_views_multiple_buffers() {
1056 let str1 = "very_long_string_from_first_buffer".as_bytes();
1057 let str2 = "very_long_string_from_second_buffer".as_bytes();
1058 let buffer1 = str1.to_vec().into();
1059 let buffer2 = str2.to_vec().into();
1060
1061 let view1 = ByteView::new(str1.len() as u32, &str1[..4])
1062 .with_buffer_index(0)
1063 .with_offset(0)
1064 .as_u128();
1065 let view2 = ByteView::new(str2.len() as u32, &str2[..4])
1066 .with_buffer_index(1)
1067 .with_offset(0)
1068 .as_u128();
1069 let view_a =
1070 StringViewArray::try_new(vec![view1, view2].into(), vec![buffer1, buffer2], None)
1071 .unwrap();
1072
1073 let str3 = "another_very_long_string_buffer_three".as_bytes();
1074 let str4 = "different_long_string_in_buffer_four".as_bytes();
1075 let buffer3 = str3.to_vec().into();
1076 let buffer4 = str4.to_vec().into();
1077
1078 let view3 = ByteView::new(str3.len() as u32, &str3[..4])
1079 .with_buffer_index(0)
1080 .with_offset(0)
1081 .as_u128();
1082 let view4 = ByteView::new(str4.len() as u32, &str4[..4])
1083 .with_buffer_index(1)
1084 .with_offset(0)
1085 .as_u128();
1086 let view_b =
1087 StringViewArray::try_new(vec![view3, view4].into(), vec![buffer3, buffer4], None)
1088 .unwrap();
1089
1090 let indices = &[
1091 (0, 0), (1, 0), (0, 1), (1, 1), (0, 0), (1, 1), ];
1098
1099 let values = interleave(&[&view_a, &view_b], indices).unwrap();
1101 let result = values.as_string_view();
1102
1103 assert_eq!(
1104 result.data_buffers().len(),
1105 4,
1106 "Expected four buffers (two from each input array)"
1107 );
1108
1109 let result_strings: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
1110 assert_eq!(
1111 result_strings,
1112 vec![
1113 Some("very_long_string_from_first_buffer".to_string()),
1114 Some("another_very_long_string_buffer_three".to_string()),
1115 Some("very_long_string_from_second_buffer".to_string()),
1116 Some("different_long_string_in_buffer_four".to_string()),
1117 Some("very_long_string_from_first_buffer".to_string()),
1118 Some("different_long_string_in_buffer_four".to_string()),
1119 ]
1120 );
1121
1122 let views = result.views();
1123 let buffer_indices: Vec<_> = views
1124 .iter()
1125 .map(|raw_view| ByteView::from(*raw_view).buffer_index)
1126 .collect();
1127
1128 assert_eq!(
1129 buffer_indices,
1130 vec![
1131 0, 1, 2, 3, 0, 3, ]
1138 );
1139 }
1140
1141 #[test]
1142 fn test_interleave_run_end_encoded_primitive() {
1143 let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1144 builder.extend([1, 1, 2, 2, 2, 3].into_iter().map(Some));
1145 let a = builder.finish();
1146
1147 let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1148 builder.extend([4, 5, 5, 6, 6, 6].into_iter().map(Some));
1149 let b = builder.finish();
1150
1151 let indices = &[(0, 1), (1, 0), (0, 4), (1, 2), (0, 5)];
1152 let result = interleave(&[&a, &b], indices).unwrap();
1153
1154 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1156
1157 let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1159
1160 let expected = vec![1, 4, 2, 5, 3];
1162 let mut actual = Vec::new();
1163 for i in 0..result_run_array.len() {
1164 let physical_idx = result_run_array.get_physical_index(i);
1165 let value = result_run_array
1166 .values()
1167 .as_primitive::<Int32Type>()
1168 .value(physical_idx);
1169 actual.push(value);
1170 }
1171 assert_eq!(actual, expected);
1172 }
1173
1174 #[test]
1175 fn test_interleave_run_end_encoded_string() {
1176 let a: Int32RunArray = vec!["hello", "hello", "world", "world", "foo"]
1177 .into_iter()
1178 .collect();
1179 let b: Int32RunArray = vec!["bar", "baz", "baz", "qux"].into_iter().collect();
1180
1181 let indices = &[(0, 0), (1, 1), (0, 3), (1, 3), (0, 4)];
1182 let result = interleave(&[&a, &b], indices).unwrap();
1183
1184 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1186
1187 let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1189
1190 let expected = vec!["hello", "baz", "world", "qux", "foo"];
1192 let mut actual = Vec::new();
1193 for i in 0..result_run_array.len() {
1194 let physical_idx = result_run_array.get_physical_index(i);
1195 let value = result_run_array
1196 .values()
1197 .as_string::<i32>()
1198 .value(physical_idx);
1199 actual.push(value);
1200 }
1201 assert_eq!(actual, expected);
1202 }
1203
1204 #[test]
1205 fn test_interleave_run_end_encoded_with_nulls() {
1206 let a: Int32RunArray = vec![Some("a"), Some("a"), None, None, Some("b")]
1207 .into_iter()
1208 .collect();
1209 let b: Int32RunArray = vec![None, Some("c"), Some("c"), Some("d")]
1210 .into_iter()
1211 .collect();
1212
1213 let indices = &[(0, 1), (1, 0), (0, 2), (1, 3), (0, 4)];
1214 let result = interleave(&[&a, &b], indices).unwrap();
1215
1216 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1218
1219 let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1221
1222 let expected = vec![Some("a"), None, None, Some("d"), Some("b")];
1224 let mut actual = Vec::new();
1225 for i in 0..result_run_array.len() {
1226 let physical_idx = result_run_array.get_physical_index(i);
1227 if result_run_array.values().is_null(physical_idx) {
1228 actual.push(None);
1229 } else {
1230 let value = result_run_array
1231 .values()
1232 .as_string::<i32>()
1233 .value(physical_idx);
1234 actual.push(Some(value));
1235 }
1236 }
1237 assert_eq!(actual, expected);
1238 }
1239
1240 #[test]
1241 fn test_interleave_run_end_encoded_different_run_types() {
1242 let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1243 builder.extend([1, 1, 2, 3, 3].into_iter().map(Some));
1244 let a = builder.finish();
1245
1246 let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1247 builder.extend([4, 5, 5, 6].into_iter().map(Some));
1248 let b = builder.finish();
1249
1250 let indices = &[(0, 0), (1, 1), (0, 3), (1, 3)];
1251 let result = interleave(&[&a, &b], indices).unwrap();
1252
1253 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1255
1256 let result_run_array: &RunArray<Int16Type> = result.as_any().downcast_ref().unwrap();
1258
1259 let expected = vec![1, 5, 3, 6];
1261 let mut actual = Vec::new();
1262 for i in 0..result_run_array.len() {
1263 let physical_idx = result_run_array.get_physical_index(i);
1264 let value = result_run_array
1265 .values()
1266 .as_primitive::<Int32Type>()
1267 .value(physical_idx);
1268 actual.push(value);
1269 }
1270 assert_eq!(actual, expected);
1271 }
1272
1273 #[test]
1274 fn test_interleave_run_end_encoded_mixed_run_lengths() {
1275 let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
1276 builder.extend([1, 2, 2, 2, 2, 3, 3, 4].into_iter().map(Some));
1277 let a = builder.finish();
1278
1279 let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
1280 builder.extend([5, 5, 5, 6, 7, 7, 8, 8].into_iter().map(Some));
1281 let b = builder.finish();
1282
1283 let indices = &[
1284 (0, 0), (1, 2), (0, 3), (1, 3), (0, 6), (1, 6), (0, 7), (1, 4), ];
1293 let result = interleave(&[&a, &b], indices).unwrap();
1294
1295 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1297
1298 let result_run_array: &RunArray<Int64Type> = result.as_any().downcast_ref().unwrap();
1300
1301 let expected = vec![1, 5, 2, 6, 3, 8, 4, 7];
1303 let mut actual = Vec::new();
1304 for i in 0..result_run_array.len() {
1305 let physical_idx = result_run_array.get_physical_index(i);
1306 let value = result_run_array
1307 .values()
1308 .as_primitive::<Int32Type>()
1309 .value(physical_idx);
1310 actual.push(value);
1311 }
1312 assert_eq!(actual, expected);
1313 }
1314
1315 #[test]
1316 fn test_interleave_run_end_encoded_empty_runs() {
1317 let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1318 builder.extend([1].into_iter().map(Some));
1319 let a = builder.finish();
1320
1321 let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1322 builder.extend([2, 2, 2].into_iter().map(Some));
1323 let b = builder.finish();
1324
1325 let indices = &[(0, 0), (1, 1), (1, 2)];
1326 let result = interleave(&[&a, &b], indices).unwrap();
1327
1328 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1330
1331 let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1333
1334 let expected = vec![1, 2, 2];
1336 let mut actual = Vec::new();
1337 for i in 0..result_run_array.len() {
1338 let physical_idx = result_run_array.get_physical_index(i);
1339 let value = result_run_array
1340 .values()
1341 .as_primitive::<Int32Type>()
1342 .value(physical_idx);
1343 actual.push(value);
1344 }
1345 assert_eq!(actual, expected);
1346 }
1347
1348 #[test]
1349 fn test_struct_no_fields() {
1350 let fields = Fields::empty();
1351 let a = StructArray::try_new_with_length(fields.clone(), vec![], None, 10).unwrap();
1352 let v = interleave(&[&a], &[(0, 0)]).unwrap();
1353 assert_eq!(v.len(), 1);
1354 assert_eq!(v.data_type(), &DataType::Struct(fields));
1355 }
1356
1357 #[test]
1358 fn test_interleave_fallback_dictionary_with_nulls() {
1359 let input_1_keys = Int32Array::from_iter([Some(0), None, Some(1)]);
1360 let input_1_values = StringArray::from_iter_values(["foo", "bar"]);
1361 let dict_a = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
1362
1363 let input_2_keys = Int32Array::from_iter([Some(0), Some(1), None]);
1364 let input_2_values = StringArray::from_iter_values(["baz", "qux"]);
1365 let dict_b = DictionaryArray::new(input_2_keys, Arc::new(input_2_values));
1366
1367 let indices = vec![
1368 (0, 0), (0, 1), (1, 0), (1, 2), (0, 2), (1, 1), ];
1375
1376 let result =
1377 interleave_fallback_dictionary::<Int32Type>(&[&dict_a, &dict_b], &indices).unwrap();
1378 let dict_result = result.as_dictionary::<Int32Type>();
1379
1380 let string_result = dict_result.downcast_dict::<StringArray>().unwrap();
1381 let collected: Vec<_> = string_result.into_iter().collect();
1382 assert_eq!(
1383 collected,
1384 vec![
1385 Some("foo"),
1386 None,
1387 Some("baz"),
1388 None,
1389 Some("bar"),
1390 Some("qux")
1391 ]
1392 );
1393 }
1394}