arrow_select/
interleave.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Interleave elements from multiple arrays
19
20use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
21use arrow_array::builder::{BooleanBufferBuilder, PrimitiveBuilder};
22use arrow_array::cast::AsArray;
23use arrow_array::types::*;
24use arrow_array::*;
25use arrow_buffer::{ArrowNativeType, BooleanBuffer, MutableBuffer, NullBuffer, OffsetBuffer};
26use arrow_data::ByteView;
27use arrow_data::transform::MutableArrayData;
28use arrow_schema::{ArrowError, DataType, Fields};
29use std::sync::Arc;
30
31macro_rules! primitive_helper {
32    ($t:ty, $values:ident, $indices:ident, $data_type:ident) => {
33        interleave_primitive::<$t>($values, $indices, $data_type)
34    };
35}
36
37macro_rules! dict_helper {
38    ($t:ty, $values:expr, $indices:expr) => {
39        Ok(Arc::new(interleave_dictionaries::<$t>($values, $indices)?) as _)
40    };
41}
42
43///
44/// Takes elements by index from a list of [`Array`], creating a new [`Array`] from those values.
45///
46/// Each element in `indices` is a pair of `usize` with the first identifying the index
47/// of the [`Array`] in `values`, and the second the index of the value within that [`Array`]
48///
49/// ```text
50/// ┌─────────────────┐      ┌─────────┐                                  ┌─────────────────┐
51/// │        A        │      │ (0, 0)  │        interleave(               │        A        │
52/// ├─────────────────┤      ├─────────┤          [values0, values1],     ├─────────────────┤
53/// │        D        │      │ (1, 0)  │          indices                 │        B        │
54/// └─────────────────┘      ├─────────┤        )                         ├─────────────────┤
55///   values array 0         │ (1, 1)  │      ─────────────────────────▶  │        C        │
56///                          ├─────────┤                                  ├─────────────────┤
57///                          │ (0, 1)  │                                  │        D        │
58///                          └─────────┘                                  └─────────────────┘
59/// ┌─────────────────┐       indices
60/// │        B        │        array
61/// ├─────────────────┤                                                    result
62/// │        C        │
63/// ├─────────────────┤
64/// │        E        │
65/// └─────────────────┘
66///   values array 1
67/// ```
68///
69/// For selecting values by index from a single array see [`crate::take`]
70pub fn interleave(
71    values: &[&dyn Array],
72    indices: &[(usize, usize)],
73) -> Result<ArrayRef, ArrowError> {
74    if values.is_empty() {
75        return Err(ArrowError::InvalidArgumentError(
76            "interleave requires input of at least one array".to_string(),
77        ));
78    }
79    let data_type = values[0].data_type();
80
81    for array in values.iter().skip(1) {
82        if array.data_type() != data_type {
83            return Err(ArrowError::InvalidArgumentError(format!(
84                "It is not possible to interleave arrays of different data types ({} and {})",
85                data_type,
86                array.data_type()
87            )));
88        }
89    }
90
91    if indices.is_empty() {
92        return Ok(new_empty_array(data_type));
93    }
94
95    downcast_primitive! {
96        data_type => (primitive_helper, values, indices, data_type),
97        DataType::Utf8 => interleave_bytes::<Utf8Type>(values, indices),
98        DataType::LargeUtf8 => interleave_bytes::<LargeUtf8Type>(values, indices),
99        DataType::Binary => interleave_bytes::<BinaryType>(values, indices),
100        DataType::LargeBinary => interleave_bytes::<LargeBinaryType>(values, indices),
101        DataType::BinaryView => interleave_views::<BinaryViewType>(values, indices),
102        DataType::Utf8View => interleave_views::<StringViewType>(values, indices),
103        DataType::Dictionary(k, _) => downcast_integer! {
104            k.as_ref() => (dict_helper, values, indices),
105            _ => unreachable!("illegal dictionary key type {k}")
106        },
107        DataType::Struct(fields) => interleave_struct(fields, values, indices),
108        _ => interleave_fallback(values, indices)
109    }
110}
111
112/// Common functionality for interleaving arrays
113///
114/// T is the concrete Array type
115struct Interleave<'a, T> {
116    /// The input arrays downcast to T
117    arrays: Vec<&'a T>,
118    /// The null buffer of the interleaved output
119    nulls: Option<NullBuffer>,
120}
121
122impl<'a, T: Array + 'static> Interleave<'a, T> {
123    fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) -> Self {
124        let mut has_nulls = false;
125        let arrays: Vec<&T> = values
126            .iter()
127            .map(|x| {
128                has_nulls = has_nulls || x.null_count() != 0;
129                x.as_any().downcast_ref().unwrap()
130            })
131            .collect();
132
133        let nulls = match has_nulls {
134            true => {
135                let nulls = BooleanBuffer::collect_bool(indices.len(), |i| {
136                    let (a, b) = indices[i];
137                    arrays[a].is_valid(b)
138                });
139                Some(nulls.into())
140            }
141            false => None,
142        };
143
144        Self { arrays, nulls }
145    }
146}
147
148fn interleave_primitive<T: ArrowPrimitiveType>(
149    values: &[&dyn Array],
150    indices: &[(usize, usize)],
151    data_type: &DataType,
152) -> Result<ArrayRef, ArrowError> {
153    let interleaved = Interleave::<'_, PrimitiveArray<T>>::new(values, indices);
154
155    let values = indices
156        .iter()
157        .map(|(a, b)| interleaved.arrays[*a].value(*b))
158        .collect::<Vec<_>>();
159
160    let array = PrimitiveArray::<T>::try_new(values.into(), interleaved.nulls)?;
161    Ok(Arc::new(array.with_data_type(data_type.clone())))
162}
163
164fn interleave_bytes<T: ByteArrayType>(
165    values: &[&dyn Array],
166    indices: &[(usize, usize)],
167) -> Result<ArrayRef, ArrowError> {
168    let interleaved = Interleave::<'_, GenericByteArray<T>>::new(values, indices);
169
170    let mut capacity = 0;
171    let mut offsets = Vec::with_capacity(indices.len() + 1);
172    offsets.push(T::Offset::from_usize(0).unwrap());
173    offsets.extend(indices.iter().map(|(a, b)| {
174        let o = interleaved.arrays[*a].value_offsets();
175        let element_len = o[*b + 1].as_usize() - o[*b].as_usize();
176        capacity += element_len;
177        T::Offset::from_usize(capacity).expect("overflow")
178    }));
179
180    let mut values = Vec::with_capacity(capacity);
181    for (a, b) in indices {
182        values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
183    }
184
185    // Safety: safe by construction
186    let array = unsafe {
187        let offsets = OffsetBuffer::new_unchecked(offsets.into());
188        GenericByteArray::<T>::new_unchecked(offsets, values.into(), interleaved.nulls)
189    };
190    Ok(Arc::new(array))
191}
192
193fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
194    arrays: &[&dyn Array],
195    indices: &[(usize, usize)],
196) -> Result<ArrayRef, ArrowError> {
197    let dictionaries: Vec<_> = arrays.iter().map(|x| x.as_dictionary::<K>()).collect();
198    if !should_merge_dictionary_values::<K>(&dictionaries, indices.len()) {
199        return interleave_fallback(arrays, indices);
200    }
201
202    let masks: Vec<_> = dictionaries
203        .iter()
204        .enumerate()
205        .map(|(a_idx, dictionary)| {
206            let mut key_mask = BooleanBufferBuilder::new_from_buffer(
207                MutableBuffer::new_null(dictionary.len()),
208                dictionary.len(),
209            );
210
211            for (_, key_idx) in indices.iter().filter(|(a, _)| *a == a_idx) {
212                key_mask.set_bit(*key_idx, true);
213            }
214            key_mask.finish()
215        })
216        .collect();
217
218    let merged = merge_dictionary_values(&dictionaries, Some(&masks))?;
219
220    // Recompute keys
221    let mut keys = PrimitiveBuilder::<K>::with_capacity(indices.len());
222    for (a, b) in indices {
223        let old_keys: &PrimitiveArray<K> = dictionaries[*a].keys();
224        match old_keys.is_valid(*b) {
225            true => {
226                let old_key = old_keys.values()[*b];
227                keys.append_value(merged.key_mappings[*a][old_key.as_usize()])
228            }
229            false => keys.append_null(),
230        }
231    }
232    let array = unsafe { DictionaryArray::new_unchecked(keys.finish(), merged.values) };
233    Ok(Arc::new(array))
234}
235
236fn interleave_views<T: ByteViewType>(
237    values: &[&dyn Array],
238    indices: &[(usize, usize)],
239) -> Result<ArrayRef, ArrowError> {
240    let interleaved = Interleave::<'_, GenericByteViewArray<T>>::new(values, indices);
241    let mut buffers = Vec::new();
242
243    // Contains the offsets of start buffer in `buffer_to_new_index`
244    let mut offsets = Vec::with_capacity(interleaved.arrays.len() + 1);
245    offsets.push(0);
246    let mut total_buffers = 0;
247    for a in interleaved.arrays.iter() {
248        total_buffers += a.data_buffers().len();
249        offsets.push(total_buffers);
250    }
251
252    // contains the mapping from old buffer index to new buffer index
253    let mut buffer_to_new_index = vec![None; total_buffers];
254
255    let views: Vec<u128> = indices
256        .iter()
257        .map(|(array_idx, value_idx)| {
258            let array = interleaved.arrays[*array_idx];
259            let view = array.views().get(*value_idx).unwrap();
260            let view_len = *view as u32;
261            if view_len <= 12 {
262                return *view;
263            }
264            // value is big enough to be in a variadic buffer
265            let view = ByteView::from(*view);
266            let buffer_to_new_idx = offsets[*array_idx] + view.buffer_index as usize;
267            let new_buffer_idx: u32 =
268                *buffer_to_new_index[buffer_to_new_idx].get_or_insert_with(|| {
269                    buffers.push(array.data_buffers()[view.buffer_index as usize].clone());
270                    (buffers.len() - 1) as u32
271                });
272            view.with_buffer_index(new_buffer_idx).as_u128()
273        })
274        .collect();
275
276    let array = unsafe {
277        GenericByteViewArray::<T>::new_unchecked(views.into(), buffers, interleaved.nulls)
278    };
279    Ok(Arc::new(array))
280}
281
282fn interleave_struct(
283    fields: &Fields,
284    values: &[&dyn Array],
285    indices: &[(usize, usize)],
286) -> Result<ArrayRef, ArrowError> {
287    let interleaved = Interleave::<'_, StructArray>::new(values, indices);
288
289    if fields.is_empty() {
290        let array = StructArray::try_new_with_length(
291            fields.clone(),
292            vec![],
293            interleaved.nulls,
294            indices.len(),
295        )?;
296        return Ok(Arc::new(array));
297    }
298
299    let struct_fields_array: Result<Vec<_>, _> = (0..fields.len())
300        .map(|i| {
301            let field_values: Vec<&dyn Array> = interleaved
302                .arrays
303                .iter()
304                .map(|x| x.column(i).as_ref())
305                .collect();
306            interleave(&field_values, indices)
307        })
308        .collect();
309
310    let struct_array =
311        StructArray::try_new(fields.clone(), struct_fields_array?, interleaved.nulls)?;
312    Ok(Arc::new(struct_array))
313}
314
315/// Fallback implementation of interleave using [`MutableArrayData`]
316fn interleave_fallback(
317    values: &[&dyn Array],
318    indices: &[(usize, usize)],
319) -> Result<ArrayRef, ArrowError> {
320    let arrays: Vec<_> = values.iter().map(|x| x.to_data()).collect();
321    let arrays: Vec<_> = arrays.iter().collect();
322    let mut array_data = MutableArrayData::new(arrays, false, indices.len());
323
324    let mut cur_array = indices[0].0;
325    let mut start_row_idx = indices[0].1;
326    let mut end_row_idx = start_row_idx + 1;
327
328    for (array, row) in indices.iter().skip(1).copied() {
329        if array == cur_array && row == end_row_idx {
330            // subsequent row in same batch
331            end_row_idx += 1;
332            continue;
333        }
334
335        // emit current batch of rows for current buffer
336        array_data.extend(cur_array, start_row_idx, end_row_idx);
337
338        // start new batch of rows
339        cur_array = array;
340        start_row_idx = row;
341        end_row_idx = start_row_idx + 1;
342    }
343
344    // emit final batch of rows
345    array_data.extend(cur_array, start_row_idx, end_row_idx);
346    Ok(make_array(array_data.freeze()))
347}
348
349/// Interleave rows by index from multiple [`RecordBatch`] instances and return a new [`RecordBatch`].
350///
351/// This function will call [`interleave`] on each array of the [`RecordBatch`] instances and assemble a new [`RecordBatch`].
352///
353/// # Example
354/// ```
355/// # use std::sync::Arc;
356/// # use arrow_array::{StringArray, Int32Array, RecordBatch, UInt32Array};
357/// # use arrow_schema::{DataType, Field, Schema};
358/// # use arrow_select::interleave::interleave_record_batch;
359///
360/// let schema = Arc::new(Schema::new(vec![
361///     Field::new("a", DataType::Int32, true),
362///     Field::new("b", DataType::Utf8, true),
363/// ]));
364///
365/// let batch1 = RecordBatch::try_new(
366///     schema.clone(),
367///     vec![
368///         Arc::new(Int32Array::from(vec![0, 1, 2])),
369///         Arc::new(StringArray::from(vec!["a", "b", "c"])),
370///     ],
371/// ).unwrap();
372///
373/// let batch2 = RecordBatch::try_new(
374///     schema.clone(),
375///     vec![
376///         Arc::new(Int32Array::from(vec![3, 4, 5])),
377///         Arc::new(StringArray::from(vec!["d", "e", "f"])),
378///     ],
379/// ).unwrap();
380///
381/// let indices = vec![(0, 1), (1, 2), (0, 0), (1, 1)];
382/// let interleaved = interleave_record_batch(&[&batch1, &batch2], &indices).unwrap();
383///
384/// let expected = RecordBatch::try_new(
385///     schema,
386///     vec![
387///         Arc::new(Int32Array::from(vec![1, 5, 0, 4])),
388///         Arc::new(StringArray::from(vec!["b", "f", "a", "e"])),
389///     ],
390/// ).unwrap();
391/// assert_eq!(interleaved, expected);
392/// ```
393pub fn interleave_record_batch(
394    record_batches: &[&RecordBatch],
395    indices: &[(usize, usize)],
396) -> Result<RecordBatch, ArrowError> {
397    let schema = record_batches[0].schema();
398    let columns = (0..schema.fields().len())
399        .map(|i| {
400            let column_values: Vec<&dyn Array> = record_batches
401                .iter()
402                .map(|batch| batch.column(i).as_ref())
403                .collect();
404            interleave(&column_values, indices)
405        })
406        .collect::<Result<Vec<_>, _>>()?;
407    RecordBatch::try_new(schema, columns)
408}
409
410#[cfg(test)]
411mod tests {
412    use super::*;
413    use arrow_array::Int32RunArray;
414    use arrow_array::builder::{Int32Builder, ListBuilder, PrimitiveRunBuilder};
415    use arrow_schema::Field;
416
417    #[test]
418    fn test_primitive() {
419        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
420        let b = Int32Array::from_iter_values([5, 6, 7]);
421        let c = Int32Array::from_iter_values([8, 9, 10]);
422        let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
423        let v = values.as_primitive::<Int32Type>();
424        assert_eq!(v.values(), &[4, 4, 10, 8, 6]);
425    }
426
427    #[test]
428    fn test_primitive_nulls() {
429        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
430        let b = Int32Array::from_iter([Some(1), Some(4), None]);
431        let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (0, 2)]).unwrap();
432        let v: Vec<_> = values.as_primitive::<Int32Type>().into_iter().collect();
433        assert_eq!(&v, &[Some(2), None, None, Some(4), Some(3)])
434    }
435
436    #[test]
437    fn test_primitive_empty() {
438        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
439        let v = interleave(&[&a], &[]).unwrap();
440        assert!(v.is_empty());
441        assert_eq!(v.data_type(), &DataType::Int32);
442    }
443
444    #[test]
445    fn test_strings() {
446        let a = StringArray::from_iter_values(["a", "b", "c"]);
447        let b = StringArray::from_iter_values(["hello", "world", "foo"]);
448        let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
449        let v = values.as_string::<i32>();
450        let values: Vec<_> = v.into_iter().collect();
451        assert_eq!(
452            &values,
453            &[
454                Some("c"),
455                Some("c"),
456                Some("hello"),
457                Some("world"),
458                Some("b")
459            ]
460        )
461    }
462
463    #[test]
464    fn test_interleave_dictionary() {
465        let a = DictionaryArray::<Int32Type>::from_iter(["a", "b", "c", "a", "b"]);
466        let b = DictionaryArray::<Int32Type>::from_iter(["a", "c", "a", "c", "a"]);
467
468        // Should not recompute dictionary
469        let values =
470            interleave(&[&a, &b], &[(0, 2), (0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
471        let v = values.as_dictionary::<Int32Type>();
472        assert_eq!(v.values().len(), 5);
473
474        let vc = v.downcast_dict::<StringArray>().unwrap();
475        let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
476        assert_eq!(&collected, &["c", "c", "c", "a", "c", "b"]);
477
478        // Should recompute dictionary
479        let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 1)]).unwrap();
480        let v = values.as_dictionary::<Int32Type>();
481        assert_eq!(v.values().len(), 1);
482
483        let vc = v.downcast_dict::<StringArray>().unwrap();
484        let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
485        assert_eq!(&collected, &["c", "c", "c"]);
486    }
487
488    #[test]
489    fn test_interleave_dictionary_nulls() {
490        let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]);
491        let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]);
492        let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
493        let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
494
495        let expected = vec![Some("fiz"), None, None, Some("foo")];
496
497        let values = interleave(
498            &[&input_1 as _, &input_2 as _],
499            &[(0, 3), (0, 2), (1, 0), (0, 0)],
500        )
501        .unwrap();
502        let dictionary = values.as_dictionary::<Int32Type>();
503        let actual: Vec<Option<&str>> = dictionary
504            .downcast_dict::<StringArray>()
505            .unwrap()
506            .into_iter()
507            .collect();
508
509        assert_eq!(actual, expected);
510    }
511
512    #[test]
513    fn test_lists() {
514        // [[1, 2], null, [3]]
515        let mut a = ListBuilder::new(Int32Builder::new());
516        a.values().append_value(1);
517        a.values().append_value(2);
518        a.append(true);
519        a.append(false);
520        a.values().append_value(3);
521        a.append(true);
522        let a = a.finish();
523
524        // [[4], null, [5, 6, null]]
525        let mut b = ListBuilder::new(Int32Builder::new());
526        b.values().append_value(4);
527        b.append(true);
528        b.append(false);
529        b.values().append_value(5);
530        b.values().append_value(6);
531        b.values().append_null();
532        b.append(true);
533        let b = b.finish();
534
535        let values = interleave(&[&a, &b], &[(0, 2), (0, 1), (1, 0), (1, 2), (1, 1)]).unwrap();
536        let v = values.as_any().downcast_ref::<ListArray>().unwrap();
537
538        // [[3], null, [4], [5, 6, null], null]
539        let mut expected = ListBuilder::new(Int32Builder::new());
540        expected.values().append_value(3);
541        expected.append(true);
542        expected.append(false);
543        expected.values().append_value(4);
544        expected.append(true);
545        expected.values().append_value(5);
546        expected.values().append_value(6);
547        expected.values().append_null();
548        expected.append(true);
549        expected.append(false);
550        let expected = expected.finish();
551
552        assert_eq!(v, &expected);
553    }
554
555    #[test]
556    fn test_struct_without_nulls() {
557        let fields = Fields::from(vec![
558            Field::new("number_col", DataType::Int32, false),
559            Field::new("string_col", DataType::Utf8, false),
560        ]);
561        let a = {
562            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
563            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
564
565            StructArray::try_new(
566                fields.clone(),
567                vec![Arc::new(number_col), Arc::new(string_col)],
568                None,
569            )
570            .unwrap()
571        };
572
573        let b = {
574            let number_col = Int32Array::from_iter_values([5, 6, 7]);
575            let string_col = StringArray::from_iter_values(["hello", "world", "foo"]);
576
577            StructArray::try_new(
578                fields.clone(),
579                vec![Arc::new(number_col), Arc::new(string_col)],
580                None,
581            )
582            .unwrap()
583        };
584
585        let c = {
586            let number_col = Int32Array::from_iter_values([8, 9, 10]);
587            let string_col = StringArray::from_iter_values(["x", "y", "z"]);
588
589            StructArray::try_new(
590                fields.clone(),
591                vec![Arc::new(number_col), Arc::new(string_col)],
592                None,
593            )
594            .unwrap()
595        };
596
597        let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
598        let values_struct = values.as_struct();
599        assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
600        assert_eq!(values_struct.null_count(), 0);
601
602        let values_number = values_struct.column(0).as_primitive::<Int32Type>();
603        assert_eq!(values_number.values(), &[4, 4, 10, 8, 6]);
604        let values_string = values_struct.column(1).as_string::<i32>();
605        let values_string: Vec<_> = values_string.into_iter().collect();
606        assert_eq!(
607            &values_string,
608            &[Some("d"), Some("d"), Some("z"), Some("x"), Some("world")]
609        );
610    }
611
612    #[test]
613    fn test_struct_with_nulls_in_values() {
614        let fields = Fields::from(vec![
615            Field::new("number_col", DataType::Int32, true),
616            Field::new("string_col", DataType::Utf8, true),
617        ]);
618        let a = {
619            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
620            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
621
622            StructArray::try_new(
623                fields.clone(),
624                vec![Arc::new(number_col), Arc::new(string_col)],
625                None,
626            )
627            .unwrap()
628        };
629
630        let b = {
631            let number_col = Int32Array::from_iter([Some(1), Some(4), None]);
632            let string_col = StringArray::from(vec![Some("hello"), None, Some("foo")]);
633
634            StructArray::try_new(
635                fields.clone(),
636                vec![Arc::new(number_col), Arc::new(string_col)],
637                None,
638            )
639            .unwrap()
640        };
641
642        let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (1, 1)]).unwrap();
643        let values_struct = values.as_struct();
644        assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
645
646        // The struct itself has no nulls, but the values do
647        assert_eq!(values_struct.null_count(), 0);
648
649        let values_number: Vec<_> = values_struct
650            .column(0)
651            .as_primitive::<Int32Type>()
652            .into_iter()
653            .collect();
654        assert_eq!(values_number, &[Some(2), None, None, Some(4), Some(4)]);
655
656        let values_string = values_struct.column(1).as_string::<i32>();
657        let values_string: Vec<_> = values_string.into_iter().collect();
658        assert_eq!(
659            &values_string,
660            &[Some("b"), Some("foo"), Some("foo"), Some("d"), None]
661        );
662    }
663
664    #[test]
665    fn test_struct_with_nulls() {
666        let fields = Fields::from(vec![
667            Field::new("number_col", DataType::Int32, false),
668            Field::new("string_col", DataType::Utf8, false),
669        ]);
670        let a = {
671            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
672            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
673
674            StructArray::try_new(
675                fields.clone(),
676                vec![Arc::new(number_col), Arc::new(string_col)],
677                None,
678            )
679            .unwrap()
680        };
681
682        let b = {
683            let number_col = Int32Array::from_iter_values([5, 6, 7]);
684            let string_col = StringArray::from_iter_values(["hello", "world", "foo"]);
685
686            StructArray::try_new(
687                fields.clone(),
688                vec![Arc::new(number_col), Arc::new(string_col)],
689                Some(NullBuffer::from(&[true, false, true])),
690            )
691            .unwrap()
692        };
693
694        let c = {
695            let number_col = Int32Array::from_iter_values([8, 9, 10]);
696            let string_col = StringArray::from_iter_values(["x", "y", "z"]);
697
698            StructArray::try_new(
699                fields.clone(),
700                vec![Arc::new(number_col), Arc::new(string_col)],
701                None,
702            )
703            .unwrap()
704        };
705
706        let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (1, 1), (2, 0)]).unwrap();
707        let values_struct = values.as_struct();
708        assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
709
710        let validity: Vec<bool> = {
711            let null_buffer = values_struct.nulls().expect("should_have_nulls");
712
713            null_buffer.iter().collect()
714        };
715        assert_eq!(validity, &[true, true, true, false, true]);
716        let values_number = values_struct.column(0).as_primitive::<Int32Type>();
717        assert_eq!(values_number.values(), &[4, 4, 10, 6, 8]);
718        let values_string = values_struct.column(1).as_string::<i32>();
719        let values_string: Vec<_> = values_string.into_iter().collect();
720        assert_eq!(
721            &values_string,
722            &[Some("d"), Some("d"), Some("z"), Some("world"), Some("x"),]
723        );
724    }
725
726    #[test]
727    fn test_struct_empty() {
728        let fields = Fields::from(vec![
729            Field::new("number_col", DataType::Int32, false),
730            Field::new("string_col", DataType::Utf8, false),
731        ]);
732        let a = {
733            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
734            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
735
736            StructArray::try_new(
737                fields.clone(),
738                vec![Arc::new(number_col), Arc::new(string_col)],
739                None,
740            )
741            .unwrap()
742        };
743        let v = interleave(&[&a], &[]).unwrap();
744        assert!(v.is_empty());
745        assert_eq!(v.data_type(), &DataType::Struct(fields));
746    }
747
748    #[test]
749    fn interleave_sparse_nulls() {
750        let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
751        let keys = Int32Array::from_iter_values(0..10);
752        let dict_a = DictionaryArray::new(keys, Arc::new(values));
753        let values = StringArray::new_null(0);
754        let keys = Int32Array::new_null(10);
755        let dict_b = DictionaryArray::new(keys, Arc::new(values));
756
757        let indices = &[(0, 0), (0, 1), (0, 2), (1, 0)];
758        let array = interleave(&[&dict_a, &dict_b], indices).unwrap();
759
760        let expected =
761            DictionaryArray::<Int32Type>::from_iter(vec![Some("0"), Some("1"), Some("2"), None]);
762        assert_eq!(array.as_ref(), &expected)
763    }
764
765    #[test]
766    fn test_interleave_views() {
767        let values = StringArray::from_iter_values([
768            "hello",
769            "world_long_string_not_inlined",
770            "foo",
771            "bar",
772            "baz",
773        ]);
774        let view_a = StringViewArray::from(&values);
775
776        let values = StringArray::from_iter_values([
777            "test",
778            "data",
779            "more_long_string_not_inlined",
780            "views",
781            "here",
782        ]);
783        let view_b = StringViewArray::from(&values);
784
785        let indices = &[
786            (0, 2), // "foo"
787            (1, 0), // "test"
788            (0, 4), // "baz"
789            (1, 3), // "views"
790            (0, 1), // "world_long_string_not_inlined"
791        ];
792
793        // Test specialized implementation
794        let values = interleave(&[&view_a, &view_b], indices).unwrap();
795        let result = values.as_string_view();
796        assert_eq!(result.data_buffers().len(), 1);
797
798        let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
799        let fallback_result = fallback.as_string_view();
800        // note that fallback_result has 2 buffers, but only one long enough string to warrant a buffer
801        assert_eq!(fallback_result.data_buffers().len(), 2);
802
803        // Convert to strings for easier assertion
804        let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
805
806        let fallback_collected: Vec<_> = fallback_result
807            .iter()
808            .map(|x| x.map(|s| s.to_string()))
809            .collect();
810
811        assert_eq!(&collected, &fallback_collected);
812
813        assert_eq!(
814            &collected,
815            &[
816                Some("foo".to_string()),
817                Some("test".to_string()),
818                Some("baz".to_string()),
819                Some("views".to_string()),
820                Some("world_long_string_not_inlined".to_string()),
821            ]
822        );
823    }
824
825    #[test]
826    fn test_interleave_views_with_nulls() {
827        let values = StringArray::from_iter([
828            Some("hello"),
829            None,
830            Some("foo_long_string_not_inlined"),
831            Some("bar"),
832            None,
833        ]);
834        let view_a = StringViewArray::from(&values);
835
836        let values = StringArray::from_iter([
837            Some("test"),
838            Some("data_long_string_not_inlined"),
839            None,
840            None,
841            Some("here"),
842        ]);
843        let view_b = StringViewArray::from(&values);
844
845        let indices = &[
846            (0, 1), // null
847            (1, 2), // null
848            (0, 2), // "foo_long_string_not_inlined"
849            (1, 3), // null
850            (0, 4), // null
851        ];
852
853        // Test specialized implementation
854        let values = interleave(&[&view_a, &view_b], indices).unwrap();
855        let result = values.as_string_view();
856        assert_eq!(result.data_buffers().len(), 1);
857
858        let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
859        let fallback_result = fallback.as_string_view();
860
861        // Convert to strings for easier assertion
862        let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
863
864        let fallback_collected: Vec<_> = fallback_result
865            .iter()
866            .map(|x| x.map(|s| s.to_string()))
867            .collect();
868
869        assert_eq!(&collected, &fallback_collected);
870
871        assert_eq!(
872            &collected,
873            &[
874                None,
875                None,
876                Some("foo_long_string_not_inlined".to_string()),
877                None,
878                None,
879            ]
880        );
881    }
882
883    #[test]
884    fn test_interleave_views_multiple_buffers() {
885        let str1 = "very_long_string_from_first_buffer".as_bytes();
886        let str2 = "very_long_string_from_second_buffer".as_bytes();
887        let buffer1 = str1.to_vec().into();
888        let buffer2 = str2.to_vec().into();
889
890        let view1 = ByteView::new(str1.len() as u32, &str1[..4])
891            .with_buffer_index(0)
892            .with_offset(0)
893            .as_u128();
894        let view2 = ByteView::new(str2.len() as u32, &str2[..4])
895            .with_buffer_index(1)
896            .with_offset(0)
897            .as_u128();
898        let view_a =
899            StringViewArray::try_new(vec![view1, view2].into(), vec![buffer1, buffer2], None)
900                .unwrap();
901
902        let str3 = "another_very_long_string_buffer_three".as_bytes();
903        let str4 = "different_long_string_in_buffer_four".as_bytes();
904        let buffer3 = str3.to_vec().into();
905        let buffer4 = str4.to_vec().into();
906
907        let view3 = ByteView::new(str3.len() as u32, &str3[..4])
908            .with_buffer_index(0)
909            .with_offset(0)
910            .as_u128();
911        let view4 = ByteView::new(str4.len() as u32, &str4[..4])
912            .with_buffer_index(1)
913            .with_offset(0)
914            .as_u128();
915        let view_b =
916            StringViewArray::try_new(vec![view3, view4].into(), vec![buffer3, buffer4], None)
917                .unwrap();
918
919        let indices = &[
920            (0, 0), // String from first buffer of array A
921            (1, 0), // String from first buffer of array B
922            (0, 1), // String from second buffer of array A
923            (1, 1), // String from second buffer of array B
924            (0, 0), // String from first buffer of array A again
925            (1, 1), // String from second buffer of array B again
926        ];
927
928        // Test interleave
929        let values = interleave(&[&view_a, &view_b], indices).unwrap();
930        let result = values.as_string_view();
931
932        assert_eq!(
933            result.data_buffers().len(),
934            4,
935            "Expected four buffers (two from each input array)"
936        );
937
938        let result_strings: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
939        assert_eq!(
940            result_strings,
941            vec![
942                Some("very_long_string_from_first_buffer".to_string()),
943                Some("another_very_long_string_buffer_three".to_string()),
944                Some("very_long_string_from_second_buffer".to_string()),
945                Some("different_long_string_in_buffer_four".to_string()),
946                Some("very_long_string_from_first_buffer".to_string()),
947                Some("different_long_string_in_buffer_four".to_string()),
948            ]
949        );
950
951        let views = result.views();
952        let buffer_indices: Vec<_> = views
953            .iter()
954            .map(|raw_view| ByteView::from(*raw_view).buffer_index)
955            .collect();
956
957        assert_eq!(
958            buffer_indices,
959            vec![
960                0, // First buffer from array A
961                1, // First buffer from array B
962                2, // Second buffer from array A
963                3, // Second buffer from array B
964                0, // First buffer from array A (reused)
965                3, // Second buffer from array B (reused)
966            ]
967        );
968    }
969
970    #[test]
971    fn test_interleave_run_end_encoded_primitive() {
972        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
973        builder.extend([1, 1, 2, 2, 2, 3].into_iter().map(Some));
974        let a = builder.finish();
975
976        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
977        builder.extend([4, 5, 5, 6, 6, 6].into_iter().map(Some));
978        let b = builder.finish();
979
980        let indices = &[(0, 1), (1, 0), (0, 4), (1, 2), (0, 5)];
981        let result = interleave(&[&a, &b], indices).unwrap();
982
983        // The result should be a RunEndEncoded array
984        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
985
986        // Cast to RunArray to access values
987        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
988
989        // Verify the logical values by accessing the logical array directly
990        let expected = vec![1, 4, 2, 5, 3];
991        let mut actual = Vec::new();
992        for i in 0..result_run_array.len() {
993            let physical_idx = result_run_array.get_physical_index(i);
994            let value = result_run_array
995                .values()
996                .as_primitive::<Int32Type>()
997                .value(physical_idx);
998            actual.push(value);
999        }
1000        assert_eq!(actual, expected);
1001    }
1002
1003    #[test]
1004    fn test_interleave_run_end_encoded_string() {
1005        let a: Int32RunArray = vec!["hello", "hello", "world", "world", "foo"]
1006            .into_iter()
1007            .collect();
1008        let b: Int32RunArray = vec!["bar", "baz", "baz", "qux"].into_iter().collect();
1009
1010        let indices = &[(0, 0), (1, 1), (0, 3), (1, 3), (0, 4)];
1011        let result = interleave(&[&a, &b], indices).unwrap();
1012
1013        // The result should be a RunEndEncoded array
1014        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1015
1016        // Cast to RunArray to access values
1017        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1018
1019        // Verify the logical values by accessing the logical array directly
1020        let expected = vec!["hello", "baz", "world", "qux", "foo"];
1021        let mut actual = Vec::new();
1022        for i in 0..result_run_array.len() {
1023            let physical_idx = result_run_array.get_physical_index(i);
1024            let value = result_run_array
1025                .values()
1026                .as_string::<i32>()
1027                .value(physical_idx);
1028            actual.push(value);
1029        }
1030        assert_eq!(actual, expected);
1031    }
1032
1033    #[test]
1034    fn test_interleave_run_end_encoded_with_nulls() {
1035        let a: Int32RunArray = vec![Some("a"), Some("a"), None, None, Some("b")]
1036            .into_iter()
1037            .collect();
1038        let b: Int32RunArray = vec![None, Some("c"), Some("c"), Some("d")]
1039            .into_iter()
1040            .collect();
1041
1042        let indices = &[(0, 1), (1, 0), (0, 2), (1, 3), (0, 4)];
1043        let result = interleave(&[&a, &b], indices).unwrap();
1044
1045        // The result should be a RunEndEncoded array
1046        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1047
1048        // Cast to RunArray to access values
1049        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1050
1051        // Verify the logical values by accessing the logical array directly
1052        let expected = vec![Some("a"), None, None, Some("d"), Some("b")];
1053        let mut actual = Vec::new();
1054        for i in 0..result_run_array.len() {
1055            let physical_idx = result_run_array.get_physical_index(i);
1056            if result_run_array.values().is_null(physical_idx) {
1057                actual.push(None);
1058            } else {
1059                let value = result_run_array
1060                    .values()
1061                    .as_string::<i32>()
1062                    .value(physical_idx);
1063                actual.push(Some(value));
1064            }
1065        }
1066        assert_eq!(actual, expected);
1067    }
1068
1069    #[test]
1070    fn test_interleave_run_end_encoded_different_run_types() {
1071        let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1072        builder.extend([1, 1, 2, 3, 3].into_iter().map(Some));
1073        let a = builder.finish();
1074
1075        let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1076        builder.extend([4, 5, 5, 6].into_iter().map(Some));
1077        let b = builder.finish();
1078
1079        let indices = &[(0, 0), (1, 1), (0, 3), (1, 3)];
1080        let result = interleave(&[&a, &b], indices).unwrap();
1081
1082        // The result should be a RunEndEncoded array
1083        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1084
1085        // Cast to RunArray to access values
1086        let result_run_array: &RunArray<Int16Type> = result.as_any().downcast_ref().unwrap();
1087
1088        // Verify the logical values by accessing the logical array directly
1089        let expected = vec![1, 5, 3, 6];
1090        let mut actual = Vec::new();
1091        for i in 0..result_run_array.len() {
1092            let physical_idx = result_run_array.get_physical_index(i);
1093            let value = result_run_array
1094                .values()
1095                .as_primitive::<Int32Type>()
1096                .value(physical_idx);
1097            actual.push(value);
1098        }
1099        assert_eq!(actual, expected);
1100    }
1101
1102    #[test]
1103    fn test_interleave_run_end_encoded_mixed_run_lengths() {
1104        let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
1105        builder.extend([1, 2, 2, 2, 2, 3, 3, 4].into_iter().map(Some));
1106        let a = builder.finish();
1107
1108        let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
1109        builder.extend([5, 5, 5, 6, 7, 7, 8, 8].into_iter().map(Some));
1110        let b = builder.finish();
1111
1112        let indices = &[
1113            (0, 0), // 1
1114            (1, 2), // 5
1115            (0, 3), // 2
1116            (1, 3), // 6
1117            (0, 6), // 3
1118            (1, 6), // 8
1119            (0, 7), // 4
1120            (1, 4), // 7
1121        ];
1122        let result = interleave(&[&a, &b], indices).unwrap();
1123
1124        // The result should be a RunEndEncoded array
1125        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1126
1127        // Cast to RunArray to access values
1128        let result_run_array: &RunArray<Int64Type> = result.as_any().downcast_ref().unwrap();
1129
1130        // Verify the logical values by accessing the logical array directly
1131        let expected = vec![1, 5, 2, 6, 3, 8, 4, 7];
1132        let mut actual = Vec::new();
1133        for i in 0..result_run_array.len() {
1134            let physical_idx = result_run_array.get_physical_index(i);
1135            let value = result_run_array
1136                .values()
1137                .as_primitive::<Int32Type>()
1138                .value(physical_idx);
1139            actual.push(value);
1140        }
1141        assert_eq!(actual, expected);
1142    }
1143
1144    #[test]
1145    fn test_interleave_run_end_encoded_empty_runs() {
1146        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1147        builder.extend([1].into_iter().map(Some));
1148        let a = builder.finish();
1149
1150        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1151        builder.extend([2, 2, 2].into_iter().map(Some));
1152        let b = builder.finish();
1153
1154        let indices = &[(0, 0), (1, 1), (1, 2)];
1155        let result = interleave(&[&a, &b], indices).unwrap();
1156
1157        // The result should be a RunEndEncoded array
1158        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1159
1160        // Cast to RunArray to access values
1161        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1162
1163        // Verify the logical values by accessing the logical array directly
1164        let expected = vec![1, 2, 2];
1165        let mut actual = Vec::new();
1166        for i in 0..result_run_array.len() {
1167            let physical_idx = result_run_array.get_physical_index(i);
1168            let value = result_run_array
1169                .values()
1170                .as_primitive::<Int32Type>()
1171                .value(physical_idx);
1172            actual.push(value);
1173        }
1174        assert_eq!(actual, expected);
1175    }
1176
1177    #[test]
1178    fn test_struct_no_fields() {
1179        let fields = Fields::empty();
1180        let a = StructArray::try_new_with_length(fields.clone(), vec![], None, 10).unwrap();
1181        let v = interleave(&[&a], &[(0, 0)]).unwrap();
1182        assert_eq!(v.len(), 1);
1183        assert_eq!(v.data_type(), &DataType::Struct(fields));
1184    }
1185}