arrow_select/
interleave.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Interleave elements from multiple arrays
19
20use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
21use arrow_array::builder::{BooleanBufferBuilder, PrimitiveBuilder};
22use arrow_array::cast::AsArray;
23use arrow_array::types::*;
24use arrow_array::*;
25use arrow_buffer::{ArrowNativeType, BooleanBuffer, MutableBuffer, NullBuffer, OffsetBuffer};
26use arrow_data::transform::MutableArrayData;
27use arrow_data::ByteView;
28use arrow_schema::{ArrowError, DataType, Fields};
29use std::sync::Arc;
30
31macro_rules! primitive_helper {
32    ($t:ty, $values:ident, $indices:ident, $data_type:ident) => {
33        interleave_primitive::<$t>($values, $indices, $data_type)
34    };
35}
36
37macro_rules! dict_helper {
38    ($t:ty, $values:expr, $indices:expr) => {
39        Ok(Arc::new(interleave_dictionaries::<$t>($values, $indices)?) as _)
40    };
41}
42
43///
44/// Takes elements by index from a list of [`Array`], creating a new [`Array`] from those values.
45///
46/// Each element in `indices` is a pair of `usize` with the first identifying the index
47/// of the [`Array`] in `values`, and the second the index of the value within that [`Array`]
48///
49/// ```text
50/// ┌─────────────────┐      ┌─────────┐                                  ┌─────────────────┐
51/// │        A        │      │ (0, 0)  │        interleave(               │        A        │
52/// ├─────────────────┤      ├─────────┤          [values0, values1],     ├─────────────────┤
53/// │        D        │      │ (1, 0)  │          indices                 │        B        │
54/// └─────────────────┘      ├─────────┤        )                         ├─────────────────┤
55///   values array 0         │ (1, 1)  │      ─────────────────────────▶  │        C        │
56///                          ├─────────┤                                  ├─────────────────┤
57///                          │ (0, 1)  │                                  │        D        │
58///                          └─────────┘                                  └─────────────────┘
59/// ┌─────────────────┐       indices
60/// │        B        │        array
61/// ├─────────────────┤                                                    result
62/// │        C        │
63/// ├─────────────────┤
64/// │        E        │
65/// └─────────────────┘
66///   values array 1
67/// ```
68///
69/// For selecting values by index from a single array see [`crate::take`]
70pub fn interleave(
71    values: &[&dyn Array],
72    indices: &[(usize, usize)],
73) -> Result<ArrayRef, ArrowError> {
74    if values.is_empty() {
75        return Err(ArrowError::InvalidArgumentError(
76            "interleave requires input of at least one array".to_string(),
77        ));
78    }
79    let data_type = values[0].data_type();
80
81    for array in values.iter().skip(1) {
82        if array.data_type() != data_type {
83            return Err(ArrowError::InvalidArgumentError(format!(
84                "It is not possible to interleave arrays of different data types ({} and {})",
85                data_type,
86                array.data_type()
87            )));
88        }
89    }
90
91    if indices.is_empty() {
92        return Ok(new_empty_array(data_type));
93    }
94
95    downcast_primitive! {
96        data_type => (primitive_helper, values, indices, data_type),
97        DataType::Utf8 => interleave_bytes::<Utf8Type>(values, indices),
98        DataType::LargeUtf8 => interleave_bytes::<LargeUtf8Type>(values, indices),
99        DataType::Binary => interleave_bytes::<BinaryType>(values, indices),
100        DataType::LargeBinary => interleave_bytes::<LargeBinaryType>(values, indices),
101        DataType::BinaryView => interleave_views::<BinaryViewType>(values, indices),
102        DataType::Utf8View => interleave_views::<StringViewType>(values, indices),
103        DataType::Dictionary(k, _) => downcast_integer! {
104            k.as_ref() => (dict_helper, values, indices),
105            _ => unreachable!("illegal dictionary key type {k}")
106        },
107        DataType::Struct(fields) => interleave_struct(fields, values, indices),
108        _ => interleave_fallback(values, indices)
109    }
110}
111
112/// Common functionality for interleaving arrays
113///
114/// T is the concrete Array type
115struct Interleave<'a, T> {
116    /// The input arrays downcast to T
117    arrays: Vec<&'a T>,
118    /// The null buffer of the interleaved output
119    nulls: Option<NullBuffer>,
120}
121
122impl<'a, T: Array + 'static> Interleave<'a, T> {
123    fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) -> Self {
124        let mut has_nulls = false;
125        let arrays: Vec<&T> = values
126            .iter()
127            .map(|x| {
128                has_nulls = has_nulls || x.null_count() != 0;
129                x.as_any().downcast_ref().unwrap()
130            })
131            .collect();
132
133        let nulls = match has_nulls {
134            true => {
135                let nulls = BooleanBuffer::collect_bool(indices.len(), |i| {
136                    let (a, b) = indices[i];
137                    arrays[a].is_valid(b)
138                });
139                Some(nulls.into())
140            }
141            false => None,
142        };
143
144        Self { arrays, nulls }
145    }
146}
147
148fn interleave_primitive<T: ArrowPrimitiveType>(
149    values: &[&dyn Array],
150    indices: &[(usize, usize)],
151    data_type: &DataType,
152) -> Result<ArrayRef, ArrowError> {
153    let interleaved = Interleave::<'_, PrimitiveArray<T>>::new(values, indices);
154
155    let values = indices
156        .iter()
157        .map(|(a, b)| interleaved.arrays[*a].value(*b))
158        .collect::<Vec<_>>();
159
160    let array = PrimitiveArray::<T>::new(values.into(), interleaved.nulls);
161    Ok(Arc::new(array.with_data_type(data_type.clone())))
162}
163
164fn interleave_bytes<T: ByteArrayType>(
165    values: &[&dyn Array],
166    indices: &[(usize, usize)],
167) -> Result<ArrayRef, ArrowError> {
168    let interleaved = Interleave::<'_, GenericByteArray<T>>::new(values, indices);
169
170    let mut capacity = 0;
171    let mut offsets = Vec::with_capacity(indices.len() + 1);
172    offsets.push(T::Offset::from_usize(0).unwrap());
173    offsets.extend(indices.iter().map(|(a, b)| {
174        let o = interleaved.arrays[*a].value_offsets();
175        let element_len = o[*b + 1].as_usize() - o[*b].as_usize();
176        capacity += element_len;
177        T::Offset::from_usize(capacity).expect("overflow")
178    }));
179
180    let mut values = Vec::with_capacity(capacity);
181    for (a, b) in indices {
182        values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
183    }
184
185    // Safety: safe by construction
186    let array = unsafe {
187        let offsets = OffsetBuffer::new_unchecked(offsets.into());
188        GenericByteArray::<T>::new_unchecked(offsets, values.into(), interleaved.nulls)
189    };
190    Ok(Arc::new(array))
191}
192
193fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
194    arrays: &[&dyn Array],
195    indices: &[(usize, usize)],
196) -> Result<ArrayRef, ArrowError> {
197    let dictionaries: Vec<_> = arrays.iter().map(|x| x.as_dictionary::<K>()).collect();
198    if !should_merge_dictionary_values::<K>(&dictionaries, indices.len()) {
199        return interleave_fallback(arrays, indices);
200    }
201
202    let masks: Vec<_> = dictionaries
203        .iter()
204        .enumerate()
205        .map(|(a_idx, dictionary)| {
206            let mut key_mask = BooleanBufferBuilder::new_from_buffer(
207                MutableBuffer::new_null(dictionary.len()),
208                dictionary.len(),
209            );
210
211            for (_, key_idx) in indices.iter().filter(|(a, _)| *a == a_idx) {
212                key_mask.set_bit(*key_idx, true);
213            }
214            key_mask.finish()
215        })
216        .collect();
217
218    let merged = merge_dictionary_values(&dictionaries, Some(&masks))?;
219
220    // Recompute keys
221    let mut keys = PrimitiveBuilder::<K>::with_capacity(indices.len());
222    for (a, b) in indices {
223        let old_keys: &PrimitiveArray<K> = dictionaries[*a].keys();
224        match old_keys.is_valid(*b) {
225            true => {
226                let old_key = old_keys.values()[*b];
227                keys.append_value(merged.key_mappings[*a][old_key.as_usize()])
228            }
229            false => keys.append_null(),
230        }
231    }
232    let array = unsafe { DictionaryArray::new_unchecked(keys.finish(), merged.values) };
233    Ok(Arc::new(array))
234}
235
236fn interleave_views<T: ByteViewType>(
237    values: &[&dyn Array],
238    indices: &[(usize, usize)],
239) -> Result<ArrayRef, ArrowError> {
240    let interleaved = Interleave::<'_, GenericByteViewArray<T>>::new(values, indices);
241    let mut buffers = Vec::new();
242
243    // Contains the offsets of start buffer in `buffer_to_new_index`
244    let mut offsets = Vec::with_capacity(interleaved.arrays.len() + 1);
245    offsets.push(0);
246    let mut total_buffers = 0;
247    for a in interleaved.arrays.iter() {
248        total_buffers += a.data_buffers().len();
249        offsets.push(total_buffers);
250    }
251
252    // contains the mapping from old buffer index to new buffer index
253    let mut buffer_to_new_index = vec![None; total_buffers];
254
255    let views: Vec<u128> = indices
256        .iter()
257        .map(|(array_idx, value_idx)| {
258            let array = interleaved.arrays[*array_idx];
259            let view = array.views().get(*value_idx).unwrap();
260            let view_len = *view as u32;
261            if view_len <= 12 {
262                return *view;
263            }
264            // value is big enough to be in a variadic buffer
265            let view = ByteView::from(*view);
266            let buffer_to_new_idx = offsets[*array_idx] + view.buffer_index as usize;
267            let new_buffer_idx: u32 =
268                *buffer_to_new_index[buffer_to_new_idx].get_or_insert_with(|| {
269                    buffers.push(array.data_buffers()[view.buffer_index as usize].clone());
270                    (buffers.len() - 1) as u32
271                });
272            view.with_buffer_index(new_buffer_idx).as_u128()
273        })
274        .collect();
275
276    let array = unsafe {
277        GenericByteViewArray::<T>::new_unchecked(views.into(), buffers, interleaved.nulls)
278    };
279    Ok(Arc::new(array))
280}
281
282fn interleave_struct(
283    fields: &Fields,
284    values: &[&dyn Array],
285    indices: &[(usize, usize)],
286) -> Result<ArrayRef, ArrowError> {
287    let interleaved = Interleave::<'_, StructArray>::new(values, indices);
288
289    let mut struct_fields_array = vec![];
290
291    for i in 0..fields.len() {
292        let field_values: Vec<&dyn Array> = interleaved
293            .arrays
294            .iter()
295            .map(|x| x.column(i).as_ref())
296            .collect();
297        let interleaved = interleave(&field_values, indices)?;
298        struct_fields_array.push(interleaved);
299    }
300
301    let struct_array =
302        StructArray::try_new(fields.clone(), struct_fields_array, interleaved.nulls)?;
303
304    Ok(Arc::new(struct_array))
305}
306
307/// Fallback implementation of interleave using [`MutableArrayData`]
308fn interleave_fallback(
309    values: &[&dyn Array],
310    indices: &[(usize, usize)],
311) -> Result<ArrayRef, ArrowError> {
312    let arrays: Vec<_> = values.iter().map(|x| x.to_data()).collect();
313    let arrays: Vec<_> = arrays.iter().collect();
314    let mut array_data = MutableArrayData::new(arrays, false, indices.len());
315
316    let mut cur_array = indices[0].0;
317    let mut start_row_idx = indices[0].1;
318    let mut end_row_idx = start_row_idx + 1;
319
320    for (array, row) in indices.iter().skip(1).copied() {
321        if array == cur_array && row == end_row_idx {
322            // subsequent row in same batch
323            end_row_idx += 1;
324            continue;
325        }
326
327        // emit current batch of rows for current buffer
328        array_data.extend(cur_array, start_row_idx, end_row_idx);
329
330        // start new batch of rows
331        cur_array = array;
332        start_row_idx = row;
333        end_row_idx = start_row_idx + 1;
334    }
335
336    // emit final batch of rows
337    array_data.extend(cur_array, start_row_idx, end_row_idx);
338    Ok(make_array(array_data.freeze()))
339}
340
341/// Interleave rows by index from multiple [`RecordBatch`] instances and return a new [`RecordBatch`].
342///
343/// This function will call [`interleave`] on each array of the [`RecordBatch`] instances and assemble a new [`RecordBatch`].
344///
345/// # Example
346/// ```
347/// # use std::sync::Arc;
348/// # use arrow_array::{StringArray, Int32Array, RecordBatch, UInt32Array};
349/// # use arrow_schema::{DataType, Field, Schema};
350/// # use arrow_select::interleave::interleave_record_batch;
351///
352/// let schema = Arc::new(Schema::new(vec![
353///     Field::new("a", DataType::Int32, true),
354///     Field::new("b", DataType::Utf8, true),
355/// ]));
356///
357/// let batch1 = RecordBatch::try_new(
358///     schema.clone(),
359///     vec![
360///         Arc::new(Int32Array::from(vec![0, 1, 2])),
361///         Arc::new(StringArray::from(vec!["a", "b", "c"])),
362///     ],
363/// ).unwrap();
364///
365/// let batch2 = RecordBatch::try_new(
366///     schema.clone(),
367///     vec![
368///         Arc::new(Int32Array::from(vec![3, 4, 5])),
369///         Arc::new(StringArray::from(vec!["d", "e", "f"])),
370///     ],
371/// ).unwrap();
372///
373/// let indices = vec![(0, 1), (1, 2), (0, 0), (1, 1)];
374/// let interleaved = interleave_record_batch(&[&batch1, &batch2], &indices).unwrap();
375///
376/// let expected = RecordBatch::try_new(
377///     schema,
378///     vec![
379///         Arc::new(Int32Array::from(vec![1, 5, 0, 4])),
380///         Arc::new(StringArray::from(vec!["b", "f", "a", "e"])),
381///     ],
382/// ).unwrap();
383/// assert_eq!(interleaved, expected);
384/// ```
385pub fn interleave_record_batch(
386    record_batches: &[&RecordBatch],
387    indices: &[(usize, usize)],
388) -> Result<RecordBatch, ArrowError> {
389    let schema = record_batches[0].schema();
390    let columns = (0..schema.fields().len())
391        .map(|i| {
392            let column_values: Vec<&dyn Array> = record_batches
393                .iter()
394                .map(|batch| batch.column(i).as_ref())
395                .collect();
396            interleave(&column_values, indices)
397        })
398        .collect::<Result<Vec<_>, _>>()?;
399    RecordBatch::try_new(schema, columns)
400}
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405    use arrow_array::builder::{Int32Builder, ListBuilder, PrimitiveRunBuilder};
406    use arrow_array::Int32RunArray;
407    use arrow_schema::Field;
408
409    #[test]
410    fn test_primitive() {
411        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
412        let b = Int32Array::from_iter_values([5, 6, 7]);
413        let c = Int32Array::from_iter_values([8, 9, 10]);
414        let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
415        let v = values.as_primitive::<Int32Type>();
416        assert_eq!(v.values(), &[4, 4, 10, 8, 6]);
417    }
418
419    #[test]
420    fn test_primitive_nulls() {
421        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
422        let b = Int32Array::from_iter([Some(1), Some(4), None]);
423        let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (0, 2)]).unwrap();
424        let v: Vec<_> = values.as_primitive::<Int32Type>().into_iter().collect();
425        assert_eq!(&v, &[Some(2), None, None, Some(4), Some(3)])
426    }
427
428    #[test]
429    fn test_primitive_empty() {
430        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
431        let v = interleave(&[&a], &[]).unwrap();
432        assert!(v.is_empty());
433        assert_eq!(v.data_type(), &DataType::Int32);
434    }
435
436    #[test]
437    fn test_strings() {
438        let a = StringArray::from_iter_values(["a", "b", "c"]);
439        let b = StringArray::from_iter_values(["hello", "world", "foo"]);
440        let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
441        let v = values.as_string::<i32>();
442        let values: Vec<_> = v.into_iter().collect();
443        assert_eq!(
444            &values,
445            &[
446                Some("c"),
447                Some("c"),
448                Some("hello"),
449                Some("world"),
450                Some("b")
451            ]
452        )
453    }
454
455    #[test]
456    fn test_interleave_dictionary() {
457        let a = DictionaryArray::<Int32Type>::from_iter(["a", "b", "c", "a", "b"]);
458        let b = DictionaryArray::<Int32Type>::from_iter(["a", "c", "a", "c", "a"]);
459
460        // Should not recompute dictionary
461        let values =
462            interleave(&[&a, &b], &[(0, 2), (0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
463        let v = values.as_dictionary::<Int32Type>();
464        assert_eq!(v.values().len(), 5);
465
466        let vc = v.downcast_dict::<StringArray>().unwrap();
467        let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
468        assert_eq!(&collected, &["c", "c", "c", "a", "c", "b"]);
469
470        // Should recompute dictionary
471        let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 1)]).unwrap();
472        let v = values.as_dictionary::<Int32Type>();
473        assert_eq!(v.values().len(), 1);
474
475        let vc = v.downcast_dict::<StringArray>().unwrap();
476        let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
477        assert_eq!(&collected, &["c", "c", "c"]);
478    }
479
480    #[test]
481    fn test_interleave_dictionary_nulls() {
482        let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]);
483        let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]);
484        let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
485        let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
486
487        let expected = vec![Some("fiz"), None, None, Some("foo")];
488
489        let values = interleave(
490            &[&input_1 as _, &input_2 as _],
491            &[(0, 3), (0, 2), (1, 0), (0, 0)],
492        )
493        .unwrap();
494        let dictionary = values.as_dictionary::<Int32Type>();
495        let actual: Vec<Option<&str>> = dictionary
496            .downcast_dict::<StringArray>()
497            .unwrap()
498            .into_iter()
499            .collect();
500
501        assert_eq!(actual, expected);
502    }
503
504    #[test]
505    fn test_lists() {
506        // [[1, 2], null, [3]]
507        let mut a = ListBuilder::new(Int32Builder::new());
508        a.values().append_value(1);
509        a.values().append_value(2);
510        a.append(true);
511        a.append(false);
512        a.values().append_value(3);
513        a.append(true);
514        let a = a.finish();
515
516        // [[4], null, [5, 6, null]]
517        let mut b = ListBuilder::new(Int32Builder::new());
518        b.values().append_value(4);
519        b.append(true);
520        b.append(false);
521        b.values().append_value(5);
522        b.values().append_value(6);
523        b.values().append_null();
524        b.append(true);
525        let b = b.finish();
526
527        let values = interleave(&[&a, &b], &[(0, 2), (0, 1), (1, 0), (1, 2), (1, 1)]).unwrap();
528        let v = values.as_any().downcast_ref::<ListArray>().unwrap();
529
530        // [[3], null, [4], [5, 6, null], null]
531        let mut expected = ListBuilder::new(Int32Builder::new());
532        expected.values().append_value(3);
533        expected.append(true);
534        expected.append(false);
535        expected.values().append_value(4);
536        expected.append(true);
537        expected.values().append_value(5);
538        expected.values().append_value(6);
539        expected.values().append_null();
540        expected.append(true);
541        expected.append(false);
542        let expected = expected.finish();
543
544        assert_eq!(v, &expected);
545    }
546
547    #[test]
548    fn test_struct_without_nulls() {
549        let fields = Fields::from(vec![
550            Field::new("number_col", DataType::Int32, false),
551            Field::new("string_col", DataType::Utf8, false),
552        ]);
553        let a = {
554            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
555            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
556
557            StructArray::try_new(
558                fields.clone(),
559                vec![Arc::new(number_col), Arc::new(string_col)],
560                None,
561            )
562            .unwrap()
563        };
564
565        let b = {
566            let number_col = Int32Array::from_iter_values([5, 6, 7]);
567            let string_col = StringArray::from_iter_values(["hello", "world", "foo"]);
568
569            StructArray::try_new(
570                fields.clone(),
571                vec![Arc::new(number_col), Arc::new(string_col)],
572                None,
573            )
574            .unwrap()
575        };
576
577        let c = {
578            let number_col = Int32Array::from_iter_values([8, 9, 10]);
579            let string_col = StringArray::from_iter_values(["x", "y", "z"]);
580
581            StructArray::try_new(
582                fields.clone(),
583                vec![Arc::new(number_col), Arc::new(string_col)],
584                None,
585            )
586            .unwrap()
587        };
588
589        let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
590        let values_struct = values.as_struct();
591        assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
592        assert_eq!(values_struct.null_count(), 0);
593
594        let values_number = values_struct.column(0).as_primitive::<Int32Type>();
595        assert_eq!(values_number.values(), &[4, 4, 10, 8, 6]);
596        let values_string = values_struct.column(1).as_string::<i32>();
597        let values_string: Vec<_> = values_string.into_iter().collect();
598        assert_eq!(
599            &values_string,
600            &[Some("d"), Some("d"), Some("z"), Some("x"), Some("world")]
601        );
602    }
603
604    #[test]
605    fn test_struct_with_nulls_in_values() {
606        let fields = Fields::from(vec![
607            Field::new("number_col", DataType::Int32, true),
608            Field::new("string_col", DataType::Utf8, true),
609        ]);
610        let a = {
611            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
612            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
613
614            StructArray::try_new(
615                fields.clone(),
616                vec![Arc::new(number_col), Arc::new(string_col)],
617                None,
618            )
619            .unwrap()
620        };
621
622        let b = {
623            let number_col = Int32Array::from_iter([Some(1), Some(4), None]);
624            let string_col = StringArray::from(vec![Some("hello"), None, Some("foo")]);
625
626            StructArray::try_new(
627                fields.clone(),
628                vec![Arc::new(number_col), Arc::new(string_col)],
629                None,
630            )
631            .unwrap()
632        };
633
634        let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (1, 1)]).unwrap();
635        let values_struct = values.as_struct();
636        assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
637
638        // The struct itself has no nulls, but the values do
639        assert_eq!(values_struct.null_count(), 0);
640
641        let values_number: Vec<_> = values_struct
642            .column(0)
643            .as_primitive::<Int32Type>()
644            .into_iter()
645            .collect();
646        assert_eq!(values_number, &[Some(2), None, None, Some(4), Some(4)]);
647
648        let values_string = values_struct.column(1).as_string::<i32>();
649        let values_string: Vec<_> = values_string.into_iter().collect();
650        assert_eq!(
651            &values_string,
652            &[Some("b"), Some("foo"), Some("foo"), Some("d"), None]
653        );
654    }
655
656    #[test]
657    fn test_struct_with_nulls() {
658        let fields = Fields::from(vec![
659            Field::new("number_col", DataType::Int32, false),
660            Field::new("string_col", DataType::Utf8, false),
661        ]);
662        let a = {
663            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
664            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
665
666            StructArray::try_new(
667                fields.clone(),
668                vec![Arc::new(number_col), Arc::new(string_col)],
669                None,
670            )
671            .unwrap()
672        };
673
674        let b = {
675            let number_col = Int32Array::from_iter_values([5, 6, 7]);
676            let string_col = StringArray::from_iter_values(["hello", "world", "foo"]);
677
678            StructArray::try_new(
679                fields.clone(),
680                vec![Arc::new(number_col), Arc::new(string_col)],
681                Some(NullBuffer::from(&[true, false, true])),
682            )
683            .unwrap()
684        };
685
686        let c = {
687            let number_col = Int32Array::from_iter_values([8, 9, 10]);
688            let string_col = StringArray::from_iter_values(["x", "y", "z"]);
689
690            StructArray::try_new(
691                fields.clone(),
692                vec![Arc::new(number_col), Arc::new(string_col)],
693                None,
694            )
695            .unwrap()
696        };
697
698        let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (1, 1), (2, 0)]).unwrap();
699        let values_struct = values.as_struct();
700        assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
701
702        let validity: Vec<bool> = {
703            let null_buffer = values_struct.nulls().expect("should_have_nulls");
704
705            null_buffer.iter().collect()
706        };
707        assert_eq!(validity, &[true, true, true, false, true]);
708        let values_number = values_struct.column(0).as_primitive::<Int32Type>();
709        assert_eq!(values_number.values(), &[4, 4, 10, 6, 8]);
710        let values_string = values_struct.column(1).as_string::<i32>();
711        let values_string: Vec<_> = values_string.into_iter().collect();
712        assert_eq!(
713            &values_string,
714            &[Some("d"), Some("d"), Some("z"), Some("world"), Some("x"),]
715        );
716    }
717
718    #[test]
719    fn test_struct_empty() {
720        let fields = Fields::from(vec![
721            Field::new("number_col", DataType::Int32, false),
722            Field::new("string_col", DataType::Utf8, false),
723        ]);
724        let a = {
725            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
726            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
727
728            StructArray::try_new(
729                fields.clone(),
730                vec![Arc::new(number_col), Arc::new(string_col)],
731                None,
732            )
733            .unwrap()
734        };
735        let v = interleave(&[&a], &[]).unwrap();
736        assert!(v.is_empty());
737        assert_eq!(v.data_type(), &DataType::Struct(fields));
738    }
739
740    #[test]
741    fn interleave_sparse_nulls() {
742        let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
743        let keys = Int32Array::from_iter_values(0..10);
744        let dict_a = DictionaryArray::new(keys, Arc::new(values));
745        let values = StringArray::new_null(0);
746        let keys = Int32Array::new_null(10);
747        let dict_b = DictionaryArray::new(keys, Arc::new(values));
748
749        let indices = &[(0, 0), (0, 1), (0, 2), (1, 0)];
750        let array = interleave(&[&dict_a, &dict_b], indices).unwrap();
751
752        let expected =
753            DictionaryArray::<Int32Type>::from_iter(vec![Some("0"), Some("1"), Some("2"), None]);
754        assert_eq!(array.as_ref(), &expected)
755    }
756
757    #[test]
758    fn test_interleave_views() {
759        let values = StringArray::from_iter_values([
760            "hello",
761            "world_long_string_not_inlined",
762            "foo",
763            "bar",
764            "baz",
765        ]);
766        let view_a = StringViewArray::from(&values);
767
768        let values = StringArray::from_iter_values([
769            "test",
770            "data",
771            "more_long_string_not_inlined",
772            "views",
773            "here",
774        ]);
775        let view_b = StringViewArray::from(&values);
776
777        let indices = &[
778            (0, 2), // "foo"
779            (1, 0), // "test"
780            (0, 4), // "baz"
781            (1, 3), // "views"
782            (0, 1), // "world_long_string_not_inlined"
783        ];
784
785        // Test specialized implementation
786        let values = interleave(&[&view_a, &view_b], indices).unwrap();
787        let result = values.as_string_view();
788        assert_eq!(result.data_buffers().len(), 1);
789
790        let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
791        let fallback_result = fallback.as_string_view();
792        // note that fallback_result has 2 buffers, but only one long enough string to warrant a buffer
793        assert_eq!(fallback_result.data_buffers().len(), 2);
794
795        // Convert to strings for easier assertion
796        let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
797
798        let fallback_collected: Vec<_> = fallback_result
799            .iter()
800            .map(|x| x.map(|s| s.to_string()))
801            .collect();
802
803        assert_eq!(&collected, &fallback_collected);
804
805        assert_eq!(
806            &collected,
807            &[
808                Some("foo".to_string()),
809                Some("test".to_string()),
810                Some("baz".to_string()),
811                Some("views".to_string()),
812                Some("world_long_string_not_inlined".to_string()),
813            ]
814        );
815    }
816
817    #[test]
818    fn test_interleave_views_with_nulls() {
819        let values = StringArray::from_iter([
820            Some("hello"),
821            None,
822            Some("foo_long_string_not_inlined"),
823            Some("bar"),
824            None,
825        ]);
826        let view_a = StringViewArray::from(&values);
827
828        let values = StringArray::from_iter([
829            Some("test"),
830            Some("data_long_string_not_inlined"),
831            None,
832            None,
833            Some("here"),
834        ]);
835        let view_b = StringViewArray::from(&values);
836
837        let indices = &[
838            (0, 1), // null
839            (1, 2), // null
840            (0, 2), // "foo_long_string_not_inlined"
841            (1, 3), // null
842            (0, 4), // null
843        ];
844
845        // Test specialized implementation
846        let values = interleave(&[&view_a, &view_b], indices).unwrap();
847        let result = values.as_string_view();
848        assert_eq!(result.data_buffers().len(), 1);
849
850        let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
851        let fallback_result = fallback.as_string_view();
852
853        // Convert to strings for easier assertion
854        let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
855
856        let fallback_collected: Vec<_> = fallback_result
857            .iter()
858            .map(|x| x.map(|s| s.to_string()))
859            .collect();
860
861        assert_eq!(&collected, &fallback_collected);
862
863        assert_eq!(
864            &collected,
865            &[
866                None,
867                None,
868                Some("foo_long_string_not_inlined".to_string()),
869                None,
870                None,
871            ]
872        );
873    }
874
875    #[test]
876    fn test_interleave_views_multiple_buffers() {
877        let str1 = "very_long_string_from_first_buffer".as_bytes();
878        let str2 = "very_long_string_from_second_buffer".as_bytes();
879        let buffer1 = str1.to_vec().into();
880        let buffer2 = str2.to_vec().into();
881
882        let view1 = ByteView::new(str1.len() as u32, &str1[..4])
883            .with_buffer_index(0)
884            .with_offset(0)
885            .as_u128();
886        let view2 = ByteView::new(str2.len() as u32, &str2[..4])
887            .with_buffer_index(1)
888            .with_offset(0)
889            .as_u128();
890        let view_a =
891            StringViewArray::try_new(vec![view1, view2].into(), vec![buffer1, buffer2], None)
892                .unwrap();
893
894        let str3 = "another_very_long_string_buffer_three".as_bytes();
895        let str4 = "different_long_string_in_buffer_four".as_bytes();
896        let buffer3 = str3.to_vec().into();
897        let buffer4 = str4.to_vec().into();
898
899        let view3 = ByteView::new(str3.len() as u32, &str3[..4])
900            .with_buffer_index(0)
901            .with_offset(0)
902            .as_u128();
903        let view4 = ByteView::new(str4.len() as u32, &str4[..4])
904            .with_buffer_index(1)
905            .with_offset(0)
906            .as_u128();
907        let view_b =
908            StringViewArray::try_new(vec![view3, view4].into(), vec![buffer3, buffer4], None)
909                .unwrap();
910
911        let indices = &[
912            (0, 0), // String from first buffer of array A
913            (1, 0), // String from first buffer of array B
914            (0, 1), // String from second buffer of array A
915            (1, 1), // String from second buffer of array B
916            (0, 0), // String from first buffer of array A again
917            (1, 1), // String from second buffer of array B again
918        ];
919
920        // Test interleave
921        let values = interleave(&[&view_a, &view_b], indices).unwrap();
922        let result = values.as_string_view();
923
924        assert_eq!(
925            result.data_buffers().len(),
926            4,
927            "Expected four buffers (two from each input array)"
928        );
929
930        let result_strings: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
931        assert_eq!(
932            result_strings,
933            vec![
934                Some("very_long_string_from_first_buffer".to_string()),
935                Some("another_very_long_string_buffer_three".to_string()),
936                Some("very_long_string_from_second_buffer".to_string()),
937                Some("different_long_string_in_buffer_four".to_string()),
938                Some("very_long_string_from_first_buffer".to_string()),
939                Some("different_long_string_in_buffer_four".to_string()),
940            ]
941        );
942
943        let views = result.views();
944        let buffer_indices: Vec<_> = views
945            .iter()
946            .map(|raw_view| ByteView::from(*raw_view).buffer_index)
947            .collect();
948
949        assert_eq!(
950            buffer_indices,
951            vec![
952                0, // First buffer from array A
953                1, // First buffer from array B
954                2, // Second buffer from array A
955                3, // Second buffer from array B
956                0, // First buffer from array A (reused)
957                3, // Second buffer from array B (reused)
958            ]
959        );
960    }
961
962    #[test]
963    fn test_interleave_run_end_encoded_primitive() {
964        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
965        builder.extend([1, 1, 2, 2, 2, 3].into_iter().map(Some));
966        let a = builder.finish();
967
968        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
969        builder.extend([4, 5, 5, 6, 6, 6].into_iter().map(Some));
970        let b = builder.finish();
971
972        let indices = &[(0, 1), (1, 0), (0, 4), (1, 2), (0, 5)];
973        let result = interleave(&[&a, &b], indices).unwrap();
974
975        // The result should be a RunEndEncoded array
976        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
977
978        // Cast to RunArray to access values
979        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
980
981        // Verify the logical values by accessing the logical array directly
982        let expected = vec![1, 4, 2, 5, 3];
983        let mut actual = Vec::new();
984        for i in 0..result_run_array.len() {
985            let physical_idx = result_run_array.get_physical_index(i);
986            let value = result_run_array
987                .values()
988                .as_primitive::<Int32Type>()
989                .value(physical_idx);
990            actual.push(value);
991        }
992        assert_eq!(actual, expected);
993    }
994
995    #[test]
996    fn test_interleave_run_end_encoded_string() {
997        let a: Int32RunArray = vec!["hello", "hello", "world", "world", "foo"]
998            .into_iter()
999            .collect();
1000        let b: Int32RunArray = vec!["bar", "baz", "baz", "qux"].into_iter().collect();
1001
1002        let indices = &[(0, 0), (1, 1), (0, 3), (1, 3), (0, 4)];
1003        let result = interleave(&[&a, &b], indices).unwrap();
1004
1005        // The result should be a RunEndEncoded array
1006        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1007
1008        // Cast to RunArray to access values
1009        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1010
1011        // Verify the logical values by accessing the logical array directly
1012        let expected = vec!["hello", "baz", "world", "qux", "foo"];
1013        let mut actual = Vec::new();
1014        for i in 0..result_run_array.len() {
1015            let physical_idx = result_run_array.get_physical_index(i);
1016            let value = result_run_array
1017                .values()
1018                .as_string::<i32>()
1019                .value(physical_idx);
1020            actual.push(value);
1021        }
1022        assert_eq!(actual, expected);
1023    }
1024
1025    #[test]
1026    fn test_interleave_run_end_encoded_with_nulls() {
1027        let a: Int32RunArray = vec![Some("a"), Some("a"), None, None, Some("b")]
1028            .into_iter()
1029            .collect();
1030        let b: Int32RunArray = vec![None, Some("c"), Some("c"), Some("d")]
1031            .into_iter()
1032            .collect();
1033
1034        let indices = &[(0, 1), (1, 0), (0, 2), (1, 3), (0, 4)];
1035        let result = interleave(&[&a, &b], indices).unwrap();
1036
1037        // The result should be a RunEndEncoded array
1038        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1039
1040        // Cast to RunArray to access values
1041        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1042
1043        // Verify the logical values by accessing the logical array directly
1044        let expected = vec![Some("a"), None, None, Some("d"), Some("b")];
1045        let mut actual = Vec::new();
1046        for i in 0..result_run_array.len() {
1047            let physical_idx = result_run_array.get_physical_index(i);
1048            if result_run_array.values().is_null(physical_idx) {
1049                actual.push(None);
1050            } else {
1051                let value = result_run_array
1052                    .values()
1053                    .as_string::<i32>()
1054                    .value(physical_idx);
1055                actual.push(Some(value));
1056            }
1057        }
1058        assert_eq!(actual, expected);
1059    }
1060
1061    #[test]
1062    fn test_interleave_run_end_encoded_different_run_types() {
1063        let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1064        builder.extend([1, 1, 2, 3, 3].into_iter().map(Some));
1065        let a = builder.finish();
1066
1067        let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1068        builder.extend([4, 5, 5, 6].into_iter().map(Some));
1069        let b = builder.finish();
1070
1071        let indices = &[(0, 0), (1, 1), (0, 3), (1, 3)];
1072        let result = interleave(&[&a, &b], indices).unwrap();
1073
1074        // The result should be a RunEndEncoded array
1075        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1076
1077        // Cast to RunArray to access values
1078        let result_run_array: &RunArray<Int16Type> = result.as_any().downcast_ref().unwrap();
1079
1080        // Verify the logical values by accessing the logical array directly
1081        let expected = vec![1, 5, 3, 6];
1082        let mut actual = Vec::new();
1083        for i in 0..result_run_array.len() {
1084            let physical_idx = result_run_array.get_physical_index(i);
1085            let value = result_run_array
1086                .values()
1087                .as_primitive::<Int32Type>()
1088                .value(physical_idx);
1089            actual.push(value);
1090        }
1091        assert_eq!(actual, expected);
1092    }
1093
1094    #[test]
1095    fn test_interleave_run_end_encoded_mixed_run_lengths() {
1096        let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
1097        builder.extend([1, 2, 2, 2, 2, 3, 3, 4].into_iter().map(Some));
1098        let a = builder.finish();
1099
1100        let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
1101        builder.extend([5, 5, 5, 6, 7, 7, 8, 8].into_iter().map(Some));
1102        let b = builder.finish();
1103
1104        let indices = &[
1105            (0, 0), // 1
1106            (1, 2), // 5
1107            (0, 3), // 2
1108            (1, 3), // 6
1109            (0, 6), // 3
1110            (1, 6), // 8
1111            (0, 7), // 4
1112            (1, 4), // 7
1113        ];
1114        let result = interleave(&[&a, &b], indices).unwrap();
1115
1116        // The result should be a RunEndEncoded array
1117        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1118
1119        // Cast to RunArray to access values
1120        let result_run_array: &RunArray<Int64Type> = result.as_any().downcast_ref().unwrap();
1121
1122        // Verify the logical values by accessing the logical array directly
1123        let expected = vec![1, 5, 2, 6, 3, 8, 4, 7];
1124        let mut actual = Vec::new();
1125        for i in 0..result_run_array.len() {
1126            let physical_idx = result_run_array.get_physical_index(i);
1127            let value = result_run_array
1128                .values()
1129                .as_primitive::<Int32Type>()
1130                .value(physical_idx);
1131            actual.push(value);
1132        }
1133        assert_eq!(actual, expected);
1134    }
1135
1136    #[test]
1137    fn test_interleave_run_end_encoded_empty_runs() {
1138        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1139        builder.extend([1].into_iter().map(Some));
1140        let a = builder.finish();
1141
1142        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1143        builder.extend([2, 2, 2].into_iter().map(Some));
1144        let b = builder.finish();
1145
1146        let indices = &[(0, 0), (1, 1), (1, 2)];
1147        let result = interleave(&[&a, &b], indices).unwrap();
1148
1149        // The result should be a RunEndEncoded array
1150        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1151
1152        // Cast to RunArray to access values
1153        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1154
1155        // Verify the logical values by accessing the logical array directly
1156        let expected = vec![1, 2, 2];
1157        let mut actual = Vec::new();
1158        for i in 0..result_run_array.len() {
1159            let physical_idx = result_run_array.get_physical_index(i);
1160            let value = result_run_array
1161                .values()
1162                .as_primitive::<Int32Type>()
1163                .value(physical_idx);
1164            actual.push(value);
1165        }
1166        assert_eq!(actual, expected);
1167    }
1168}