Skip to main content

arrow_select/
interleave.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Interleave elements from multiple arrays
19
20use crate::concat::concat;
21use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
22use arrow_array::builder::{BooleanBufferBuilder, PrimitiveBuilder};
23use arrow_array::cast::AsArray;
24use arrow_array::types::*;
25use arrow_array::*;
26use arrow_buffer::{ArrowNativeType, BooleanBuffer, MutableBuffer, NullBuffer, OffsetBuffer};
27use arrow_data::ByteView;
28use arrow_data::transform::MutableArrayData;
29use arrow_schema::{ArrowError, DataType, FieldRef, Fields};
30use std::sync::Arc;
31
32macro_rules! primitive_helper {
33    ($t:ty, $values:ident, $indices:ident, $data_type:ident) => {
34        interleave_primitive::<$t>($values, $indices, $data_type)
35    };
36}
37
38macro_rules! dict_helper {
39    ($t:ty, $values:expr, $indices:expr) => {
40        interleave_dictionaries::<$t>($values, $indices)
41    };
42}
43
44///
45/// Takes elements by index from a list of [`Array`], creating a new [`Array`] from those values.
46///
47/// Each element in `indices` is a pair of `usize` with the first identifying the index
48/// of the [`Array`] in `values`, and the second the index of the value within that [`Array`]
49///
50/// ```text
51/// ┌─────────────────┐      ┌─────────┐                                  ┌─────────────────┐
52/// │        A        │      │ (0, 0)  │        interleave(               │        A        │
53/// ├─────────────────┤      ├─────────┤          [values0, values1],     ├─────────────────┤
54/// │        D        │      │ (1, 0)  │          indices                 │        B        │
55/// └─────────────────┘      ├─────────┤        )                         ├─────────────────┤
56///   values array 0         │ (1, 1)  │      ─────────────────────────▶  │        C        │
57///                          ├─────────┤                                  ├─────────────────┤
58///                          │ (0, 1)  │                                  │        D        │
59///                          └─────────┘                                  └─────────────────┘
60/// ┌─────────────────┐       indices
61/// │        B        │        array
62/// ├─────────────────┤                                                    result
63/// │        C        │
64/// ├─────────────────┤
65/// │        E        │
66/// └─────────────────┘
67///   values array 1
68/// ```
69///
70/// For selecting values by index from a single array see [`crate::take`]
71pub fn interleave(
72    values: &[&dyn Array],
73    indices: &[(usize, usize)],
74) -> Result<ArrayRef, ArrowError> {
75    if values.is_empty() {
76        return Err(ArrowError::InvalidArgumentError(
77            "interleave requires input of at least one array".to_string(),
78        ));
79    }
80    let data_type = values[0].data_type();
81
82    for array in values.iter().skip(1) {
83        if array.data_type() != data_type {
84            return Err(ArrowError::InvalidArgumentError(format!(
85                "It is not possible to interleave arrays of different data types ({} and {})",
86                data_type,
87                array.data_type()
88            )));
89        }
90    }
91
92    if indices.is_empty() {
93        return Ok(new_empty_array(data_type));
94    }
95
96    downcast_primitive! {
97        data_type => (primitive_helper, values, indices, data_type),
98        DataType::Utf8 => interleave_bytes::<Utf8Type>(values, indices),
99        DataType::LargeUtf8 => interleave_bytes::<LargeUtf8Type>(values, indices),
100        DataType::Binary => interleave_bytes::<BinaryType>(values, indices),
101        DataType::LargeBinary => interleave_bytes::<LargeBinaryType>(values, indices),
102        DataType::BinaryView => interleave_views::<BinaryViewType>(values, indices),
103        DataType::Utf8View => interleave_views::<StringViewType>(values, indices),
104        DataType::Dictionary(k, _) => downcast_integer! {
105            k.as_ref() => (dict_helper, values, indices),
106            _ => unreachable!("illegal dictionary key type {k}")
107        },
108        DataType::Struct(fields) => interleave_struct(fields, values, indices),
109        DataType::List(field) => interleave_list::<i32>(values, indices, field),
110        DataType::LargeList(field) => interleave_list::<i64>(values, indices, field),
111        _ => interleave_fallback(values, indices)
112    }
113}
114
115/// Common functionality for interleaving arrays
116///
117/// T is the concrete Array type
118struct Interleave<'a, T> {
119    /// The input arrays downcast to T
120    arrays: Vec<&'a T>,
121    /// The null buffer of the interleaved output
122    nulls: Option<NullBuffer>,
123}
124
125impl<'a, T: Array + 'static> Interleave<'a, T> {
126    fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) -> Self {
127        let mut has_nulls = false;
128        let arrays: Vec<&T> = values
129            .iter()
130            .map(|x| {
131                has_nulls = has_nulls || x.null_count() != 0;
132                x.as_any().downcast_ref().unwrap()
133            })
134            .collect();
135
136        let nulls = match has_nulls {
137            true => {
138                let nulls = BooleanBuffer::collect_bool(indices.len(), |i| {
139                    let (a, b) = indices[i];
140                    arrays[a].is_valid(b)
141                });
142                Some(nulls.into())
143            }
144            false => None,
145        };
146
147        Self { arrays, nulls }
148    }
149}
150
151fn interleave_primitive<T: ArrowPrimitiveType>(
152    values: &[&dyn Array],
153    indices: &[(usize, usize)],
154    data_type: &DataType,
155) -> Result<ArrayRef, ArrowError> {
156    let interleaved = Interleave::<'_, PrimitiveArray<T>>::new(values, indices);
157    let arrays = &interleaved.arrays;
158    let len = indices.len();
159
160    let mut output = Vec::with_capacity(len);
161    let dst: *mut T::Native = output.as_mut_ptr();
162    let mut base = 0;
163
164    // Process 8 elements at a time to issue multiple independent loads
165    // and increase memory-level parallelism for random access patterns.
166    let chunks = indices.chunks_exact(8);
167    let remainder = chunks.remainder();
168    for chunk in chunks {
169        let v0 = arrays[chunk[0].0].value(chunk[0].1);
170        let v1 = arrays[chunk[1].0].value(chunk[1].1);
171        let v2 = arrays[chunk[2].0].value(chunk[2].1);
172        let v3 = arrays[chunk[3].0].value(chunk[3].1);
173        let v4 = arrays[chunk[4].0].value(chunk[4].1);
174        let v5 = arrays[chunk[5].0].value(chunk[5].1);
175        let v6 = arrays[chunk[6].0].value(chunk[6].1);
176        let v7 = arrays[chunk[7].0].value(chunk[7].1);
177
178        // SAFETY: base+7 < len == output capacity
179        debug_assert!(base + 7 < len);
180        unsafe {
181            dst.add(base).write(v0);
182            dst.add(base + 1).write(v1);
183            dst.add(base + 2).write(v2);
184            dst.add(base + 3).write(v3);
185            dst.add(base + 4).write(v4);
186            dst.add(base + 5).write(v5);
187            dst.add(base + 6).write(v6);
188            dst.add(base + 7).write(v7);
189        }
190        base += 8;
191    }
192
193    for idx in remainder {
194        // SAFETY: base < len == output capacity
195        debug_assert!(base < len);
196        unsafe { dst.add(base).write(arrays[idx.0].value(idx.1)) };
197        base += 1;
198    }
199
200    // SAFETY: all `len` elements have been initialized
201    debug_assert!(base == len);
202    unsafe { output.set_len(len) };
203
204    let array = PrimitiveArray::<T>::try_new(output.into(), interleaved.nulls)?;
205    Ok(Arc::new(array.with_data_type(data_type.clone())))
206}
207
208fn interleave_bytes<T: ByteArrayType>(
209    values: &[&dyn Array],
210    indices: &[(usize, usize)],
211) -> Result<ArrayRef, ArrowError> {
212    let interleaved = Interleave::<'_, GenericByteArray<T>>::new(values, indices);
213
214    let mut capacity = 0;
215    let mut offsets = Vec::with_capacity(indices.len() + 1);
216    offsets.push(T::Offset::from_usize(0).unwrap());
217    for (a, b) in indices {
218        let o = interleaved.arrays[*a].value_offsets();
219        let element_len = o[*b + 1].as_usize() - o[*b].as_usize();
220        capacity += element_len;
221        offsets.push(
222            T::Offset::from_usize(capacity)
223                .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
224        );
225    }
226
227    let mut values = Vec::with_capacity(capacity);
228    for (a, b) in indices {
229        values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
230    }
231
232    // Safety: safe by construction
233    let array = unsafe {
234        let offsets = OffsetBuffer::new_unchecked(offsets.into());
235        GenericByteArray::<T>::new_unchecked(offsets, values.into(), interleaved.nulls)
236    };
237    Ok(Arc::new(array))
238}
239
240fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
241    arrays: &[&dyn Array],
242    indices: &[(usize, usize)],
243) -> Result<ArrayRef, ArrowError> {
244    let dictionaries: Vec<_> = arrays.iter().map(|x| x.as_dictionary::<K>()).collect();
245    let (should_merge, has_overflow) =
246        should_merge_dictionary_values::<K>(&dictionaries, indices.len());
247    if !should_merge {
248        return if has_overflow {
249            interleave_fallback(arrays, indices)
250        } else {
251            interleave_fallback_dictionary::<K>(&dictionaries, indices)
252        };
253    }
254
255    let masks: Vec<_> = dictionaries
256        .iter()
257        .enumerate()
258        .map(|(a_idx, dictionary)| {
259            let mut key_mask = BooleanBufferBuilder::new_from_buffer(
260                MutableBuffer::new_null(dictionary.len()),
261                dictionary.len(),
262            );
263
264            for (_, key_idx) in indices.iter().filter(|(a, _)| *a == a_idx) {
265                key_mask.set_bit(*key_idx, true);
266            }
267            key_mask.finish()
268        })
269        .collect();
270
271    let merged = merge_dictionary_values(&dictionaries, Some(&masks))?;
272
273    // Recompute keys
274    let mut keys = PrimitiveBuilder::<K>::with_capacity(indices.len());
275    for (a, b) in indices {
276        let old_keys: &PrimitiveArray<K> = dictionaries[*a].keys();
277        match old_keys.is_valid(*b) {
278            true => {
279                let old_key = old_keys.values()[*b];
280                keys.append_value(merged.key_mappings[*a][old_key.as_usize()])
281            }
282            false => keys.append_null(),
283        }
284    }
285    let array = unsafe { DictionaryArray::new_unchecked(keys.finish(), merged.values) };
286    Ok(Arc::new(array))
287}
288
289fn interleave_views<T: ByteViewType>(
290    values: &[&dyn Array],
291    indices: &[(usize, usize)],
292) -> Result<ArrayRef, ArrowError> {
293    let interleaved = Interleave::<'_, GenericByteViewArray<T>>::new(values, indices);
294    let mut buffers = Vec::new();
295
296    // Contains the offsets of start buffer in `buffer_to_new_index`
297    let mut offsets = Vec::with_capacity(interleaved.arrays.len() + 1);
298    offsets.push(0);
299    let mut total_buffers = 0;
300    for a in interleaved.arrays.iter() {
301        total_buffers += a.data_buffers().len();
302        offsets.push(total_buffers);
303    }
304
305    // contains the mapping from old buffer index to new buffer index
306    let mut buffer_to_new_index = vec![None; total_buffers];
307
308    let views: Vec<u128> = indices
309        .iter()
310        .map(|(array_idx, value_idx)| {
311            let array = interleaved.arrays[*array_idx];
312            let view = array.views().get(*value_idx).unwrap();
313            let view_len = *view as u32;
314            if view_len <= 12 {
315                return *view;
316            }
317            // value is big enough to be in a variadic buffer
318            let view = ByteView::from(*view);
319            let buffer_to_new_idx = offsets[*array_idx] + view.buffer_index as usize;
320            let new_buffer_idx: u32 =
321                *buffer_to_new_index[buffer_to_new_idx].get_or_insert_with(|| {
322                    buffers.push(array.data_buffers()[view.buffer_index as usize].clone());
323                    (buffers.len() - 1) as u32
324                });
325            view.with_buffer_index(new_buffer_idx).as_u128()
326        })
327        .collect();
328
329    let array = unsafe {
330        GenericByteViewArray::<T>::new_unchecked(views.into(), buffers, interleaved.nulls)
331    };
332    Ok(Arc::new(array))
333}
334
335fn interleave_struct(
336    fields: &Fields,
337    values: &[&dyn Array],
338    indices: &[(usize, usize)],
339) -> Result<ArrayRef, ArrowError> {
340    let interleaved = Interleave::<'_, StructArray>::new(values, indices);
341
342    if fields.is_empty() {
343        let array = StructArray::try_new_with_length(
344            fields.clone(),
345            vec![],
346            interleaved.nulls,
347            indices.len(),
348        )?;
349        return Ok(Arc::new(array));
350    }
351
352    let struct_fields_array: Result<Vec<_>, _> = (0..fields.len())
353        .map(|i| {
354            let field_values: Vec<&dyn Array> = interleaved
355                .arrays
356                .iter()
357                .map(|x| x.column(i).as_ref())
358                .collect();
359            interleave(&field_values, indices)
360        })
361        .collect();
362
363    let struct_array =
364        StructArray::try_new(fields.clone(), struct_fields_array?, interleaved.nulls)?;
365    Ok(Arc::new(struct_array))
366}
367
368fn interleave_list<O: OffsetSizeTrait>(
369    values: &[&dyn Array],
370    indices: &[(usize, usize)],
371    field: &FieldRef,
372) -> Result<ArrayRef, ArrowError> {
373    let interleaved = Interleave::<'_, GenericListArray<O>>::new(values, indices);
374
375    let mut capacity = 0usize;
376    let mut offsets = Vec::with_capacity(indices.len() + 1);
377    offsets.push(O::from_usize(0).unwrap());
378    for (array, row) in indices {
379        let o = interleaved.arrays[*array].value_offsets();
380        let element_len = o[*row + 1].as_usize() - o[*row].as_usize();
381        capacity += element_len;
382        offsets.push(
383            O::from_usize(capacity).ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
384        );
385    }
386
387    let mut child_indices = Vec::with_capacity(capacity);
388    for (array, row) in indices {
389        let list = interleaved.arrays[*array];
390        let start = list.value_offsets()[*row].as_usize();
391        let end = list.value_offsets()[*row + 1].as_usize();
392        child_indices.extend((start..end).map(|i| (*array, i)));
393    }
394
395    let child_arrays: Vec<&dyn Array> = interleaved
396        .arrays
397        .iter()
398        .map(|list| list.values().as_ref())
399        .collect();
400
401    let interleaved_values = interleave(&child_arrays, &child_indices)?;
402
403    let offsets = OffsetBuffer::new(offsets.into());
404    let list_array = GenericListArray::<O>::new(
405        field.clone(),
406        offsets,
407        interleaved_values,
408        interleaved.nulls,
409    );
410
411    Ok(Arc::new(list_array))
412}
413
414/// Fallback implementation of interleave using [`MutableArrayData`]
415fn interleave_fallback(
416    values: &[&dyn Array],
417    indices: &[(usize, usize)],
418) -> Result<ArrayRef, ArrowError> {
419    let arrays: Vec<_> = values.iter().map(|x| x.to_data()).collect();
420    let arrays: Vec<_> = arrays.iter().collect();
421    let mut array_data = MutableArrayData::new(arrays, false, indices.len());
422
423    let mut cur_array = indices[0].0;
424    let mut start_row_idx = indices[0].1;
425    let mut end_row_idx = start_row_idx + 1;
426
427    for (array, row) in indices.iter().skip(1).copied() {
428        if array == cur_array && row == end_row_idx {
429            // subsequent row in same batch
430            end_row_idx += 1;
431            continue;
432        }
433
434        // emit current batch of rows for current buffer
435        array_data.extend(cur_array, start_row_idx, end_row_idx);
436
437        // start new batch of rows
438        cur_array = array;
439        start_row_idx = row;
440        end_row_idx = start_row_idx + 1;
441    }
442
443    // emit final batch of rows
444    array_data.extend(cur_array, start_row_idx, end_row_idx);
445    Ok(make_array(array_data.freeze()))
446}
447
448/// Fallback implementation for interleaving dictionaries when it was determined
449/// that the dictionary values should not be merged. This implementation concatenates
450/// the value slices and recomputes the resulting dictionary keys.
451///
452/// # Panics
453///
454/// This function assumes that the combined dictionary values will not overflow the
455/// key type. Callers must verify this condition [`should_merge_dictionary_values`]
456/// before calling this function.
457fn interleave_fallback_dictionary<K: ArrowDictionaryKeyType>(
458    dictionaries: &[&DictionaryArray<K>],
459    indices: &[(usize, usize)],
460) -> Result<ArrayRef, ArrowError> {
461    let relative_offsets: Vec<usize> = dictionaries
462        .iter()
463        .scan(0usize, |offset, dict| {
464            let current = *offset;
465            *offset += dict.values().len();
466            Some(current)
467        })
468        .collect();
469    let all_values: Vec<&dyn Array> = dictionaries.iter().map(|d| d.values().as_ref()).collect();
470    let concatenated_values = concat(&all_values)?;
471
472    let any_nulls = dictionaries.iter().any(|d| d.keys().nulls().is_some());
473    let (new_keys, nulls) = if any_nulls {
474        let mut has_nulls = false;
475        let new_keys: Vec<K::Native> = indices
476            .iter()
477            .map(|(array, row)| {
478                let old_keys = dictionaries[*array].keys();
479                if old_keys.is_valid(*row) {
480                    let old_key = old_keys.values()[*row].as_usize();
481                    K::Native::from_usize(relative_offsets[*array] + old_key)
482                        .expect("key overflow should be checked by caller")
483                } else {
484                    has_nulls = true;
485                    K::Native::ZERO
486                }
487            })
488            .collect();
489
490        let nulls = if has_nulls {
491            let null_buffer = BooleanBuffer::collect_bool(indices.len(), |i| {
492                let (array, row) = indices[i];
493                dictionaries[array].keys().is_valid(row)
494            });
495            Some(NullBuffer::new(null_buffer))
496        } else {
497            None
498        };
499        (new_keys, nulls)
500    } else {
501        let new_keys: Vec<K::Native> = indices
502            .iter()
503            .map(|(array, row)| {
504                let old_key = dictionaries[*array].keys().values()[*row].as_usize();
505                K::Native::from_usize(relative_offsets[*array] + old_key)
506                    .expect("key overflow should be checked by caller")
507            })
508            .collect();
509        (new_keys, None)
510    };
511
512    let keys_array = PrimitiveArray::<K>::new(new_keys.into(), nulls);
513    // SAFETY: keys_array is constructed from a valid set of keys.
514    let array = unsafe { DictionaryArray::new_unchecked(keys_array, concatenated_values) };
515    Ok(Arc::new(array))
516}
517
518/// Interleave rows by index from multiple [`RecordBatch`] instances and return a new [`RecordBatch`].
519///
520/// This function will call [`interleave`] on each array of the [`RecordBatch`] instances and assemble a new [`RecordBatch`].
521///
522/// # Example
523/// ```
524/// # use std::sync::Arc;
525/// # use arrow_array::{StringArray, Int32Array, RecordBatch, UInt32Array};
526/// # use arrow_schema::{DataType, Field, Schema};
527/// # use arrow_select::interleave::interleave_record_batch;
528///
529/// let schema = Arc::new(Schema::new(vec![
530///     Field::new("a", DataType::Int32, true),
531///     Field::new("b", DataType::Utf8, true),
532/// ]));
533///
534/// let batch1 = RecordBatch::try_new(
535///     schema.clone(),
536///     vec![
537///         Arc::new(Int32Array::from(vec![0, 1, 2])),
538///         Arc::new(StringArray::from(vec!["a", "b", "c"])),
539///     ],
540/// ).unwrap();
541///
542/// let batch2 = RecordBatch::try_new(
543///     schema.clone(),
544///     vec![
545///         Arc::new(Int32Array::from(vec![3, 4, 5])),
546///         Arc::new(StringArray::from(vec!["d", "e", "f"])),
547///     ],
548/// ).unwrap();
549///
550/// let indices = vec![(0, 1), (1, 2), (0, 0), (1, 1)];
551/// let interleaved = interleave_record_batch(&[&batch1, &batch2], &indices).unwrap();
552///
553/// let expected = RecordBatch::try_new(
554///     schema,
555///     vec![
556///         Arc::new(Int32Array::from(vec![1, 5, 0, 4])),
557///         Arc::new(StringArray::from(vec!["b", "f", "a", "e"])),
558///     ],
559/// ).unwrap();
560/// assert_eq!(interleaved, expected);
561/// ```
562pub fn interleave_record_batch(
563    record_batches: &[&RecordBatch],
564    indices: &[(usize, usize)],
565) -> Result<RecordBatch, ArrowError> {
566    let schema = record_batches[0].schema();
567    let columns = (0..schema.fields().len())
568        .map(|i| {
569            let column_values: Vec<&dyn Array> = record_batches
570                .iter()
571                .map(|batch| batch.column(i).as_ref())
572                .collect();
573            interleave(&column_values, indices)
574        })
575        .collect::<Result<Vec<_>, _>>()?;
576    RecordBatch::try_new(schema, columns)
577}
578
579#[cfg(test)]
580mod tests {
581    use super::*;
582    use arrow_array::Int32RunArray;
583    use arrow_array::builder::{GenericListBuilder, Int32Builder, PrimitiveRunBuilder};
584    use arrow_array::types::Int8Type;
585    use arrow_buffer::ScalarBuffer;
586    use arrow_schema::Field;
587
588    #[test]
589    fn test_primitive() {
590        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
591        let b = Int32Array::from_iter_values([5, 6, 7]);
592        let c = Int32Array::from_iter_values([8, 9, 10]);
593        let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
594        let v = values.as_primitive::<Int32Type>();
595        assert_eq!(v.values(), &[4, 4, 10, 8, 6]);
596    }
597
598    #[test]
599    fn test_primitive_nulls() {
600        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
601        let b = Int32Array::from_iter([Some(1), Some(4), None]);
602        let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (0, 2)]).unwrap();
603        let v: Vec<_> = values.as_primitive::<Int32Type>().into_iter().collect();
604        assert_eq!(&v, &[Some(2), None, None, Some(4), Some(3)])
605    }
606
607    #[test]
608    fn test_primitive_empty() {
609        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
610        let v = interleave(&[&a], &[]).unwrap();
611        assert!(v.is_empty());
612        assert_eq!(v.data_type(), &DataType::Int32);
613    }
614
615    #[test]
616    fn test_strings() {
617        let a = StringArray::from_iter_values(["a", "b", "c"]);
618        let b = StringArray::from_iter_values(["hello", "world", "foo"]);
619        let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
620        let v = values.as_string::<i32>();
621        let values: Vec<_> = v.into_iter().collect();
622        assert_eq!(
623            &values,
624            &[
625                Some("c"),
626                Some("c"),
627                Some("hello"),
628                Some("world"),
629                Some("b")
630            ]
631        )
632    }
633
634    #[test]
635    fn test_interleave_dictionary() {
636        let a = DictionaryArray::<Int32Type>::from_iter(["a", "b", "c", "a", "b"]);
637        let b = DictionaryArray::<Int32Type>::from_iter(["a", "c", "a", "c", "a"]);
638
639        // Should not recompute dictionary
640        let values =
641            interleave(&[&a, &b], &[(0, 2), (0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
642        let v = values.as_dictionary::<Int32Type>();
643        assert_eq!(v.values().len(), 5);
644
645        let vc = v.downcast_dict::<StringArray>().unwrap();
646        let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
647        assert_eq!(&collected, &["c", "c", "c", "a", "c", "b"]);
648
649        // Should recompute dictionary
650        let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 1)]).unwrap();
651        let v = values.as_dictionary::<Int32Type>();
652        assert_eq!(v.values().len(), 1);
653
654        let vc = v.downcast_dict::<StringArray>().unwrap();
655        let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
656        assert_eq!(&collected, &["c", "c", "c"]);
657    }
658
659    #[test]
660    fn test_interleave_dictionary_nulls() {
661        let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]);
662        let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]);
663        let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
664        let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
665
666        let expected = vec![Some("fiz"), None, None, Some("foo")];
667
668        let values = interleave(
669            &[&input_1 as _, &input_2 as _],
670            &[(0, 3), (0, 2), (1, 0), (0, 0)],
671        )
672        .unwrap();
673        let dictionary = values.as_dictionary::<Int32Type>();
674        let actual: Vec<Option<&str>> = dictionary
675            .downcast_dict::<StringArray>()
676            .unwrap()
677            .into_iter()
678            .collect();
679
680        assert_eq!(actual, expected);
681    }
682
683    #[test]
684    fn test_interleave_dictionary_overflow_same_values() {
685        let values: ArrayRef = Arc::new(StringArray::from_iter_values(
686            (0..50).map(|i| format!("v{i}")),
687        ));
688
689        // With 3 dictionaries of 50 values each, relative_offsets = [0, 50, 100]
690        // Accessing key 49 from dict3 gives 100 + 49 = 149 which overflows Int8
691        // (max 127).
692        // This test case falls back to interleave_fallback because the
693        // dictionaries share the same underlying values slice.
694        let dict1 = DictionaryArray::<Int8Type>::new(
695            Int8Array::from_iter_values([0, 1, 2]),
696            values.clone(),
697        );
698        let dict2 = DictionaryArray::<Int8Type>::new(
699            Int8Array::from_iter_values([0, 1, 2]),
700            values.clone(),
701        );
702        let dict3 =
703            DictionaryArray::<Int8Type>::new(Int8Array::from_iter_values([49]), values.clone());
704
705        let indices = &[(0, 0), (1, 0), (2, 0)];
706        let result = interleave(&[&dict1, &dict2, &dict3], indices).unwrap();
707
708        let dict_result = result.as_dictionary::<Int8Type>();
709        let string_result: Vec<_> = dict_result
710            .downcast_dict::<StringArray>()
711            .unwrap()
712            .into_iter()
713            .map(|x| x.unwrap())
714            .collect();
715        assert_eq!(string_result, vec!["v0", "v0", "v49"]);
716    }
717
718    fn test_interleave_lists<O: OffsetSizeTrait>() {
719        // [[1, 2], null, [3]]
720        let mut a = GenericListBuilder::<O, _>::new(Int32Builder::new());
721        a.values().append_value(1);
722        a.values().append_value(2);
723        a.append(true);
724        a.append(false);
725        a.values().append_value(3);
726        a.append(true);
727        let a = a.finish();
728
729        // [[4], null, [5, 6, null]]
730        let mut b = GenericListBuilder::<O, _>::new(Int32Builder::new());
731        b.values().append_value(4);
732        b.append(true);
733        b.append(false);
734        b.values().append_value(5);
735        b.values().append_value(6);
736        b.values().append_null();
737        b.append(true);
738        let b = b.finish();
739
740        let values = interleave(&[&a, &b], &[(0, 2), (0, 1), (1, 0), (1, 2), (1, 1)]).unwrap();
741        let v = values
742            .as_any()
743            .downcast_ref::<GenericListArray<O>>()
744            .unwrap();
745
746        // [[3], null, [4], [5, 6, null], null]
747        let mut expected = GenericListBuilder::<O, _>::new(Int32Builder::new());
748        expected.values().append_value(3);
749        expected.append(true);
750        expected.append(false);
751        expected.values().append_value(4);
752        expected.append(true);
753        expected.values().append_value(5);
754        expected.values().append_value(6);
755        expected.values().append_null();
756        expected.append(true);
757        expected.append(false);
758        let expected = expected.finish();
759
760        assert_eq!(v, &expected);
761    }
762
763    #[test]
764    fn test_lists() {
765        test_interleave_lists::<i32>();
766    }
767
768    #[test]
769    fn test_large_lists() {
770        test_interleave_lists::<i64>();
771    }
772
773    #[test]
774    fn test_struct_without_nulls() {
775        let fields = Fields::from(vec![
776            Field::new("number_col", DataType::Int32, false),
777            Field::new("string_col", DataType::Utf8, false),
778        ]);
779        let a = {
780            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
781            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
782
783            StructArray::try_new(
784                fields.clone(),
785                vec![Arc::new(number_col), Arc::new(string_col)],
786                None,
787            )
788            .unwrap()
789        };
790
791        let b = {
792            let number_col = Int32Array::from_iter_values([5, 6, 7]);
793            let string_col = StringArray::from_iter_values(["hello", "world", "foo"]);
794
795            StructArray::try_new(
796                fields.clone(),
797                vec![Arc::new(number_col), Arc::new(string_col)],
798                None,
799            )
800            .unwrap()
801        };
802
803        let c = {
804            let number_col = Int32Array::from_iter_values([8, 9, 10]);
805            let string_col = StringArray::from_iter_values(["x", "y", "z"]);
806
807            StructArray::try_new(
808                fields.clone(),
809                vec![Arc::new(number_col), Arc::new(string_col)],
810                None,
811            )
812            .unwrap()
813        };
814
815        let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
816        let values_struct = values.as_struct();
817        assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
818        assert_eq!(values_struct.null_count(), 0);
819
820        let values_number = values_struct.column(0).as_primitive::<Int32Type>();
821        assert_eq!(values_number.values(), &[4, 4, 10, 8, 6]);
822        let values_string = values_struct.column(1).as_string::<i32>();
823        let values_string: Vec<_> = values_string.into_iter().collect();
824        assert_eq!(
825            &values_string,
826            &[Some("d"), Some("d"), Some("z"), Some("x"), Some("world")]
827        );
828    }
829
830    #[test]
831    fn test_struct_with_nulls_in_values() {
832        let fields = Fields::from(vec![
833            Field::new("number_col", DataType::Int32, true),
834            Field::new("string_col", DataType::Utf8, true),
835        ]);
836        let a = {
837            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
838            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
839
840            StructArray::try_new(
841                fields.clone(),
842                vec![Arc::new(number_col), Arc::new(string_col)],
843                None,
844            )
845            .unwrap()
846        };
847
848        let b = {
849            let number_col = Int32Array::from_iter([Some(1), Some(4), None]);
850            let string_col = StringArray::from(vec![Some("hello"), None, Some("foo")]);
851
852            StructArray::try_new(
853                fields.clone(),
854                vec![Arc::new(number_col), Arc::new(string_col)],
855                None,
856            )
857            .unwrap()
858        };
859
860        let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (1, 1)]).unwrap();
861        let values_struct = values.as_struct();
862        assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
863
864        // The struct itself has no nulls, but the values do
865        assert_eq!(values_struct.null_count(), 0);
866
867        let values_number: Vec<_> = values_struct
868            .column(0)
869            .as_primitive::<Int32Type>()
870            .into_iter()
871            .collect();
872        assert_eq!(values_number, &[Some(2), None, None, Some(4), Some(4)]);
873
874        let values_string = values_struct.column(1).as_string::<i32>();
875        let values_string: Vec<_> = values_string.into_iter().collect();
876        assert_eq!(
877            &values_string,
878            &[Some("b"), Some("foo"), Some("foo"), Some("d"), None]
879        );
880    }
881
882    #[test]
883    fn test_struct_with_nulls() {
884        let fields = Fields::from(vec![
885            Field::new("number_col", DataType::Int32, false),
886            Field::new("string_col", DataType::Utf8, false),
887        ]);
888        let a = {
889            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
890            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
891
892            StructArray::try_new(
893                fields.clone(),
894                vec![Arc::new(number_col), Arc::new(string_col)],
895                None,
896            )
897            .unwrap()
898        };
899
900        let b = {
901            let number_col = Int32Array::from_iter_values([5, 6, 7]);
902            let string_col = StringArray::from_iter_values(["hello", "world", "foo"]);
903
904            StructArray::try_new(
905                fields.clone(),
906                vec![Arc::new(number_col), Arc::new(string_col)],
907                Some(NullBuffer::from(&[true, false, true])),
908            )
909            .unwrap()
910        };
911
912        let c = {
913            let number_col = Int32Array::from_iter_values([8, 9, 10]);
914            let string_col = StringArray::from_iter_values(["x", "y", "z"]);
915
916            StructArray::try_new(
917                fields.clone(),
918                vec![Arc::new(number_col), Arc::new(string_col)],
919                None,
920            )
921            .unwrap()
922        };
923
924        let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (1, 1), (2, 0)]).unwrap();
925        let values_struct = values.as_struct();
926        assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
927
928        let validity: Vec<bool> = {
929            let null_buffer = values_struct.nulls().expect("should_have_nulls");
930
931            null_buffer.iter().collect()
932        };
933        assert_eq!(validity, &[true, true, true, false, true]);
934        let values_number = values_struct.column(0).as_primitive::<Int32Type>();
935        assert_eq!(values_number.values(), &[4, 4, 10, 6, 8]);
936        let values_string = values_struct.column(1).as_string::<i32>();
937        let values_string: Vec<_> = values_string.into_iter().collect();
938        assert_eq!(
939            &values_string,
940            &[Some("d"), Some("d"), Some("z"), Some("world"), Some("x"),]
941        );
942    }
943
944    #[test]
945    fn test_struct_empty() {
946        let fields = Fields::from(vec![
947            Field::new("number_col", DataType::Int32, false),
948            Field::new("string_col", DataType::Utf8, false),
949        ]);
950        let a = {
951            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
952            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
953
954            StructArray::try_new(
955                fields.clone(),
956                vec![Arc::new(number_col), Arc::new(string_col)],
957                None,
958            )
959            .unwrap()
960        };
961        let v = interleave(&[&a], &[]).unwrap();
962        assert!(v.is_empty());
963        assert_eq!(v.data_type(), &DataType::Struct(fields));
964    }
965
966    #[test]
967    fn interleave_sparse_nulls() {
968        let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
969        let keys = Int32Array::from_iter_values(0..10);
970        let dict_a = DictionaryArray::new(keys, Arc::new(values));
971        let values = StringArray::new_null(0);
972        let keys = Int32Array::new_null(10);
973        let dict_b = DictionaryArray::new(keys, Arc::new(values));
974
975        let indices = &[(0, 0), (0, 1), (0, 2), (1, 0)];
976        let array = interleave(&[&dict_a, &dict_b], indices).unwrap();
977
978        let expected =
979            DictionaryArray::<Int32Type>::from_iter(vec![Some("0"), Some("1"), Some("2"), None]);
980        assert_eq!(array.as_ref(), &expected)
981    }
982
983    #[test]
984    fn test_interleave_views() {
985        let values = StringArray::from_iter_values([
986            "hello",
987            "world_long_string_not_inlined",
988            "foo",
989            "bar",
990            "baz",
991        ]);
992        let view_a = StringViewArray::from(&values);
993
994        let values = StringArray::from_iter_values([
995            "test",
996            "data",
997            "more_long_string_not_inlined",
998            "views",
999            "here",
1000        ]);
1001        let view_b = StringViewArray::from(&values);
1002
1003        let indices = &[
1004            (0, 2), // "foo"
1005            (1, 0), // "test"
1006            (0, 4), // "baz"
1007            (1, 3), // "views"
1008            (0, 1), // "world_long_string_not_inlined"
1009        ];
1010
1011        // Test specialized implementation
1012        let values = interleave(&[&view_a, &view_b], indices).unwrap();
1013        let result = values.as_string_view();
1014        assert_eq!(result.data_buffers().len(), 1);
1015
1016        let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
1017        let fallback_result = fallback.as_string_view();
1018        // note that fallback_result has 2 buffers, but only one long enough string to warrant a buffer
1019        assert_eq!(fallback_result.data_buffers().len(), 2);
1020
1021        // Convert to strings for easier assertion
1022        let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
1023
1024        let fallback_collected: Vec<_> = fallback_result
1025            .iter()
1026            .map(|x| x.map(|s| s.to_string()))
1027            .collect();
1028
1029        assert_eq!(&collected, &fallback_collected);
1030
1031        assert_eq!(
1032            &collected,
1033            &[
1034                Some("foo".to_string()),
1035                Some("test".to_string()),
1036                Some("baz".to_string()),
1037                Some("views".to_string()),
1038                Some("world_long_string_not_inlined".to_string()),
1039            ]
1040        );
1041    }
1042
1043    #[test]
1044    fn test_interleave_views_with_nulls() {
1045        let values = StringArray::from_iter([
1046            Some("hello"),
1047            None,
1048            Some("foo_long_string_not_inlined"),
1049            Some("bar"),
1050            None,
1051        ]);
1052        let view_a = StringViewArray::from(&values);
1053
1054        let values = StringArray::from_iter([
1055            Some("test"),
1056            Some("data_long_string_not_inlined"),
1057            None,
1058            None,
1059            Some("here"),
1060        ]);
1061        let view_b = StringViewArray::from(&values);
1062
1063        let indices = &[
1064            (0, 1), // null
1065            (1, 2), // null
1066            (0, 2), // "foo_long_string_not_inlined"
1067            (1, 3), // null
1068            (0, 4), // null
1069        ];
1070
1071        // Test specialized implementation
1072        let values = interleave(&[&view_a, &view_b], indices).unwrap();
1073        let result = values.as_string_view();
1074        assert_eq!(result.data_buffers().len(), 1);
1075
1076        let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
1077        let fallback_result = fallback.as_string_view();
1078
1079        // Convert to strings for easier assertion
1080        let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
1081
1082        let fallback_collected: Vec<_> = fallback_result
1083            .iter()
1084            .map(|x| x.map(|s| s.to_string()))
1085            .collect();
1086
1087        assert_eq!(&collected, &fallback_collected);
1088
1089        assert_eq!(
1090            &collected,
1091            &[
1092                None,
1093                None,
1094                Some("foo_long_string_not_inlined".to_string()),
1095                None,
1096                None,
1097            ]
1098        );
1099    }
1100
1101    #[test]
1102    fn test_interleave_views_multiple_buffers() {
1103        let str1 = "very_long_string_from_first_buffer".as_bytes();
1104        let str2 = "very_long_string_from_second_buffer".as_bytes();
1105        let buffer1 = str1.to_vec().into();
1106        let buffer2 = str2.to_vec().into();
1107
1108        let view1 = ByteView::new(str1.len() as u32, &str1[..4])
1109            .with_buffer_index(0)
1110            .with_offset(0)
1111            .as_u128();
1112        let view2 = ByteView::new(str2.len() as u32, &str2[..4])
1113            .with_buffer_index(1)
1114            .with_offset(0)
1115            .as_u128();
1116        let view_a =
1117            StringViewArray::try_new(vec![view1, view2].into(), vec![buffer1, buffer2], None)
1118                .unwrap();
1119
1120        let str3 = "another_very_long_string_buffer_three".as_bytes();
1121        let str4 = "different_long_string_in_buffer_four".as_bytes();
1122        let buffer3 = str3.to_vec().into();
1123        let buffer4 = str4.to_vec().into();
1124
1125        let view3 = ByteView::new(str3.len() as u32, &str3[..4])
1126            .with_buffer_index(0)
1127            .with_offset(0)
1128            .as_u128();
1129        let view4 = ByteView::new(str4.len() as u32, &str4[..4])
1130            .with_buffer_index(1)
1131            .with_offset(0)
1132            .as_u128();
1133        let view_b =
1134            StringViewArray::try_new(vec![view3, view4].into(), vec![buffer3, buffer4], None)
1135                .unwrap();
1136
1137        let indices = &[
1138            (0, 0), // String from first buffer of array A
1139            (1, 0), // String from first buffer of array B
1140            (0, 1), // String from second buffer of array A
1141            (1, 1), // String from second buffer of array B
1142            (0, 0), // String from first buffer of array A again
1143            (1, 1), // String from second buffer of array B again
1144        ];
1145
1146        // Test interleave
1147        let values = interleave(&[&view_a, &view_b], indices).unwrap();
1148        let result = values.as_string_view();
1149
1150        assert_eq!(
1151            result.data_buffers().len(),
1152            4,
1153            "Expected four buffers (two from each input array)"
1154        );
1155
1156        let result_strings: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
1157        assert_eq!(
1158            result_strings,
1159            vec![
1160                Some("very_long_string_from_first_buffer".to_string()),
1161                Some("another_very_long_string_buffer_three".to_string()),
1162                Some("very_long_string_from_second_buffer".to_string()),
1163                Some("different_long_string_in_buffer_four".to_string()),
1164                Some("very_long_string_from_first_buffer".to_string()),
1165                Some("different_long_string_in_buffer_four".to_string()),
1166            ]
1167        );
1168
1169        let views = result.views();
1170        let buffer_indices: Vec<_> = views
1171            .iter()
1172            .map(|raw_view| ByteView::from(*raw_view).buffer_index)
1173            .collect();
1174
1175        assert_eq!(
1176            buffer_indices,
1177            vec![
1178                0, // First buffer from array A
1179                1, // First buffer from array B
1180                2, // Second buffer from array A
1181                3, // Second buffer from array B
1182                0, // First buffer from array A (reused)
1183                3, // Second buffer from array B (reused)
1184            ]
1185        );
1186    }
1187
1188    #[test]
1189    fn test_interleave_run_end_encoded_primitive() {
1190        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1191        builder.extend([1, 1, 2, 2, 2, 3].into_iter().map(Some));
1192        let a = builder.finish();
1193
1194        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1195        builder.extend([4, 5, 5, 6, 6, 6].into_iter().map(Some));
1196        let b = builder.finish();
1197
1198        let indices = &[(0, 1), (1, 0), (0, 4), (1, 2), (0, 5)];
1199        let result = interleave(&[&a, &b], indices).unwrap();
1200
1201        // The result should be a RunEndEncoded array
1202        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1203
1204        // Cast to RunArray to access values
1205        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1206
1207        // Verify the logical values by accessing the logical array directly
1208        let expected = vec![1, 4, 2, 5, 3];
1209        let mut actual = Vec::new();
1210        for i in 0..result_run_array.len() {
1211            let physical_idx = result_run_array.get_physical_index(i);
1212            let value = result_run_array
1213                .values()
1214                .as_primitive::<Int32Type>()
1215                .value(physical_idx);
1216            actual.push(value);
1217        }
1218        assert_eq!(actual, expected);
1219    }
1220
1221    #[test]
1222    fn test_interleave_run_end_encoded_sliced() {
1223        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1224        builder.extend([1, 1, 2, 2, 2, 3].into_iter().map(Some));
1225        let a = builder.finish();
1226        let a = a.slice(2, 3); // [2, 2, 2]
1227
1228        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1229        builder.extend([4, 5, 5, 6, 6, 6].into_iter().map(Some));
1230        let b = builder.finish();
1231        let b = b.slice(1, 3); // [5, 5, 6]
1232
1233        let indices = &[(0, 1), (1, 0), (0, 2), (1, 1), (1, 2)];
1234        let result = interleave(&[&a, &b], indices).unwrap();
1235
1236        let result = result.as_run::<Int32Type>();
1237        let result = result.downcast::<Int32Array>().unwrap();
1238
1239        let expected = vec![2, 5, 2, 5, 6];
1240        let actual = result.into_iter().flatten().collect::<Vec<_>>();
1241        assert_eq!(actual, expected);
1242    }
1243
1244    #[test]
1245    fn test_interleave_run_end_encoded_string() {
1246        let a: Int32RunArray = vec!["hello", "hello", "world", "world", "foo"]
1247            .into_iter()
1248            .collect();
1249        let b: Int32RunArray = vec!["bar", "baz", "baz", "qux"].into_iter().collect();
1250
1251        let indices = &[(0, 0), (1, 1), (0, 3), (1, 3), (0, 4)];
1252        let result = interleave(&[&a, &b], indices).unwrap();
1253
1254        // The result should be a RunEndEncoded array
1255        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1256
1257        // Cast to RunArray to access values
1258        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1259
1260        // Verify the logical values by accessing the logical array directly
1261        let expected = vec!["hello", "baz", "world", "qux", "foo"];
1262        let mut actual = Vec::new();
1263        for i in 0..result_run_array.len() {
1264            let physical_idx = result_run_array.get_physical_index(i);
1265            let value = result_run_array
1266                .values()
1267                .as_string::<i32>()
1268                .value(physical_idx);
1269            actual.push(value);
1270        }
1271        assert_eq!(actual, expected);
1272    }
1273
1274    #[test]
1275    fn test_interleave_run_end_encoded_with_nulls() {
1276        let a: Int32RunArray = vec![Some("a"), Some("a"), None, None, Some("b")]
1277            .into_iter()
1278            .collect();
1279        let b: Int32RunArray = vec![None, Some("c"), Some("c"), Some("d")]
1280            .into_iter()
1281            .collect();
1282
1283        let indices = &[(0, 1), (1, 0), (0, 2), (1, 3), (0, 4)];
1284        let result = interleave(&[&a, &b], indices).unwrap();
1285
1286        // The result should be a RunEndEncoded array
1287        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1288
1289        // Cast to RunArray to access values
1290        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1291
1292        // Verify the logical values by accessing the logical array directly
1293        let expected = vec![Some("a"), None, None, Some("d"), Some("b")];
1294        let mut actual = Vec::new();
1295        for i in 0..result_run_array.len() {
1296            let physical_idx = result_run_array.get_physical_index(i);
1297            if result_run_array.values().is_null(physical_idx) {
1298                actual.push(None);
1299            } else {
1300                let value = result_run_array
1301                    .values()
1302                    .as_string::<i32>()
1303                    .value(physical_idx);
1304                actual.push(Some(value));
1305            }
1306        }
1307        assert_eq!(actual, expected);
1308    }
1309
1310    #[test]
1311    fn test_interleave_run_end_encoded_different_run_types() {
1312        let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1313        builder.extend([1, 1, 2, 3, 3].into_iter().map(Some));
1314        let a = builder.finish();
1315
1316        let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1317        builder.extend([4, 5, 5, 6].into_iter().map(Some));
1318        let b = builder.finish();
1319
1320        let indices = &[(0, 0), (1, 1), (0, 3), (1, 3)];
1321        let result = interleave(&[&a, &b], indices).unwrap();
1322
1323        // The result should be a RunEndEncoded array
1324        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1325
1326        // Cast to RunArray to access values
1327        let result_run_array: &RunArray<Int16Type> = result.as_any().downcast_ref().unwrap();
1328
1329        // Verify the logical values by accessing the logical array directly
1330        let expected = vec![1, 5, 3, 6];
1331        let mut actual = Vec::new();
1332        for i in 0..result_run_array.len() {
1333            let physical_idx = result_run_array.get_physical_index(i);
1334            let value = result_run_array
1335                .values()
1336                .as_primitive::<Int32Type>()
1337                .value(physical_idx);
1338            actual.push(value);
1339        }
1340        assert_eq!(actual, expected);
1341    }
1342
1343    #[test]
1344    fn test_interleave_run_end_encoded_mixed_run_lengths() {
1345        let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
1346        builder.extend([1, 2, 2, 2, 2, 3, 3, 4].into_iter().map(Some));
1347        let a = builder.finish();
1348
1349        let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
1350        builder.extend([5, 5, 5, 6, 7, 7, 8, 8].into_iter().map(Some));
1351        let b = builder.finish();
1352
1353        let indices = &[
1354            (0, 0), // 1
1355            (1, 2), // 5
1356            (0, 3), // 2
1357            (1, 3), // 6
1358            (0, 6), // 3
1359            (1, 6), // 8
1360            (0, 7), // 4
1361            (1, 4), // 7
1362        ];
1363        let result = interleave(&[&a, &b], indices).unwrap();
1364
1365        // The result should be a RunEndEncoded array
1366        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1367
1368        // Cast to RunArray to access values
1369        let result_run_array: &RunArray<Int64Type> = result.as_any().downcast_ref().unwrap();
1370
1371        // Verify the logical values by accessing the logical array directly
1372        let expected = vec![1, 5, 2, 6, 3, 8, 4, 7];
1373        let mut actual = Vec::new();
1374        for i in 0..result_run_array.len() {
1375            let physical_idx = result_run_array.get_physical_index(i);
1376            let value = result_run_array
1377                .values()
1378                .as_primitive::<Int32Type>()
1379                .value(physical_idx);
1380            actual.push(value);
1381        }
1382        assert_eq!(actual, expected);
1383    }
1384
1385    #[test]
1386    fn test_interleave_run_end_encoded_empty_runs() {
1387        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1388        builder.extend([1].into_iter().map(Some));
1389        let a = builder.finish();
1390
1391        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1392        builder.extend([2, 2, 2].into_iter().map(Some));
1393        let b = builder.finish();
1394
1395        let indices = &[(0, 0), (1, 1), (1, 2)];
1396        let result = interleave(&[&a, &b], indices).unwrap();
1397
1398        // The result should be a RunEndEncoded array
1399        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1400
1401        // Cast to RunArray to access values
1402        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1403
1404        // Verify the logical values by accessing the logical array directly
1405        let expected = vec![1, 2, 2];
1406        let mut actual = Vec::new();
1407        for i in 0..result_run_array.len() {
1408            let physical_idx = result_run_array.get_physical_index(i);
1409            let value = result_run_array
1410                .values()
1411                .as_primitive::<Int32Type>()
1412                .value(physical_idx);
1413            actual.push(value);
1414        }
1415        assert_eq!(actual, expected);
1416    }
1417
1418    #[test]
1419    fn test_struct_no_fields() {
1420        let fields = Fields::empty();
1421        let a = StructArray::try_new_with_length(fields.clone(), vec![], None, 10).unwrap();
1422        let v = interleave(&[&a], &[(0, 0)]).unwrap();
1423        assert_eq!(v.len(), 1);
1424        assert_eq!(v.data_type(), &DataType::Struct(fields));
1425    }
1426
1427    #[test]
1428    fn test_interleave_fallback_dictionary_with_nulls() {
1429        let input_1_keys = Int32Array::from_iter([Some(0), None, Some(1)]);
1430        let input_1_values = StringArray::from_iter_values(["foo", "bar"]);
1431        let dict_a = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
1432
1433        let input_2_keys = Int32Array::from_iter([Some(0), Some(1), None]);
1434        let input_2_values = StringArray::from_iter_values(["baz", "qux"]);
1435        let dict_b = DictionaryArray::new(input_2_keys, Arc::new(input_2_values));
1436
1437        let indices = vec![
1438            (0, 0), // "foo"
1439            (0, 1), // null
1440            (1, 0), // "baz"
1441            (1, 2), // null
1442            (0, 2), // "bar"
1443            (1, 1), // "qux"
1444        ];
1445
1446        let result =
1447            interleave_fallback_dictionary::<Int32Type>(&[&dict_a, &dict_b], &indices).unwrap();
1448        let dict_result = result.as_dictionary::<Int32Type>();
1449
1450        let string_result = dict_result.downcast_dict::<StringArray>().unwrap();
1451        let collected: Vec<_> = string_result.into_iter().collect();
1452        assert_eq!(
1453            collected,
1454            vec![
1455                Some("foo"),
1456                None,
1457                Some("baz"),
1458                None,
1459                Some("bar"),
1460                Some("qux")
1461            ]
1462        );
1463    }
1464
1465    #[test]
1466    fn test_interleave_bytes_offset_overflow() {
1467        let indices: Vec<(usize, usize)> = vec![(0, 0); (i32::MAX >> 4) as usize];
1468        let text = ('a'..='z').collect::<String>();
1469        let values = StringArray::from(vec![Some(text)]);
1470        assert!(matches!(
1471            interleave(&[&values], &indices),
1472            Err(ArrowError::OffsetOverflowError(_))
1473        ));
1474    }
1475
1476    #[test]
1477    fn test_interleave_list_offset_overflow() {
1478        // Build a ListArray<i32> with a single row containing many elements
1479        let mut builder = GenericListBuilder::<i32, _>::new(Int32Builder::new());
1480        for i in 0..32 {
1481            builder.values().append_value(i);
1482        }
1483        builder.append(true);
1484        let list = builder.finish();
1485
1486        // Interleave enough copies to overflow i32 offsets
1487        let indices: Vec<(usize, usize)> = vec![(0, 0); (i32::MAX as usize / 32) + 1];
1488        assert!(matches!(
1489            interleave(&[&list], &indices),
1490            Err(ArrowError::OffsetOverflowError(_))
1491        ));
1492    }
1493
1494    #[test]
1495    fn test_interleave_list_view() {
1496        // `interleave` for ListView falls through to `interleave_fallback`, which uses
1497        // `MutableArrayData`. `list_view::build_extend` copies offsets/sizes but never
1498        // extends the child array, so the result contains offsets/sizes that reference
1499        // positions in the now-absent original child arrays while the child is empty.
1500        //
1501        // lv_a: [[1, 2], [3]]   (values=[1,2,3], offsets=[0,2], sizes=[2,1])
1502        // lv_b: [[4, 5, 6]]     (values=[4,5,6], offsets=[0],   sizes=[3])
1503        // interleave at [(0,0), (1,0), (0,1)] should produce [[1, 2], [4, 5, 6], [3]]
1504        let field = Arc::new(Field::new_list_field(DataType::Int64, false));
1505
1506        let lv_a = ListViewArray::new(
1507            Arc::clone(&field),
1508            ScalarBuffer::from(vec![0i32, 2]),
1509            ScalarBuffer::from(vec![2i32, 1]),
1510            Arc::new(Int64Array::from(vec![1_i64, 2, 3])),
1511            None,
1512        );
1513        let lv_b = ListViewArray::new(
1514            field,
1515            ScalarBuffer::from(vec![0i32]),
1516            ScalarBuffer::from(vec![3i32]),
1517            Arc::new(Int64Array::from(vec![4_i64, 5, 6])),
1518            None,
1519        );
1520
1521        let result = interleave(
1522            &[&lv_a as &dyn Array, &lv_b as &dyn Array],
1523            &[(0, 0), (1, 0), (0, 1)],
1524        )
1525        .unwrap();
1526
1527        result
1528            .to_data()
1529            .validate_full()
1530            .expect("interleaved ListViewArray must be internally consistent");
1531
1532        let result_lv = result.as_list_view::<i32>();
1533        assert_eq!(result_lv.len(), 3);
1534        assert_eq!(
1535            result_lv.value(0).as_primitive::<Int64Type>().values(),
1536            &[1, 2]
1537        );
1538        assert_eq!(
1539            result_lv.value(1).as_primitive::<Int64Type>().values(),
1540            &[4, 5, 6]
1541        );
1542        assert_eq!(
1543            result_lv.value(2).as_primitive::<Int64Type>().values(),
1544            &[3]
1545        );
1546    }
1547}