Skip to main content

arrow_select/
interleave.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Interleave elements from multiple arrays
19
20use crate::concat::concat;
21use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
22use arrow_array::builder::{BooleanBufferBuilder, PrimitiveBuilder};
23use arrow_array::cast::AsArray;
24use arrow_array::types::*;
25use arrow_array::*;
26use arrow_buffer::bit_mask::set_bits;
27use arrow_buffer::bit_util;
28use arrow_buffer::{ArrowNativeType, BooleanBuffer, MutableBuffer, NullBuffer, OffsetBuffer};
29use arrow_data::ByteView;
30use arrow_data::transform::MutableArrayData;
31use arrow_schema::{ArrowError, DataType, FieldRef, Fields};
32use std::sync::Arc;
33
34macro_rules! primitive_helper {
35    ($t:ty, $values:ident, $indices:ident, $data_type:ident) => {
36        interleave_primitive::<$t>($values, $indices, $data_type)
37    };
38}
39
40macro_rules! dict_helper {
41    ($t:ty, $values:expr, $indices:expr) => {
42        interleave_dictionaries::<$t>($values, $indices)
43    };
44}
45
46///
47/// Takes elements by index from a list of [`Array`], creating a new [`Array`] from those values.
48///
49/// Each element in `indices` is a pair of `usize` with the first identifying the index
50/// of the [`Array`] in `values`, and the second the index of the value within that [`Array`]
51///
52/// ```text
53/// ┌─────────────────┐      ┌─────────┐                                  ┌─────────────────┐
54/// │        A        │      │ (0, 0)  │        interleave(               │        A        │
55/// ├─────────────────┤      ├─────────┤          [values0, values1],     ├─────────────────┤
56/// │        D        │      │ (1, 0)  │          indices                 │        B        │
57/// └─────────────────┘      ├─────────┤        )                         ├─────────────────┤
58///   values array 0         │ (1, 1)  │      ─────────────────────────▶  │        C        │
59///                          ├─────────┤                                  ├─────────────────┤
60///                          │ (0, 1)  │                                  │        D        │
61///                          └─────────┘                                  └─────────────────┘
62/// ┌─────────────────┐       indices
63/// │        B        │        array
64/// ├─────────────────┤                                                    result
65/// │        C        │
66/// ├─────────────────┤
67/// │        E        │
68/// └─────────────────┘
69///   values array 1
70/// ```
71///
72/// For selecting values by index from a single array see [`crate::take`]
73pub fn interleave(
74    values: &[&dyn Array],
75    indices: &[(usize, usize)],
76) -> Result<ArrayRef, ArrowError> {
77    if values.is_empty() {
78        return Err(ArrowError::InvalidArgumentError(
79            "interleave requires input of at least one array".to_string(),
80        ));
81    }
82    let data_type = values[0].data_type();
83
84    for array in values.iter().skip(1) {
85        if array.data_type() != data_type {
86            return Err(ArrowError::InvalidArgumentError(format!(
87                "It is not possible to interleave arrays of different data types ({} and {})",
88                data_type,
89                array.data_type()
90            )));
91        }
92    }
93
94    if indices.is_empty() {
95        return Ok(new_empty_array(data_type));
96    }
97
98    downcast_primitive! {
99        data_type => (primitive_helper, values, indices, data_type),
100        DataType::Utf8 => interleave_bytes::<Utf8Type>(values, indices),
101        DataType::LargeUtf8 => interleave_bytes::<LargeUtf8Type>(values, indices),
102        DataType::Binary => interleave_bytes::<BinaryType>(values, indices),
103        DataType::LargeBinary => interleave_bytes::<LargeBinaryType>(values, indices),
104        DataType::BinaryView => interleave_views::<BinaryViewType>(values, indices),
105        DataType::Utf8View => interleave_views::<StringViewType>(values, indices),
106        DataType::Dictionary(k, _) => downcast_integer! {
107            k.as_ref() => (dict_helper, values, indices),
108            _ => unreachable!("illegal dictionary key type {k}")
109        },
110        DataType::Struct(fields) => interleave_struct(fields, values, indices),
111        DataType::List(field) => interleave_list::<i32>(values, indices, field),
112        DataType::LargeList(field) => interleave_list::<i64>(values, indices, field),
113        DataType::RunEndEncoded(r, _) => match r.data_type() {
114            DataType::Int16 => interleave_run_end::<Int16Type>(values, indices),
115            DataType::Int32 => interleave_run_end::<Int32Type>(values, indices),
116            DataType::Int64 => interleave_run_end::<Int64Type>(values, indices),
117            t => unreachable!("illegal run-end type {t}"),
118        },
119        DataType::ListView(field) => interleave_list_view::<i32>(values, indices, field),
120        DataType::LargeListView(field) => interleave_list_view::<i64>(values, indices, field),
121        _ => interleave_fallback(values, indices)
122    }
123}
124
125/// Common functionality for interleaving arrays
126///
127/// T is the concrete Array type
128struct Interleave<'a, T> {
129    /// The input arrays downcast to T
130    arrays: Vec<&'a T>,
131    /// The null buffer of the interleaved output
132    nulls: Option<NullBuffer>,
133}
134
135impl<'a, T: Array + 'static> Interleave<'a, T> {
136    fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) -> Self {
137        let mut has_nulls = false;
138        let arrays: Vec<&T> = values
139            .iter()
140            .map(|x| {
141                has_nulls = has_nulls || x.null_count() != 0;
142                x.as_any().downcast_ref().unwrap()
143            })
144            .collect();
145
146        let nulls = match has_nulls {
147            true => {
148                let nulls = BooleanBuffer::collect_bool(indices.len(), |i| {
149                    let (a, b) = indices[i];
150                    arrays[a].is_valid(b)
151                });
152                Some(nulls.into())
153            }
154            false => None,
155        };
156
157        Self { arrays, nulls }
158    }
159}
160
161fn interleave_primitive<T: ArrowPrimitiveType>(
162    values: &[&dyn Array],
163    indices: &[(usize, usize)],
164    data_type: &DataType,
165) -> Result<ArrayRef, ArrowError> {
166    let interleaved = Interleave::<'_, PrimitiveArray<T>>::new(values, indices);
167    let arrays = &interleaved.arrays;
168    let len = indices.len();
169
170    let mut output = Vec::with_capacity(len);
171    let dst: *mut T::Native = output.as_mut_ptr();
172    let mut base = 0;
173
174    // Process 8 elements at a time to issue multiple independent loads
175    // and increase memory-level parallelism for random access patterns.
176    let chunks = indices.chunks_exact(8);
177    let remainder = chunks.remainder();
178    for chunk in chunks {
179        let v0 = arrays[chunk[0].0].value(chunk[0].1);
180        let v1 = arrays[chunk[1].0].value(chunk[1].1);
181        let v2 = arrays[chunk[2].0].value(chunk[2].1);
182        let v3 = arrays[chunk[3].0].value(chunk[3].1);
183        let v4 = arrays[chunk[4].0].value(chunk[4].1);
184        let v5 = arrays[chunk[5].0].value(chunk[5].1);
185        let v6 = arrays[chunk[6].0].value(chunk[6].1);
186        let v7 = arrays[chunk[7].0].value(chunk[7].1);
187
188        // SAFETY: base+7 < len == output capacity
189        debug_assert!(base + 7 < len);
190        unsafe {
191            dst.add(base).write(v0);
192            dst.add(base + 1).write(v1);
193            dst.add(base + 2).write(v2);
194            dst.add(base + 3).write(v3);
195            dst.add(base + 4).write(v4);
196            dst.add(base + 5).write(v5);
197            dst.add(base + 6).write(v6);
198            dst.add(base + 7).write(v7);
199        }
200        base += 8;
201    }
202
203    for idx in remainder {
204        // SAFETY: base < len == output capacity
205        debug_assert!(base < len);
206        unsafe { dst.add(base).write(arrays[idx.0].value(idx.1)) };
207        base += 1;
208    }
209
210    // SAFETY: all `len` elements have been initialized
211    debug_assert!(base == len);
212    unsafe { output.set_len(len) };
213
214    let array = PrimitiveArray::<T>::try_new(output.into(), interleaved.nulls)?;
215    Ok(Arc::new(array.with_data_type(data_type.clone())))
216}
217
218fn interleave_bytes<T: ByteArrayType>(
219    values: &[&dyn Array],
220    indices: &[(usize, usize)],
221) -> Result<ArrayRef, ArrowError> {
222    let interleaved = Interleave::<'_, GenericByteArray<T>>::new(values, indices);
223
224    let mut capacity = 0;
225    let mut offsets = Vec::with_capacity(indices.len() + 1);
226    offsets.push(T::Offset::from_usize(0).unwrap());
227    for (a, b) in indices {
228        let o = interleaved.arrays[*a].value_offsets();
229        let element_len = o[*b + 1].as_usize() - o[*b].as_usize();
230        capacity += element_len;
231        offsets.push(
232            T::Offset::from_usize(capacity)
233                .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
234        );
235    }
236
237    let mut values = Vec::with_capacity(capacity);
238    for (a, b) in indices {
239        values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
240    }
241
242    // Safety: safe by construction
243    let array = unsafe {
244        let offsets = OffsetBuffer::new_unchecked(offsets.into());
245        GenericByteArray::<T>::new_unchecked(offsets, values.into(), interleaved.nulls)
246    };
247    Ok(Arc::new(array))
248}
249
250fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
251    arrays: &[&dyn Array],
252    indices: &[(usize, usize)],
253) -> Result<ArrayRef, ArrowError> {
254    let dictionaries: Vec<_> = arrays.iter().map(|x| x.as_dictionary::<K>()).collect();
255    let (should_merge, has_overflow) =
256        should_merge_dictionary_values::<K>(&dictionaries, indices.len());
257    if !should_merge {
258        return if has_overflow {
259            interleave_fallback(arrays, indices)
260        } else {
261            interleave_fallback_dictionary::<K>(&dictionaries, indices)
262        };
263    }
264
265    let masks: Vec<_> = dictionaries
266        .iter()
267        .enumerate()
268        .map(|(a_idx, dictionary)| {
269            let mut key_mask = BooleanBufferBuilder::new_from_buffer(
270                MutableBuffer::new_null(dictionary.len()),
271                dictionary.len(),
272            );
273
274            for (_, key_idx) in indices.iter().filter(|(a, _)| *a == a_idx) {
275                key_mask.set_bit(*key_idx, true);
276            }
277            key_mask.finish()
278        })
279        .collect();
280
281    let merged = merge_dictionary_values(&dictionaries, Some(&masks))?;
282
283    // Recompute keys
284    let mut keys = PrimitiveBuilder::<K>::with_capacity(indices.len());
285    for (a, b) in indices {
286        let old_keys: &PrimitiveArray<K> = dictionaries[*a].keys();
287        match old_keys.is_valid(*b) {
288            true => {
289                let old_key = old_keys.values()[*b];
290                keys.append_value(merged.key_mappings[*a][old_key.as_usize()])
291            }
292            false => keys.append_null(),
293        }
294    }
295    let array = unsafe { DictionaryArray::new_unchecked(keys.finish(), merged.values) };
296    Ok(Arc::new(array))
297}
298
299fn interleave_views<T: ByteViewType>(
300    values: &[&dyn Array],
301    indices: &[(usize, usize)],
302) -> Result<ArrayRef, ArrowError> {
303    let interleaved = Interleave::<'_, GenericByteViewArray<T>>::new(values, indices);
304    let mut buffers = Vec::new();
305
306    // Contains the offsets of start buffer in `buffer_to_new_index`
307    let mut offsets = Vec::with_capacity(interleaved.arrays.len() + 1);
308    offsets.push(0);
309    let mut total_buffers = 0;
310    for a in interleaved.arrays.iter() {
311        total_buffers += a.data_buffers().len();
312        offsets.push(total_buffers);
313    }
314
315    // contains the mapping from old buffer index to new buffer index
316    let mut buffer_to_new_index = vec![None; total_buffers];
317
318    let views: Vec<u128> = indices
319        .iter()
320        .map(|(array_idx, value_idx)| {
321            let array = interleaved.arrays[*array_idx];
322            let view = array.views().get(*value_idx).unwrap();
323            let view_len = *view as u32;
324            if view_len <= 12 {
325                return *view;
326            }
327            // value is big enough to be in a variadic buffer
328            let view = ByteView::from(*view);
329            let buffer_to_new_idx = offsets[*array_idx] + view.buffer_index as usize;
330            let new_buffer_idx: u32 =
331                *buffer_to_new_index[buffer_to_new_idx].get_or_insert_with(|| {
332                    buffers.push(array.data_buffers()[view.buffer_index as usize].clone());
333                    (buffers.len() - 1) as u32
334                });
335            view.with_buffer_index(new_buffer_idx).as_u128()
336        })
337        .collect();
338
339    let array = unsafe {
340        GenericByteViewArray::<T>::new_unchecked(views.into(), buffers, interleaved.nulls)
341    };
342    Ok(Arc::new(array))
343}
344
345fn interleave_struct(
346    fields: &Fields,
347    values: &[&dyn Array],
348    indices: &[(usize, usize)],
349) -> Result<ArrayRef, ArrowError> {
350    let interleaved = Interleave::<'_, StructArray>::new(values, indices);
351
352    if fields.is_empty() {
353        let array = StructArray::try_new_with_length(
354            fields.clone(),
355            vec![],
356            interleaved.nulls,
357            indices.len(),
358        )?;
359        return Ok(Arc::new(array));
360    }
361
362    let struct_fields_array: Result<Vec<_>, _> = (0..fields.len())
363        .map(|i| {
364            let field_values: Vec<&dyn Array> = interleaved
365                .arrays
366                .iter()
367                .map(|x| x.column(i).as_ref())
368                .collect();
369            interleave(&field_values, indices)
370        })
371        .collect();
372
373    let struct_array =
374        StructArray::try_new(fields.clone(), struct_fields_array?, interleaved.nulls)?;
375    Ok(Arc::new(struct_array))
376}
377
378fn interleave_list_primitive_child<O: OffsetSizeTrait, T: ArrowPrimitiveType>(
379    interleaved: &Interleave<'_, GenericListArray<O>>,
380    indices: &[(usize, usize)],
381    capacity: usize,
382    data_type: &DataType,
383) -> ArrayRef {
384    let child_arrays: Vec<&PrimitiveArray<T>> = interleaved
385        .arrays
386        .iter()
387        .map(|list| list.values().as_primitive::<T>())
388        .collect();
389
390    let has_child_nulls = child_arrays.iter().any(|a| a.null_count() > 0);
391
392    // Build values buffer by copying contiguous slices
393    let mut values: Vec<T::Native> = Vec::with_capacity(capacity);
394    for &(array, row) in indices {
395        let o = interleaved.arrays[array].value_offsets();
396        let start = o[row].as_usize();
397        let end = o[row + 1].as_usize();
398        if end > start {
399            values.extend_from_slice(&child_arrays[array].values()[start..end]);
400        }
401    }
402
403    // Build null buffer. Pre-allocate with 0x00 (all null), then:
404    // - Sources with nulls: set_bits copies the source validity bits into the destination range.
405    // - Sources without nulls: set the bit range to all 1s directly.
406    let nulls = if has_child_nulls {
407        let null_byte_len = bit_util::ceil(capacity, 8);
408        let mut output_null_buf = MutableBuffer::from_len_zeroed(null_byte_len);
409
410        let mut offset_write = 0;
411        let mut output_null_count = 0usize;
412        for &(array, row) in indices {
413            let o = interleaved.arrays[array].value_offsets();
414            let start = o[row].as_usize();
415            let end = o[row + 1].as_usize();
416            let len = end - start;
417            if len > 0 {
418                match child_arrays[array].nulls() {
419                    Some(null_buffer) => {
420                        output_null_count += set_bits(
421                            output_null_buf.as_slice_mut(),
422                            null_buffer.validity(),
423                            offset_write,
424                            null_buffer.offset() + start,
425                            len,
426                        );
427                    }
428                    None => {
429                        // For a non-nullable source, set the bit range to all 1s directly.
430                        let buf = output_null_buf.as_slice_mut();
431                        (offset_write..offset_write + len).for_each(|i| bit_util::set_bit(buf, i));
432                    }
433                }
434            }
435            offset_write += len;
436        }
437
438        if output_null_count > 0 {
439            let bool_buf = BooleanBuffer::new(output_null_buf.into(), 0, capacity);
440            // SAFETY: null_count is accumulated from set_bits which correctly counts unset bits
441            Some(unsafe { NullBuffer::new_unchecked(bool_buf, output_null_count) })
442        } else {
443            None
444        }
445    } else {
446        None
447    };
448
449    Arc::new(PrimitiveArray::<T>::new(values.into(), nulls).with_data_type(data_type.clone()))
450}
451
452fn interleave_list<O: OffsetSizeTrait>(
453    values: &[&dyn Array],
454    indices: &[(usize, usize)],
455    field: &FieldRef,
456) -> Result<ArrayRef, ArrowError> {
457    let interleaved = Interleave::<'_, GenericListArray<O>>::new(values, indices);
458
459    // Step 1: compute output offsets and total child capacity
460    let mut capacity = 0usize;
461    let mut offsets = Vec::with_capacity(indices.len() + 1);
462    offsets.push(O::from_usize(0).unwrap());
463    for (array, row) in indices {
464        let o = interleaved.arrays[*array].value_offsets();
465        let element_len = o[*row + 1].as_usize() - o[*row].as_usize();
466        capacity += element_len;
467        offsets.push(
468            O::from_usize(capacity).ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
469        );
470    }
471
472    // Step 2: build child values.
473    macro_rules! list_primitive_helper {
474        ($t:ty) => {
475            interleave_list_primitive_child::<O, $t>(
476                &interleaved,
477                indices,
478                capacity,
479                field.data_type(),
480            )
481        };
482    }
483
484    let child_values = downcast_primitive! {
485        // For primitive child types, directly copy typed value slices and null bit
486        // ranges, avoiding both the intermediate child_indices Vec allocation and
487        // MutableArrayData's function pointer indirection.
488        field.data_type() => (list_primitive_helper),
489        _ => {
490            // For complex child types (nested lists, structs, views, dictionaries, etc.),
491            // use recursive interleave to benefit from type-specific optimizations.
492            let mut child_indices = Vec::with_capacity(capacity);
493            for (array, row) in indices {
494                let list = interleaved.arrays[*array];
495                let start = list.value_offsets()[*row].as_usize();
496                let end = list.value_offsets()[*row + 1].as_usize();
497                child_indices.extend((start..end).map(|i| (*array, i)));
498            }
499
500            let child_arrays: Vec<&dyn Array> = interleaved
501                .arrays
502                .iter()
503                .map(|list| list.values().as_ref())
504                .collect();
505            interleave(&child_arrays, &child_indices)?
506        }
507    };
508
509    let offsets = OffsetBuffer::new(offsets.into());
510    let list_array =
511        GenericListArray::<O>::new(field.clone(), offsets, child_values, interleaved.nulls);
512
513    Ok(Arc::new(list_array))
514}
515
516/// Specialized [`interleave`] for [`RunArray`].
517fn interleave_run_end<R: RunEndIndexType>(
518    values: &[&dyn Array],
519    indices: &[(usize, usize)],
520) -> Result<ArrayRef, ArrowError> {
521    if indices.is_empty() {
522        return Ok(new_empty_array(values[0].data_type()));
523    }
524
525    let n = indices.len();
526    R::Native::from_usize(n).ok_or_else(|| {
527        ArrowError::ComputeError(format!(
528            "interleave_run_end: output length {n} does not fit run-end type"
529        ))
530    })?;
531
532    let runs: Vec<&RunArray<R>> = values.iter().map(|a| a.as_run::<R>()).collect();
533    let value_arrays: Vec<&dyn Array> = runs.iter().map(|r| r.values().as_ref()).collect();
534
535    // Resolve each (array, logical_row) to (array, physical_row), so we can
536    // lookup physical indices by batch.
537    let mut phys_pairs: Vec<(usize, usize)> = vec![(0, 0); n];
538    let mut grouped: Vec<(Vec<R::Native>, Vec<usize>)> =
539        (0..runs.len()).map(|_| (Vec::new(), Vec::new())).collect();
540    for (out_pos, &(arr, row)) in indices.iter().enumerate() {
541        let row = R::Native::from_usize(row).ok_or_else(|| {
542            ArrowError::InvalidArgumentError(format!(
543                "interleave_run_end: row index {row} not representable as run-end type {}",
544                R::DATA_TYPE
545            ))
546        })?;
547        grouped[arr].0.push(row);
548        grouped[arr].1.push(out_pos);
549    }
550    for (arr_idx, (logical_rows, out_positions)) in grouped.into_iter().enumerate() {
551        let phys = runs[arr_idx].get_physical_indices(&logical_rows)?;
552        for (p, out_pos) in phys.iter().zip(out_positions.iter()) {
553            phys_pairs[*out_pos] = (arr_idx, *p);
554        }
555    }
556
557    // Coalesce by physical-pair equality only: emit a new run when the
558    // (array_idx, physical_idx) pair changes between adjacent output rows.
559    // TODO: We could perform an equality check across sources to extend the
560    // output run, but we can't call make_comparator from this crate.
561    let mut run_ends_buf: Vec<R::Native> = Vec::with_capacity(n);
562    let mut dedup_pairs: Vec<(usize, usize)> = Vec::with_capacity(n);
563    dedup_pairs.push(phys_pairs[0]);
564    for i in 1..n {
565        if phys_pairs[i] != phys_pairs[i - 1] {
566            run_ends_buf.push(R::Native::from_usize(i).unwrap());
567            dedup_pairs.push(phys_pairs[i]);
568        }
569    }
570    run_ends_buf.push(R::Native::from_usize(n).unwrap());
571
572    let taken_values = interleave(&value_arrays, &dedup_pairs)?;
573    let run_ends = PrimitiveArray::<R>::from_iter_values(run_ends_buf);
574
575    Ok(Arc::new(RunArray::<R>::try_new(
576        &run_ends,
577        taken_values.as_ref(),
578    )?))
579}
580
581fn interleave_list_view<O: OffsetSizeTrait>(
582    values: &[&dyn Array],
583    indices: &[(usize, usize)],
584    field: &FieldRef,
585) -> Result<ArrayRef, ArrowError> {
586    let interleaved = Interleave::<'_, GenericListViewArray<O>>::new(values, indices);
587
588    // Pick whichever strategy produces fewer child elements:
589    // - Per-row copy: total = sum of selected sizes. Better for sparse selections.
590    // - Concat + offset adjustment: total = sum of source backing array lengths.
591    //   Better when rows share backing elements via overlapping offset/size ranges.
592    let concat_cost: usize = interleaved.arrays.iter().map(|lv| lv.values().len()).sum();
593    let per_row_cost: usize = indices
594        .iter()
595        .map(|&(a, r)| interleaved.arrays[a].sizes()[r].as_usize())
596        .sum();
597
598    if per_row_cost <= concat_cost {
599        interleave_list_view_copy::<O>(&interleaved, indices, field)
600    } else {
601        interleave_list_view_concat::<O>(&interleaved, indices, field)
602    }
603}
604
605/// Per-row copy: copies each selected row's child elements into a new flat array.
606fn interleave_list_view_copy<O: OffsetSizeTrait>(
607    interleaved: &Interleave<'_, GenericListViewArray<O>>,
608    indices: &[(usize, usize)],
609    field: &FieldRef,
610) -> Result<ArrayRef, ArrowError> {
611    let mut capacity = 0usize;
612    let mut offsets = Vec::with_capacity(indices.len());
613    let mut sizes = Vec::with_capacity(indices.len());
614    for &(array_idx, row_idx) in indices {
615        let list = interleaved.arrays[array_idx];
616        let size = list.sizes()[row_idx].as_usize();
617        offsets.push(
618            O::from_usize(capacity).ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
619        );
620        sizes.push(O::from_usize(size).ok_or_else(|| ArrowError::OffsetOverflowError(size))?);
621        capacity += size;
622    }
623
624    let child_data: Vec<_> = interleaved
625        .arrays
626        .iter()
627        .map(|list| list.values().to_data())
628        .collect();
629    let child_data_refs: Vec<_> = child_data.iter().collect();
630    let mut mutable_child = MutableArrayData::new(child_data_refs, false, capacity);
631    for &(array_idx, row_idx) in indices {
632        let list = interleaved.arrays[array_idx];
633        let start = list.offsets()[row_idx].as_usize();
634        let size = list.sizes()[row_idx].as_usize();
635        if size > 0 {
636            mutable_child.extend(array_idx, start, start + size);
637        }
638    }
639
640    Ok(Arc::new(GenericListViewArray::<O>::new(
641        field.clone(),
642        offsets.into(),
643        sizes.into(),
644        make_array(mutable_child.freeze()),
645        interleaved.nulls.clone(),
646    )))
647}
648
649/// Concat backing arrays: concatenates all source value arrays and adjusts offsets.
650/// Preserves within-source element sharing.
651fn interleave_list_view_concat<O: OffsetSizeTrait>(
652    interleaved: &Interleave<'_, GenericListViewArray<O>>,
653    indices: &[(usize, usize)],
654    field: &FieldRef,
655) -> Result<ArrayRef, ArrowError> {
656    let child_arrays: Vec<&dyn Array> = interleaved
657        .arrays
658        .iter()
659        .map(|lv| lv.values().as_ref())
660        .collect();
661    let mut base_offsets = Vec::with_capacity(interleaved.arrays.len());
662    let mut running = 0usize;
663    for lv in &interleaved.arrays {
664        base_offsets.push(running);
665        running += lv.values().len();
666    }
667    let combined_values = concat(&child_arrays)?;
668
669    let mut new_offsets = Vec::with_capacity(indices.len());
670    let mut new_sizes = Vec::with_capacity(indices.len());
671    for &(array_idx, row_idx) in indices {
672        let lv = interleaved.arrays[array_idx];
673        let adjusted = lv.offsets()[row_idx].as_usize() + base_offsets[array_idx];
674        new_offsets.push(
675            O::from_usize(adjusted).ok_or_else(|| ArrowError::OffsetOverflowError(adjusted))?,
676        );
677        new_sizes.push(lv.sizes()[row_idx]);
678    }
679
680    Ok(Arc::new(GenericListViewArray::<O>::new(
681        field.clone(),
682        new_offsets.into(),
683        new_sizes.into(),
684        combined_values,
685        interleaved.nulls.clone(),
686    )))
687}
688
689/// Fallback implementation of interleave using [`MutableArrayData`]
690fn interleave_fallback(
691    values: &[&dyn Array],
692    indices: &[(usize, usize)],
693) -> Result<ArrayRef, ArrowError> {
694    let arrays: Vec<_> = values.iter().map(|x| x.to_data()).collect();
695    let arrays: Vec<_> = arrays.iter().collect();
696    let mut array_data = MutableArrayData::new(arrays, false, indices.len());
697
698    let mut cur_array = indices[0].0;
699    let mut start_row_idx = indices[0].1;
700    let mut end_row_idx = start_row_idx + 1;
701
702    for (array, row) in indices.iter().skip(1).copied() {
703        if array == cur_array && row == end_row_idx {
704            // subsequent row in same batch
705            end_row_idx += 1;
706            continue;
707        }
708
709        // emit current batch of rows for current buffer
710        array_data.extend(cur_array, start_row_idx, end_row_idx);
711
712        // start new batch of rows
713        cur_array = array;
714        start_row_idx = row;
715        end_row_idx = start_row_idx + 1;
716    }
717
718    // emit final batch of rows
719    array_data.extend(cur_array, start_row_idx, end_row_idx);
720    Ok(make_array(array_data.freeze()))
721}
722
723/// Fallback implementation for interleaving dictionaries when it was determined
724/// that the dictionary values should not be merged. This implementation concatenates
725/// the value slices and recomputes the resulting dictionary keys.
726///
727/// # Panics
728///
729/// This function assumes that the combined dictionary values will not overflow the
730/// key type. Callers must verify this condition [`should_merge_dictionary_values`]
731/// before calling this function.
732fn interleave_fallback_dictionary<K: ArrowDictionaryKeyType>(
733    dictionaries: &[&DictionaryArray<K>],
734    indices: &[(usize, usize)],
735) -> Result<ArrayRef, ArrowError> {
736    let relative_offsets: Vec<usize> = dictionaries
737        .iter()
738        .scan(0usize, |offset, dict| {
739            let current = *offset;
740            *offset += dict.values().len();
741            Some(current)
742        })
743        .collect();
744    let all_values: Vec<&dyn Array> = dictionaries.iter().map(|d| d.values().as_ref()).collect();
745    let concatenated_values = concat(&all_values)?;
746
747    let any_nulls = dictionaries.iter().any(|d| d.keys().nulls().is_some());
748    let (new_keys, nulls) = if any_nulls {
749        let mut has_nulls = false;
750        let new_keys: Vec<K::Native> = indices
751            .iter()
752            .map(|(array, row)| {
753                let old_keys = dictionaries[*array].keys();
754                if old_keys.is_valid(*row) {
755                    let old_key = old_keys.values()[*row].as_usize();
756                    K::Native::from_usize(relative_offsets[*array] + old_key)
757                        .expect("key overflow should be checked by caller")
758                } else {
759                    has_nulls = true;
760                    K::Native::ZERO
761                }
762            })
763            .collect();
764
765        let nulls = if has_nulls {
766            let null_buffer = BooleanBuffer::collect_bool(indices.len(), |i| {
767                let (array, row) = indices[i];
768                dictionaries[array].keys().is_valid(row)
769            });
770            Some(NullBuffer::new(null_buffer))
771        } else {
772            None
773        };
774        (new_keys, nulls)
775    } else {
776        let new_keys: Vec<K::Native> = indices
777            .iter()
778            .map(|(array, row)| {
779                let old_key = dictionaries[*array].keys().values()[*row].as_usize();
780                K::Native::from_usize(relative_offsets[*array] + old_key)
781                    .expect("key overflow should be checked by caller")
782            })
783            .collect();
784        (new_keys, None)
785    };
786
787    let keys_array = PrimitiveArray::<K>::new(new_keys.into(), nulls);
788    // SAFETY: keys_array is constructed from a valid set of keys.
789    let array = unsafe { DictionaryArray::new_unchecked(keys_array, concatenated_values) };
790    Ok(Arc::new(array))
791}
792
793/// Interleave rows by index from multiple [`RecordBatch`] instances and return a new [`RecordBatch`].
794///
795/// This function will call [`interleave`] on each array of the [`RecordBatch`] instances and assemble a new [`RecordBatch`].
796///
797/// # Example
798/// ```
799/// # use std::sync::Arc;
800/// # use arrow_array::{StringArray, Int32Array, RecordBatch, UInt32Array};
801/// # use arrow_schema::{DataType, Field, Schema};
802/// # use arrow_select::interleave::interleave_record_batch;
803///
804/// let schema = Arc::new(Schema::new(vec![
805///     Field::new("a", DataType::Int32, true),
806///     Field::new("b", DataType::Utf8, true),
807/// ]));
808///
809/// let batch1 = RecordBatch::try_new(
810///     schema.clone(),
811///     vec![
812///         Arc::new(Int32Array::from(vec![0, 1, 2])),
813///         Arc::new(StringArray::from(vec!["a", "b", "c"])),
814///     ],
815/// ).unwrap();
816///
817/// let batch2 = RecordBatch::try_new(
818///     schema.clone(),
819///     vec![
820///         Arc::new(Int32Array::from(vec![3, 4, 5])),
821///         Arc::new(StringArray::from(vec!["d", "e", "f"])),
822///     ],
823/// ).unwrap();
824///
825/// let indices = vec![(0, 1), (1, 2), (0, 0), (1, 1)];
826/// let interleaved = interleave_record_batch(&[&batch1, &batch2], &indices).unwrap();
827///
828/// let expected = RecordBatch::try_new(
829///     schema,
830///     vec![
831///         Arc::new(Int32Array::from(vec![1, 5, 0, 4])),
832///         Arc::new(StringArray::from(vec!["b", "f", "a", "e"])),
833///     ],
834/// ).unwrap();
835/// assert_eq!(interleaved, expected);
836/// ```
837pub fn interleave_record_batch(
838    record_batches: &[&RecordBatch],
839    indices: &[(usize, usize)],
840) -> Result<RecordBatch, ArrowError> {
841    let schema = record_batches[0].schema();
842    let columns = (0..schema.fields().len())
843        .map(|i| {
844            let column_values: Vec<&dyn Array> = record_batches
845                .iter()
846                .map(|batch| batch.column(i).as_ref())
847                .collect();
848            interleave(&column_values, indices)
849        })
850        .collect::<Result<Vec<_>, _>>()?;
851    RecordBatch::try_new(schema, columns)
852}
853
854#[cfg(test)]
855mod tests {
856    use super::*;
857    use arrow_array::Int32RunArray;
858    use arrow_array::builder::{GenericListBuilder, Int32Builder, PrimitiveRunBuilder};
859    use arrow_array::types::Int8Type;
860    use arrow_buffer::ScalarBuffer;
861    use arrow_schema::Field;
862
863    #[test]
864    fn test_primitive() {
865        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
866        let b = Int32Array::from_iter_values([5, 6, 7]);
867        let c = Int32Array::from_iter_values([8, 9, 10]);
868        let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
869        let v = values.as_primitive::<Int32Type>();
870        assert_eq!(v.values(), &[4, 4, 10, 8, 6]);
871    }
872
873    #[test]
874    fn test_primitive_nulls() {
875        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
876        let b = Int32Array::from_iter([Some(1), Some(4), None]);
877        let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (0, 2)]).unwrap();
878        let v: Vec<_> = values.as_primitive::<Int32Type>().into_iter().collect();
879        assert_eq!(&v, &[Some(2), None, None, Some(4), Some(3)])
880    }
881
882    #[test]
883    fn test_primitive_empty() {
884        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
885        let v = interleave(&[&a], &[]).unwrap();
886        assert!(v.is_empty());
887        assert_eq!(v.data_type(), &DataType::Int32);
888    }
889
890    #[test]
891    fn test_strings() {
892        let a = StringArray::from_iter_values(["a", "b", "c"]);
893        let b = StringArray::from_iter_values(["hello", "world", "foo"]);
894        let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
895        let v = values.as_string::<i32>();
896        let values: Vec<_> = v.into_iter().collect();
897        assert_eq!(
898            &values,
899            &[
900                Some("c"),
901                Some("c"),
902                Some("hello"),
903                Some("world"),
904                Some("b")
905            ]
906        )
907    }
908
909    #[test]
910    fn test_interleave_dictionary() {
911        let a = DictionaryArray::<Int32Type>::from_iter(["a", "b", "c", "a", "b"]);
912        let b = DictionaryArray::<Int32Type>::from_iter(["a", "c", "a", "c", "a"]);
913
914        // Should not recompute dictionary
915        let values =
916            interleave(&[&a, &b], &[(0, 2), (0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
917        let v = values.as_dictionary::<Int32Type>();
918        assert_eq!(v.values().len(), 5);
919
920        let vc = v.downcast_dict::<StringArray>().unwrap();
921        let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
922        assert_eq!(&collected, &["c", "c", "c", "a", "c", "b"]);
923
924        // Should recompute dictionary
925        let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 1)]).unwrap();
926        let v = values.as_dictionary::<Int32Type>();
927        assert_eq!(v.values().len(), 1);
928
929        let vc = v.downcast_dict::<StringArray>().unwrap();
930        let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
931        assert_eq!(&collected, &["c", "c", "c"]);
932    }
933
934    #[test]
935    fn test_interleave_dictionary_nulls() {
936        let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]);
937        let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]);
938        let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
939        let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
940
941        let expected = vec![Some("fiz"), None, None, Some("foo")];
942
943        let values = interleave(
944            &[&input_1 as _, &input_2 as _],
945            &[(0, 3), (0, 2), (1, 0), (0, 0)],
946        )
947        .unwrap();
948        let dictionary = values.as_dictionary::<Int32Type>();
949        let actual: Vec<Option<&str>> = dictionary
950            .downcast_dict::<StringArray>()
951            .unwrap()
952            .into_iter()
953            .collect();
954
955        assert_eq!(actual, expected);
956    }
957
958    #[test]
959    fn test_interleave_dictionary_overflow_same_values() {
960        let values: ArrayRef = Arc::new(StringArray::from_iter_values(
961            (0..50).map(|i| format!("v{i}")),
962        ));
963
964        // With 3 dictionaries of 50 values each, relative_offsets = [0, 50, 100]
965        // Accessing key 49 from dict3 gives 100 + 49 = 149 which overflows Int8
966        // (max 127).
967        // This test case falls back to interleave_fallback because the
968        // dictionaries share the same underlying values slice.
969        let dict1 = DictionaryArray::<Int8Type>::new(
970            Int8Array::from_iter_values([0, 1, 2]),
971            values.clone(),
972        );
973        let dict2 = DictionaryArray::<Int8Type>::new(
974            Int8Array::from_iter_values([0, 1, 2]),
975            values.clone(),
976        );
977        let dict3 =
978            DictionaryArray::<Int8Type>::new(Int8Array::from_iter_values([49]), values.clone());
979
980        let indices = &[(0, 0), (1, 0), (2, 0)];
981        let result = interleave(&[&dict1, &dict2, &dict3], indices).unwrap();
982
983        let dict_result = result.as_dictionary::<Int8Type>();
984        let string_result: Vec<_> = dict_result
985            .downcast_dict::<StringArray>()
986            .unwrap()
987            .into_iter()
988            .map(|x| x.unwrap())
989            .collect();
990        assert_eq!(string_result, vec!["v0", "v0", "v49"]);
991    }
992
993    fn test_interleave_lists<O: OffsetSizeTrait>() {
994        // [[1, 2], null, [3]]
995        let mut a = GenericListBuilder::<O, _>::new(Int32Builder::new());
996        a.values().append_value(1);
997        a.values().append_value(2);
998        a.append(true);
999        a.append(false);
1000        a.values().append_value(3);
1001        a.append(true);
1002        let a = a.finish();
1003
1004        // [[4], null, [5, 6, null]]
1005        let mut b = GenericListBuilder::<O, _>::new(Int32Builder::new());
1006        b.values().append_value(4);
1007        b.append(true);
1008        b.append(false);
1009        b.values().append_value(5);
1010        b.values().append_value(6);
1011        b.values().append_null();
1012        b.append(true);
1013        let b = b.finish();
1014
1015        let values = interleave(&[&a, &b], &[(0, 2), (0, 1), (1, 0), (1, 2), (1, 1)]).unwrap();
1016        let v = values
1017            .as_any()
1018            .downcast_ref::<GenericListArray<O>>()
1019            .unwrap();
1020
1021        // [[3], null, [4], [5, 6, null], null]
1022        let mut expected = GenericListBuilder::<O, _>::new(Int32Builder::new());
1023        expected.values().append_value(3);
1024        expected.append(true);
1025        expected.append(false);
1026        expected.values().append_value(4);
1027        expected.append(true);
1028        expected.values().append_value(5);
1029        expected.values().append_value(6);
1030        expected.values().append_null();
1031        expected.append(true);
1032        expected.append(false);
1033        let expected = expected.finish();
1034
1035        assert_eq!(v, &expected);
1036    }
1037
1038    #[test]
1039    fn test_lists() {
1040        test_interleave_lists::<i32>();
1041    }
1042
1043    #[test]
1044    fn test_large_lists() {
1045        test_interleave_lists::<i64>();
1046    }
1047
1048    fn test_interleave_list_views<O: OffsetSizeTrait>() {
1049        // [[1, 2], null, [3]]
1050        let mut a = GenericListBuilder::<O, _>::new(Int32Builder::new());
1051        a.values().append_value(1);
1052        a.values().append_value(2);
1053        a.append(true);
1054        a.append(false);
1055        a.values().append_value(3);
1056        a.append(true);
1057        let a: GenericListViewArray<O> = a.finish().into();
1058
1059        // [[4], null, [5, 6, null]]
1060        let mut b = GenericListBuilder::<O, _>::new(Int32Builder::new());
1061        b.values().append_value(4);
1062        b.append(true);
1063        b.append(false);
1064        b.values().append_value(5);
1065        b.values().append_value(6);
1066        b.values().append_null();
1067        b.append(true);
1068        let b: GenericListViewArray<O> = b.finish().into();
1069
1070        let values = interleave(&[&a, &b], &[(0, 2), (0, 1), (1, 0), (1, 2), (1, 1)]).unwrap();
1071        let v = values
1072            .as_any()
1073            .downcast_ref::<GenericListViewArray<O>>()
1074            .unwrap();
1075
1076        // [[3], null, [4], [5, 6, null], null]
1077        let mut expected = GenericListBuilder::<O, _>::new(Int32Builder::new());
1078        expected.values().append_value(3);
1079        expected.append(true);
1080        expected.append(false);
1081        expected.values().append_value(4);
1082        expected.append(true);
1083        expected.values().append_value(5);
1084        expected.values().append_value(6);
1085        expected.values().append_null();
1086        expected.append(true);
1087        expected.append(false);
1088        let expected: GenericListViewArray<O> = expected.finish().into();
1089
1090        assert_eq!(v, &expected);
1091    }
1092
1093    #[test]
1094    fn test_list_views() {
1095        test_interleave_list_views::<i32>();
1096    }
1097
1098    #[test]
1099    fn test_large_list_views() {
1100        test_interleave_list_views::<i64>();
1101    }
1102
1103    #[test]
1104    fn test_interleave_list_view_overlapping() {
1105        let field = Arc::new(Field::new_list_field(DataType::Int64, false));
1106
1107        // lv_a: 10 rows, two groups of 5 sharing the same backing elements.
1108        //   rows 0-4 → offset 0, size 5 → [0,1,2,3,4]
1109        //   rows 5-9 → offset 5, size 5 → [5,6,7,8,9]
1110        let lv_a = ListViewArray::new(
1111            Arc::clone(&field),
1112            ScalarBuffer::from(vec![0i32, 0, 0, 0, 0, 5, 5, 5, 5, 5]),
1113            ScalarBuffer::from(vec![5i32; 10]),
1114            Arc::new(Int64Array::from_iter_values(0..10)),
1115            None,
1116        );
1117
1118        // lv_b: 8 rows, two groups of 4 sharing the same backing elements.
1119        //   rows 0-3 → offset 0, size 3 → [100,101,102]
1120        //   rows 4-7 → offset 3, size 3 → [103,104,105]
1121        let lv_b = ListViewArray::new(
1122            Arc::clone(&field),
1123            ScalarBuffer::from(vec![0i32, 0, 0, 0, 3, 3, 3, 3]),
1124            ScalarBuffer::from(vec![3i32; 8]),
1125            Arc::new(Int64Array::from_iter_values(100..106)),
1126            None,
1127        );
1128
1129        let indices: Vec<(usize, usize)> = vec![
1130            (0, 0),
1131            (1, 0),
1132            (0, 5),
1133            (1, 4),
1134            (0, 1),
1135            (1, 1),
1136            (0, 6),
1137            (1, 5),
1138        ];
1139        let result = interleave(&[&lv_a as &dyn Array, &lv_b as &dyn Array], &indices).unwrap();
1140        result
1141            .to_data()
1142            .validate_full()
1143            .expect("result must be valid");
1144
1145        let result_lv = result.as_list_view::<i32>();
1146        assert_eq!(result_lv.len(), 8);
1147        assert_eq!(
1148            result_lv.value(0).as_primitive::<Int64Type>().values(),
1149            &[0, 1, 2, 3, 4]
1150        );
1151        assert_eq!(
1152            result_lv.value(1).as_primitive::<Int64Type>().values(),
1153            &[100, 101, 102]
1154        );
1155        assert_eq!(
1156            result_lv.value(2).as_primitive::<Int64Type>().values(),
1157            &[5, 6, 7, 8, 9]
1158        );
1159        assert_eq!(
1160            result_lv.value(3).as_primitive::<Int64Type>().values(),
1161            &[103, 104, 105]
1162        );
1163
1164        // Backing elements = sum of source arrays (10 + 6 = 16), not per-row
1165        // expansion (8 rows × avg ~4 = 32). Overlapping sharing is preserved.
1166        let total_input_elements = lv_a.values().len() + lv_b.values().len();
1167        assert_eq!(result_lv.values().len(), total_input_elements);
1168    }
1169
1170    #[test]
1171    fn test_struct_without_nulls() {
1172        let fields = Fields::from(vec![
1173            Field::new("number_col", DataType::Int32, false),
1174            Field::new("string_col", DataType::Utf8, false),
1175        ]);
1176        let a = {
1177            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
1178            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
1179
1180            StructArray::try_new(
1181                fields.clone(),
1182                vec![Arc::new(number_col), Arc::new(string_col)],
1183                None,
1184            )
1185            .unwrap()
1186        };
1187
1188        let b = {
1189            let number_col = Int32Array::from_iter_values([5, 6, 7]);
1190            let string_col = StringArray::from_iter_values(["hello", "world", "foo"]);
1191
1192            StructArray::try_new(
1193                fields.clone(),
1194                vec![Arc::new(number_col), Arc::new(string_col)],
1195                None,
1196            )
1197            .unwrap()
1198        };
1199
1200        let c = {
1201            let number_col = Int32Array::from_iter_values([8, 9, 10]);
1202            let string_col = StringArray::from_iter_values(["x", "y", "z"]);
1203
1204            StructArray::try_new(
1205                fields.clone(),
1206                vec![Arc::new(number_col), Arc::new(string_col)],
1207                None,
1208            )
1209            .unwrap()
1210        };
1211
1212        let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
1213        let values_struct = values.as_struct();
1214        assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
1215        assert_eq!(values_struct.null_count(), 0);
1216
1217        let values_number = values_struct.column(0).as_primitive::<Int32Type>();
1218        assert_eq!(values_number.values(), &[4, 4, 10, 8, 6]);
1219        let values_string = values_struct.column(1).as_string::<i32>();
1220        let values_string: Vec<_> = values_string.into_iter().collect();
1221        assert_eq!(
1222            &values_string,
1223            &[Some("d"), Some("d"), Some("z"), Some("x"), Some("world")]
1224        );
1225    }
1226
1227    #[test]
1228    fn test_struct_with_nulls_in_values() {
1229        let fields = Fields::from(vec![
1230            Field::new("number_col", DataType::Int32, true),
1231            Field::new("string_col", DataType::Utf8, true),
1232        ]);
1233        let a = {
1234            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
1235            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
1236
1237            StructArray::try_new(
1238                fields.clone(),
1239                vec![Arc::new(number_col), Arc::new(string_col)],
1240                None,
1241            )
1242            .unwrap()
1243        };
1244
1245        let b = {
1246            let number_col = Int32Array::from_iter([Some(1), Some(4), None]);
1247            let string_col = StringArray::from(vec![Some("hello"), None, Some("foo")]);
1248
1249            StructArray::try_new(
1250                fields.clone(),
1251                vec![Arc::new(number_col), Arc::new(string_col)],
1252                None,
1253            )
1254            .unwrap()
1255        };
1256
1257        let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (1, 1)]).unwrap();
1258        let values_struct = values.as_struct();
1259        assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
1260
1261        // The struct itself has no nulls, but the values do
1262        assert_eq!(values_struct.null_count(), 0);
1263
1264        let values_number: Vec<_> = values_struct
1265            .column(0)
1266            .as_primitive::<Int32Type>()
1267            .into_iter()
1268            .collect();
1269        assert_eq!(values_number, &[Some(2), None, None, Some(4), Some(4)]);
1270
1271        let values_string = values_struct.column(1).as_string::<i32>();
1272        let values_string: Vec<_> = values_string.into_iter().collect();
1273        assert_eq!(
1274            &values_string,
1275            &[Some("b"), Some("foo"), Some("foo"), Some("d"), None]
1276        );
1277    }
1278
1279    #[test]
1280    fn test_struct_with_nulls() {
1281        let fields = Fields::from(vec![
1282            Field::new("number_col", DataType::Int32, false),
1283            Field::new("string_col", DataType::Utf8, false),
1284        ]);
1285        let a = {
1286            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
1287            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
1288
1289            StructArray::try_new(
1290                fields.clone(),
1291                vec![Arc::new(number_col), Arc::new(string_col)],
1292                None,
1293            )
1294            .unwrap()
1295        };
1296
1297        let b = {
1298            let number_col = Int32Array::from_iter_values([5, 6, 7]);
1299            let string_col = StringArray::from_iter_values(["hello", "world", "foo"]);
1300
1301            StructArray::try_new(
1302                fields.clone(),
1303                vec![Arc::new(number_col), Arc::new(string_col)],
1304                Some(NullBuffer::from(&[true, false, true])),
1305            )
1306            .unwrap()
1307        };
1308
1309        let c = {
1310            let number_col = Int32Array::from_iter_values([8, 9, 10]);
1311            let string_col = StringArray::from_iter_values(["x", "y", "z"]);
1312
1313            StructArray::try_new(
1314                fields.clone(),
1315                vec![Arc::new(number_col), Arc::new(string_col)],
1316                None,
1317            )
1318            .unwrap()
1319        };
1320
1321        let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (1, 1), (2, 0)]).unwrap();
1322        let values_struct = values.as_struct();
1323        assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
1324
1325        let validity: Vec<bool> = {
1326            let null_buffer = values_struct.nulls().expect("should_have_nulls");
1327
1328            null_buffer.iter().collect()
1329        };
1330        assert_eq!(validity, &[true, true, true, false, true]);
1331        let values_number = values_struct.column(0).as_primitive::<Int32Type>();
1332        assert_eq!(values_number.values(), &[4, 4, 10, 6, 8]);
1333        let values_string = values_struct.column(1).as_string::<i32>();
1334        let values_string: Vec<_> = values_string.into_iter().collect();
1335        assert_eq!(
1336            &values_string,
1337            &[Some("d"), Some("d"), Some("z"), Some("world"), Some("x"),]
1338        );
1339    }
1340
1341    #[test]
1342    fn test_struct_empty() {
1343        let fields = Fields::from(vec![
1344            Field::new("number_col", DataType::Int32, false),
1345            Field::new("string_col", DataType::Utf8, false),
1346        ]);
1347        let a = {
1348            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
1349            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
1350
1351            StructArray::try_new(
1352                fields.clone(),
1353                vec![Arc::new(number_col), Arc::new(string_col)],
1354                None,
1355            )
1356            .unwrap()
1357        };
1358        let v = interleave(&[&a], &[]).unwrap();
1359        assert!(v.is_empty());
1360        assert_eq!(v.data_type(), &DataType::Struct(fields));
1361    }
1362
1363    #[test]
1364    fn interleave_sparse_nulls() {
1365        let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
1366        let keys = Int32Array::from_iter_values(0..10);
1367        let dict_a = DictionaryArray::new(keys, Arc::new(values));
1368        let values = StringArray::new_null(0);
1369        let keys = Int32Array::new_null(10);
1370        let dict_b = DictionaryArray::new(keys, Arc::new(values));
1371
1372        let indices = &[(0, 0), (0, 1), (0, 2), (1, 0)];
1373        let array = interleave(&[&dict_a, &dict_b], indices).unwrap();
1374
1375        let expected =
1376            DictionaryArray::<Int32Type>::from_iter(vec![Some("0"), Some("1"), Some("2"), None]);
1377        assert_eq!(array.as_ref(), &expected)
1378    }
1379
1380    #[test]
1381    fn test_interleave_views() {
1382        let values = StringArray::from_iter_values([
1383            "hello",
1384            "world_long_string_not_inlined",
1385            "foo",
1386            "bar",
1387            "baz",
1388        ]);
1389        let view_a = StringViewArray::from(&values);
1390
1391        let values = StringArray::from_iter_values([
1392            "test",
1393            "data",
1394            "more_long_string_not_inlined",
1395            "views",
1396            "here",
1397        ]);
1398        let view_b = StringViewArray::from(&values);
1399
1400        let indices = &[
1401            (0, 2), // "foo"
1402            (1, 0), // "test"
1403            (0, 4), // "baz"
1404            (1, 3), // "views"
1405            (0, 1), // "world_long_string_not_inlined"
1406        ];
1407
1408        // Test specialized implementation
1409        let values = interleave(&[&view_a, &view_b], indices).unwrap();
1410        let result = values.as_string_view();
1411        assert_eq!(result.data_buffers().len(), 1);
1412
1413        let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
1414        let fallback_result = fallback.as_string_view();
1415        // note that fallback_result has 2 buffers, but only one long enough string to warrant a buffer
1416        assert_eq!(fallback_result.data_buffers().len(), 2);
1417
1418        // Convert to strings for easier assertion
1419        let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
1420
1421        let fallback_collected: Vec<_> = fallback_result
1422            .iter()
1423            .map(|x| x.map(|s| s.to_string()))
1424            .collect();
1425
1426        assert_eq!(&collected, &fallback_collected);
1427
1428        assert_eq!(
1429            &collected,
1430            &[
1431                Some("foo".to_string()),
1432                Some("test".to_string()),
1433                Some("baz".to_string()),
1434                Some("views".to_string()),
1435                Some("world_long_string_not_inlined".to_string()),
1436            ]
1437        );
1438    }
1439
1440    #[test]
1441    fn test_interleave_views_with_nulls() {
1442        let values = StringArray::from_iter([
1443            Some("hello"),
1444            None,
1445            Some("foo_long_string_not_inlined"),
1446            Some("bar"),
1447            None,
1448        ]);
1449        let view_a = StringViewArray::from(&values);
1450
1451        let values = StringArray::from_iter([
1452            Some("test"),
1453            Some("data_long_string_not_inlined"),
1454            None,
1455            None,
1456            Some("here"),
1457        ]);
1458        let view_b = StringViewArray::from(&values);
1459
1460        let indices = &[
1461            (0, 1), // null
1462            (1, 2), // null
1463            (0, 2), // "foo_long_string_not_inlined"
1464            (1, 3), // null
1465            (0, 4), // null
1466        ];
1467
1468        // Test specialized implementation
1469        let values = interleave(&[&view_a, &view_b], indices).unwrap();
1470        let result = values.as_string_view();
1471        assert_eq!(result.data_buffers().len(), 1);
1472
1473        let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
1474        let fallback_result = fallback.as_string_view();
1475
1476        // Convert to strings for easier assertion
1477        let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
1478
1479        let fallback_collected: Vec<_> = fallback_result
1480            .iter()
1481            .map(|x| x.map(|s| s.to_string()))
1482            .collect();
1483
1484        assert_eq!(&collected, &fallback_collected);
1485
1486        assert_eq!(
1487            &collected,
1488            &[
1489                None,
1490                None,
1491                Some("foo_long_string_not_inlined".to_string()),
1492                None,
1493                None,
1494            ]
1495        );
1496    }
1497
1498    #[test]
1499    fn test_interleave_views_multiple_buffers() {
1500        let str1 = "very_long_string_from_first_buffer".as_bytes();
1501        let str2 = "very_long_string_from_second_buffer".as_bytes();
1502        let buffer1 = str1.to_vec().into();
1503        let buffer2 = str2.to_vec().into();
1504
1505        let view1 = ByteView::new(str1.len() as u32, &str1[..4])
1506            .with_buffer_index(0)
1507            .with_offset(0)
1508            .as_u128();
1509        let view2 = ByteView::new(str2.len() as u32, &str2[..4])
1510            .with_buffer_index(1)
1511            .with_offset(0)
1512            .as_u128();
1513        let view_a =
1514            StringViewArray::try_new(vec![view1, view2].into(), vec![buffer1, buffer2], None)
1515                .unwrap();
1516
1517        let str3 = "another_very_long_string_buffer_three".as_bytes();
1518        let str4 = "different_long_string_in_buffer_four".as_bytes();
1519        let buffer3 = str3.to_vec().into();
1520        let buffer4 = str4.to_vec().into();
1521
1522        let view3 = ByteView::new(str3.len() as u32, &str3[..4])
1523            .with_buffer_index(0)
1524            .with_offset(0)
1525            .as_u128();
1526        let view4 = ByteView::new(str4.len() as u32, &str4[..4])
1527            .with_buffer_index(1)
1528            .with_offset(0)
1529            .as_u128();
1530        let view_b =
1531            StringViewArray::try_new(vec![view3, view4].into(), vec![buffer3, buffer4], None)
1532                .unwrap();
1533
1534        let indices = &[
1535            (0, 0), // String from first buffer of array A
1536            (1, 0), // String from first buffer of array B
1537            (0, 1), // String from second buffer of array A
1538            (1, 1), // String from second buffer of array B
1539            (0, 0), // String from first buffer of array A again
1540            (1, 1), // String from second buffer of array B again
1541        ];
1542
1543        // Test interleave
1544        let values = interleave(&[&view_a, &view_b], indices).unwrap();
1545        let result = values.as_string_view();
1546
1547        assert_eq!(
1548            result.data_buffers().len(),
1549            4,
1550            "Expected four buffers (two from each input array)"
1551        );
1552
1553        let result_strings: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
1554        assert_eq!(
1555            result_strings,
1556            vec![
1557                Some("very_long_string_from_first_buffer".to_string()),
1558                Some("another_very_long_string_buffer_three".to_string()),
1559                Some("very_long_string_from_second_buffer".to_string()),
1560                Some("different_long_string_in_buffer_four".to_string()),
1561                Some("very_long_string_from_first_buffer".to_string()),
1562                Some("different_long_string_in_buffer_four".to_string()),
1563            ]
1564        );
1565
1566        let views = result.views();
1567        let buffer_indices: Vec<_> = views
1568            .iter()
1569            .map(|raw_view| ByteView::from(*raw_view).buffer_index)
1570            .collect();
1571
1572        assert_eq!(
1573            buffer_indices,
1574            vec![
1575                0, // First buffer from array A
1576                1, // First buffer from array B
1577                2, // Second buffer from array A
1578                3, // Second buffer from array B
1579                0, // First buffer from array A (reused)
1580                3, // Second buffer from array B (reused)
1581            ]
1582        );
1583    }
1584
1585    #[test]
1586    fn test_interleave_run_end_encoded_primitive() {
1587        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1588        builder.extend([1, 1, 2, 2, 2, 3].into_iter().map(Some));
1589        let a = builder.finish();
1590
1591        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1592        builder.extend([4, 5, 5, 6, 6, 6].into_iter().map(Some));
1593        let b = builder.finish();
1594
1595        let indices = &[(0, 1), (1, 0), (0, 4), (1, 2), (0, 5)];
1596        let result = interleave(&[&a, &b], indices).unwrap();
1597
1598        // The result should be a RunEndEncoded array
1599        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1600
1601        // Cast to RunArray to access values
1602        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1603
1604        // Verify the logical values by accessing the logical array directly
1605        let expected = vec![1, 4, 2, 5, 3];
1606        let mut actual = Vec::new();
1607        for i in 0..result_run_array.len() {
1608            let physical_idx = result_run_array.get_physical_index(i);
1609            let value = result_run_array
1610                .values()
1611                .as_primitive::<Int32Type>()
1612                .value(physical_idx);
1613            actual.push(value);
1614        }
1615        assert_eq!(actual, expected);
1616    }
1617
1618    #[test]
1619    fn test_interleave_run_end_encoded_sliced() {
1620        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1621        builder.extend([1, 1, 2, 2, 2, 3].into_iter().map(Some));
1622        let a = builder.finish();
1623        let a = a.slice(2, 3); // [2, 2, 2]
1624
1625        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1626        builder.extend([4, 5, 5, 6, 6, 6].into_iter().map(Some));
1627        let b = builder.finish();
1628        let b = b.slice(1, 3); // [5, 5, 6]
1629
1630        let indices = &[(0, 1), (1, 0), (0, 2), (1, 1), (1, 2)];
1631        let result = interleave(&[&a, &b], indices).unwrap();
1632
1633        let result = result.as_run::<Int32Type>();
1634        let result = result.downcast::<Int32Array>().unwrap();
1635
1636        let expected = vec![2, 5, 2, 5, 6];
1637        let actual = result.into_iter().flatten().collect::<Vec<_>>();
1638        assert_eq!(actual, expected);
1639    }
1640
1641    #[test]
1642    fn test_interleave_run_end_encoded_string() {
1643        let a: Int32RunArray = vec!["hello", "hello", "world", "world", "foo"]
1644            .into_iter()
1645            .collect();
1646        let b: Int32RunArray = vec!["bar", "baz", "baz", "qux"].into_iter().collect();
1647
1648        let indices = &[(0, 0), (1, 1), (0, 3), (1, 3), (0, 4)];
1649        let result = interleave(&[&a, &b], indices).unwrap();
1650
1651        // The result should be a RunEndEncoded array
1652        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1653
1654        // Cast to RunArray to access values
1655        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1656
1657        // Verify the logical values by accessing the logical array directly
1658        let expected = vec!["hello", "baz", "world", "qux", "foo"];
1659        let mut actual = Vec::new();
1660        for i in 0..result_run_array.len() {
1661            let physical_idx = result_run_array.get_physical_index(i);
1662            let value = result_run_array
1663                .values()
1664                .as_string::<i32>()
1665                .value(physical_idx);
1666            actual.push(value);
1667        }
1668        assert_eq!(actual, expected);
1669    }
1670
1671    #[test]
1672    fn test_interleave_run_end_encoded_with_nulls() {
1673        let a: Int32RunArray = vec![Some("a"), Some("a"), None, None, Some("b")]
1674            .into_iter()
1675            .collect();
1676        let b: Int32RunArray = vec![None, Some("c"), Some("c"), Some("d")]
1677            .into_iter()
1678            .collect();
1679
1680        let indices = &[(0, 1), (1, 0), (0, 2), (1, 3), (0, 4)];
1681        let result = interleave(&[&a, &b], indices).unwrap();
1682
1683        // The result should be a RunEndEncoded array
1684        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1685
1686        // Cast to RunArray to access values
1687        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1688
1689        // Verify the logical values by accessing the logical array directly
1690        let expected = vec![Some("a"), None, None, Some("d"), Some("b")];
1691        let mut actual = Vec::new();
1692        for i in 0..result_run_array.len() {
1693            let physical_idx = result_run_array.get_physical_index(i);
1694            if result_run_array.values().is_null(physical_idx) {
1695                actual.push(None);
1696            } else {
1697                let value = result_run_array
1698                    .values()
1699                    .as_string::<i32>()
1700                    .value(physical_idx);
1701                actual.push(Some(value));
1702            }
1703        }
1704        assert_eq!(actual, expected);
1705    }
1706
1707    #[test]
1708    fn test_interleave_run_end_encoded_different_run_types() {
1709        let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1710        builder.extend([1, 1, 2, 3, 3].into_iter().map(Some));
1711        let a = builder.finish();
1712
1713        let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1714        builder.extend([4, 5, 5, 6].into_iter().map(Some));
1715        let b = builder.finish();
1716
1717        let indices = &[(0, 0), (1, 1), (0, 3), (1, 3)];
1718        let result = interleave(&[&a, &b], indices).unwrap();
1719
1720        // The result should be a RunEndEncoded array
1721        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1722
1723        // Cast to RunArray to access values
1724        let result_run_array: &RunArray<Int16Type> = result.as_any().downcast_ref().unwrap();
1725
1726        // Verify the logical values by accessing the logical array directly
1727        let expected = vec![1, 5, 3, 6];
1728        let mut actual = Vec::new();
1729        for i in 0..result_run_array.len() {
1730            let physical_idx = result_run_array.get_physical_index(i);
1731            let value = result_run_array
1732                .values()
1733                .as_primitive::<Int32Type>()
1734                .value(physical_idx);
1735            actual.push(value);
1736        }
1737        assert_eq!(actual, expected);
1738    }
1739
1740    #[test]
1741    fn test_interleave_run_end_encoded_mixed_run_lengths() {
1742        let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
1743        builder.extend([1, 2, 2, 2, 2, 3, 3, 4].into_iter().map(Some));
1744        let a = builder.finish();
1745
1746        let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
1747        builder.extend([5, 5, 5, 6, 7, 7, 8, 8].into_iter().map(Some));
1748        let b = builder.finish();
1749
1750        let indices = &[
1751            (0, 0), // 1
1752            (1, 2), // 5
1753            (0, 3), // 2
1754            (1, 3), // 6
1755            (0, 6), // 3
1756            (1, 6), // 8
1757            (0, 7), // 4
1758            (1, 4), // 7
1759        ];
1760        let result = interleave(&[&a, &b], indices).unwrap();
1761
1762        // The result should be a RunEndEncoded array
1763        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1764
1765        // Cast to RunArray to access values
1766        let result_run_array: &RunArray<Int64Type> = result.as_any().downcast_ref().unwrap();
1767
1768        // Verify the logical values by accessing the logical array directly
1769        let expected = vec![1, 5, 2, 6, 3, 8, 4, 7];
1770        let mut actual = Vec::new();
1771        for i in 0..result_run_array.len() {
1772            let physical_idx = result_run_array.get_physical_index(i);
1773            let value = result_run_array
1774                .values()
1775                .as_primitive::<Int32Type>()
1776                .value(physical_idx);
1777            actual.push(value);
1778        }
1779        assert_eq!(actual, expected);
1780    }
1781
1782    #[test]
1783    fn test_interleave_run_end_encoded_empty_runs() {
1784        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1785        builder.extend([1].into_iter().map(Some));
1786        let a = builder.finish();
1787
1788        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1789        builder.extend([2, 2, 2].into_iter().map(Some));
1790        let b = builder.finish();
1791
1792        let indices = &[(0, 0), (1, 1), (1, 2)];
1793        let result = interleave(&[&a, &b], indices).unwrap();
1794
1795        // The result should be a RunEndEncoded array
1796        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1797
1798        // Cast to RunArray to access values
1799        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1800
1801        // Verify the logical values by accessing the logical array directly
1802        let expected = vec![1, 2, 2];
1803        let mut actual = Vec::new();
1804        for i in 0..result_run_array.len() {
1805            let physical_idx = result_run_array.get_physical_index(i);
1806            let value = result_run_array
1807                .values()
1808                .as_primitive::<Int32Type>()
1809                .value(physical_idx);
1810            actual.push(value);
1811        }
1812        assert_eq!(actual, expected);
1813    }
1814
1815    #[test]
1816    fn test_struct_no_fields() {
1817        let fields = Fields::empty();
1818        let a = StructArray::try_new_with_length(fields.clone(), vec![], None, 10).unwrap();
1819        let v = interleave(&[&a], &[(0, 0)]).unwrap();
1820        assert_eq!(v.len(), 1);
1821        assert_eq!(v.data_type(), &DataType::Struct(fields));
1822    }
1823
1824    #[test]
1825    fn test_interleave_fallback_dictionary_with_nulls() {
1826        let input_1_keys = Int32Array::from_iter([Some(0), None, Some(1)]);
1827        let input_1_values = StringArray::from_iter_values(["foo", "bar"]);
1828        let dict_a = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
1829
1830        let input_2_keys = Int32Array::from_iter([Some(0), Some(1), None]);
1831        let input_2_values = StringArray::from_iter_values(["baz", "qux"]);
1832        let dict_b = DictionaryArray::new(input_2_keys, Arc::new(input_2_values));
1833
1834        let indices = vec![
1835            (0, 0), // "foo"
1836            (0, 1), // null
1837            (1, 0), // "baz"
1838            (1, 2), // null
1839            (0, 2), // "bar"
1840            (1, 1), // "qux"
1841        ];
1842
1843        let result =
1844            interleave_fallback_dictionary::<Int32Type>(&[&dict_a, &dict_b], &indices).unwrap();
1845        let dict_result = result.as_dictionary::<Int32Type>();
1846
1847        let string_result = dict_result.downcast_dict::<StringArray>().unwrap();
1848        let collected: Vec<_> = string_result.into_iter().collect();
1849        assert_eq!(
1850            collected,
1851            vec![
1852                Some("foo"),
1853                None,
1854                Some("baz"),
1855                None,
1856                Some("bar"),
1857                Some("qux")
1858            ]
1859        );
1860    }
1861
1862    #[test]
1863    fn test_interleave_bytes_offset_overflow() {
1864        let indices: Vec<(usize, usize)> = vec![(0, 0); (i32::MAX >> 4) as usize];
1865        let text = ('a'..='z').collect::<String>();
1866        let values = StringArray::from(vec![Some(text)]);
1867        assert!(matches!(
1868            interleave(&[&values], &indices),
1869            Err(ArrowError::OffsetOverflowError(_))
1870        ));
1871    }
1872
1873    #[test]
1874    fn test_interleave_list_offset_overflow() {
1875        // Build a ListArray<i32> with a single row containing many elements
1876        let mut builder = GenericListBuilder::<i32, _>::new(Int32Builder::new());
1877        for i in 0..32 {
1878            builder.values().append_value(i);
1879        }
1880        builder.append(true);
1881        let list = builder.finish();
1882
1883        // Interleave enough copies to overflow i32 offsets
1884        let indices: Vec<(usize, usize)> = vec![(0, 0); (i32::MAX as usize / 32) + 1];
1885        assert!(matches!(
1886            interleave(&[&list], &indices),
1887            Err(ArrowError::OffsetOverflowError(_))
1888        ));
1889    }
1890
1891    #[test]
1892    fn test_interleave_list_view() {
1893        // `interleave` for ListView falls through to `interleave_fallback`, which uses
1894        // `MutableArrayData`. `list_view::build_extend` copies offsets/sizes but never
1895        // extends the child array, so the result contains offsets/sizes that reference
1896        // positions in the now-absent original child arrays while the child is empty.
1897        //
1898        // lv_a: [[1, 2], [3]]   (values=[1,2,3], offsets=[0,2], sizes=[2,1])
1899        // lv_b: [[4, 5, 6]]     (values=[4,5,6], offsets=[0],   sizes=[3])
1900        // interleave at [(0,0), (1,0), (0,1)] should produce [[1, 2], [4, 5, 6], [3]]
1901        let field = Arc::new(Field::new_list_field(DataType::Int64, false));
1902
1903        let lv_a = ListViewArray::new(
1904            Arc::clone(&field),
1905            ScalarBuffer::from(vec![0i32, 2]),
1906            ScalarBuffer::from(vec![2i32, 1]),
1907            Arc::new(Int64Array::from(vec![1_i64, 2, 3])),
1908            None,
1909        );
1910        let lv_b = ListViewArray::new(
1911            field,
1912            ScalarBuffer::from(vec![0i32]),
1913            ScalarBuffer::from(vec![3i32]),
1914            Arc::new(Int64Array::from(vec![4_i64, 5, 6])),
1915            None,
1916        );
1917
1918        let result = interleave(
1919            &[&lv_a as &dyn Array, &lv_b as &dyn Array],
1920            &[(0, 0), (1, 0), (0, 1)],
1921        )
1922        .unwrap();
1923
1924        result
1925            .to_data()
1926            .validate_full()
1927            .expect("interleaved ListViewArray must be internally consistent");
1928
1929        let result_lv = result.as_list_view::<i32>();
1930        assert_eq!(result_lv.len(), 3);
1931        assert_eq!(
1932            result_lv.value(0).as_primitive::<Int64Type>().values(),
1933            &[1, 2]
1934        );
1935        assert_eq!(
1936            result_lv.value(1).as_primitive::<Int64Type>().values(),
1937            &[4, 5, 6]
1938        );
1939        assert_eq!(
1940            result_lv.value(2).as_primitive::<Int64Type>().values(),
1941            &[3]
1942        );
1943    }
1944}