arrow_select/
interleave.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Interleave elements from multiple arrays
19
20use crate::concat::concat;
21use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
22use arrow_array::builder::{BooleanBufferBuilder, PrimitiveBuilder};
23use arrow_array::cast::AsArray;
24use arrow_array::types::*;
25use arrow_array::*;
26use arrow_buffer::{ArrowNativeType, BooleanBuffer, MutableBuffer, NullBuffer, OffsetBuffer};
27use arrow_data::ByteView;
28use arrow_data::transform::MutableArrayData;
29use arrow_schema::{ArrowError, DataType, FieldRef, Fields};
30use std::sync::Arc;
31
32macro_rules! primitive_helper {
33    ($t:ty, $values:ident, $indices:ident, $data_type:ident) => {
34        interleave_primitive::<$t>($values, $indices, $data_type)
35    };
36}
37
38macro_rules! dict_helper {
39    ($t:ty, $values:expr, $indices:expr) => {
40        Ok(Arc::new(interleave_dictionaries::<$t>($values, $indices)?) as _)
41    };
42}
43
44///
45/// Takes elements by index from a list of [`Array`], creating a new [`Array`] from those values.
46///
47/// Each element in `indices` is a pair of `usize` with the first identifying the index
48/// of the [`Array`] in `values`, and the second the index of the value within that [`Array`]
49///
50/// ```text
51/// ┌─────────────────┐      ┌─────────┐                                  ┌─────────────────┐
52/// │        A        │      │ (0, 0)  │        interleave(               │        A        │
53/// ├─────────────────┤      ├─────────┤          [values0, values1],     ├─────────────────┤
54/// │        D        │      │ (1, 0)  │          indices                 │        B        │
55/// └─────────────────┘      ├─────────┤        )                         ├─────────────────┤
56///   values array 0         │ (1, 1)  │      ─────────────────────────▶  │        C        │
57///                          ├─────────┤                                  ├─────────────────┤
58///                          │ (0, 1)  │                                  │        D        │
59///                          └─────────┘                                  └─────────────────┘
60/// ┌─────────────────┐       indices
61/// │        B        │        array
62/// ├─────────────────┤                                                    result
63/// │        C        │
64/// ├─────────────────┤
65/// │        E        │
66/// └─────────────────┘
67///   values array 1
68/// ```
69///
70/// For selecting values by index from a single array see [`crate::take`]
71pub fn interleave(
72    values: &[&dyn Array],
73    indices: &[(usize, usize)],
74) -> Result<ArrayRef, ArrowError> {
75    if values.is_empty() {
76        return Err(ArrowError::InvalidArgumentError(
77            "interleave requires input of at least one array".to_string(),
78        ));
79    }
80    let data_type = values[0].data_type();
81
82    for array in values.iter().skip(1) {
83        if array.data_type() != data_type {
84            return Err(ArrowError::InvalidArgumentError(format!(
85                "It is not possible to interleave arrays of different data types ({} and {})",
86                data_type,
87                array.data_type()
88            )));
89        }
90    }
91
92    if indices.is_empty() {
93        return Ok(new_empty_array(data_type));
94    }
95
96    downcast_primitive! {
97        data_type => (primitive_helper, values, indices, data_type),
98        DataType::Utf8 => interleave_bytes::<Utf8Type>(values, indices),
99        DataType::LargeUtf8 => interleave_bytes::<LargeUtf8Type>(values, indices),
100        DataType::Binary => interleave_bytes::<BinaryType>(values, indices),
101        DataType::LargeBinary => interleave_bytes::<LargeBinaryType>(values, indices),
102        DataType::BinaryView => interleave_views::<BinaryViewType>(values, indices),
103        DataType::Utf8View => interleave_views::<StringViewType>(values, indices),
104        DataType::Dictionary(k, _) => downcast_integer! {
105            k.as_ref() => (dict_helper, values, indices),
106            _ => unreachable!("illegal dictionary key type {k}")
107        },
108        DataType::Struct(fields) => interleave_struct(fields, values, indices),
109        DataType::List(field) => interleave_list::<i32>(values, indices, field),
110        DataType::LargeList(field) => interleave_list::<i64>(values, indices, field),
111        _ => interleave_fallback(values, indices)
112    }
113}
114
115/// Common functionality for interleaving arrays
116///
117/// T is the concrete Array type
118struct Interleave<'a, T> {
119    /// The input arrays downcast to T
120    arrays: Vec<&'a T>,
121    /// The null buffer of the interleaved output
122    nulls: Option<NullBuffer>,
123}
124
125impl<'a, T: Array + 'static> Interleave<'a, T> {
126    fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) -> Self {
127        let mut has_nulls = false;
128        let arrays: Vec<&T> = values
129            .iter()
130            .map(|x| {
131                has_nulls = has_nulls || x.null_count() != 0;
132                x.as_any().downcast_ref().unwrap()
133            })
134            .collect();
135
136        let nulls = match has_nulls {
137            true => {
138                let nulls = BooleanBuffer::collect_bool(indices.len(), |i| {
139                    let (a, b) = indices[i];
140                    arrays[a].is_valid(b)
141                });
142                Some(nulls.into())
143            }
144            false => None,
145        };
146
147        Self { arrays, nulls }
148    }
149}
150
151fn interleave_primitive<T: ArrowPrimitiveType>(
152    values: &[&dyn Array],
153    indices: &[(usize, usize)],
154    data_type: &DataType,
155) -> Result<ArrayRef, ArrowError> {
156    let interleaved = Interleave::<'_, PrimitiveArray<T>>::new(values, indices);
157
158    let values = indices
159        .iter()
160        .map(|(a, b)| interleaved.arrays[*a].value(*b))
161        .collect::<Vec<_>>();
162
163    let array = PrimitiveArray::<T>::try_new(values.into(), interleaved.nulls)?;
164    Ok(Arc::new(array.with_data_type(data_type.clone())))
165}
166
167fn interleave_bytes<T: ByteArrayType>(
168    values: &[&dyn Array],
169    indices: &[(usize, usize)],
170) -> Result<ArrayRef, ArrowError> {
171    let interleaved = Interleave::<'_, GenericByteArray<T>>::new(values, indices);
172
173    let mut capacity = 0;
174    let mut offsets = Vec::with_capacity(indices.len() + 1);
175    offsets.push(T::Offset::from_usize(0).unwrap());
176    offsets.extend(indices.iter().map(|(a, b)| {
177        let o = interleaved.arrays[*a].value_offsets();
178        let element_len = o[*b + 1].as_usize() - o[*b].as_usize();
179        capacity += element_len;
180        T::Offset::from_usize(capacity).expect("overflow")
181    }));
182
183    let mut values = Vec::with_capacity(capacity);
184    for (a, b) in indices {
185        values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
186    }
187
188    // Safety: safe by construction
189    let array = unsafe {
190        let offsets = OffsetBuffer::new_unchecked(offsets.into());
191        GenericByteArray::<T>::new_unchecked(offsets, values.into(), interleaved.nulls)
192    };
193    Ok(Arc::new(array))
194}
195
196fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
197    arrays: &[&dyn Array],
198    indices: &[(usize, usize)],
199) -> Result<ArrayRef, ArrowError> {
200    let dictionaries: Vec<_> = arrays.iter().map(|x| x.as_dictionary::<K>()).collect();
201    let (should_merge, has_overflow) =
202        should_merge_dictionary_values::<K>(&dictionaries, indices.len());
203    if !should_merge {
204        return if has_overflow {
205            interleave_fallback(arrays, indices)
206        } else {
207            interleave_fallback_dictionary::<K>(&dictionaries, indices)
208        };
209    }
210
211    let masks: Vec<_> = dictionaries
212        .iter()
213        .enumerate()
214        .map(|(a_idx, dictionary)| {
215            let mut key_mask = BooleanBufferBuilder::new_from_buffer(
216                MutableBuffer::new_null(dictionary.len()),
217                dictionary.len(),
218            );
219
220            for (_, key_idx) in indices.iter().filter(|(a, _)| *a == a_idx) {
221                key_mask.set_bit(*key_idx, true);
222            }
223            key_mask.finish()
224        })
225        .collect();
226
227    let merged = merge_dictionary_values(&dictionaries, Some(&masks))?;
228
229    // Recompute keys
230    let mut keys = PrimitiveBuilder::<K>::with_capacity(indices.len());
231    for (a, b) in indices {
232        let old_keys: &PrimitiveArray<K> = dictionaries[*a].keys();
233        match old_keys.is_valid(*b) {
234            true => {
235                let old_key = old_keys.values()[*b];
236                keys.append_value(merged.key_mappings[*a][old_key.as_usize()])
237            }
238            false => keys.append_null(),
239        }
240    }
241    let array = unsafe { DictionaryArray::new_unchecked(keys.finish(), merged.values) };
242    Ok(Arc::new(array))
243}
244
245fn interleave_views<T: ByteViewType>(
246    values: &[&dyn Array],
247    indices: &[(usize, usize)],
248) -> Result<ArrayRef, ArrowError> {
249    let interleaved = Interleave::<'_, GenericByteViewArray<T>>::new(values, indices);
250    let mut buffers = Vec::new();
251
252    // Contains the offsets of start buffer in `buffer_to_new_index`
253    let mut offsets = Vec::with_capacity(interleaved.arrays.len() + 1);
254    offsets.push(0);
255    let mut total_buffers = 0;
256    for a in interleaved.arrays.iter() {
257        total_buffers += a.data_buffers().len();
258        offsets.push(total_buffers);
259    }
260
261    // contains the mapping from old buffer index to new buffer index
262    let mut buffer_to_new_index = vec![None; total_buffers];
263
264    let views: Vec<u128> = indices
265        .iter()
266        .map(|(array_idx, value_idx)| {
267            let array = interleaved.arrays[*array_idx];
268            let view = array.views().get(*value_idx).unwrap();
269            let view_len = *view as u32;
270            if view_len <= 12 {
271                return *view;
272            }
273            // value is big enough to be in a variadic buffer
274            let view = ByteView::from(*view);
275            let buffer_to_new_idx = offsets[*array_idx] + view.buffer_index as usize;
276            let new_buffer_idx: u32 =
277                *buffer_to_new_index[buffer_to_new_idx].get_or_insert_with(|| {
278                    buffers.push(array.data_buffers()[view.buffer_index as usize].clone());
279                    (buffers.len() - 1) as u32
280                });
281            view.with_buffer_index(new_buffer_idx).as_u128()
282        })
283        .collect();
284
285    let array = unsafe {
286        GenericByteViewArray::<T>::new_unchecked(views.into(), buffers, interleaved.nulls)
287    };
288    Ok(Arc::new(array))
289}
290
291fn interleave_struct(
292    fields: &Fields,
293    values: &[&dyn Array],
294    indices: &[(usize, usize)],
295) -> Result<ArrayRef, ArrowError> {
296    let interleaved = Interleave::<'_, StructArray>::new(values, indices);
297
298    if fields.is_empty() {
299        let array = StructArray::try_new_with_length(
300            fields.clone(),
301            vec![],
302            interleaved.nulls,
303            indices.len(),
304        )?;
305        return Ok(Arc::new(array));
306    }
307
308    let struct_fields_array: Result<Vec<_>, _> = (0..fields.len())
309        .map(|i| {
310            let field_values: Vec<&dyn Array> = interleaved
311                .arrays
312                .iter()
313                .map(|x| x.column(i).as_ref())
314                .collect();
315            interleave(&field_values, indices)
316        })
317        .collect();
318
319    let struct_array =
320        StructArray::try_new(fields.clone(), struct_fields_array?, interleaved.nulls)?;
321    Ok(Arc::new(struct_array))
322}
323
324fn interleave_list<O: OffsetSizeTrait>(
325    values: &[&dyn Array],
326    indices: &[(usize, usize)],
327    field: &FieldRef,
328) -> Result<ArrayRef, ArrowError> {
329    let interleaved = Interleave::<'_, GenericListArray<O>>::new(values, indices);
330
331    let mut capacity = 0usize;
332    let mut offsets = Vec::with_capacity(indices.len() + 1);
333    offsets.push(O::from_usize(0).unwrap());
334    offsets.extend(indices.iter().map(|(array, row)| {
335        let o = interleaved.arrays[*array].value_offsets();
336        let element_len = o[*row + 1].as_usize() - o[*row].as_usize();
337        capacity += element_len;
338        O::from_usize(capacity).expect("offset overflow")
339    }));
340
341    let mut child_indices = Vec::with_capacity(capacity);
342    for (array, row) in indices {
343        let list = interleaved.arrays[*array];
344        let start = list.value_offsets()[*row].as_usize();
345        let end = list.value_offsets()[*row + 1].as_usize();
346        child_indices.extend((start..end).map(|i| (*array, i)));
347    }
348
349    let child_arrays: Vec<&dyn Array> = interleaved
350        .arrays
351        .iter()
352        .map(|list| list.values().as_ref())
353        .collect();
354
355    let interleaved_values = interleave(&child_arrays, &child_indices)?;
356
357    let offsets = OffsetBuffer::new(offsets.into());
358    let list_array = GenericListArray::<O>::new(
359        field.clone(),
360        offsets,
361        interleaved_values,
362        interleaved.nulls,
363    );
364
365    Ok(Arc::new(list_array))
366}
367
368/// Fallback implementation of interleave using [`MutableArrayData`]
369fn interleave_fallback(
370    values: &[&dyn Array],
371    indices: &[(usize, usize)],
372) -> Result<ArrayRef, ArrowError> {
373    let arrays: Vec<_> = values.iter().map(|x| x.to_data()).collect();
374    let arrays: Vec<_> = arrays.iter().collect();
375    let mut array_data = MutableArrayData::new(arrays, false, indices.len());
376
377    let mut cur_array = indices[0].0;
378    let mut start_row_idx = indices[0].1;
379    let mut end_row_idx = start_row_idx + 1;
380
381    for (array, row) in indices.iter().skip(1).copied() {
382        if array == cur_array && row == end_row_idx {
383            // subsequent row in same batch
384            end_row_idx += 1;
385            continue;
386        }
387
388        // emit current batch of rows for current buffer
389        array_data.extend(cur_array, start_row_idx, end_row_idx);
390
391        // start new batch of rows
392        cur_array = array;
393        start_row_idx = row;
394        end_row_idx = start_row_idx + 1;
395    }
396
397    // emit final batch of rows
398    array_data.extend(cur_array, start_row_idx, end_row_idx);
399    Ok(make_array(array_data.freeze()))
400}
401
402/// Fallback implementation for interleaving dictionaries when it was determined
403/// that the dictionary values should not be merged. This implementation concatenates
404/// the value slices and recomputes the resulting dictionary keys.
405///
406/// # Panics
407///
408/// This function assumes that the combined dictionary values will not overflow the
409/// key type. Callers must verify this condition [`should_merge_dictionary_values`]
410/// before calling this function.
411fn interleave_fallback_dictionary<K: ArrowDictionaryKeyType>(
412    dictionaries: &[&DictionaryArray<K>],
413    indices: &[(usize, usize)],
414) -> Result<ArrayRef, ArrowError> {
415    let relative_offsets: Vec<usize> = dictionaries
416        .iter()
417        .scan(0usize, |offset, dict| {
418            let current = *offset;
419            *offset += dict.values().len();
420            Some(current)
421        })
422        .collect();
423    let all_values: Vec<&dyn Array> = dictionaries.iter().map(|d| d.values().as_ref()).collect();
424    let concatenated_values = concat(&all_values)?;
425
426    let any_nulls = dictionaries.iter().any(|d| d.keys().nulls().is_some());
427    let (new_keys, nulls) = if any_nulls {
428        let mut has_nulls = false;
429        let new_keys: Vec<K::Native> = indices
430            .iter()
431            .map(|(array, row)| {
432                let old_keys = dictionaries[*array].keys();
433                if old_keys.is_valid(*row) {
434                    let old_key = old_keys.values()[*row].as_usize();
435                    K::Native::from_usize(relative_offsets[*array] + old_key)
436                        .expect("key overflow should be checked by caller")
437                } else {
438                    has_nulls = true;
439                    K::Native::ZERO
440                }
441            })
442            .collect();
443
444        let nulls = if has_nulls {
445            let null_buffer = BooleanBuffer::collect_bool(indices.len(), |i| {
446                let (array, row) = indices[i];
447                dictionaries[array].keys().is_valid(row)
448            });
449            Some(NullBuffer::new(null_buffer))
450        } else {
451            None
452        };
453        (new_keys, nulls)
454    } else {
455        let new_keys: Vec<K::Native> = indices
456            .iter()
457            .map(|(array, row)| {
458                let old_key = dictionaries[*array].keys().values()[*row].as_usize();
459                K::Native::from_usize(relative_offsets[*array] + old_key)
460                    .expect("key overflow should be checked by caller")
461            })
462            .collect();
463        (new_keys, None)
464    };
465
466    let keys_array = PrimitiveArray::<K>::new(new_keys.into(), nulls);
467    // SAFETY: keys_array is constructed from a valid set of keys.
468    let array = unsafe { DictionaryArray::new_unchecked(keys_array, concatenated_values) };
469    Ok(Arc::new(array))
470}
471
472/// Interleave rows by index from multiple [`RecordBatch`] instances and return a new [`RecordBatch`].
473///
474/// This function will call [`interleave`] on each array of the [`RecordBatch`] instances and assemble a new [`RecordBatch`].
475///
476/// # Example
477/// ```
478/// # use std::sync::Arc;
479/// # use arrow_array::{StringArray, Int32Array, RecordBatch, UInt32Array};
480/// # use arrow_schema::{DataType, Field, Schema};
481/// # use arrow_select::interleave::interleave_record_batch;
482///
483/// let schema = Arc::new(Schema::new(vec![
484///     Field::new("a", DataType::Int32, true),
485///     Field::new("b", DataType::Utf8, true),
486/// ]));
487///
488/// let batch1 = RecordBatch::try_new(
489///     schema.clone(),
490///     vec![
491///         Arc::new(Int32Array::from(vec![0, 1, 2])),
492///         Arc::new(StringArray::from(vec!["a", "b", "c"])),
493///     ],
494/// ).unwrap();
495///
496/// let batch2 = RecordBatch::try_new(
497///     schema.clone(),
498///     vec![
499///         Arc::new(Int32Array::from(vec![3, 4, 5])),
500///         Arc::new(StringArray::from(vec!["d", "e", "f"])),
501///     ],
502/// ).unwrap();
503///
504/// let indices = vec![(0, 1), (1, 2), (0, 0), (1, 1)];
505/// let interleaved = interleave_record_batch(&[&batch1, &batch2], &indices).unwrap();
506///
507/// let expected = RecordBatch::try_new(
508///     schema,
509///     vec![
510///         Arc::new(Int32Array::from(vec![1, 5, 0, 4])),
511///         Arc::new(StringArray::from(vec!["b", "f", "a", "e"])),
512///     ],
513/// ).unwrap();
514/// assert_eq!(interleaved, expected);
515/// ```
516pub fn interleave_record_batch(
517    record_batches: &[&RecordBatch],
518    indices: &[(usize, usize)],
519) -> Result<RecordBatch, ArrowError> {
520    let schema = record_batches[0].schema();
521    let columns = (0..schema.fields().len())
522        .map(|i| {
523            let column_values: Vec<&dyn Array> = record_batches
524                .iter()
525                .map(|batch| batch.column(i).as_ref())
526                .collect();
527            interleave(&column_values, indices)
528        })
529        .collect::<Result<Vec<_>, _>>()?;
530    RecordBatch::try_new(schema, columns)
531}
532
533#[cfg(test)]
534mod tests {
535    use super::*;
536    use arrow_array::Int32RunArray;
537    use arrow_array::builder::{GenericListBuilder, Int32Builder, PrimitiveRunBuilder};
538    use arrow_array::types::Int8Type;
539    use arrow_schema::Field;
540
541    #[test]
542    fn test_primitive() {
543        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
544        let b = Int32Array::from_iter_values([5, 6, 7]);
545        let c = Int32Array::from_iter_values([8, 9, 10]);
546        let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
547        let v = values.as_primitive::<Int32Type>();
548        assert_eq!(v.values(), &[4, 4, 10, 8, 6]);
549    }
550
551    #[test]
552    fn test_primitive_nulls() {
553        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
554        let b = Int32Array::from_iter([Some(1), Some(4), None]);
555        let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (0, 2)]).unwrap();
556        let v: Vec<_> = values.as_primitive::<Int32Type>().into_iter().collect();
557        assert_eq!(&v, &[Some(2), None, None, Some(4), Some(3)])
558    }
559
560    #[test]
561    fn test_primitive_empty() {
562        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
563        let v = interleave(&[&a], &[]).unwrap();
564        assert!(v.is_empty());
565        assert_eq!(v.data_type(), &DataType::Int32);
566    }
567
568    #[test]
569    fn test_strings() {
570        let a = StringArray::from_iter_values(["a", "b", "c"]);
571        let b = StringArray::from_iter_values(["hello", "world", "foo"]);
572        let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
573        let v = values.as_string::<i32>();
574        let values: Vec<_> = v.into_iter().collect();
575        assert_eq!(
576            &values,
577            &[
578                Some("c"),
579                Some("c"),
580                Some("hello"),
581                Some("world"),
582                Some("b")
583            ]
584        )
585    }
586
587    #[test]
588    fn test_interleave_dictionary() {
589        let a = DictionaryArray::<Int32Type>::from_iter(["a", "b", "c", "a", "b"]);
590        let b = DictionaryArray::<Int32Type>::from_iter(["a", "c", "a", "c", "a"]);
591
592        // Should not recompute dictionary
593        let values =
594            interleave(&[&a, &b], &[(0, 2), (0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
595        let v = values.as_dictionary::<Int32Type>();
596        assert_eq!(v.values().len(), 5);
597
598        let vc = v.downcast_dict::<StringArray>().unwrap();
599        let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
600        assert_eq!(&collected, &["c", "c", "c", "a", "c", "b"]);
601
602        // Should recompute dictionary
603        let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 1)]).unwrap();
604        let v = values.as_dictionary::<Int32Type>();
605        assert_eq!(v.values().len(), 1);
606
607        let vc = v.downcast_dict::<StringArray>().unwrap();
608        let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
609        assert_eq!(&collected, &["c", "c", "c"]);
610    }
611
612    #[test]
613    fn test_interleave_dictionary_nulls() {
614        let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]);
615        let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]);
616        let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
617        let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
618
619        let expected = vec![Some("fiz"), None, None, Some("foo")];
620
621        let values = interleave(
622            &[&input_1 as _, &input_2 as _],
623            &[(0, 3), (0, 2), (1, 0), (0, 0)],
624        )
625        .unwrap();
626        let dictionary = values.as_dictionary::<Int32Type>();
627        let actual: Vec<Option<&str>> = dictionary
628            .downcast_dict::<StringArray>()
629            .unwrap()
630            .into_iter()
631            .collect();
632
633        assert_eq!(actual, expected);
634    }
635
636    #[test]
637    fn test_interleave_dictionary_overflow_same_values() {
638        let values: ArrayRef = Arc::new(StringArray::from_iter_values(
639            (0..50).map(|i| format!("v{i}")),
640        ));
641
642        // With 3 dictionaries of 50 values each, relative_offsets = [0, 50, 100]
643        // Accessing key 49 from dict3 gives 100 + 49 = 149 which overflows Int8
644        // (max 127).
645        // This test case falls back to interleave_fallback because the
646        // dictionaries share the same underlying values slice.
647        let dict1 = DictionaryArray::<Int8Type>::new(
648            Int8Array::from_iter_values([0, 1, 2]),
649            values.clone(),
650        );
651        let dict2 = DictionaryArray::<Int8Type>::new(
652            Int8Array::from_iter_values([0, 1, 2]),
653            values.clone(),
654        );
655        let dict3 =
656            DictionaryArray::<Int8Type>::new(Int8Array::from_iter_values([49]), values.clone());
657
658        let indices = &[(0, 0), (1, 0), (2, 0)];
659        let result = interleave(&[&dict1, &dict2, &dict3], indices).unwrap();
660
661        let dict_result = result.as_dictionary::<Int8Type>();
662        let string_result: Vec<_> = dict_result
663            .downcast_dict::<StringArray>()
664            .unwrap()
665            .into_iter()
666            .map(|x| x.unwrap())
667            .collect();
668        assert_eq!(string_result, vec!["v0", "v0", "v49"]);
669    }
670
671    fn test_interleave_lists<O: OffsetSizeTrait>() {
672        // [[1, 2], null, [3]]
673        let mut a = GenericListBuilder::<O, _>::new(Int32Builder::new());
674        a.values().append_value(1);
675        a.values().append_value(2);
676        a.append(true);
677        a.append(false);
678        a.values().append_value(3);
679        a.append(true);
680        let a = a.finish();
681
682        // [[4], null, [5, 6, null]]
683        let mut b = GenericListBuilder::<O, _>::new(Int32Builder::new());
684        b.values().append_value(4);
685        b.append(true);
686        b.append(false);
687        b.values().append_value(5);
688        b.values().append_value(6);
689        b.values().append_null();
690        b.append(true);
691        let b = b.finish();
692
693        let values = interleave(&[&a, &b], &[(0, 2), (0, 1), (1, 0), (1, 2), (1, 1)]).unwrap();
694        let v = values
695            .as_any()
696            .downcast_ref::<GenericListArray<O>>()
697            .unwrap();
698
699        // [[3], null, [4], [5, 6, null], null]
700        let mut expected = GenericListBuilder::<O, _>::new(Int32Builder::new());
701        expected.values().append_value(3);
702        expected.append(true);
703        expected.append(false);
704        expected.values().append_value(4);
705        expected.append(true);
706        expected.values().append_value(5);
707        expected.values().append_value(6);
708        expected.values().append_null();
709        expected.append(true);
710        expected.append(false);
711        let expected = expected.finish();
712
713        assert_eq!(v, &expected);
714    }
715
716    #[test]
717    fn test_lists() {
718        test_interleave_lists::<i32>();
719    }
720
721    #[test]
722    fn test_large_lists() {
723        test_interleave_lists::<i64>();
724    }
725
726    #[test]
727    fn test_struct_without_nulls() {
728        let fields = Fields::from(vec![
729            Field::new("number_col", DataType::Int32, false),
730            Field::new("string_col", DataType::Utf8, false),
731        ]);
732        let a = {
733            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
734            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
735
736            StructArray::try_new(
737                fields.clone(),
738                vec![Arc::new(number_col), Arc::new(string_col)],
739                None,
740            )
741            .unwrap()
742        };
743
744        let b = {
745            let number_col = Int32Array::from_iter_values([5, 6, 7]);
746            let string_col = StringArray::from_iter_values(["hello", "world", "foo"]);
747
748            StructArray::try_new(
749                fields.clone(),
750                vec![Arc::new(number_col), Arc::new(string_col)],
751                None,
752            )
753            .unwrap()
754        };
755
756        let c = {
757            let number_col = Int32Array::from_iter_values([8, 9, 10]);
758            let string_col = StringArray::from_iter_values(["x", "y", "z"]);
759
760            StructArray::try_new(
761                fields.clone(),
762                vec![Arc::new(number_col), Arc::new(string_col)],
763                None,
764            )
765            .unwrap()
766        };
767
768        let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
769        let values_struct = values.as_struct();
770        assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
771        assert_eq!(values_struct.null_count(), 0);
772
773        let values_number = values_struct.column(0).as_primitive::<Int32Type>();
774        assert_eq!(values_number.values(), &[4, 4, 10, 8, 6]);
775        let values_string = values_struct.column(1).as_string::<i32>();
776        let values_string: Vec<_> = values_string.into_iter().collect();
777        assert_eq!(
778            &values_string,
779            &[Some("d"), Some("d"), Some("z"), Some("x"), Some("world")]
780        );
781    }
782
783    #[test]
784    fn test_struct_with_nulls_in_values() {
785        let fields = Fields::from(vec![
786            Field::new("number_col", DataType::Int32, true),
787            Field::new("string_col", DataType::Utf8, true),
788        ]);
789        let a = {
790            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
791            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
792
793            StructArray::try_new(
794                fields.clone(),
795                vec![Arc::new(number_col), Arc::new(string_col)],
796                None,
797            )
798            .unwrap()
799        };
800
801        let b = {
802            let number_col = Int32Array::from_iter([Some(1), Some(4), None]);
803            let string_col = StringArray::from(vec![Some("hello"), None, Some("foo")]);
804
805            StructArray::try_new(
806                fields.clone(),
807                vec![Arc::new(number_col), Arc::new(string_col)],
808                None,
809            )
810            .unwrap()
811        };
812
813        let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (1, 1)]).unwrap();
814        let values_struct = values.as_struct();
815        assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
816
817        // The struct itself has no nulls, but the values do
818        assert_eq!(values_struct.null_count(), 0);
819
820        let values_number: Vec<_> = values_struct
821            .column(0)
822            .as_primitive::<Int32Type>()
823            .into_iter()
824            .collect();
825        assert_eq!(values_number, &[Some(2), None, None, Some(4), Some(4)]);
826
827        let values_string = values_struct.column(1).as_string::<i32>();
828        let values_string: Vec<_> = values_string.into_iter().collect();
829        assert_eq!(
830            &values_string,
831            &[Some("b"), Some("foo"), Some("foo"), Some("d"), None]
832        );
833    }
834
835    #[test]
836    fn test_struct_with_nulls() {
837        let fields = Fields::from(vec![
838            Field::new("number_col", DataType::Int32, false),
839            Field::new("string_col", DataType::Utf8, false),
840        ]);
841        let a = {
842            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
843            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
844
845            StructArray::try_new(
846                fields.clone(),
847                vec![Arc::new(number_col), Arc::new(string_col)],
848                None,
849            )
850            .unwrap()
851        };
852
853        let b = {
854            let number_col = Int32Array::from_iter_values([5, 6, 7]);
855            let string_col = StringArray::from_iter_values(["hello", "world", "foo"]);
856
857            StructArray::try_new(
858                fields.clone(),
859                vec![Arc::new(number_col), Arc::new(string_col)],
860                Some(NullBuffer::from(&[true, false, true])),
861            )
862            .unwrap()
863        };
864
865        let c = {
866            let number_col = Int32Array::from_iter_values([8, 9, 10]);
867            let string_col = StringArray::from_iter_values(["x", "y", "z"]);
868
869            StructArray::try_new(
870                fields.clone(),
871                vec![Arc::new(number_col), Arc::new(string_col)],
872                None,
873            )
874            .unwrap()
875        };
876
877        let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (1, 1), (2, 0)]).unwrap();
878        let values_struct = values.as_struct();
879        assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
880
881        let validity: Vec<bool> = {
882            let null_buffer = values_struct.nulls().expect("should_have_nulls");
883
884            null_buffer.iter().collect()
885        };
886        assert_eq!(validity, &[true, true, true, false, true]);
887        let values_number = values_struct.column(0).as_primitive::<Int32Type>();
888        assert_eq!(values_number.values(), &[4, 4, 10, 6, 8]);
889        let values_string = values_struct.column(1).as_string::<i32>();
890        let values_string: Vec<_> = values_string.into_iter().collect();
891        assert_eq!(
892            &values_string,
893            &[Some("d"), Some("d"), Some("z"), Some("world"), Some("x"),]
894        );
895    }
896
897    #[test]
898    fn test_struct_empty() {
899        let fields = Fields::from(vec![
900            Field::new("number_col", DataType::Int32, false),
901            Field::new("string_col", DataType::Utf8, false),
902        ]);
903        let a = {
904            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
905            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
906
907            StructArray::try_new(
908                fields.clone(),
909                vec![Arc::new(number_col), Arc::new(string_col)],
910                None,
911            )
912            .unwrap()
913        };
914        let v = interleave(&[&a], &[]).unwrap();
915        assert!(v.is_empty());
916        assert_eq!(v.data_type(), &DataType::Struct(fields));
917    }
918
919    #[test]
920    fn interleave_sparse_nulls() {
921        let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
922        let keys = Int32Array::from_iter_values(0..10);
923        let dict_a = DictionaryArray::new(keys, Arc::new(values));
924        let values = StringArray::new_null(0);
925        let keys = Int32Array::new_null(10);
926        let dict_b = DictionaryArray::new(keys, Arc::new(values));
927
928        let indices = &[(0, 0), (0, 1), (0, 2), (1, 0)];
929        let array = interleave(&[&dict_a, &dict_b], indices).unwrap();
930
931        let expected =
932            DictionaryArray::<Int32Type>::from_iter(vec![Some("0"), Some("1"), Some("2"), None]);
933        assert_eq!(array.as_ref(), &expected)
934    }
935
936    #[test]
937    fn test_interleave_views() {
938        let values = StringArray::from_iter_values([
939            "hello",
940            "world_long_string_not_inlined",
941            "foo",
942            "bar",
943            "baz",
944        ]);
945        let view_a = StringViewArray::from(&values);
946
947        let values = StringArray::from_iter_values([
948            "test",
949            "data",
950            "more_long_string_not_inlined",
951            "views",
952            "here",
953        ]);
954        let view_b = StringViewArray::from(&values);
955
956        let indices = &[
957            (0, 2), // "foo"
958            (1, 0), // "test"
959            (0, 4), // "baz"
960            (1, 3), // "views"
961            (0, 1), // "world_long_string_not_inlined"
962        ];
963
964        // Test specialized implementation
965        let values = interleave(&[&view_a, &view_b], indices).unwrap();
966        let result = values.as_string_view();
967        assert_eq!(result.data_buffers().len(), 1);
968
969        let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
970        let fallback_result = fallback.as_string_view();
971        // note that fallback_result has 2 buffers, but only one long enough string to warrant a buffer
972        assert_eq!(fallback_result.data_buffers().len(), 2);
973
974        // Convert to strings for easier assertion
975        let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
976
977        let fallback_collected: Vec<_> = fallback_result
978            .iter()
979            .map(|x| x.map(|s| s.to_string()))
980            .collect();
981
982        assert_eq!(&collected, &fallback_collected);
983
984        assert_eq!(
985            &collected,
986            &[
987                Some("foo".to_string()),
988                Some("test".to_string()),
989                Some("baz".to_string()),
990                Some("views".to_string()),
991                Some("world_long_string_not_inlined".to_string()),
992            ]
993        );
994    }
995
996    #[test]
997    fn test_interleave_views_with_nulls() {
998        let values = StringArray::from_iter([
999            Some("hello"),
1000            None,
1001            Some("foo_long_string_not_inlined"),
1002            Some("bar"),
1003            None,
1004        ]);
1005        let view_a = StringViewArray::from(&values);
1006
1007        let values = StringArray::from_iter([
1008            Some("test"),
1009            Some("data_long_string_not_inlined"),
1010            None,
1011            None,
1012            Some("here"),
1013        ]);
1014        let view_b = StringViewArray::from(&values);
1015
1016        let indices = &[
1017            (0, 1), // null
1018            (1, 2), // null
1019            (0, 2), // "foo_long_string_not_inlined"
1020            (1, 3), // null
1021            (0, 4), // null
1022        ];
1023
1024        // Test specialized implementation
1025        let values = interleave(&[&view_a, &view_b], indices).unwrap();
1026        let result = values.as_string_view();
1027        assert_eq!(result.data_buffers().len(), 1);
1028
1029        let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
1030        let fallback_result = fallback.as_string_view();
1031
1032        // Convert to strings for easier assertion
1033        let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
1034
1035        let fallback_collected: Vec<_> = fallback_result
1036            .iter()
1037            .map(|x| x.map(|s| s.to_string()))
1038            .collect();
1039
1040        assert_eq!(&collected, &fallback_collected);
1041
1042        assert_eq!(
1043            &collected,
1044            &[
1045                None,
1046                None,
1047                Some("foo_long_string_not_inlined".to_string()),
1048                None,
1049                None,
1050            ]
1051        );
1052    }
1053
1054    #[test]
1055    fn test_interleave_views_multiple_buffers() {
1056        let str1 = "very_long_string_from_first_buffer".as_bytes();
1057        let str2 = "very_long_string_from_second_buffer".as_bytes();
1058        let buffer1 = str1.to_vec().into();
1059        let buffer2 = str2.to_vec().into();
1060
1061        let view1 = ByteView::new(str1.len() as u32, &str1[..4])
1062            .with_buffer_index(0)
1063            .with_offset(0)
1064            .as_u128();
1065        let view2 = ByteView::new(str2.len() as u32, &str2[..4])
1066            .with_buffer_index(1)
1067            .with_offset(0)
1068            .as_u128();
1069        let view_a =
1070            StringViewArray::try_new(vec![view1, view2].into(), vec![buffer1, buffer2], None)
1071                .unwrap();
1072
1073        let str3 = "another_very_long_string_buffer_three".as_bytes();
1074        let str4 = "different_long_string_in_buffer_four".as_bytes();
1075        let buffer3 = str3.to_vec().into();
1076        let buffer4 = str4.to_vec().into();
1077
1078        let view3 = ByteView::new(str3.len() as u32, &str3[..4])
1079            .with_buffer_index(0)
1080            .with_offset(0)
1081            .as_u128();
1082        let view4 = ByteView::new(str4.len() as u32, &str4[..4])
1083            .with_buffer_index(1)
1084            .with_offset(0)
1085            .as_u128();
1086        let view_b =
1087            StringViewArray::try_new(vec![view3, view4].into(), vec![buffer3, buffer4], None)
1088                .unwrap();
1089
1090        let indices = &[
1091            (0, 0), // String from first buffer of array A
1092            (1, 0), // String from first buffer of array B
1093            (0, 1), // String from second buffer of array A
1094            (1, 1), // String from second buffer of array B
1095            (0, 0), // String from first buffer of array A again
1096            (1, 1), // String from second buffer of array B again
1097        ];
1098
1099        // Test interleave
1100        let values = interleave(&[&view_a, &view_b], indices).unwrap();
1101        let result = values.as_string_view();
1102
1103        assert_eq!(
1104            result.data_buffers().len(),
1105            4,
1106            "Expected four buffers (two from each input array)"
1107        );
1108
1109        let result_strings: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
1110        assert_eq!(
1111            result_strings,
1112            vec![
1113                Some("very_long_string_from_first_buffer".to_string()),
1114                Some("another_very_long_string_buffer_three".to_string()),
1115                Some("very_long_string_from_second_buffer".to_string()),
1116                Some("different_long_string_in_buffer_four".to_string()),
1117                Some("very_long_string_from_first_buffer".to_string()),
1118                Some("different_long_string_in_buffer_four".to_string()),
1119            ]
1120        );
1121
1122        let views = result.views();
1123        let buffer_indices: Vec<_> = views
1124            .iter()
1125            .map(|raw_view| ByteView::from(*raw_view).buffer_index)
1126            .collect();
1127
1128        assert_eq!(
1129            buffer_indices,
1130            vec![
1131                0, // First buffer from array A
1132                1, // First buffer from array B
1133                2, // Second buffer from array A
1134                3, // Second buffer from array B
1135                0, // First buffer from array A (reused)
1136                3, // Second buffer from array B (reused)
1137            ]
1138        );
1139    }
1140
1141    #[test]
1142    fn test_interleave_run_end_encoded_primitive() {
1143        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1144        builder.extend([1, 1, 2, 2, 2, 3].into_iter().map(Some));
1145        let a = builder.finish();
1146
1147        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1148        builder.extend([4, 5, 5, 6, 6, 6].into_iter().map(Some));
1149        let b = builder.finish();
1150
1151        let indices = &[(0, 1), (1, 0), (0, 4), (1, 2), (0, 5)];
1152        let result = interleave(&[&a, &b], indices).unwrap();
1153
1154        // The result should be a RunEndEncoded array
1155        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1156
1157        // Cast to RunArray to access values
1158        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1159
1160        // Verify the logical values by accessing the logical array directly
1161        let expected = vec![1, 4, 2, 5, 3];
1162        let mut actual = Vec::new();
1163        for i in 0..result_run_array.len() {
1164            let physical_idx = result_run_array.get_physical_index(i);
1165            let value = result_run_array
1166                .values()
1167                .as_primitive::<Int32Type>()
1168                .value(physical_idx);
1169            actual.push(value);
1170        }
1171        assert_eq!(actual, expected);
1172    }
1173
1174    #[test]
1175    fn test_interleave_run_end_encoded_string() {
1176        let a: Int32RunArray = vec!["hello", "hello", "world", "world", "foo"]
1177            .into_iter()
1178            .collect();
1179        let b: Int32RunArray = vec!["bar", "baz", "baz", "qux"].into_iter().collect();
1180
1181        let indices = &[(0, 0), (1, 1), (0, 3), (1, 3), (0, 4)];
1182        let result = interleave(&[&a, &b], indices).unwrap();
1183
1184        // The result should be a RunEndEncoded array
1185        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1186
1187        // Cast to RunArray to access values
1188        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1189
1190        // Verify the logical values by accessing the logical array directly
1191        let expected = vec!["hello", "baz", "world", "qux", "foo"];
1192        let mut actual = Vec::new();
1193        for i in 0..result_run_array.len() {
1194            let physical_idx = result_run_array.get_physical_index(i);
1195            let value = result_run_array
1196                .values()
1197                .as_string::<i32>()
1198                .value(physical_idx);
1199            actual.push(value);
1200        }
1201        assert_eq!(actual, expected);
1202    }
1203
1204    #[test]
1205    fn test_interleave_run_end_encoded_with_nulls() {
1206        let a: Int32RunArray = vec![Some("a"), Some("a"), None, None, Some("b")]
1207            .into_iter()
1208            .collect();
1209        let b: Int32RunArray = vec![None, Some("c"), Some("c"), Some("d")]
1210            .into_iter()
1211            .collect();
1212
1213        let indices = &[(0, 1), (1, 0), (0, 2), (1, 3), (0, 4)];
1214        let result = interleave(&[&a, &b], indices).unwrap();
1215
1216        // The result should be a RunEndEncoded array
1217        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1218
1219        // Cast to RunArray to access values
1220        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1221
1222        // Verify the logical values by accessing the logical array directly
1223        let expected = vec![Some("a"), None, None, Some("d"), Some("b")];
1224        let mut actual = Vec::new();
1225        for i in 0..result_run_array.len() {
1226            let physical_idx = result_run_array.get_physical_index(i);
1227            if result_run_array.values().is_null(physical_idx) {
1228                actual.push(None);
1229            } else {
1230                let value = result_run_array
1231                    .values()
1232                    .as_string::<i32>()
1233                    .value(physical_idx);
1234                actual.push(Some(value));
1235            }
1236        }
1237        assert_eq!(actual, expected);
1238    }
1239
1240    #[test]
1241    fn test_interleave_run_end_encoded_different_run_types() {
1242        let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1243        builder.extend([1, 1, 2, 3, 3].into_iter().map(Some));
1244        let a = builder.finish();
1245
1246        let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1247        builder.extend([4, 5, 5, 6].into_iter().map(Some));
1248        let b = builder.finish();
1249
1250        let indices = &[(0, 0), (1, 1), (0, 3), (1, 3)];
1251        let result = interleave(&[&a, &b], indices).unwrap();
1252
1253        // The result should be a RunEndEncoded array
1254        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1255
1256        // Cast to RunArray to access values
1257        let result_run_array: &RunArray<Int16Type> = result.as_any().downcast_ref().unwrap();
1258
1259        // Verify the logical values by accessing the logical array directly
1260        let expected = vec![1, 5, 3, 6];
1261        let mut actual = Vec::new();
1262        for i in 0..result_run_array.len() {
1263            let physical_idx = result_run_array.get_physical_index(i);
1264            let value = result_run_array
1265                .values()
1266                .as_primitive::<Int32Type>()
1267                .value(physical_idx);
1268            actual.push(value);
1269        }
1270        assert_eq!(actual, expected);
1271    }
1272
1273    #[test]
1274    fn test_interleave_run_end_encoded_mixed_run_lengths() {
1275        let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
1276        builder.extend([1, 2, 2, 2, 2, 3, 3, 4].into_iter().map(Some));
1277        let a = builder.finish();
1278
1279        let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
1280        builder.extend([5, 5, 5, 6, 7, 7, 8, 8].into_iter().map(Some));
1281        let b = builder.finish();
1282
1283        let indices = &[
1284            (0, 0), // 1
1285            (1, 2), // 5
1286            (0, 3), // 2
1287            (1, 3), // 6
1288            (0, 6), // 3
1289            (1, 6), // 8
1290            (0, 7), // 4
1291            (1, 4), // 7
1292        ];
1293        let result = interleave(&[&a, &b], indices).unwrap();
1294
1295        // The result should be a RunEndEncoded array
1296        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1297
1298        // Cast to RunArray to access values
1299        let result_run_array: &RunArray<Int64Type> = result.as_any().downcast_ref().unwrap();
1300
1301        // Verify the logical values by accessing the logical array directly
1302        let expected = vec![1, 5, 2, 6, 3, 8, 4, 7];
1303        let mut actual = Vec::new();
1304        for i in 0..result_run_array.len() {
1305            let physical_idx = result_run_array.get_physical_index(i);
1306            let value = result_run_array
1307                .values()
1308                .as_primitive::<Int32Type>()
1309                .value(physical_idx);
1310            actual.push(value);
1311        }
1312        assert_eq!(actual, expected);
1313    }
1314
1315    #[test]
1316    fn test_interleave_run_end_encoded_empty_runs() {
1317        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1318        builder.extend([1].into_iter().map(Some));
1319        let a = builder.finish();
1320
1321        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1322        builder.extend([2, 2, 2].into_iter().map(Some));
1323        let b = builder.finish();
1324
1325        let indices = &[(0, 0), (1, 1), (1, 2)];
1326        let result = interleave(&[&a, &b], indices).unwrap();
1327
1328        // The result should be a RunEndEncoded array
1329        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1330
1331        // Cast to RunArray to access values
1332        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1333
1334        // Verify the logical values by accessing the logical array directly
1335        let expected = vec![1, 2, 2];
1336        let mut actual = Vec::new();
1337        for i in 0..result_run_array.len() {
1338            let physical_idx = result_run_array.get_physical_index(i);
1339            let value = result_run_array
1340                .values()
1341                .as_primitive::<Int32Type>()
1342                .value(physical_idx);
1343            actual.push(value);
1344        }
1345        assert_eq!(actual, expected);
1346    }
1347
1348    #[test]
1349    fn test_struct_no_fields() {
1350        let fields = Fields::empty();
1351        let a = StructArray::try_new_with_length(fields.clone(), vec![], None, 10).unwrap();
1352        let v = interleave(&[&a], &[(0, 0)]).unwrap();
1353        assert_eq!(v.len(), 1);
1354        assert_eq!(v.data_type(), &DataType::Struct(fields));
1355    }
1356
1357    #[test]
1358    fn test_interleave_fallback_dictionary_with_nulls() {
1359        let input_1_keys = Int32Array::from_iter([Some(0), None, Some(1)]);
1360        let input_1_values = StringArray::from_iter_values(["foo", "bar"]);
1361        let dict_a = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
1362
1363        let input_2_keys = Int32Array::from_iter([Some(0), Some(1), None]);
1364        let input_2_values = StringArray::from_iter_values(["baz", "qux"]);
1365        let dict_b = DictionaryArray::new(input_2_keys, Arc::new(input_2_values));
1366
1367        let indices = vec![
1368            (0, 0), // "foo"
1369            (0, 1), // null
1370            (1, 0), // "baz"
1371            (1, 2), // null
1372            (0, 2), // "bar"
1373            (1, 1), // "qux"
1374        ];
1375
1376        let result =
1377            interleave_fallback_dictionary::<Int32Type>(&[&dict_a, &dict_b], &indices).unwrap();
1378        let dict_result = result.as_dictionary::<Int32Type>();
1379
1380        let string_result = dict_result.downcast_dict::<StringArray>().unwrap();
1381        let collected: Vec<_> = string_result.into_iter().collect();
1382        assert_eq!(
1383            collected,
1384            vec![
1385                Some("foo"),
1386                None,
1387                Some("baz"),
1388                None,
1389                Some("bar"),
1390                Some("qux")
1391            ]
1392        );
1393    }
1394}