arrow_select/
interleave.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Interleave elements from multiple arrays
19
20use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
21use arrow_array::builder::{BooleanBufferBuilder, PrimitiveBuilder};
22use arrow_array::cast::AsArray;
23use arrow_array::types::*;
24use arrow_array::*;
25use arrow_buffer::{ArrowNativeType, BooleanBuffer, MutableBuffer, NullBuffer, OffsetBuffer};
26use arrow_data::transform::MutableArrayData;
27use arrow_data::ByteView;
28use arrow_schema::{ArrowError, DataType};
29use std::sync::Arc;
30
31macro_rules! primitive_helper {
32    ($t:ty, $values:ident, $indices:ident, $data_type:ident) => {
33        interleave_primitive::<$t>($values, $indices, $data_type)
34    };
35}
36
37macro_rules! dict_helper {
38    ($t:ty, $values:expr, $indices:expr) => {
39        Ok(Arc::new(interleave_dictionaries::<$t>($values, $indices)?) as _)
40    };
41}
42
43///
44/// Takes elements by index from a list of [`Array`], creating a new [`Array`] from those values.
45///
46/// Each element in `indices` is a pair of `usize` with the first identifying the index
47/// of the [`Array`] in `values`, and the second the index of the value within that [`Array`]
48///
49/// ```text
50/// ┌─────────────────┐      ┌─────────┐                                  ┌─────────────────┐
51/// │        A        │      │ (0, 0)  │        interleave(               │        A        │
52/// ├─────────────────┤      ├─────────┤          [values0, values1],     ├─────────────────┤
53/// │        D        │      │ (1, 0)  │          indices                 │        B        │
54/// └─────────────────┘      ├─────────┤        )                         ├─────────────────┤
55///   values array 0         │ (1, 1)  │      ─────────────────────────▶  │        C        │
56///                          ├─────────┤                                  ├─────────────────┤
57///                          │ (0, 1)  │                                  │        D        │
58///                          └─────────┘                                  └─────────────────┘
59/// ┌─────────────────┐       indices
60/// │        B        │        array
61/// ├─────────────────┤                                                    result
62/// │        C        │
63/// ├─────────────────┤
64/// │        E        │
65/// └─────────────────┘
66///   values array 1
67/// ```
68///
69/// For selecting values by index from a single array see [`crate::take`]
70pub fn interleave(
71    values: &[&dyn Array],
72    indices: &[(usize, usize)],
73) -> Result<ArrayRef, ArrowError> {
74    if values.is_empty() {
75        return Err(ArrowError::InvalidArgumentError(
76            "interleave requires input of at least one array".to_string(),
77        ));
78    }
79    let data_type = values[0].data_type();
80
81    for array in values.iter().skip(1) {
82        if array.data_type() != data_type {
83            return Err(ArrowError::InvalidArgumentError(format!(
84                "It is not possible to interleave arrays of different data types ({} and {})",
85                data_type,
86                array.data_type()
87            )));
88        }
89    }
90
91    if indices.is_empty() {
92        return Ok(new_empty_array(data_type));
93    }
94
95    downcast_primitive! {
96        data_type => (primitive_helper, values, indices, data_type),
97        DataType::Utf8 => interleave_bytes::<Utf8Type>(values, indices),
98        DataType::LargeUtf8 => interleave_bytes::<LargeUtf8Type>(values, indices),
99        DataType::Binary => interleave_bytes::<BinaryType>(values, indices),
100        DataType::LargeBinary => interleave_bytes::<LargeBinaryType>(values, indices),
101        DataType::BinaryView => interleave_views::<BinaryViewType>(values, indices),
102        DataType::Utf8View => interleave_views::<StringViewType>(values, indices),
103        DataType::Dictionary(k, _) => downcast_integer! {
104            k.as_ref() => (dict_helper, values, indices),
105            _ => unreachable!("illegal dictionary key type {k}")
106        },
107        _ => interleave_fallback(values, indices)
108    }
109}
110
111/// Common functionality for interleaving arrays
112///
113/// T is the concrete Array type
114struct Interleave<'a, T> {
115    /// The input arrays downcast to T
116    arrays: Vec<&'a T>,
117    /// The null buffer of the interleaved output
118    nulls: Option<NullBuffer>,
119}
120
121impl<'a, T: Array + 'static> Interleave<'a, T> {
122    fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) -> Self {
123        let mut has_nulls = false;
124        let arrays: Vec<&T> = values
125            .iter()
126            .map(|x| {
127                has_nulls = has_nulls || x.null_count() != 0;
128                x.as_any().downcast_ref().unwrap()
129            })
130            .collect();
131
132        let nulls = match has_nulls {
133            true => {
134                let nulls = BooleanBuffer::collect_bool(indices.len(), |i| {
135                    let (a, b) = indices[i];
136                    arrays[a].is_valid(b)
137                });
138                Some(nulls.into())
139            }
140            false => None,
141        };
142
143        Self { arrays, nulls }
144    }
145}
146
147fn interleave_primitive<T: ArrowPrimitiveType>(
148    values: &[&dyn Array],
149    indices: &[(usize, usize)],
150    data_type: &DataType,
151) -> Result<ArrayRef, ArrowError> {
152    let interleaved = Interleave::<'_, PrimitiveArray<T>>::new(values, indices);
153
154    let values = indices
155        .iter()
156        .map(|(a, b)| interleaved.arrays[*a].value(*b))
157        .collect::<Vec<_>>();
158
159    let array = PrimitiveArray::<T>::new(values.into(), interleaved.nulls);
160    Ok(Arc::new(array.with_data_type(data_type.clone())))
161}
162
163fn interleave_bytes<T: ByteArrayType>(
164    values: &[&dyn Array],
165    indices: &[(usize, usize)],
166) -> Result<ArrayRef, ArrowError> {
167    let interleaved = Interleave::<'_, GenericByteArray<T>>::new(values, indices);
168
169    let mut capacity = 0;
170    let mut offsets = Vec::with_capacity(indices.len() + 1);
171    offsets.push(T::Offset::from_usize(0).unwrap());
172    offsets.extend(indices.iter().map(|(a, b)| {
173        let o = interleaved.arrays[*a].value_offsets();
174        let element_len = o[*b + 1].as_usize() - o[*b].as_usize();
175        capacity += element_len;
176        T::Offset::from_usize(capacity).expect("overflow")
177    }));
178
179    let mut values = Vec::with_capacity(capacity);
180    for (a, b) in indices {
181        values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
182    }
183
184    // Safety: safe by construction
185    let array = unsafe {
186        let offsets = OffsetBuffer::new_unchecked(offsets.into());
187        GenericByteArray::<T>::new_unchecked(offsets, values.into(), interleaved.nulls)
188    };
189    Ok(Arc::new(array))
190}
191
192fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
193    arrays: &[&dyn Array],
194    indices: &[(usize, usize)],
195) -> Result<ArrayRef, ArrowError> {
196    let dictionaries: Vec<_> = arrays.iter().map(|x| x.as_dictionary::<K>()).collect();
197    if !should_merge_dictionary_values::<K>(&dictionaries, indices.len()) {
198        return interleave_fallback(arrays, indices);
199    }
200
201    let masks: Vec<_> = dictionaries
202        .iter()
203        .enumerate()
204        .map(|(a_idx, dictionary)| {
205            let mut key_mask = BooleanBufferBuilder::new_from_buffer(
206                MutableBuffer::new_null(dictionary.len()),
207                dictionary.len(),
208            );
209
210            for (_, key_idx) in indices.iter().filter(|(a, _)| *a == a_idx) {
211                key_mask.set_bit(*key_idx, true);
212            }
213            key_mask.finish()
214        })
215        .collect();
216
217    let merged = merge_dictionary_values(&dictionaries, Some(&masks))?;
218
219    // Recompute keys
220    let mut keys = PrimitiveBuilder::<K>::with_capacity(indices.len());
221    for (a, b) in indices {
222        let old_keys: &PrimitiveArray<K> = dictionaries[*a].keys();
223        match old_keys.is_valid(*b) {
224            true => {
225                let old_key = old_keys.values()[*b];
226                keys.append_value(merged.key_mappings[*a][old_key.as_usize()])
227            }
228            false => keys.append_null(),
229        }
230    }
231    let array = unsafe { DictionaryArray::new_unchecked(keys.finish(), merged.values) };
232    Ok(Arc::new(array))
233}
234
235fn interleave_views<T: ByteViewType>(
236    values: &[&dyn Array],
237    indices: &[(usize, usize)],
238) -> Result<ArrayRef, ArrowError> {
239    let interleaved = Interleave::<'_, GenericByteViewArray<T>>::new(values, indices);
240    let mut buffers = Vec::new();
241
242    // Contains the offsets of start buffer in `buffer_to_new_index`
243    let mut offsets = Vec::with_capacity(interleaved.arrays.len() + 1);
244    offsets.push(0);
245    let mut total_buffers = 0;
246    for a in interleaved.arrays.iter() {
247        total_buffers += a.data_buffers().len();
248        offsets.push(total_buffers);
249    }
250
251    // contains the mapping from old buffer index to new buffer index
252    let mut buffer_to_new_index = vec![None; total_buffers];
253
254    let views: Vec<u128> = indices
255        .iter()
256        .map(|(array_idx, value_idx)| {
257            let array = interleaved.arrays[*array_idx];
258            let view = array.views().get(*value_idx).unwrap();
259            let view_len = *view as u32;
260            if view_len <= 12 {
261                return *view;
262            }
263            // value is big enough to be in a variadic buffer
264            let view = ByteView::from(*view);
265            let buffer_to_new_idx = offsets[*array_idx] + view.buffer_index as usize;
266            let new_buffer_idx: u32 =
267                *buffer_to_new_index[buffer_to_new_idx].get_or_insert_with(|| {
268                    buffers.push(array.data_buffers()[view.buffer_index as usize].clone());
269                    (buffers.len() - 1) as u32
270                });
271            view.with_buffer_index(new_buffer_idx).as_u128()
272        })
273        .collect();
274
275    let array = unsafe {
276        GenericByteViewArray::<T>::new_unchecked(views.into(), buffers, interleaved.nulls)
277    };
278    Ok(Arc::new(array))
279}
280
281/// Fallback implementation of interleave using [`MutableArrayData`]
282fn interleave_fallback(
283    values: &[&dyn Array],
284    indices: &[(usize, usize)],
285) -> Result<ArrayRef, ArrowError> {
286    let arrays: Vec<_> = values.iter().map(|x| x.to_data()).collect();
287    let arrays: Vec<_> = arrays.iter().collect();
288    let mut array_data = MutableArrayData::new(arrays, false, indices.len());
289
290    let mut cur_array = indices[0].0;
291    let mut start_row_idx = indices[0].1;
292    let mut end_row_idx = start_row_idx + 1;
293
294    for (array, row) in indices.iter().skip(1).copied() {
295        if array == cur_array && row == end_row_idx {
296            // subsequent row in same batch
297            end_row_idx += 1;
298            continue;
299        }
300
301        // emit current batch of rows for current buffer
302        array_data.extend(cur_array, start_row_idx, end_row_idx);
303
304        // start new batch of rows
305        cur_array = array;
306        start_row_idx = row;
307        end_row_idx = start_row_idx + 1;
308    }
309
310    // emit final batch of rows
311    array_data.extend(cur_array, start_row_idx, end_row_idx);
312    Ok(make_array(array_data.freeze()))
313}
314
315/// Interleave rows by index from multiple [`RecordBatch`] instances and return a new [`RecordBatch`].
316///
317/// This function will call [`interleave`] on each array of the [`RecordBatch`] instances and assemble a new [`RecordBatch`].
318///
319/// # Example
320/// ```
321/// # use std::sync::Arc;
322/// # use arrow_array::{StringArray, Int32Array, RecordBatch, UInt32Array};
323/// # use arrow_schema::{DataType, Field, Schema};
324/// # use arrow_select::interleave::interleave_record_batch;
325///
326/// let schema = Arc::new(Schema::new(vec![
327///     Field::new("a", DataType::Int32, true),
328///     Field::new("b", DataType::Utf8, true),
329/// ]));
330///
331/// let batch1 = RecordBatch::try_new(
332///     schema.clone(),
333///     vec![
334///         Arc::new(Int32Array::from(vec![0, 1, 2])),
335///         Arc::new(StringArray::from(vec!["a", "b", "c"])),
336///     ],
337/// ).unwrap();
338///
339/// let batch2 = RecordBatch::try_new(
340///     schema.clone(),
341///     vec![
342///         Arc::new(Int32Array::from(vec![3, 4, 5])),
343///         Arc::new(StringArray::from(vec!["d", "e", "f"])),
344///     ],
345/// ).unwrap();
346///
347/// let indices = vec![(0, 1), (1, 2), (0, 0), (1, 1)];
348/// let interleaved = interleave_record_batch(&[&batch1, &batch2], &indices).unwrap();
349///
350/// let expected = RecordBatch::try_new(
351///     schema,
352///     vec![
353///         Arc::new(Int32Array::from(vec![1, 5, 0, 4])),
354///         Arc::new(StringArray::from(vec!["b", "f", "a", "e"])),
355///     ],
356/// ).unwrap();
357/// assert_eq!(interleaved, expected);
358/// ```
359pub fn interleave_record_batch(
360    record_batches: &[&RecordBatch],
361    indices: &[(usize, usize)],
362) -> Result<RecordBatch, ArrowError> {
363    let schema = record_batches[0].schema();
364    let columns = (0..schema.fields().len())
365        .map(|i| {
366            let column_values: Vec<&dyn Array> = record_batches
367                .iter()
368                .map(|batch| batch.column(i).as_ref())
369                .collect();
370            interleave(&column_values, indices)
371        })
372        .collect::<Result<Vec<_>, _>>()?;
373    RecordBatch::try_new(schema, columns)
374}
375
376#[cfg(test)]
377mod tests {
378    use super::*;
379    use arrow_array::builder::{Int32Builder, ListBuilder, PrimitiveRunBuilder};
380    use arrow_array::Int32RunArray;
381
382    #[test]
383    fn test_primitive() {
384        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
385        let b = Int32Array::from_iter_values([5, 6, 7]);
386        let c = Int32Array::from_iter_values([8, 9, 10]);
387        let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
388        let v = values.as_primitive::<Int32Type>();
389        assert_eq!(v.values(), &[4, 4, 10, 8, 6]);
390    }
391
392    #[test]
393    fn test_primitive_nulls() {
394        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
395        let b = Int32Array::from_iter([Some(1), Some(4), None]);
396        let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (0, 2)]).unwrap();
397        let v: Vec<_> = values.as_primitive::<Int32Type>().into_iter().collect();
398        assert_eq!(&v, &[Some(2), None, None, Some(4), Some(3)])
399    }
400
401    #[test]
402    fn test_primitive_empty() {
403        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
404        let v = interleave(&[&a], &[]).unwrap();
405        assert!(v.is_empty());
406        assert_eq!(v.data_type(), &DataType::Int32);
407    }
408
409    #[test]
410    fn test_strings() {
411        let a = StringArray::from_iter_values(["a", "b", "c"]);
412        let b = StringArray::from_iter_values(["hello", "world", "foo"]);
413        let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
414        let v = values.as_string::<i32>();
415        let values: Vec<_> = v.into_iter().collect();
416        assert_eq!(
417            &values,
418            &[
419                Some("c"),
420                Some("c"),
421                Some("hello"),
422                Some("world"),
423                Some("b")
424            ]
425        )
426    }
427
428    #[test]
429    fn test_interleave_dictionary() {
430        let a = DictionaryArray::<Int32Type>::from_iter(["a", "b", "c", "a", "b"]);
431        let b = DictionaryArray::<Int32Type>::from_iter(["a", "c", "a", "c", "a"]);
432
433        // Should not recompute dictionary
434        let values =
435            interleave(&[&a, &b], &[(0, 2), (0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
436        let v = values.as_dictionary::<Int32Type>();
437        assert_eq!(v.values().len(), 5);
438
439        let vc = v.downcast_dict::<StringArray>().unwrap();
440        let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
441        assert_eq!(&collected, &["c", "c", "c", "a", "c", "b"]);
442
443        // Should recompute dictionary
444        let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 1)]).unwrap();
445        let v = values.as_dictionary::<Int32Type>();
446        assert_eq!(v.values().len(), 1);
447
448        let vc = v.downcast_dict::<StringArray>().unwrap();
449        let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
450        assert_eq!(&collected, &["c", "c", "c"]);
451    }
452
453    #[test]
454    fn test_interleave_dictionary_nulls() {
455        let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]);
456        let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]);
457        let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
458        let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
459
460        let expected = vec![Some("fiz"), None, None, Some("foo")];
461
462        let values = interleave(
463            &[&input_1 as _, &input_2 as _],
464            &[(0, 3), (0, 2), (1, 0), (0, 0)],
465        )
466        .unwrap();
467        let dictionary = values.as_dictionary::<Int32Type>();
468        let actual: Vec<Option<&str>> = dictionary
469            .downcast_dict::<StringArray>()
470            .unwrap()
471            .into_iter()
472            .collect();
473
474        assert_eq!(actual, expected);
475    }
476
477    #[test]
478    fn test_lists() {
479        // [[1, 2], null, [3]]
480        let mut a = ListBuilder::new(Int32Builder::new());
481        a.values().append_value(1);
482        a.values().append_value(2);
483        a.append(true);
484        a.append(false);
485        a.values().append_value(3);
486        a.append(true);
487        let a = a.finish();
488
489        // [[4], null, [5, 6, null]]
490        let mut b = ListBuilder::new(Int32Builder::new());
491        b.values().append_value(4);
492        b.append(true);
493        b.append(false);
494        b.values().append_value(5);
495        b.values().append_value(6);
496        b.values().append_null();
497        b.append(true);
498        let b = b.finish();
499
500        let values = interleave(&[&a, &b], &[(0, 2), (0, 1), (1, 0), (1, 2), (1, 1)]).unwrap();
501        let v = values.as_any().downcast_ref::<ListArray>().unwrap();
502
503        // [[3], null, [4], [5, 6, null], null]
504        let mut expected = ListBuilder::new(Int32Builder::new());
505        expected.values().append_value(3);
506        expected.append(true);
507        expected.append(false);
508        expected.values().append_value(4);
509        expected.append(true);
510        expected.values().append_value(5);
511        expected.values().append_value(6);
512        expected.values().append_null();
513        expected.append(true);
514        expected.append(false);
515        let expected = expected.finish();
516
517        assert_eq!(v, &expected);
518    }
519
520    #[test]
521    fn interleave_sparse_nulls() {
522        let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
523        let keys = Int32Array::from_iter_values(0..10);
524        let dict_a = DictionaryArray::new(keys, Arc::new(values));
525        let values = StringArray::new_null(0);
526        let keys = Int32Array::new_null(10);
527        let dict_b = DictionaryArray::new(keys, Arc::new(values));
528
529        let indices = &[(0, 0), (0, 1), (0, 2), (1, 0)];
530        let array = interleave(&[&dict_a, &dict_b], indices).unwrap();
531
532        let expected =
533            DictionaryArray::<Int32Type>::from_iter(vec![Some("0"), Some("1"), Some("2"), None]);
534        assert_eq!(array.as_ref(), &expected)
535    }
536
537    #[test]
538    fn test_interleave_views() {
539        let values = StringArray::from_iter_values([
540            "hello",
541            "world_long_string_not_inlined",
542            "foo",
543            "bar",
544            "baz",
545        ]);
546        let view_a = StringViewArray::from(&values);
547
548        let values = StringArray::from_iter_values([
549            "test",
550            "data",
551            "more_long_string_not_inlined",
552            "views",
553            "here",
554        ]);
555        let view_b = StringViewArray::from(&values);
556
557        let indices = &[
558            (0, 2), // "foo"
559            (1, 0), // "test"
560            (0, 4), // "baz"
561            (1, 3), // "views"
562            (0, 1), // "world_long_string_not_inlined"
563        ];
564
565        // Test specialized implementation
566        let values = interleave(&[&view_a, &view_b], indices).unwrap();
567        let result = values.as_string_view();
568        assert_eq!(result.data_buffers().len(), 1);
569
570        let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
571        let fallback_result = fallback.as_string_view();
572        // note that fallback_result has 2 buffers, but only one long enough string to warrant a buffer
573        assert_eq!(fallback_result.data_buffers().len(), 2);
574
575        // Convert to strings for easier assertion
576        let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
577
578        let fallback_collected: Vec<_> = fallback_result
579            .iter()
580            .map(|x| x.map(|s| s.to_string()))
581            .collect();
582
583        assert_eq!(&collected, &fallback_collected);
584
585        assert_eq!(
586            &collected,
587            &[
588                Some("foo".to_string()),
589                Some("test".to_string()),
590                Some("baz".to_string()),
591                Some("views".to_string()),
592                Some("world_long_string_not_inlined".to_string()),
593            ]
594        );
595    }
596
597    #[test]
598    fn test_interleave_views_with_nulls() {
599        let values = StringArray::from_iter([
600            Some("hello"),
601            None,
602            Some("foo_long_string_not_inlined"),
603            Some("bar"),
604            None,
605        ]);
606        let view_a = StringViewArray::from(&values);
607
608        let values = StringArray::from_iter([
609            Some("test"),
610            Some("data_long_string_not_inlined"),
611            None,
612            None,
613            Some("here"),
614        ]);
615        let view_b = StringViewArray::from(&values);
616
617        let indices = &[
618            (0, 1), // null
619            (1, 2), // null
620            (0, 2), // "foo_long_string_not_inlined"
621            (1, 3), // null
622            (0, 4), // null
623        ];
624
625        // Test specialized implementation
626        let values = interleave(&[&view_a, &view_b], indices).unwrap();
627        let result = values.as_string_view();
628        assert_eq!(result.data_buffers().len(), 1);
629
630        let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
631        let fallback_result = fallback.as_string_view();
632
633        // Convert to strings for easier assertion
634        let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
635
636        let fallback_collected: Vec<_> = fallback_result
637            .iter()
638            .map(|x| x.map(|s| s.to_string()))
639            .collect();
640
641        assert_eq!(&collected, &fallback_collected);
642
643        assert_eq!(
644            &collected,
645            &[
646                None,
647                None,
648                Some("foo_long_string_not_inlined".to_string()),
649                None,
650                None,
651            ]
652        );
653    }
654
655    #[test]
656    fn test_interleave_views_multiple_buffers() {
657        let str1 = "very_long_string_from_first_buffer".as_bytes();
658        let str2 = "very_long_string_from_second_buffer".as_bytes();
659        let buffer1 = str1.to_vec().into();
660        let buffer2 = str2.to_vec().into();
661
662        let view1 = ByteView::new(str1.len() as u32, &str1[..4])
663            .with_buffer_index(0)
664            .with_offset(0)
665            .as_u128();
666        let view2 = ByteView::new(str2.len() as u32, &str2[..4])
667            .with_buffer_index(1)
668            .with_offset(0)
669            .as_u128();
670        let view_a =
671            StringViewArray::try_new(vec![view1, view2].into(), vec![buffer1, buffer2], None)
672                .unwrap();
673
674        let str3 = "another_very_long_string_buffer_three".as_bytes();
675        let str4 = "different_long_string_in_buffer_four".as_bytes();
676        let buffer3 = str3.to_vec().into();
677        let buffer4 = str4.to_vec().into();
678
679        let view3 = ByteView::new(str3.len() as u32, &str3[..4])
680            .with_buffer_index(0)
681            .with_offset(0)
682            .as_u128();
683        let view4 = ByteView::new(str4.len() as u32, &str4[..4])
684            .with_buffer_index(1)
685            .with_offset(0)
686            .as_u128();
687        let view_b =
688            StringViewArray::try_new(vec![view3, view4].into(), vec![buffer3, buffer4], None)
689                .unwrap();
690
691        let indices = &[
692            (0, 0), // String from first buffer of array A
693            (1, 0), // String from first buffer of array B
694            (0, 1), // String from second buffer of array A
695            (1, 1), // String from second buffer of array B
696            (0, 0), // String from first buffer of array A again
697            (1, 1), // String from second buffer of array B again
698        ];
699
700        // Test interleave
701        let values = interleave(&[&view_a, &view_b], indices).unwrap();
702        let result = values.as_string_view();
703
704        assert_eq!(
705            result.data_buffers().len(),
706            4,
707            "Expected four buffers (two from each input array)"
708        );
709
710        let result_strings: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
711        assert_eq!(
712            result_strings,
713            vec![
714                Some("very_long_string_from_first_buffer".to_string()),
715                Some("another_very_long_string_buffer_three".to_string()),
716                Some("very_long_string_from_second_buffer".to_string()),
717                Some("different_long_string_in_buffer_four".to_string()),
718                Some("very_long_string_from_first_buffer".to_string()),
719                Some("different_long_string_in_buffer_four".to_string()),
720            ]
721        );
722
723        let views = result.views();
724        let buffer_indices: Vec<_> = views
725            .iter()
726            .map(|raw_view| ByteView::from(*raw_view).buffer_index)
727            .collect();
728
729        assert_eq!(
730            buffer_indices,
731            vec![
732                0, // First buffer from array A
733                1, // First buffer from array B
734                2, // Second buffer from array A
735                3, // Second buffer from array B
736                0, // First buffer from array A (reused)
737                3, // Second buffer from array B (reused)
738            ]
739        );
740    }
741
742    #[test]
743    fn test_interleave_run_end_encoded_primitive() {
744        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
745        builder.extend([1, 1, 2, 2, 2, 3].into_iter().map(Some));
746        let a = builder.finish();
747
748        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
749        builder.extend([4, 5, 5, 6, 6, 6].into_iter().map(Some));
750        let b = builder.finish();
751
752        let indices = &[(0, 1), (1, 0), (0, 4), (1, 2), (0, 5)];
753        let result = interleave(&[&a, &b], indices).unwrap();
754
755        // The result should be a RunEndEncoded array
756        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
757
758        // Cast to RunArray to access values
759        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
760
761        // Verify the logical values by accessing the logical array directly
762        let expected = vec![1, 4, 2, 5, 3];
763        let mut actual = Vec::new();
764        for i in 0..result_run_array.len() {
765            let physical_idx = result_run_array.get_physical_index(i);
766            let value = result_run_array
767                .values()
768                .as_primitive::<Int32Type>()
769                .value(physical_idx);
770            actual.push(value);
771        }
772        assert_eq!(actual, expected);
773    }
774
775    #[test]
776    fn test_interleave_run_end_encoded_string() {
777        let a: Int32RunArray = vec!["hello", "hello", "world", "world", "foo"]
778            .into_iter()
779            .collect();
780        let b: Int32RunArray = vec!["bar", "baz", "baz", "qux"].into_iter().collect();
781
782        let indices = &[(0, 0), (1, 1), (0, 3), (1, 3), (0, 4)];
783        let result = interleave(&[&a, &b], indices).unwrap();
784
785        // The result should be a RunEndEncoded array
786        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
787
788        // Cast to RunArray to access values
789        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
790
791        // Verify the logical values by accessing the logical array directly
792        let expected = vec!["hello", "baz", "world", "qux", "foo"];
793        let mut actual = Vec::new();
794        for i in 0..result_run_array.len() {
795            let physical_idx = result_run_array.get_physical_index(i);
796            let value = result_run_array
797                .values()
798                .as_string::<i32>()
799                .value(physical_idx);
800            actual.push(value);
801        }
802        assert_eq!(actual, expected);
803    }
804
805    #[test]
806    fn test_interleave_run_end_encoded_with_nulls() {
807        let a: Int32RunArray = vec![Some("a"), Some("a"), None, None, Some("b")]
808            .into_iter()
809            .collect();
810        let b: Int32RunArray = vec![None, Some("c"), Some("c"), Some("d")]
811            .into_iter()
812            .collect();
813
814        let indices = &[(0, 1), (1, 0), (0, 2), (1, 3), (0, 4)];
815        let result = interleave(&[&a, &b], indices).unwrap();
816
817        // The result should be a RunEndEncoded array
818        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
819
820        // Cast to RunArray to access values
821        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
822
823        // Verify the logical values by accessing the logical array directly
824        let expected = vec![Some("a"), None, None, Some("d"), Some("b")];
825        let mut actual = Vec::new();
826        for i in 0..result_run_array.len() {
827            let physical_idx = result_run_array.get_physical_index(i);
828            if result_run_array.values().is_null(physical_idx) {
829                actual.push(None);
830            } else {
831                let value = result_run_array
832                    .values()
833                    .as_string::<i32>()
834                    .value(physical_idx);
835                actual.push(Some(value));
836            }
837        }
838        assert_eq!(actual, expected);
839    }
840
841    #[test]
842    fn test_interleave_run_end_encoded_different_run_types() {
843        let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
844        builder.extend([1, 1, 2, 3, 3].into_iter().map(Some));
845        let a = builder.finish();
846
847        let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
848        builder.extend([4, 5, 5, 6].into_iter().map(Some));
849        let b = builder.finish();
850
851        let indices = &[(0, 0), (1, 1), (0, 3), (1, 3)];
852        let result = interleave(&[&a, &b], indices).unwrap();
853
854        // The result should be a RunEndEncoded array
855        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
856
857        // Cast to RunArray to access values
858        let result_run_array: &RunArray<Int16Type> = result.as_any().downcast_ref().unwrap();
859
860        // Verify the logical values by accessing the logical array directly
861        let expected = vec![1, 5, 3, 6];
862        let mut actual = Vec::new();
863        for i in 0..result_run_array.len() {
864            let physical_idx = result_run_array.get_physical_index(i);
865            let value = result_run_array
866                .values()
867                .as_primitive::<Int32Type>()
868                .value(physical_idx);
869            actual.push(value);
870        }
871        assert_eq!(actual, expected);
872    }
873
874    #[test]
875    fn test_interleave_run_end_encoded_mixed_run_lengths() {
876        let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
877        builder.extend([1, 2, 2, 2, 2, 3, 3, 4].into_iter().map(Some));
878        let a = builder.finish();
879
880        let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
881        builder.extend([5, 5, 5, 6, 7, 7, 8, 8].into_iter().map(Some));
882        let b = builder.finish();
883
884        let indices = &[
885            (0, 0), // 1
886            (1, 2), // 5
887            (0, 3), // 2
888            (1, 3), // 6
889            (0, 6), // 3
890            (1, 6), // 8
891            (0, 7), // 4
892            (1, 4), // 7
893        ];
894        let result = interleave(&[&a, &b], indices).unwrap();
895
896        // The result should be a RunEndEncoded array
897        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
898
899        // Cast to RunArray to access values
900        let result_run_array: &RunArray<Int64Type> = result.as_any().downcast_ref().unwrap();
901
902        // Verify the logical values by accessing the logical array directly
903        let expected = vec![1, 5, 2, 6, 3, 8, 4, 7];
904        let mut actual = Vec::new();
905        for i in 0..result_run_array.len() {
906            let physical_idx = result_run_array.get_physical_index(i);
907            let value = result_run_array
908                .values()
909                .as_primitive::<Int32Type>()
910                .value(physical_idx);
911            actual.push(value);
912        }
913        assert_eq!(actual, expected);
914    }
915
916    #[test]
917    fn test_interleave_run_end_encoded_empty_runs() {
918        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
919        builder.extend([1].into_iter().map(Some));
920        let a = builder.finish();
921
922        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
923        builder.extend([2, 2, 2].into_iter().map(Some));
924        let b = builder.finish();
925
926        let indices = &[(0, 0), (1, 1), (1, 2)];
927        let result = interleave(&[&a, &b], indices).unwrap();
928
929        // The result should be a RunEndEncoded array
930        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
931
932        // Cast to RunArray to access values
933        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
934
935        // Verify the logical values by accessing the logical array directly
936        let expected = vec![1, 2, 2];
937        let mut actual = Vec::new();
938        for i in 0..result_run_array.len() {
939            let physical_idx = result_run_array.get_physical_index(i);
940            let value = result_run_array
941                .values()
942                .as_primitive::<Int32Type>()
943                .value(physical_idx);
944            actual.push(value);
945        }
946        assert_eq!(actual, expected);
947    }
948}