1use crate::concat::concat;
21use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
22use arrow_array::builder::{BooleanBufferBuilder, PrimitiveBuilder};
23use arrow_array::cast::AsArray;
24use arrow_array::types::*;
25use arrow_array::*;
26use arrow_buffer::bit_mask::set_bits;
27use arrow_buffer::bit_util;
28use arrow_buffer::{ArrowNativeType, BooleanBuffer, MutableBuffer, NullBuffer, OffsetBuffer};
29use arrow_data::ByteView;
30use arrow_data::transform::MutableArrayData;
31use arrow_schema::{ArrowError, DataType, FieldRef, Fields};
32use std::sync::Arc;
33
34macro_rules! primitive_helper {
35 ($t:ty, $values:ident, $indices:ident, $data_type:ident) => {
36 interleave_primitive::<$t>($values, $indices, $data_type)
37 };
38}
39
40macro_rules! dict_helper {
41 ($t:ty, $values:expr, $indices:expr) => {
42 interleave_dictionaries::<$t>($values, $indices)
43 };
44}
45
46pub fn interleave(
74 values: &[&dyn Array],
75 indices: &[(usize, usize)],
76) -> Result<ArrayRef, ArrowError> {
77 if values.is_empty() {
78 return Err(ArrowError::InvalidArgumentError(
79 "interleave requires input of at least one array".to_string(),
80 ));
81 }
82 let data_type = values[0].data_type();
83
84 for array in values.iter().skip(1) {
85 if array.data_type() != data_type {
86 return Err(ArrowError::InvalidArgumentError(format!(
87 "It is not possible to interleave arrays of different data types ({} and {})",
88 data_type,
89 array.data_type()
90 )));
91 }
92 }
93
94 if indices.is_empty() {
95 return Ok(new_empty_array(data_type));
96 }
97
98 downcast_primitive! {
99 data_type => (primitive_helper, values, indices, data_type),
100 DataType::Utf8 => interleave_bytes::<Utf8Type>(values, indices),
101 DataType::LargeUtf8 => interleave_bytes::<LargeUtf8Type>(values, indices),
102 DataType::Binary => interleave_bytes::<BinaryType>(values, indices),
103 DataType::LargeBinary => interleave_bytes::<LargeBinaryType>(values, indices),
104 DataType::BinaryView => interleave_views::<BinaryViewType>(values, indices),
105 DataType::Utf8View => interleave_views::<StringViewType>(values, indices),
106 DataType::Dictionary(k, _) => downcast_integer! {
107 k.as_ref() => (dict_helper, values, indices),
108 _ => unreachable!("illegal dictionary key type {k}")
109 },
110 DataType::Struct(fields) => interleave_struct(fields, values, indices),
111 DataType::List(field) => interleave_list::<i32>(values, indices, field),
112 DataType::LargeList(field) => interleave_list::<i64>(values, indices, field),
113 DataType::RunEndEncoded(r, _) => match r.data_type() {
114 DataType::Int16 => interleave_run_end::<Int16Type>(values, indices),
115 DataType::Int32 => interleave_run_end::<Int32Type>(values, indices),
116 DataType::Int64 => interleave_run_end::<Int64Type>(values, indices),
117 t => unreachable!("illegal run-end type {t}"),
118 },
119 DataType::ListView(field) => interleave_list_view::<i32>(values, indices, field),
120 DataType::LargeListView(field) => interleave_list_view::<i64>(values, indices, field),
121 _ => interleave_fallback(values, indices)
122 }
123}
124
125struct Interleave<'a, T> {
129 arrays: Vec<&'a T>,
131 nulls: Option<NullBuffer>,
133}
134
135impl<'a, T: Array + 'static> Interleave<'a, T> {
136 fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) -> Self {
137 let mut has_nulls = false;
138 let arrays: Vec<&T> = values
139 .iter()
140 .map(|x| {
141 has_nulls = has_nulls || x.null_count() != 0;
142 x.as_any().downcast_ref().unwrap()
143 })
144 .collect();
145
146 let nulls = match has_nulls {
147 true => {
148 let nulls = BooleanBuffer::collect_bool(indices.len(), |i| {
149 let (a, b) = indices[i];
150 arrays[a].is_valid(b)
151 });
152 Some(nulls.into())
153 }
154 false => None,
155 };
156
157 Self { arrays, nulls }
158 }
159}
160
161fn interleave_primitive<T: ArrowPrimitiveType>(
162 values: &[&dyn Array],
163 indices: &[(usize, usize)],
164 data_type: &DataType,
165) -> Result<ArrayRef, ArrowError> {
166 let interleaved = Interleave::<'_, PrimitiveArray<T>>::new(values, indices);
167 let arrays = &interleaved.arrays;
168 let len = indices.len();
169
170 let mut output = Vec::with_capacity(len);
171 let dst: *mut T::Native = output.as_mut_ptr();
172 let mut base = 0;
173
174 let chunks = indices.chunks_exact(8);
177 let remainder = chunks.remainder();
178 for chunk in chunks {
179 let v0 = arrays[chunk[0].0].value(chunk[0].1);
180 let v1 = arrays[chunk[1].0].value(chunk[1].1);
181 let v2 = arrays[chunk[2].0].value(chunk[2].1);
182 let v3 = arrays[chunk[3].0].value(chunk[3].1);
183 let v4 = arrays[chunk[4].0].value(chunk[4].1);
184 let v5 = arrays[chunk[5].0].value(chunk[5].1);
185 let v6 = arrays[chunk[6].0].value(chunk[6].1);
186 let v7 = arrays[chunk[7].0].value(chunk[7].1);
187
188 debug_assert!(base + 7 < len);
190 unsafe {
191 dst.add(base).write(v0);
192 dst.add(base + 1).write(v1);
193 dst.add(base + 2).write(v2);
194 dst.add(base + 3).write(v3);
195 dst.add(base + 4).write(v4);
196 dst.add(base + 5).write(v5);
197 dst.add(base + 6).write(v6);
198 dst.add(base + 7).write(v7);
199 }
200 base += 8;
201 }
202
203 for idx in remainder {
204 debug_assert!(base < len);
206 unsafe { dst.add(base).write(arrays[idx.0].value(idx.1)) };
207 base += 1;
208 }
209
210 debug_assert!(base == len);
212 unsafe { output.set_len(len) };
213
214 let array = PrimitiveArray::<T>::try_new(output.into(), interleaved.nulls)?;
215 Ok(Arc::new(array.with_data_type(data_type.clone())))
216}
217
218fn interleave_bytes<T: ByteArrayType>(
219 values: &[&dyn Array],
220 indices: &[(usize, usize)],
221) -> Result<ArrayRef, ArrowError> {
222 let interleaved = Interleave::<'_, GenericByteArray<T>>::new(values, indices);
223
224 let mut capacity = 0;
225 let mut offsets = Vec::with_capacity(indices.len() + 1);
226 offsets.push(T::Offset::from_usize(0).unwrap());
227 for (a, b) in indices {
228 let o = interleaved.arrays[*a].value_offsets();
229 let element_len = o[*b + 1].as_usize() - o[*b].as_usize();
230 capacity += element_len;
231 offsets.push(
232 T::Offset::from_usize(capacity)
233 .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
234 );
235 }
236
237 let mut values = Vec::with_capacity(capacity);
238 for (a, b) in indices {
239 values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
240 }
241
242 let array = unsafe {
244 let offsets = OffsetBuffer::new_unchecked(offsets.into());
245 GenericByteArray::<T>::new_unchecked(offsets, values.into(), interleaved.nulls)
246 };
247 Ok(Arc::new(array))
248}
249
250fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
251 arrays: &[&dyn Array],
252 indices: &[(usize, usize)],
253) -> Result<ArrayRef, ArrowError> {
254 let dictionaries: Vec<_> = arrays.iter().map(|x| x.as_dictionary::<K>()).collect();
255 let (should_merge, has_overflow) =
256 should_merge_dictionary_values::<K>(&dictionaries, indices.len());
257 if !should_merge {
258 return if has_overflow {
259 interleave_fallback(arrays, indices)
260 } else {
261 interleave_fallback_dictionary::<K>(&dictionaries, indices)
262 };
263 }
264
265 let masks: Vec<_> = dictionaries
266 .iter()
267 .enumerate()
268 .map(|(a_idx, dictionary)| {
269 let mut key_mask = BooleanBufferBuilder::new_from_buffer(
270 MutableBuffer::new_null(dictionary.len()),
271 dictionary.len(),
272 );
273
274 for (_, key_idx) in indices.iter().filter(|(a, _)| *a == a_idx) {
275 key_mask.set_bit(*key_idx, true);
276 }
277 key_mask.finish()
278 })
279 .collect();
280
281 let merged = merge_dictionary_values(&dictionaries, Some(&masks))?;
282
283 let mut keys = PrimitiveBuilder::<K>::with_capacity(indices.len());
285 for (a, b) in indices {
286 let old_keys: &PrimitiveArray<K> = dictionaries[*a].keys();
287 match old_keys.is_valid(*b) {
288 true => {
289 let old_key = old_keys.values()[*b];
290 keys.append_value(merged.key_mappings[*a][old_key.as_usize()])
291 }
292 false => keys.append_null(),
293 }
294 }
295 let array = unsafe { DictionaryArray::new_unchecked(keys.finish(), merged.values) };
296 Ok(Arc::new(array))
297}
298
299fn interleave_views<T: ByteViewType>(
300 values: &[&dyn Array],
301 indices: &[(usize, usize)],
302) -> Result<ArrayRef, ArrowError> {
303 let interleaved = Interleave::<'_, GenericByteViewArray<T>>::new(values, indices);
304 let mut buffers = Vec::new();
305
306 let mut offsets = Vec::with_capacity(interleaved.arrays.len() + 1);
308 offsets.push(0);
309 let mut total_buffers = 0;
310 for a in interleaved.arrays.iter() {
311 total_buffers += a.data_buffers().len();
312 offsets.push(total_buffers);
313 }
314
315 let mut buffer_to_new_index = vec![None; total_buffers];
317
318 let views: Vec<u128> = indices
319 .iter()
320 .map(|(array_idx, value_idx)| {
321 let array = interleaved.arrays[*array_idx];
322 let view = array.views().get(*value_idx).unwrap();
323 let view_len = *view as u32;
324 if view_len <= 12 {
325 return *view;
326 }
327 let view = ByteView::from(*view);
329 let buffer_to_new_idx = offsets[*array_idx] + view.buffer_index as usize;
330 let new_buffer_idx: u32 =
331 *buffer_to_new_index[buffer_to_new_idx].get_or_insert_with(|| {
332 buffers.push(array.data_buffers()[view.buffer_index as usize].clone());
333 (buffers.len() - 1) as u32
334 });
335 view.with_buffer_index(new_buffer_idx).as_u128()
336 })
337 .collect();
338
339 let array = unsafe {
340 GenericByteViewArray::<T>::new_unchecked(views.into(), buffers, interleaved.nulls)
341 };
342 Ok(Arc::new(array))
343}
344
345fn interleave_struct(
346 fields: &Fields,
347 values: &[&dyn Array],
348 indices: &[(usize, usize)],
349) -> Result<ArrayRef, ArrowError> {
350 let interleaved = Interleave::<'_, StructArray>::new(values, indices);
351
352 if fields.is_empty() {
353 let array = StructArray::try_new_with_length(
354 fields.clone(),
355 vec![],
356 interleaved.nulls,
357 indices.len(),
358 )?;
359 return Ok(Arc::new(array));
360 }
361
362 let struct_fields_array: Result<Vec<_>, _> = (0..fields.len())
363 .map(|i| {
364 let field_values: Vec<&dyn Array> = interleaved
365 .arrays
366 .iter()
367 .map(|x| x.column(i).as_ref())
368 .collect();
369 interleave(&field_values, indices)
370 })
371 .collect();
372
373 let struct_array =
374 StructArray::try_new(fields.clone(), struct_fields_array?, interleaved.nulls)?;
375 Ok(Arc::new(struct_array))
376}
377
378fn interleave_list_primitive_child<O: OffsetSizeTrait, T: ArrowPrimitiveType>(
379 interleaved: &Interleave<'_, GenericListArray<O>>,
380 indices: &[(usize, usize)],
381 capacity: usize,
382 data_type: &DataType,
383) -> ArrayRef {
384 let child_arrays: Vec<&PrimitiveArray<T>> = interleaved
385 .arrays
386 .iter()
387 .map(|list| list.values().as_primitive::<T>())
388 .collect();
389
390 let has_child_nulls = child_arrays.iter().any(|a| a.null_count() > 0);
391
392 let mut values: Vec<T::Native> = Vec::with_capacity(capacity);
394 for &(array, row) in indices {
395 let o = interleaved.arrays[array].value_offsets();
396 let start = o[row].as_usize();
397 let end = o[row + 1].as_usize();
398 if end > start {
399 values.extend_from_slice(&child_arrays[array].values()[start..end]);
400 }
401 }
402
403 let nulls = if has_child_nulls {
407 let null_byte_len = bit_util::ceil(capacity, 8);
408 let mut output_null_buf = MutableBuffer::from_len_zeroed(null_byte_len);
409
410 let mut offset_write = 0;
411 let mut output_null_count = 0usize;
412 for &(array, row) in indices {
413 let o = interleaved.arrays[array].value_offsets();
414 let start = o[row].as_usize();
415 let end = o[row + 1].as_usize();
416 let len = end - start;
417 if len > 0 {
418 match child_arrays[array].nulls() {
419 Some(null_buffer) => {
420 output_null_count += set_bits(
421 output_null_buf.as_slice_mut(),
422 null_buffer.validity(),
423 offset_write,
424 null_buffer.offset() + start,
425 len,
426 );
427 }
428 None => {
429 let buf = output_null_buf.as_slice_mut();
431 (offset_write..offset_write + len).for_each(|i| bit_util::set_bit(buf, i));
432 }
433 }
434 }
435 offset_write += len;
436 }
437
438 if output_null_count > 0 {
439 let bool_buf = BooleanBuffer::new(output_null_buf.into(), 0, capacity);
440 Some(unsafe { NullBuffer::new_unchecked(bool_buf, output_null_count) })
442 } else {
443 None
444 }
445 } else {
446 None
447 };
448
449 Arc::new(PrimitiveArray::<T>::new(values.into(), nulls).with_data_type(data_type.clone()))
450}
451
452fn interleave_list<O: OffsetSizeTrait>(
453 values: &[&dyn Array],
454 indices: &[(usize, usize)],
455 field: &FieldRef,
456) -> Result<ArrayRef, ArrowError> {
457 let interleaved = Interleave::<'_, GenericListArray<O>>::new(values, indices);
458
459 let mut capacity = 0usize;
461 let mut offsets = Vec::with_capacity(indices.len() + 1);
462 offsets.push(O::from_usize(0).unwrap());
463 for (array, row) in indices {
464 let o = interleaved.arrays[*array].value_offsets();
465 let element_len = o[*row + 1].as_usize() - o[*row].as_usize();
466 capacity += element_len;
467 offsets.push(
468 O::from_usize(capacity).ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
469 );
470 }
471
472 macro_rules! list_primitive_helper {
474 ($t:ty) => {
475 interleave_list_primitive_child::<O, $t>(
476 &interleaved,
477 indices,
478 capacity,
479 field.data_type(),
480 )
481 };
482 }
483
484 let child_values = downcast_primitive! {
485 field.data_type() => (list_primitive_helper),
489 _ => {
490 let mut child_indices = Vec::with_capacity(capacity);
493 for (array, row) in indices {
494 let list = interleaved.arrays[*array];
495 let start = list.value_offsets()[*row].as_usize();
496 let end = list.value_offsets()[*row + 1].as_usize();
497 child_indices.extend((start..end).map(|i| (*array, i)));
498 }
499
500 let child_arrays: Vec<&dyn Array> = interleaved
501 .arrays
502 .iter()
503 .map(|list| list.values().as_ref())
504 .collect();
505 interleave(&child_arrays, &child_indices)?
506 }
507 };
508
509 let offsets = OffsetBuffer::new(offsets.into());
510 let list_array =
511 GenericListArray::<O>::new(field.clone(), offsets, child_values, interleaved.nulls);
512
513 Ok(Arc::new(list_array))
514}
515
516fn interleave_run_end<R: RunEndIndexType>(
518 values: &[&dyn Array],
519 indices: &[(usize, usize)],
520) -> Result<ArrayRef, ArrowError> {
521 if indices.is_empty() {
522 return Ok(new_empty_array(values[0].data_type()));
523 }
524
525 let n = indices.len();
526 R::Native::from_usize(n).ok_or_else(|| {
527 ArrowError::ComputeError(format!(
528 "interleave_run_end: output length {n} does not fit run-end type"
529 ))
530 })?;
531
532 let runs: Vec<&RunArray<R>> = values.iter().map(|a| a.as_run::<R>()).collect();
533 let value_arrays: Vec<&dyn Array> = runs.iter().map(|r| r.values().as_ref()).collect();
534
535 let mut phys_pairs: Vec<(usize, usize)> = vec![(0, 0); n];
538 let mut grouped: Vec<(Vec<R::Native>, Vec<usize>)> =
539 (0..runs.len()).map(|_| (Vec::new(), Vec::new())).collect();
540 for (out_pos, &(arr, row)) in indices.iter().enumerate() {
541 let row = R::Native::from_usize(row).ok_or_else(|| {
542 ArrowError::InvalidArgumentError(format!(
543 "interleave_run_end: row index {row} not representable as run-end type {}",
544 R::DATA_TYPE
545 ))
546 })?;
547 grouped[arr].0.push(row);
548 grouped[arr].1.push(out_pos);
549 }
550 for (arr_idx, (logical_rows, out_positions)) in grouped.into_iter().enumerate() {
551 let phys = runs[arr_idx].get_physical_indices(&logical_rows)?;
552 for (p, out_pos) in phys.iter().zip(out_positions.iter()) {
553 phys_pairs[*out_pos] = (arr_idx, *p);
554 }
555 }
556
557 let mut run_ends_buf: Vec<R::Native> = Vec::with_capacity(n);
562 let mut dedup_pairs: Vec<(usize, usize)> = Vec::with_capacity(n);
563 dedup_pairs.push(phys_pairs[0]);
564 for i in 1..n {
565 if phys_pairs[i] != phys_pairs[i - 1] {
566 run_ends_buf.push(R::Native::from_usize(i).unwrap());
567 dedup_pairs.push(phys_pairs[i]);
568 }
569 }
570 run_ends_buf.push(R::Native::from_usize(n).unwrap());
571
572 let taken_values = interleave(&value_arrays, &dedup_pairs)?;
573 let run_ends = PrimitiveArray::<R>::from_iter_values(run_ends_buf);
574
575 Ok(Arc::new(RunArray::<R>::try_new(
576 &run_ends,
577 taken_values.as_ref(),
578 )?))
579}
580
581fn interleave_list_view<O: OffsetSizeTrait>(
582 values: &[&dyn Array],
583 indices: &[(usize, usize)],
584 field: &FieldRef,
585) -> Result<ArrayRef, ArrowError> {
586 let interleaved = Interleave::<'_, GenericListViewArray<O>>::new(values, indices);
587
588 let concat_cost: usize = interleaved.arrays.iter().map(|lv| lv.values().len()).sum();
593 let per_row_cost: usize = indices
594 .iter()
595 .map(|&(a, r)| interleaved.arrays[a].sizes()[r].as_usize())
596 .sum();
597
598 if per_row_cost <= concat_cost {
599 interleave_list_view_copy::<O>(&interleaved, indices, field)
600 } else {
601 interleave_list_view_concat::<O>(&interleaved, indices, field)
602 }
603}
604
605fn interleave_list_view_copy<O: OffsetSizeTrait>(
607 interleaved: &Interleave<'_, GenericListViewArray<O>>,
608 indices: &[(usize, usize)],
609 field: &FieldRef,
610) -> Result<ArrayRef, ArrowError> {
611 let mut capacity = 0usize;
612 let mut offsets = Vec::with_capacity(indices.len());
613 let mut sizes = Vec::with_capacity(indices.len());
614 for &(array_idx, row_idx) in indices {
615 let list = interleaved.arrays[array_idx];
616 let size = list.sizes()[row_idx].as_usize();
617 offsets.push(
618 O::from_usize(capacity).ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
619 );
620 sizes.push(O::from_usize(size).ok_or_else(|| ArrowError::OffsetOverflowError(size))?);
621 capacity += size;
622 }
623
624 let child_data: Vec<_> = interleaved
625 .arrays
626 .iter()
627 .map(|list| list.values().to_data())
628 .collect();
629 let child_data_refs: Vec<_> = child_data.iter().collect();
630 let mut mutable_child = MutableArrayData::new(child_data_refs, false, capacity);
631 for &(array_idx, row_idx) in indices {
632 let list = interleaved.arrays[array_idx];
633 let start = list.offsets()[row_idx].as_usize();
634 let size = list.sizes()[row_idx].as_usize();
635 if size > 0 {
636 mutable_child.extend(array_idx, start, start + size);
637 }
638 }
639
640 Ok(Arc::new(GenericListViewArray::<O>::new(
641 field.clone(),
642 offsets.into(),
643 sizes.into(),
644 make_array(mutable_child.freeze()),
645 interleaved.nulls.clone(),
646 )))
647}
648
649fn interleave_list_view_concat<O: OffsetSizeTrait>(
652 interleaved: &Interleave<'_, GenericListViewArray<O>>,
653 indices: &[(usize, usize)],
654 field: &FieldRef,
655) -> Result<ArrayRef, ArrowError> {
656 let child_arrays: Vec<&dyn Array> = interleaved
657 .arrays
658 .iter()
659 .map(|lv| lv.values().as_ref())
660 .collect();
661 let mut base_offsets = Vec::with_capacity(interleaved.arrays.len());
662 let mut running = 0usize;
663 for lv in &interleaved.arrays {
664 base_offsets.push(running);
665 running += lv.values().len();
666 }
667 let combined_values = concat(&child_arrays)?;
668
669 let mut new_offsets = Vec::with_capacity(indices.len());
670 let mut new_sizes = Vec::with_capacity(indices.len());
671 for &(array_idx, row_idx) in indices {
672 let lv = interleaved.arrays[array_idx];
673 let adjusted = lv.offsets()[row_idx].as_usize() + base_offsets[array_idx];
674 new_offsets.push(
675 O::from_usize(adjusted).ok_or_else(|| ArrowError::OffsetOverflowError(adjusted))?,
676 );
677 new_sizes.push(lv.sizes()[row_idx]);
678 }
679
680 Ok(Arc::new(GenericListViewArray::<O>::new(
681 field.clone(),
682 new_offsets.into(),
683 new_sizes.into(),
684 combined_values,
685 interleaved.nulls.clone(),
686 )))
687}
688
689fn interleave_fallback(
691 values: &[&dyn Array],
692 indices: &[(usize, usize)],
693) -> Result<ArrayRef, ArrowError> {
694 let arrays: Vec<_> = values.iter().map(|x| x.to_data()).collect();
695 let arrays: Vec<_> = arrays.iter().collect();
696 let mut array_data = MutableArrayData::new(arrays, false, indices.len());
697
698 let mut cur_array = indices[0].0;
699 let mut start_row_idx = indices[0].1;
700 let mut end_row_idx = start_row_idx + 1;
701
702 for (array, row) in indices.iter().skip(1).copied() {
703 if array == cur_array && row == end_row_idx {
704 end_row_idx += 1;
706 continue;
707 }
708
709 array_data.extend(cur_array, start_row_idx, end_row_idx);
711
712 cur_array = array;
714 start_row_idx = row;
715 end_row_idx = start_row_idx + 1;
716 }
717
718 array_data.extend(cur_array, start_row_idx, end_row_idx);
720 Ok(make_array(array_data.freeze()))
721}
722
723fn interleave_fallback_dictionary<K: ArrowDictionaryKeyType>(
733 dictionaries: &[&DictionaryArray<K>],
734 indices: &[(usize, usize)],
735) -> Result<ArrayRef, ArrowError> {
736 let relative_offsets: Vec<usize> = dictionaries
737 .iter()
738 .scan(0usize, |offset, dict| {
739 let current = *offset;
740 *offset += dict.values().len();
741 Some(current)
742 })
743 .collect();
744 let all_values: Vec<&dyn Array> = dictionaries.iter().map(|d| d.values().as_ref()).collect();
745 let concatenated_values = concat(&all_values)?;
746
747 let any_nulls = dictionaries.iter().any(|d| d.keys().nulls().is_some());
748 let (new_keys, nulls) = if any_nulls {
749 let mut has_nulls = false;
750 let new_keys: Vec<K::Native> = indices
751 .iter()
752 .map(|(array, row)| {
753 let old_keys = dictionaries[*array].keys();
754 if old_keys.is_valid(*row) {
755 let old_key = old_keys.values()[*row].as_usize();
756 K::Native::from_usize(relative_offsets[*array] + old_key)
757 .expect("key overflow should be checked by caller")
758 } else {
759 has_nulls = true;
760 K::Native::ZERO
761 }
762 })
763 .collect();
764
765 let nulls = if has_nulls {
766 let null_buffer = BooleanBuffer::collect_bool(indices.len(), |i| {
767 let (array, row) = indices[i];
768 dictionaries[array].keys().is_valid(row)
769 });
770 Some(NullBuffer::new(null_buffer))
771 } else {
772 None
773 };
774 (new_keys, nulls)
775 } else {
776 let new_keys: Vec<K::Native> = indices
777 .iter()
778 .map(|(array, row)| {
779 let old_key = dictionaries[*array].keys().values()[*row].as_usize();
780 K::Native::from_usize(relative_offsets[*array] + old_key)
781 .expect("key overflow should be checked by caller")
782 })
783 .collect();
784 (new_keys, None)
785 };
786
787 let keys_array = PrimitiveArray::<K>::new(new_keys.into(), nulls);
788 let array = unsafe { DictionaryArray::new_unchecked(keys_array, concatenated_values) };
790 Ok(Arc::new(array))
791}
792
793pub fn interleave_record_batch(
838 record_batches: &[&RecordBatch],
839 indices: &[(usize, usize)],
840) -> Result<RecordBatch, ArrowError> {
841 let schema = record_batches[0].schema();
842 let columns = (0..schema.fields().len())
843 .map(|i| {
844 let column_values: Vec<&dyn Array> = record_batches
845 .iter()
846 .map(|batch| batch.column(i).as_ref())
847 .collect();
848 interleave(&column_values, indices)
849 })
850 .collect::<Result<Vec<_>, _>>()?;
851 RecordBatch::try_new(schema, columns)
852}
853
854#[cfg(test)]
855mod tests {
856 use super::*;
857 use arrow_array::Int32RunArray;
858 use arrow_array::builder::{GenericListBuilder, Int32Builder, PrimitiveRunBuilder};
859 use arrow_array::types::Int8Type;
860 use arrow_buffer::ScalarBuffer;
861 use arrow_schema::Field;
862
863 #[test]
864 fn test_primitive() {
865 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
866 let b = Int32Array::from_iter_values([5, 6, 7]);
867 let c = Int32Array::from_iter_values([8, 9, 10]);
868 let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
869 let v = values.as_primitive::<Int32Type>();
870 assert_eq!(v.values(), &[4, 4, 10, 8, 6]);
871 }
872
873 #[test]
874 fn test_primitive_nulls() {
875 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
876 let b = Int32Array::from_iter([Some(1), Some(4), None]);
877 let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (0, 2)]).unwrap();
878 let v: Vec<_> = values.as_primitive::<Int32Type>().into_iter().collect();
879 assert_eq!(&v, &[Some(2), None, None, Some(4), Some(3)])
880 }
881
882 #[test]
883 fn test_primitive_empty() {
884 let a = Int32Array::from_iter_values([1, 2, 3, 4]);
885 let v = interleave(&[&a], &[]).unwrap();
886 assert!(v.is_empty());
887 assert_eq!(v.data_type(), &DataType::Int32);
888 }
889
890 #[test]
891 fn test_strings() {
892 let a = StringArray::from_iter_values(["a", "b", "c"]);
893 let b = StringArray::from_iter_values(["hello", "world", "foo"]);
894 let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
895 let v = values.as_string::<i32>();
896 let values: Vec<_> = v.into_iter().collect();
897 assert_eq!(
898 &values,
899 &[
900 Some("c"),
901 Some("c"),
902 Some("hello"),
903 Some("world"),
904 Some("b")
905 ]
906 )
907 }
908
909 #[test]
910 fn test_interleave_dictionary() {
911 let a = DictionaryArray::<Int32Type>::from_iter(["a", "b", "c", "a", "b"]);
912 let b = DictionaryArray::<Int32Type>::from_iter(["a", "c", "a", "c", "a"]);
913
914 let values =
916 interleave(&[&a, &b], &[(0, 2), (0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
917 let v = values.as_dictionary::<Int32Type>();
918 assert_eq!(v.values().len(), 5);
919
920 let vc = v.downcast_dict::<StringArray>().unwrap();
921 let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
922 assert_eq!(&collected, &["c", "c", "c", "a", "c", "b"]);
923
924 let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 1)]).unwrap();
926 let v = values.as_dictionary::<Int32Type>();
927 assert_eq!(v.values().len(), 1);
928
929 let vc = v.downcast_dict::<StringArray>().unwrap();
930 let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
931 assert_eq!(&collected, &["c", "c", "c"]);
932 }
933
934 #[test]
935 fn test_interleave_dictionary_nulls() {
936 let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]);
937 let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]);
938 let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
939 let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
940
941 let expected = vec![Some("fiz"), None, None, Some("foo")];
942
943 let values = interleave(
944 &[&input_1 as _, &input_2 as _],
945 &[(0, 3), (0, 2), (1, 0), (0, 0)],
946 )
947 .unwrap();
948 let dictionary = values.as_dictionary::<Int32Type>();
949 let actual: Vec<Option<&str>> = dictionary
950 .downcast_dict::<StringArray>()
951 .unwrap()
952 .into_iter()
953 .collect();
954
955 assert_eq!(actual, expected);
956 }
957
958 #[test]
959 fn test_interleave_dictionary_overflow_same_values() {
960 let values: ArrayRef = Arc::new(StringArray::from_iter_values(
961 (0..50).map(|i| format!("v{i}")),
962 ));
963
964 let dict1 = DictionaryArray::<Int8Type>::new(
970 Int8Array::from_iter_values([0, 1, 2]),
971 values.clone(),
972 );
973 let dict2 = DictionaryArray::<Int8Type>::new(
974 Int8Array::from_iter_values([0, 1, 2]),
975 values.clone(),
976 );
977 let dict3 =
978 DictionaryArray::<Int8Type>::new(Int8Array::from_iter_values([49]), values.clone());
979
980 let indices = &[(0, 0), (1, 0), (2, 0)];
981 let result = interleave(&[&dict1, &dict2, &dict3], indices).unwrap();
982
983 let dict_result = result.as_dictionary::<Int8Type>();
984 let string_result: Vec<_> = dict_result
985 .downcast_dict::<StringArray>()
986 .unwrap()
987 .into_iter()
988 .map(|x| x.unwrap())
989 .collect();
990 assert_eq!(string_result, vec!["v0", "v0", "v49"]);
991 }
992
993 fn test_interleave_lists<O: OffsetSizeTrait>() {
994 let mut a = GenericListBuilder::<O, _>::new(Int32Builder::new());
996 a.values().append_value(1);
997 a.values().append_value(2);
998 a.append(true);
999 a.append(false);
1000 a.values().append_value(3);
1001 a.append(true);
1002 let a = a.finish();
1003
1004 let mut b = GenericListBuilder::<O, _>::new(Int32Builder::new());
1006 b.values().append_value(4);
1007 b.append(true);
1008 b.append(false);
1009 b.values().append_value(5);
1010 b.values().append_value(6);
1011 b.values().append_null();
1012 b.append(true);
1013 let b = b.finish();
1014
1015 let values = interleave(&[&a, &b], &[(0, 2), (0, 1), (1, 0), (1, 2), (1, 1)]).unwrap();
1016 let v = values
1017 .as_any()
1018 .downcast_ref::<GenericListArray<O>>()
1019 .unwrap();
1020
1021 let mut expected = GenericListBuilder::<O, _>::new(Int32Builder::new());
1023 expected.values().append_value(3);
1024 expected.append(true);
1025 expected.append(false);
1026 expected.values().append_value(4);
1027 expected.append(true);
1028 expected.values().append_value(5);
1029 expected.values().append_value(6);
1030 expected.values().append_null();
1031 expected.append(true);
1032 expected.append(false);
1033 let expected = expected.finish();
1034
1035 assert_eq!(v, &expected);
1036 }
1037
1038 #[test]
1039 fn test_lists() {
1040 test_interleave_lists::<i32>();
1041 }
1042
1043 #[test]
1044 fn test_large_lists() {
1045 test_interleave_lists::<i64>();
1046 }
1047
1048 fn test_interleave_list_views<O: OffsetSizeTrait>() {
1049 let mut a = GenericListBuilder::<O, _>::new(Int32Builder::new());
1051 a.values().append_value(1);
1052 a.values().append_value(2);
1053 a.append(true);
1054 a.append(false);
1055 a.values().append_value(3);
1056 a.append(true);
1057 let a: GenericListViewArray<O> = a.finish().into();
1058
1059 let mut b = GenericListBuilder::<O, _>::new(Int32Builder::new());
1061 b.values().append_value(4);
1062 b.append(true);
1063 b.append(false);
1064 b.values().append_value(5);
1065 b.values().append_value(6);
1066 b.values().append_null();
1067 b.append(true);
1068 let b: GenericListViewArray<O> = b.finish().into();
1069
1070 let values = interleave(&[&a, &b], &[(0, 2), (0, 1), (1, 0), (1, 2), (1, 1)]).unwrap();
1071 let v = values
1072 .as_any()
1073 .downcast_ref::<GenericListViewArray<O>>()
1074 .unwrap();
1075
1076 let mut expected = GenericListBuilder::<O, _>::new(Int32Builder::new());
1078 expected.values().append_value(3);
1079 expected.append(true);
1080 expected.append(false);
1081 expected.values().append_value(4);
1082 expected.append(true);
1083 expected.values().append_value(5);
1084 expected.values().append_value(6);
1085 expected.values().append_null();
1086 expected.append(true);
1087 expected.append(false);
1088 let expected: GenericListViewArray<O> = expected.finish().into();
1089
1090 assert_eq!(v, &expected);
1091 }
1092
1093 #[test]
1094 fn test_list_views() {
1095 test_interleave_list_views::<i32>();
1096 }
1097
1098 #[test]
1099 fn test_large_list_views() {
1100 test_interleave_list_views::<i64>();
1101 }
1102
1103 #[test]
1104 fn test_interleave_list_view_overlapping() {
1105 let field = Arc::new(Field::new_list_field(DataType::Int64, false));
1106
1107 let lv_a = ListViewArray::new(
1111 Arc::clone(&field),
1112 ScalarBuffer::from(vec![0i32, 0, 0, 0, 0, 5, 5, 5, 5, 5]),
1113 ScalarBuffer::from(vec![5i32; 10]),
1114 Arc::new(Int64Array::from_iter_values(0..10)),
1115 None,
1116 );
1117
1118 let lv_b = ListViewArray::new(
1122 Arc::clone(&field),
1123 ScalarBuffer::from(vec![0i32, 0, 0, 0, 3, 3, 3, 3]),
1124 ScalarBuffer::from(vec![3i32; 8]),
1125 Arc::new(Int64Array::from_iter_values(100..106)),
1126 None,
1127 );
1128
1129 let indices: Vec<(usize, usize)> = vec![
1130 (0, 0),
1131 (1, 0),
1132 (0, 5),
1133 (1, 4),
1134 (0, 1),
1135 (1, 1),
1136 (0, 6),
1137 (1, 5),
1138 ];
1139 let result = interleave(&[&lv_a as &dyn Array, &lv_b as &dyn Array], &indices).unwrap();
1140 result
1141 .to_data()
1142 .validate_full()
1143 .expect("result must be valid");
1144
1145 let result_lv = result.as_list_view::<i32>();
1146 assert_eq!(result_lv.len(), 8);
1147 assert_eq!(
1148 result_lv.value(0).as_primitive::<Int64Type>().values(),
1149 &[0, 1, 2, 3, 4]
1150 );
1151 assert_eq!(
1152 result_lv.value(1).as_primitive::<Int64Type>().values(),
1153 &[100, 101, 102]
1154 );
1155 assert_eq!(
1156 result_lv.value(2).as_primitive::<Int64Type>().values(),
1157 &[5, 6, 7, 8, 9]
1158 );
1159 assert_eq!(
1160 result_lv.value(3).as_primitive::<Int64Type>().values(),
1161 &[103, 104, 105]
1162 );
1163
1164 let total_input_elements = lv_a.values().len() + lv_b.values().len();
1167 assert_eq!(result_lv.values().len(), total_input_elements);
1168 }
1169
1170 #[test]
1171 fn test_struct_without_nulls() {
1172 let fields = Fields::from(vec![
1173 Field::new("number_col", DataType::Int32, false),
1174 Field::new("string_col", DataType::Utf8, false),
1175 ]);
1176 let a = {
1177 let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
1178 let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
1179
1180 StructArray::try_new(
1181 fields.clone(),
1182 vec![Arc::new(number_col), Arc::new(string_col)],
1183 None,
1184 )
1185 .unwrap()
1186 };
1187
1188 let b = {
1189 let number_col = Int32Array::from_iter_values([5, 6, 7]);
1190 let string_col = StringArray::from_iter_values(["hello", "world", "foo"]);
1191
1192 StructArray::try_new(
1193 fields.clone(),
1194 vec![Arc::new(number_col), Arc::new(string_col)],
1195 None,
1196 )
1197 .unwrap()
1198 };
1199
1200 let c = {
1201 let number_col = Int32Array::from_iter_values([8, 9, 10]);
1202 let string_col = StringArray::from_iter_values(["x", "y", "z"]);
1203
1204 StructArray::try_new(
1205 fields.clone(),
1206 vec![Arc::new(number_col), Arc::new(string_col)],
1207 None,
1208 )
1209 .unwrap()
1210 };
1211
1212 let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
1213 let values_struct = values.as_struct();
1214 assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
1215 assert_eq!(values_struct.null_count(), 0);
1216
1217 let values_number = values_struct.column(0).as_primitive::<Int32Type>();
1218 assert_eq!(values_number.values(), &[4, 4, 10, 8, 6]);
1219 let values_string = values_struct.column(1).as_string::<i32>();
1220 let values_string: Vec<_> = values_string.into_iter().collect();
1221 assert_eq!(
1222 &values_string,
1223 &[Some("d"), Some("d"), Some("z"), Some("x"), Some("world")]
1224 );
1225 }
1226
1227 #[test]
1228 fn test_struct_with_nulls_in_values() {
1229 let fields = Fields::from(vec![
1230 Field::new("number_col", DataType::Int32, true),
1231 Field::new("string_col", DataType::Utf8, true),
1232 ]);
1233 let a = {
1234 let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
1235 let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
1236
1237 StructArray::try_new(
1238 fields.clone(),
1239 vec![Arc::new(number_col), Arc::new(string_col)],
1240 None,
1241 )
1242 .unwrap()
1243 };
1244
1245 let b = {
1246 let number_col = Int32Array::from_iter([Some(1), Some(4), None]);
1247 let string_col = StringArray::from(vec![Some("hello"), None, Some("foo")]);
1248
1249 StructArray::try_new(
1250 fields.clone(),
1251 vec![Arc::new(number_col), Arc::new(string_col)],
1252 None,
1253 )
1254 .unwrap()
1255 };
1256
1257 let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (1, 1)]).unwrap();
1258 let values_struct = values.as_struct();
1259 assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
1260
1261 assert_eq!(values_struct.null_count(), 0);
1263
1264 let values_number: Vec<_> = values_struct
1265 .column(0)
1266 .as_primitive::<Int32Type>()
1267 .into_iter()
1268 .collect();
1269 assert_eq!(values_number, &[Some(2), None, None, Some(4), Some(4)]);
1270
1271 let values_string = values_struct.column(1).as_string::<i32>();
1272 let values_string: Vec<_> = values_string.into_iter().collect();
1273 assert_eq!(
1274 &values_string,
1275 &[Some("b"), Some("foo"), Some("foo"), Some("d"), None]
1276 );
1277 }
1278
1279 #[test]
1280 fn test_struct_with_nulls() {
1281 let fields = Fields::from(vec![
1282 Field::new("number_col", DataType::Int32, false),
1283 Field::new("string_col", DataType::Utf8, false),
1284 ]);
1285 let a = {
1286 let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
1287 let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
1288
1289 StructArray::try_new(
1290 fields.clone(),
1291 vec![Arc::new(number_col), Arc::new(string_col)],
1292 None,
1293 )
1294 .unwrap()
1295 };
1296
1297 let b = {
1298 let number_col = Int32Array::from_iter_values([5, 6, 7]);
1299 let string_col = StringArray::from_iter_values(["hello", "world", "foo"]);
1300
1301 StructArray::try_new(
1302 fields.clone(),
1303 vec![Arc::new(number_col), Arc::new(string_col)],
1304 Some(NullBuffer::from(&[true, false, true])),
1305 )
1306 .unwrap()
1307 };
1308
1309 let c = {
1310 let number_col = Int32Array::from_iter_values([8, 9, 10]);
1311 let string_col = StringArray::from_iter_values(["x", "y", "z"]);
1312
1313 StructArray::try_new(
1314 fields.clone(),
1315 vec![Arc::new(number_col), Arc::new(string_col)],
1316 None,
1317 )
1318 .unwrap()
1319 };
1320
1321 let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (1, 1), (2, 0)]).unwrap();
1322 let values_struct = values.as_struct();
1323 assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
1324
1325 let validity: Vec<bool> = {
1326 let null_buffer = values_struct.nulls().expect("should_have_nulls");
1327
1328 null_buffer.iter().collect()
1329 };
1330 assert_eq!(validity, &[true, true, true, false, true]);
1331 let values_number = values_struct.column(0).as_primitive::<Int32Type>();
1332 assert_eq!(values_number.values(), &[4, 4, 10, 6, 8]);
1333 let values_string = values_struct.column(1).as_string::<i32>();
1334 let values_string: Vec<_> = values_string.into_iter().collect();
1335 assert_eq!(
1336 &values_string,
1337 &[Some("d"), Some("d"), Some("z"), Some("world"), Some("x"),]
1338 );
1339 }
1340
1341 #[test]
1342 fn test_struct_empty() {
1343 let fields = Fields::from(vec![
1344 Field::new("number_col", DataType::Int32, false),
1345 Field::new("string_col", DataType::Utf8, false),
1346 ]);
1347 let a = {
1348 let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
1349 let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
1350
1351 StructArray::try_new(
1352 fields.clone(),
1353 vec![Arc::new(number_col), Arc::new(string_col)],
1354 None,
1355 )
1356 .unwrap()
1357 };
1358 let v = interleave(&[&a], &[]).unwrap();
1359 assert!(v.is_empty());
1360 assert_eq!(v.data_type(), &DataType::Struct(fields));
1361 }
1362
1363 #[test]
1364 fn interleave_sparse_nulls() {
1365 let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
1366 let keys = Int32Array::from_iter_values(0..10);
1367 let dict_a = DictionaryArray::new(keys, Arc::new(values));
1368 let values = StringArray::new_null(0);
1369 let keys = Int32Array::new_null(10);
1370 let dict_b = DictionaryArray::new(keys, Arc::new(values));
1371
1372 let indices = &[(0, 0), (0, 1), (0, 2), (1, 0)];
1373 let array = interleave(&[&dict_a, &dict_b], indices).unwrap();
1374
1375 let expected =
1376 DictionaryArray::<Int32Type>::from_iter(vec![Some("0"), Some("1"), Some("2"), None]);
1377 assert_eq!(array.as_ref(), &expected)
1378 }
1379
1380 #[test]
1381 fn test_interleave_views() {
1382 let values = StringArray::from_iter_values([
1383 "hello",
1384 "world_long_string_not_inlined",
1385 "foo",
1386 "bar",
1387 "baz",
1388 ]);
1389 let view_a = StringViewArray::from(&values);
1390
1391 let values = StringArray::from_iter_values([
1392 "test",
1393 "data",
1394 "more_long_string_not_inlined",
1395 "views",
1396 "here",
1397 ]);
1398 let view_b = StringViewArray::from(&values);
1399
1400 let indices = &[
1401 (0, 2), (1, 0), (0, 4), (1, 3), (0, 1), ];
1407
1408 let values = interleave(&[&view_a, &view_b], indices).unwrap();
1410 let result = values.as_string_view();
1411 assert_eq!(result.data_buffers().len(), 1);
1412
1413 let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
1414 let fallback_result = fallback.as_string_view();
1415 assert_eq!(fallback_result.data_buffers().len(), 2);
1417
1418 let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
1420
1421 let fallback_collected: Vec<_> = fallback_result
1422 .iter()
1423 .map(|x| x.map(|s| s.to_string()))
1424 .collect();
1425
1426 assert_eq!(&collected, &fallback_collected);
1427
1428 assert_eq!(
1429 &collected,
1430 &[
1431 Some("foo".to_string()),
1432 Some("test".to_string()),
1433 Some("baz".to_string()),
1434 Some("views".to_string()),
1435 Some("world_long_string_not_inlined".to_string()),
1436 ]
1437 );
1438 }
1439
1440 #[test]
1441 fn test_interleave_views_with_nulls() {
1442 let values = StringArray::from_iter([
1443 Some("hello"),
1444 None,
1445 Some("foo_long_string_not_inlined"),
1446 Some("bar"),
1447 None,
1448 ]);
1449 let view_a = StringViewArray::from(&values);
1450
1451 let values = StringArray::from_iter([
1452 Some("test"),
1453 Some("data_long_string_not_inlined"),
1454 None,
1455 None,
1456 Some("here"),
1457 ]);
1458 let view_b = StringViewArray::from(&values);
1459
1460 let indices = &[
1461 (0, 1), (1, 2), (0, 2), (1, 3), (0, 4), ];
1467
1468 let values = interleave(&[&view_a, &view_b], indices).unwrap();
1470 let result = values.as_string_view();
1471 assert_eq!(result.data_buffers().len(), 1);
1472
1473 let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
1474 let fallback_result = fallback.as_string_view();
1475
1476 let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
1478
1479 let fallback_collected: Vec<_> = fallback_result
1480 .iter()
1481 .map(|x| x.map(|s| s.to_string()))
1482 .collect();
1483
1484 assert_eq!(&collected, &fallback_collected);
1485
1486 assert_eq!(
1487 &collected,
1488 &[
1489 None,
1490 None,
1491 Some("foo_long_string_not_inlined".to_string()),
1492 None,
1493 None,
1494 ]
1495 );
1496 }
1497
1498 #[test]
1499 fn test_interleave_views_multiple_buffers() {
1500 let str1 = "very_long_string_from_first_buffer".as_bytes();
1501 let str2 = "very_long_string_from_second_buffer".as_bytes();
1502 let buffer1 = str1.to_vec().into();
1503 let buffer2 = str2.to_vec().into();
1504
1505 let view1 = ByteView::new(str1.len() as u32, &str1[..4])
1506 .with_buffer_index(0)
1507 .with_offset(0)
1508 .as_u128();
1509 let view2 = ByteView::new(str2.len() as u32, &str2[..4])
1510 .with_buffer_index(1)
1511 .with_offset(0)
1512 .as_u128();
1513 let view_a =
1514 StringViewArray::try_new(vec![view1, view2].into(), vec![buffer1, buffer2], None)
1515 .unwrap();
1516
1517 let str3 = "another_very_long_string_buffer_three".as_bytes();
1518 let str4 = "different_long_string_in_buffer_four".as_bytes();
1519 let buffer3 = str3.to_vec().into();
1520 let buffer4 = str4.to_vec().into();
1521
1522 let view3 = ByteView::new(str3.len() as u32, &str3[..4])
1523 .with_buffer_index(0)
1524 .with_offset(0)
1525 .as_u128();
1526 let view4 = ByteView::new(str4.len() as u32, &str4[..4])
1527 .with_buffer_index(1)
1528 .with_offset(0)
1529 .as_u128();
1530 let view_b =
1531 StringViewArray::try_new(vec![view3, view4].into(), vec![buffer3, buffer4], None)
1532 .unwrap();
1533
1534 let indices = &[
1535 (0, 0), (1, 0), (0, 1), (1, 1), (0, 0), (1, 1), ];
1542
1543 let values = interleave(&[&view_a, &view_b], indices).unwrap();
1545 let result = values.as_string_view();
1546
1547 assert_eq!(
1548 result.data_buffers().len(),
1549 4,
1550 "Expected four buffers (two from each input array)"
1551 );
1552
1553 let result_strings: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
1554 assert_eq!(
1555 result_strings,
1556 vec![
1557 Some("very_long_string_from_first_buffer".to_string()),
1558 Some("another_very_long_string_buffer_three".to_string()),
1559 Some("very_long_string_from_second_buffer".to_string()),
1560 Some("different_long_string_in_buffer_four".to_string()),
1561 Some("very_long_string_from_first_buffer".to_string()),
1562 Some("different_long_string_in_buffer_four".to_string()),
1563 ]
1564 );
1565
1566 let views = result.views();
1567 let buffer_indices: Vec<_> = views
1568 .iter()
1569 .map(|raw_view| ByteView::from(*raw_view).buffer_index)
1570 .collect();
1571
1572 assert_eq!(
1573 buffer_indices,
1574 vec![
1575 0, 1, 2, 3, 0, 3, ]
1582 );
1583 }
1584
1585 #[test]
1586 fn test_interleave_run_end_encoded_primitive() {
1587 let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1588 builder.extend([1, 1, 2, 2, 2, 3].into_iter().map(Some));
1589 let a = builder.finish();
1590
1591 let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1592 builder.extend([4, 5, 5, 6, 6, 6].into_iter().map(Some));
1593 let b = builder.finish();
1594
1595 let indices = &[(0, 1), (1, 0), (0, 4), (1, 2), (0, 5)];
1596 let result = interleave(&[&a, &b], indices).unwrap();
1597
1598 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1600
1601 let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1603
1604 let expected = vec![1, 4, 2, 5, 3];
1606 let mut actual = Vec::new();
1607 for i in 0..result_run_array.len() {
1608 let physical_idx = result_run_array.get_physical_index(i);
1609 let value = result_run_array
1610 .values()
1611 .as_primitive::<Int32Type>()
1612 .value(physical_idx);
1613 actual.push(value);
1614 }
1615 assert_eq!(actual, expected);
1616 }
1617
1618 #[test]
1619 fn test_interleave_run_end_encoded_sliced() {
1620 let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1621 builder.extend([1, 1, 2, 2, 2, 3].into_iter().map(Some));
1622 let a = builder.finish();
1623 let a = a.slice(2, 3); let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1626 builder.extend([4, 5, 5, 6, 6, 6].into_iter().map(Some));
1627 let b = builder.finish();
1628 let b = b.slice(1, 3); let indices = &[(0, 1), (1, 0), (0, 2), (1, 1), (1, 2)];
1631 let result = interleave(&[&a, &b], indices).unwrap();
1632
1633 let result = result.as_run::<Int32Type>();
1634 let result = result.downcast::<Int32Array>().unwrap();
1635
1636 let expected = vec![2, 5, 2, 5, 6];
1637 let actual = result.into_iter().flatten().collect::<Vec<_>>();
1638 assert_eq!(actual, expected);
1639 }
1640
1641 #[test]
1642 fn test_interleave_run_end_encoded_string() {
1643 let a: Int32RunArray = vec!["hello", "hello", "world", "world", "foo"]
1644 .into_iter()
1645 .collect();
1646 let b: Int32RunArray = vec!["bar", "baz", "baz", "qux"].into_iter().collect();
1647
1648 let indices = &[(0, 0), (1, 1), (0, 3), (1, 3), (0, 4)];
1649 let result = interleave(&[&a, &b], indices).unwrap();
1650
1651 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1653
1654 let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1656
1657 let expected = vec!["hello", "baz", "world", "qux", "foo"];
1659 let mut actual = Vec::new();
1660 for i in 0..result_run_array.len() {
1661 let physical_idx = result_run_array.get_physical_index(i);
1662 let value = result_run_array
1663 .values()
1664 .as_string::<i32>()
1665 .value(physical_idx);
1666 actual.push(value);
1667 }
1668 assert_eq!(actual, expected);
1669 }
1670
1671 #[test]
1672 fn test_interleave_run_end_encoded_with_nulls() {
1673 let a: Int32RunArray = vec![Some("a"), Some("a"), None, None, Some("b")]
1674 .into_iter()
1675 .collect();
1676 let b: Int32RunArray = vec![None, Some("c"), Some("c"), Some("d")]
1677 .into_iter()
1678 .collect();
1679
1680 let indices = &[(0, 1), (1, 0), (0, 2), (1, 3), (0, 4)];
1681 let result = interleave(&[&a, &b], indices).unwrap();
1682
1683 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1685
1686 let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1688
1689 let expected = vec![Some("a"), None, None, Some("d"), Some("b")];
1691 let mut actual = Vec::new();
1692 for i in 0..result_run_array.len() {
1693 let physical_idx = result_run_array.get_physical_index(i);
1694 if result_run_array.values().is_null(physical_idx) {
1695 actual.push(None);
1696 } else {
1697 let value = result_run_array
1698 .values()
1699 .as_string::<i32>()
1700 .value(physical_idx);
1701 actual.push(Some(value));
1702 }
1703 }
1704 assert_eq!(actual, expected);
1705 }
1706
1707 #[test]
1708 fn test_interleave_run_end_encoded_different_run_types() {
1709 let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1710 builder.extend([1, 1, 2, 3, 3].into_iter().map(Some));
1711 let a = builder.finish();
1712
1713 let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1714 builder.extend([4, 5, 5, 6].into_iter().map(Some));
1715 let b = builder.finish();
1716
1717 let indices = &[(0, 0), (1, 1), (0, 3), (1, 3)];
1718 let result = interleave(&[&a, &b], indices).unwrap();
1719
1720 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1722
1723 let result_run_array: &RunArray<Int16Type> = result.as_any().downcast_ref().unwrap();
1725
1726 let expected = vec![1, 5, 3, 6];
1728 let mut actual = Vec::new();
1729 for i in 0..result_run_array.len() {
1730 let physical_idx = result_run_array.get_physical_index(i);
1731 let value = result_run_array
1732 .values()
1733 .as_primitive::<Int32Type>()
1734 .value(physical_idx);
1735 actual.push(value);
1736 }
1737 assert_eq!(actual, expected);
1738 }
1739
1740 #[test]
1741 fn test_interleave_run_end_encoded_mixed_run_lengths() {
1742 let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
1743 builder.extend([1, 2, 2, 2, 2, 3, 3, 4].into_iter().map(Some));
1744 let a = builder.finish();
1745
1746 let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
1747 builder.extend([5, 5, 5, 6, 7, 7, 8, 8].into_iter().map(Some));
1748 let b = builder.finish();
1749
1750 let indices = &[
1751 (0, 0), (1, 2), (0, 3), (1, 3), (0, 6), (1, 6), (0, 7), (1, 4), ];
1760 let result = interleave(&[&a, &b], indices).unwrap();
1761
1762 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1764
1765 let result_run_array: &RunArray<Int64Type> = result.as_any().downcast_ref().unwrap();
1767
1768 let expected = vec![1, 5, 2, 6, 3, 8, 4, 7];
1770 let mut actual = Vec::new();
1771 for i in 0..result_run_array.len() {
1772 let physical_idx = result_run_array.get_physical_index(i);
1773 let value = result_run_array
1774 .values()
1775 .as_primitive::<Int32Type>()
1776 .value(physical_idx);
1777 actual.push(value);
1778 }
1779 assert_eq!(actual, expected);
1780 }
1781
1782 #[test]
1783 fn test_interleave_run_end_encoded_empty_runs() {
1784 let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1785 builder.extend([1].into_iter().map(Some));
1786 let a = builder.finish();
1787
1788 let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1789 builder.extend([2, 2, 2].into_iter().map(Some));
1790 let b = builder.finish();
1791
1792 let indices = &[(0, 0), (1, 1), (1, 2)];
1793 let result = interleave(&[&a, &b], indices).unwrap();
1794
1795 assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1797
1798 let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1800
1801 let expected = vec![1, 2, 2];
1803 let mut actual = Vec::new();
1804 for i in 0..result_run_array.len() {
1805 let physical_idx = result_run_array.get_physical_index(i);
1806 let value = result_run_array
1807 .values()
1808 .as_primitive::<Int32Type>()
1809 .value(physical_idx);
1810 actual.push(value);
1811 }
1812 assert_eq!(actual, expected);
1813 }
1814
1815 #[test]
1816 fn test_struct_no_fields() {
1817 let fields = Fields::empty();
1818 let a = StructArray::try_new_with_length(fields.clone(), vec![], None, 10).unwrap();
1819 let v = interleave(&[&a], &[(0, 0)]).unwrap();
1820 assert_eq!(v.len(), 1);
1821 assert_eq!(v.data_type(), &DataType::Struct(fields));
1822 }
1823
1824 #[test]
1825 fn test_interleave_fallback_dictionary_with_nulls() {
1826 let input_1_keys = Int32Array::from_iter([Some(0), None, Some(1)]);
1827 let input_1_values = StringArray::from_iter_values(["foo", "bar"]);
1828 let dict_a = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
1829
1830 let input_2_keys = Int32Array::from_iter([Some(0), Some(1), None]);
1831 let input_2_values = StringArray::from_iter_values(["baz", "qux"]);
1832 let dict_b = DictionaryArray::new(input_2_keys, Arc::new(input_2_values));
1833
1834 let indices = vec![
1835 (0, 0), (0, 1), (1, 0), (1, 2), (0, 2), (1, 1), ];
1842
1843 let result =
1844 interleave_fallback_dictionary::<Int32Type>(&[&dict_a, &dict_b], &indices).unwrap();
1845 let dict_result = result.as_dictionary::<Int32Type>();
1846
1847 let string_result = dict_result.downcast_dict::<StringArray>().unwrap();
1848 let collected: Vec<_> = string_result.into_iter().collect();
1849 assert_eq!(
1850 collected,
1851 vec![
1852 Some("foo"),
1853 None,
1854 Some("baz"),
1855 None,
1856 Some("bar"),
1857 Some("qux")
1858 ]
1859 );
1860 }
1861
1862 #[test]
1863 fn test_interleave_bytes_offset_overflow() {
1864 let indices: Vec<(usize, usize)> = vec![(0, 0); (i32::MAX >> 4) as usize];
1865 let text = ('a'..='z').collect::<String>();
1866 let values = StringArray::from(vec![Some(text)]);
1867 assert!(matches!(
1868 interleave(&[&values], &indices),
1869 Err(ArrowError::OffsetOverflowError(_))
1870 ));
1871 }
1872
1873 #[test]
1874 fn test_interleave_list_offset_overflow() {
1875 let mut builder = GenericListBuilder::<i32, _>::new(Int32Builder::new());
1877 for i in 0..32 {
1878 builder.values().append_value(i);
1879 }
1880 builder.append(true);
1881 let list = builder.finish();
1882
1883 let indices: Vec<(usize, usize)> = vec![(0, 0); (i32::MAX as usize / 32) + 1];
1885 assert!(matches!(
1886 interleave(&[&list], &indices),
1887 Err(ArrowError::OffsetOverflowError(_))
1888 ));
1889 }
1890
1891 #[test]
1892 fn test_interleave_list_view() {
1893 let field = Arc::new(Field::new_list_field(DataType::Int64, false));
1902
1903 let lv_a = ListViewArray::new(
1904 Arc::clone(&field),
1905 ScalarBuffer::from(vec![0i32, 2]),
1906 ScalarBuffer::from(vec![2i32, 1]),
1907 Arc::new(Int64Array::from(vec![1_i64, 2, 3])),
1908 None,
1909 );
1910 let lv_b = ListViewArray::new(
1911 field,
1912 ScalarBuffer::from(vec![0i32]),
1913 ScalarBuffer::from(vec![3i32]),
1914 Arc::new(Int64Array::from(vec![4_i64, 5, 6])),
1915 None,
1916 );
1917
1918 let result = interleave(
1919 &[&lv_a as &dyn Array, &lv_b as &dyn Array],
1920 &[(0, 0), (1, 0), (0, 1)],
1921 )
1922 .unwrap();
1923
1924 result
1925 .to_data()
1926 .validate_full()
1927 .expect("interleaved ListViewArray must be internally consistent");
1928
1929 let result_lv = result.as_list_view::<i32>();
1930 assert_eq!(result_lv.len(), 3);
1931 assert_eq!(
1932 result_lv.value(0).as_primitive::<Int64Type>().values(),
1933 &[1, 2]
1934 );
1935 assert_eq!(
1936 result_lv.value(1).as_primitive::<Int64Type>().values(),
1937 &[4, 5, 6]
1938 );
1939 assert_eq!(
1940 result_lv.value(2).as_primitive::<Int64Type>().values(),
1941 &[3]
1942 );
1943 }
1944}