arrow_select/
take.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines take kernel for [Array]
19
20use std::fmt::Display;
21use std::sync::Arc;
22
23use arrow_array::builder::{BufferBuilder, UInt32Builder};
24use arrow_array::cast::AsArray;
25use arrow_array::types::*;
26use arrow_array::*;
27use arrow_buffer::{
28    ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, OffsetBuffer, ScalarBuffer,
29    bit_util,
30};
31use arrow_data::ArrayDataBuilder;
32use arrow_schema::{ArrowError, DataType, FieldRef, UnionMode};
33
34use num_traits::{One, Zero};
35
36/// Take elements by index from [Array], creating a new [Array] from those indexes.
37///
38/// ```text
39/// ┌─────────────────┐      ┌─────────┐                              ┌─────────────────┐
40/// │        A        │      │    0    │                              │        A        │
41/// ├─────────────────┤      ├─────────┤                              ├─────────────────┤
42/// │        D        │      │    2    │                              │        B        │
43/// ├─────────────────┤      ├─────────┤   take(values, indices)      ├─────────────────┤
44/// │        B        │      │    3    │ ─────────────────────────▶   │        C        │
45/// ├─────────────────┤      ├─────────┤                              ├─────────────────┤
46/// │        C        │      │    1    │                              │        D        │
47/// ├─────────────────┤      └─────────┘                              └─────────────────┘
48/// │        E        │
49/// └─────────────────┘
50///    values array          indices array                              result
51/// ```
52///
53/// For selecting values by index from multiple arrays see [`crate::interleave`]
54///
55/// Note that this kernel, similar to other kernels in this crate,
56/// will avoid allocating where not necessary. Consequently
57/// the returned array may share buffers with the inputs
58///
59/// # Errors
60/// This function errors whenever:
61/// * An index cannot be casted to `usize` (typically 32 bit architectures)
62/// * An index is out of bounds and `options` is set to check bounds.
63///
64/// # Safety
65///
66/// When `options` is not set to check bounds, taking indexes after `len` will panic.
67///
68/// # See also
69/// * [`BatchCoalescer`]: to filter multiple [`RecordBatch`] and coalesce
70///   the results into a single array.
71///
72/// [`BatchCoalescer`]: crate::coalesce::BatchCoalescer
73///
74/// # Examples
75/// ```
76/// # use arrow_array::{StringArray, UInt32Array, cast::AsArray};
77/// # use arrow_select::take::take;
78/// let values = StringArray::from(vec!["zero", "one", "two"]);
79///
80/// // Take items at index 2, and 1:
81/// let indices = UInt32Array::from(vec![2, 1]);
82/// let taken = take(&values, &indices, None).unwrap();
83/// let taken = taken.as_string::<i32>();
84///
85/// assert_eq!(*taken, StringArray::from(vec!["two", "one"]));
86/// ```
87pub fn take(
88    values: &dyn Array,
89    indices: &dyn Array,
90    options: Option<TakeOptions>,
91) -> Result<ArrayRef, ArrowError> {
92    let options = options.unwrap_or_default();
93    downcast_integer_array!(
94        indices => {
95            if options.check_bounds {
96                check_bounds(values.len(), indices)?;
97            }
98            let indices = indices.to_indices();
99            take_impl(values, &indices)
100        },
101        d => Err(ArrowError::InvalidArgumentError(format!("Take only supported for integers, got {d:?}")))
102    )
103}
104
105/// For each [ArrayRef] in the [`Vec<ArrayRef>`], take elements by index and create a new
106/// [`Vec<ArrayRef>`] from those indices.
107///
108/// ```text
109/// ┌────────┬────────┐
110/// │        │        │           ┌────────┐                                ┌────────┬────────┐
111/// │   A    │   1    │           │        │                                │        │        │
112/// ├────────┼────────┤           │   0    │                                │   A    │   1    │
113/// │        │        │           ├────────┤                                ├────────┼────────┤
114/// │   D    │   4    │           │        │                                │        │        │
115/// ├────────┼────────┤           │   2    │  take_arrays(values,indices)   │   B    │   2    │
116/// │        │        │           ├────────┤                                ├────────┼────────┤
117/// │   B    │   2    │           │        │  ───────────────────────────►  │        │        │
118/// ├────────┼────────┤           │   3    │                                │   C    │   3    │
119/// │        │        │           ├────────┤                                ├────────┼────────┤
120/// │   C    │   3    │           │        │                                │        │        │
121/// ├────────┼────────┤           │   1    │                                │   D    │   4    │
122/// │        │        │           └────────┘                                └────────┼────────┘
123/// │   E    │   5    │
124/// └────────┴────────┘
125///    values arrays             indices array                                      result
126/// ```
127///
128/// # Errors
129/// This function errors whenever:
130/// * An index cannot be casted to `usize` (typically 32 bit architectures)
131/// * An index is out of bounds and `options` is set to check bounds.
132///
133/// # Safety
134///
135/// When `options` is not set to check bounds, taking indexes after `len` will panic.
136///
137/// # Examples
138/// ```
139/// # use std::sync::Arc;
140/// # use arrow_array::{StringArray, UInt32Array, cast::AsArray};
141/// # use arrow_select::take::{take, take_arrays};
142/// let string_values = Arc::new(StringArray::from(vec!["zero", "one", "two"]));
143/// let values = Arc::new(UInt32Array::from(vec![0, 1, 2]));
144///
145/// // Take items at index 2, and 1:
146/// let indices = UInt32Array::from(vec![2, 1]);
147/// let taken_arrays = take_arrays(&[string_values, values], &indices, None).unwrap();
148/// let taken_string = taken_arrays[0].as_string::<i32>();
149/// assert_eq!(*taken_string, StringArray::from(vec!["two", "one"]));
150/// let taken_values = taken_arrays[1].as_primitive();
151/// assert_eq!(*taken_values, UInt32Array::from(vec![2, 1]));
152/// ```
153pub fn take_arrays(
154    arrays: &[ArrayRef],
155    indices: &dyn Array,
156    options: Option<TakeOptions>,
157) -> Result<Vec<ArrayRef>, ArrowError> {
158    arrays
159        .iter()
160        .map(|array| take(array.as_ref(), indices, options.clone()))
161        .collect()
162}
163
164/// Verifies that the non-null values of `indices` are all `< len`
165fn check_bounds<T: ArrowPrimitiveType>(
166    len: usize,
167    indices: &PrimitiveArray<T>,
168) -> Result<(), ArrowError>
169where
170    T::Native: Display,
171{
172    let len = match T::Native::from_usize(len) {
173        Some(len) => len,
174        None => {
175            if T::DATA_TYPE.is_integer() {
176                // the biggest representable value for T::Native is lower than len, e.g: u8::MAX < 512, no need to check bounds
177                return Ok(());
178            } else {
179                return Err(ArrowError::ComputeError("Cast to usize failed".to_string()));
180            }
181        }
182    };
183
184    if indices.null_count() > 0 {
185        indices.iter().flatten().try_for_each(|index| {
186            if index >= len {
187                return Err(ArrowError::ComputeError(format!(
188                    "Array index out of bounds, cannot get item at index {index} from {len} entries"
189                )));
190            }
191            Ok(())
192        })
193    } else {
194        let in_bounds = indices.values().iter().fold(true, |in_bounds, &i| {
195            in_bounds & (i >= T::Native::ZERO) & (i < len)
196        });
197
198        if !in_bounds {
199            for &index in indices.values() {
200                if index < T::Native::ZERO || index >= len {
201                    return Err(ArrowError::ComputeError(format!(
202                        "Array index out of bounds, cannot get item at index {index} from {len} entries"
203                    )));
204                }
205            }
206        }
207
208        Ok(())
209    }
210}
211
212#[inline(never)]
213fn take_impl<IndexType: ArrowPrimitiveType>(
214    values: &dyn Array,
215    indices: &PrimitiveArray<IndexType>,
216) -> Result<ArrayRef, ArrowError> {
217    if indices.is_empty() {
218        return Ok(new_empty_array(values.data_type()));
219    }
220    downcast_primitive_array! {
221        values => Ok(Arc::new(take_primitive(values, indices)?)),
222        DataType::Boolean => {
223            let values = values.as_any().downcast_ref::<BooleanArray>().unwrap();
224            Ok(Arc::new(take_boolean(values, indices)))
225        }
226        DataType::Utf8 => {
227            Ok(Arc::new(take_bytes(values.as_string::<i32>(), indices)?))
228        }
229        DataType::LargeUtf8 => {
230            Ok(Arc::new(take_bytes(values.as_string::<i64>(), indices)?))
231        }
232        DataType::Utf8View => {
233            Ok(Arc::new(take_byte_view(values.as_string_view(), indices)?))
234        }
235        DataType::List(_) => {
236            Ok(Arc::new(take_list::<_, Int32Type>(values.as_list(), indices)?))
237        }
238        DataType::LargeList(_) => {
239            Ok(Arc::new(take_list::<_, Int64Type>(values.as_list(), indices)?))
240        }
241        DataType::ListView(_) => {
242            Ok(Arc::new(take_list_view::<_, Int32Type>(values.as_list_view(), indices)?))
243        }
244        DataType::LargeListView(_) => {
245            Ok(Arc::new(take_list_view::<_, Int64Type>(values.as_list_view(), indices)?))
246        }
247        DataType::FixedSizeList(_, length) => {
248            let values = values
249                .as_any()
250                .downcast_ref::<FixedSizeListArray>()
251                .unwrap();
252            Ok(Arc::new(take_fixed_size_list(
253                values,
254                indices,
255                *length as u32,
256            )?))
257        }
258        DataType::Map(_, _) => {
259            let list_arr = ListArray::from(values.as_map().clone());
260            let list_data = take_list::<_, Int32Type>(&list_arr, indices)?;
261            let builder = list_data.into_data().into_builder().data_type(values.data_type().clone());
262            Ok(Arc::new(MapArray::from(unsafe { builder.build_unchecked() })))
263        }
264        DataType::Struct(fields) => {
265            let array: &StructArray = values.as_struct();
266            let arrays  = array
267                .columns()
268                .iter()
269                .map(|a| take_impl(a.as_ref(), indices))
270                .collect::<Result<Vec<ArrayRef>, _>>()?;
271            let fields: Vec<(FieldRef, ArrayRef)> =
272                fields.iter().cloned().zip(arrays).collect();
273
274            // Create the null bit buffer.
275            let is_valid: Buffer = indices
276                .iter()
277                .map(|index| {
278                    if let Some(index) = index {
279                        array.is_valid(index.to_usize().unwrap())
280                    } else {
281                        false
282                    }
283                })
284                .collect();
285
286            if fields.is_empty() {
287                let nulls = NullBuffer::new(BooleanBuffer::new(is_valid, 0, indices.len()));
288                Ok(Arc::new(StructArray::new_empty_fields(indices.len(), Some(nulls))))
289            } else {
290                Ok(Arc::new(StructArray::from((fields, is_valid))) as ArrayRef)
291            }
292        }
293        DataType::Dictionary(_, _) => downcast_dictionary_array! {
294            values => Ok(Arc::new(take_dict(values, indices)?)),
295            t => unimplemented!("Take not supported for dictionary type {:?}", t)
296        }
297        DataType::RunEndEncoded(_, _) => downcast_run_array! {
298            values => Ok(Arc::new(take_run(values, indices)?)),
299            t => unimplemented!("Take not supported for run type {:?}", t)
300        }
301        DataType::Binary => {
302            Ok(Arc::new(take_bytes(values.as_binary::<i32>(), indices)?))
303        }
304        DataType::LargeBinary => {
305            Ok(Arc::new(take_bytes(values.as_binary::<i64>(), indices)?))
306        }
307        DataType::BinaryView => {
308            Ok(Arc::new(take_byte_view(values.as_binary_view(), indices)?))
309        }
310        DataType::FixedSizeBinary(size) => {
311            let values = values
312                .as_any()
313                .downcast_ref::<FixedSizeBinaryArray>()
314                .unwrap();
315            Ok(Arc::new(take_fixed_size_binary(values, indices, *size)?))
316        }
317        DataType::Null => {
318            // Take applied to a null array produces a null array.
319            if values.len() >= indices.len() {
320                // If the existing null array is as big as the indices, we can use a slice of it
321                // to avoid allocating a new null array.
322                Ok(values.slice(0, indices.len()))
323            } else {
324                // If the existing null array isn't big enough, create a new one.
325                Ok(new_null_array(&DataType::Null, indices.len()))
326            }
327        }
328        DataType::Union(fields, UnionMode::Sparse) => {
329            let mut children = Vec::with_capacity(fields.len());
330            let values = values.as_any().downcast_ref::<UnionArray>().unwrap();
331            let type_ids = take_native(values.type_ids(), indices);
332            for (type_id, _field) in fields.iter() {
333                let values = values.child(type_id);
334                let values = take_impl(values, indices)?;
335                children.push(values);
336            }
337            let array = UnionArray::try_new(fields.clone(), type_ids, None, children)?;
338            Ok(Arc::new(array))
339        }
340        DataType::Union(fields, UnionMode::Dense) => {
341            let values = values.as_any().downcast_ref::<UnionArray>().unwrap();
342
343            let type_ids = <PrimitiveArray<Int8Type>>::try_new(take_native(values.type_ids(), indices), None)?;
344            let offsets = <PrimitiveArray<Int32Type>>::try_new(take_native(values.offsets().unwrap(), indices), None)?;
345
346            let children = fields.iter()
347                .map(|(field_type_id, _)| {
348                    let mask = BooleanArray::from_unary(&type_ids, |value_type_id| value_type_id == field_type_id);
349
350                    let indices = crate::filter::filter(&offsets, &mask)?;
351
352                    let values = values.child(field_type_id);
353
354                    take_impl(values, indices.as_primitive::<Int32Type>())
355                })
356                .collect::<Result<_, _>>()?;
357
358            let mut child_offsets = [0; 128];
359
360            let offsets = type_ids.values()
361                .iter()
362                .map(|&i| {
363                    let offset = child_offsets[i as usize];
364
365                    child_offsets[i as usize] += 1;
366
367                    offset
368                })
369                .collect();
370
371            let (_, type_ids, _) = type_ids.into_parts();
372
373            let array = UnionArray::try_new(fields.clone(), type_ids, Some(offsets), children)?;
374
375            Ok(Arc::new(array))
376        }
377        t => unimplemented!("Take not supported for data type {:?}", t)
378    }
379}
380
381/// Options that define how `take` should behave
382#[derive(Clone, Debug, Default)]
383pub struct TakeOptions {
384    /// Perform bounds check before taking indices from values.
385    /// If enabled, an `ArrowError` is returned if the indices are out of bounds.
386    /// If not enabled, and indices exceed bounds, the kernel will panic.
387    pub check_bounds: bool,
388}
389
390/// `take` implementation for all primitive arrays
391///
392/// This checks if an `indices` slot is populated, and gets the value from `values`
393///  as the populated index.
394/// If the `indices` slot is null, a null value is returned.
395/// For example, given:
396///     values:  [1, 2, 3, null, 5]
397///     indices: [0, null, 4, 3]
398/// The result is: [1 (slot 0), null (null slot), 5 (slot 4), null (slot 3)]
399fn take_primitive<T, I>(
400    values: &PrimitiveArray<T>,
401    indices: &PrimitiveArray<I>,
402) -> Result<PrimitiveArray<T>, ArrowError>
403where
404    T: ArrowPrimitiveType,
405    I: ArrowPrimitiveType,
406{
407    let values_buf = take_native(values.values(), indices);
408    let nulls = take_nulls(values.nulls(), indices);
409    Ok(PrimitiveArray::try_new(values_buf, nulls)?.with_data_type(values.data_type().clone()))
410}
411
412#[inline(never)]
413fn take_nulls<I: ArrowPrimitiveType>(
414    values: Option<&NullBuffer>,
415    indices: &PrimitiveArray<I>,
416) -> Option<NullBuffer> {
417    match values.filter(|n| n.null_count() > 0) {
418        Some(n) => {
419            let buffer = take_bits(n.inner(), indices);
420            Some(NullBuffer::new(buffer)).filter(|n| n.null_count() > 0)
421        }
422        None => indices.nulls().cloned(),
423    }
424}
425
426#[inline(never)]
427fn take_native<T: ArrowNativeType, I: ArrowPrimitiveType>(
428    values: &[T],
429    indices: &PrimitiveArray<I>,
430) -> ScalarBuffer<T> {
431    match indices.nulls().filter(|n| n.null_count() > 0) {
432        Some(n) => indices
433            .values()
434            .iter()
435            .enumerate()
436            .map(|(idx, index)| match values.get(index.as_usize()) {
437                Some(v) => *v,
438                // SAFETY: idx<indices.len()
439                None => match unsafe { n.inner().value_unchecked(idx) } {
440                    false => T::default(),
441                    true => panic!("Out-of-bounds index {index:?}"),
442                },
443            })
444            .collect(),
445        None => indices
446            .values()
447            .iter()
448            .map(|index| values[index.as_usize()])
449            .collect(),
450    }
451}
452
453#[inline(never)]
454fn take_bits<I: ArrowPrimitiveType>(
455    values: &BooleanBuffer,
456    indices: &PrimitiveArray<I>,
457) -> BooleanBuffer {
458    let len = indices.len();
459
460    match indices.nulls().filter(|n| n.null_count() > 0) {
461        Some(nulls) => {
462            let mut output_buffer = MutableBuffer::new_null(len);
463            let output_slice = output_buffer.as_slice_mut();
464            nulls.valid_indices().for_each(|idx| {
465                // SAFETY: idx is a valid index in indices.nulls() --> idx<indices.len()
466                if values.value(unsafe { indices.value_unchecked(idx).as_usize() }) {
467                    // SAFETY: MutableBuffer was created with space for indices.len() bit, and idx < indices.len()
468                    unsafe { bit_util::set_bit_raw(output_slice.as_mut_ptr(), idx) };
469                }
470            });
471            BooleanBuffer::new(output_buffer.into(), 0, len)
472        }
473        None => {
474            BooleanBuffer::collect_bool(len, |idx: usize| {
475                // SAFETY: idx<indices.len()
476                values.value(unsafe { indices.value_unchecked(idx).as_usize() })
477            })
478        }
479    }
480}
481
482/// `take` implementation for boolean arrays
483fn take_boolean<IndexType: ArrowPrimitiveType>(
484    values: &BooleanArray,
485    indices: &PrimitiveArray<IndexType>,
486) -> BooleanArray {
487    let val_buf = take_bits(values.values(), indices);
488    let null_buf = take_nulls(values.nulls(), indices);
489    BooleanArray::new(val_buf, null_buf)
490}
491
492/// `take` implementation for string arrays
493fn take_bytes<T: ByteArrayType, IndexType: ArrowPrimitiveType>(
494    array: &GenericByteArray<T>,
495    indices: &PrimitiveArray<IndexType>,
496) -> Result<GenericByteArray<T>, ArrowError> {
497    let mut offsets = Vec::with_capacity(indices.len() + 1);
498    offsets.push(T::Offset::default());
499
500    let input_offsets = array.value_offsets();
501    let mut capacity = 0;
502    let nulls = take_nulls(array.nulls(), indices);
503
504    let (offsets, values) = if array.null_count() == 0 && indices.null_count() == 0 {
505        offsets.reserve(indices.len());
506        for index in indices.values() {
507            let index = index.as_usize();
508            capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
509            offsets.push(
510                T::Offset::from_usize(capacity)
511                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
512            );
513        }
514        let mut values = Vec::with_capacity(capacity);
515
516        for index in indices.values() {
517            values.extend_from_slice(array.value(index.as_usize()).as_ref());
518        }
519        (offsets, values)
520    } else if indices.null_count() == 0 {
521        offsets.reserve(indices.len());
522        for index in indices.values() {
523            let index = index.as_usize();
524            if array.is_valid(index) {
525                capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
526            }
527            offsets.push(
528                T::Offset::from_usize(capacity)
529                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
530            );
531        }
532        let mut values = Vec::with_capacity(capacity);
533
534        for index in indices.values() {
535            let index = index.as_usize();
536            if array.is_valid(index) {
537                values.extend_from_slice(array.value(index).as_ref());
538            }
539        }
540        (offsets, values)
541    } else if array.null_count() == 0 {
542        offsets.reserve(indices.len());
543        for (i, index) in indices.values().iter().enumerate() {
544            let index = index.as_usize();
545            if indices.is_valid(i) {
546                capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
547            }
548            offsets.push(
549                T::Offset::from_usize(capacity)
550                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
551            );
552        }
553        let mut values = Vec::with_capacity(capacity);
554
555        for (i, index) in indices.values().iter().enumerate() {
556            if indices.is_valid(i) {
557                values.extend_from_slice(array.value(index.as_usize()).as_ref());
558            }
559        }
560        (offsets, values)
561    } else {
562        let nulls = nulls.as_ref().unwrap();
563        offsets.reserve(indices.len());
564        for (i, index) in indices.values().iter().enumerate() {
565            let index = index.as_usize();
566            if nulls.is_valid(i) {
567                capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
568            }
569            offsets.push(
570                T::Offset::from_usize(capacity)
571                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
572            );
573        }
574        let mut values = Vec::with_capacity(capacity);
575
576        for (i, index) in indices.values().iter().enumerate() {
577            // check index is valid before using index. The value in
578            // NULL index slots may not be within bounds of array
579            let index = index.as_usize();
580            if nulls.is_valid(i) {
581                values.extend_from_slice(array.value(index).as_ref());
582            }
583        }
584        (offsets, values)
585    };
586
587    T::Offset::from_usize(values.len())
588        .ok_or_else(|| ArrowError::OffsetOverflowError(values.len()))?;
589
590    let array = unsafe {
591        let offsets = OffsetBuffer::new_unchecked(offsets.into());
592        GenericByteArray::<T>::new_unchecked(offsets, values.into(), nulls)
593    };
594
595    Ok(array)
596}
597
598/// `take` implementation for byte view arrays
599fn take_byte_view<T: ByteViewType, IndexType: ArrowPrimitiveType>(
600    array: &GenericByteViewArray<T>,
601    indices: &PrimitiveArray<IndexType>,
602) -> Result<GenericByteViewArray<T>, ArrowError> {
603    let new_views = take_native(array.views(), indices);
604    let new_nulls = take_nulls(array.nulls(), indices);
605    // Safety:  array.views was valid, and take_native copies only valid values, and verifies bounds
606    Ok(unsafe {
607        GenericByteViewArray::new_unchecked(new_views, array.data_buffers().to_vec(), new_nulls)
608    })
609}
610
611/// `take` implementation for list arrays
612///
613/// Calculates the index and indexed offset for the inner array,
614/// applying `take` on the inner array, then reconstructing a list array
615/// with the indexed offsets
616fn take_list<IndexType, OffsetType>(
617    values: &GenericListArray<OffsetType::Native>,
618    indices: &PrimitiveArray<IndexType>,
619) -> Result<GenericListArray<OffsetType::Native>, ArrowError>
620where
621    IndexType: ArrowPrimitiveType,
622    OffsetType: ArrowPrimitiveType,
623    OffsetType::Native: OffsetSizeTrait,
624    PrimitiveArray<OffsetType>: From<Vec<OffsetType::Native>>,
625{
626    // TODO: Some optimizations can be done here such as if it is
627    // taking the whole list or a contiguous sublist
628    let (list_indices, offsets, null_buf) =
629        take_value_indices_from_list::<IndexType, OffsetType>(values, indices)?;
630
631    let taken = take_impl::<OffsetType>(values.values().as_ref(), &list_indices)?;
632    let value_offsets = Buffer::from_vec(offsets);
633    // create a new list with taken data and computed null information
634    let list_data = ArrayDataBuilder::new(values.data_type().clone())
635        .len(indices.len())
636        .null_bit_buffer(Some(null_buf.into()))
637        .offset(0)
638        .add_child_data(taken.into_data())
639        .add_buffer(value_offsets);
640
641    let list_data = unsafe { list_data.build_unchecked() };
642
643    Ok(GenericListArray::<OffsetType::Native>::from(list_data))
644}
645
646fn take_list_view<IndexType, OffsetType>(
647    values: &GenericListViewArray<OffsetType::Native>,
648    indices: &PrimitiveArray<IndexType>,
649) -> Result<GenericListViewArray<OffsetType::Native>, ArrowError>
650where
651    IndexType: ArrowPrimitiveType,
652    OffsetType: ArrowPrimitiveType,
653    OffsetType::Native: OffsetSizeTrait,
654{
655    let taken_offsets = take_native(values.offsets(), indices);
656    let taken_sizes = take_native(values.sizes(), indices);
657    let nulls = take_nulls(values.nulls(), indices);
658
659    let list_view_data = ArrayDataBuilder::new(values.data_type().clone())
660        .len(indices.len())
661        .nulls(nulls)
662        .buffers(vec![taken_offsets.into(), taken_sizes.into()])
663        .child_data(vec![values.values().to_data()]);
664
665    // SAFETY: all buffers and child nodes for ListView added in constructor
666    let list_view_data = unsafe { list_view_data.build_unchecked() };
667
668    Ok(GenericListViewArray::<OffsetType::Native>::from(
669        list_view_data,
670    ))
671}
672
673/// `take` implementation for `FixedSizeListArray`
674///
675/// Calculates the index and indexed offset for the inner array,
676/// applying `take` on the inner array, then reconstructing a list array
677/// with the indexed offsets
678fn take_fixed_size_list<IndexType: ArrowPrimitiveType>(
679    values: &FixedSizeListArray,
680    indices: &PrimitiveArray<IndexType>,
681    length: <UInt32Type as ArrowPrimitiveType>::Native,
682) -> Result<FixedSizeListArray, ArrowError> {
683    let list_indices = take_value_indices_from_fixed_size_list(values, indices, length)?;
684    let taken = take_impl::<UInt32Type>(values.values().as_ref(), &list_indices)?;
685
686    // determine null count and null buffer, which are a function of `values` and `indices`
687    let num_bytes = bit_util::ceil(indices.len(), 8);
688    let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
689    let null_slice = null_buf.as_slice_mut();
690
691    for i in 0..indices.len() {
692        let index = indices
693            .value(i)
694            .to_usize()
695            .ok_or_else(|| ArrowError::ComputeError("Cast to usize failed".to_string()))?;
696        if !indices.is_valid(i) || values.is_null(index) {
697            bit_util::unset_bit(null_slice, i);
698        }
699    }
700
701    let list_data = ArrayDataBuilder::new(values.data_type().clone())
702        .len(indices.len())
703        .null_bit_buffer(Some(null_buf.into()))
704        .offset(0)
705        .add_child_data(taken.into_data());
706
707    let list_data = unsafe { list_data.build_unchecked() };
708
709    Ok(FixedSizeListArray::from(list_data))
710}
711
712/// The take kernel implementation for `FixedSizeBinaryArray`.
713///
714/// The computation is done in two steps:
715/// - Compute the values buffer
716/// - Compute the null buffer
717fn take_fixed_size_binary<IndexType: ArrowPrimitiveType>(
718    values: &FixedSizeBinaryArray,
719    indices: &PrimitiveArray<IndexType>,
720    size: i32,
721) -> Result<FixedSizeBinaryArray, ArrowError> {
722    let size_usize = usize::try_from(size).map_err(|_| {
723        ArrowError::InvalidArgumentError(format!("Cannot convert size '{}' to usize", size))
724    })?;
725
726    let values_buffer = values.values().as_slice();
727    let mut values_buffer_builder = BufferBuilder::new(indices.len() * size_usize);
728
729    if indices.null_count() == 0 {
730        let array_iter = indices.values().iter().map(|idx| {
731            let offset = idx.as_usize() * size_usize;
732            &values_buffer[offset..offset + size_usize]
733        });
734        for slice in array_iter {
735            values_buffer_builder.append_slice(slice);
736        }
737    } else {
738        // The indices nullability cannot be ignored here because the values buffer may contain
739        // nulls which should not cause a panic.
740        let array_iter = indices.iter().map(|idx| {
741            idx.map(|idx| {
742                let offset = idx.as_usize() * size_usize;
743                &values_buffer[offset..offset + size_usize]
744            })
745        });
746        for slice in array_iter {
747            match slice {
748                None => values_buffer_builder.append_n(size_usize, 0),
749                Some(slice) => values_buffer_builder.append_slice(slice),
750            }
751        }
752    }
753
754    let values_buffer = values_buffer_builder.finish();
755    let value_nulls = take_nulls(values.nulls(), indices);
756    let final_nulls = NullBuffer::union(value_nulls.as_ref(), indices.nulls());
757
758    let array_data = ArrayDataBuilder::new(DataType::FixedSizeBinary(size))
759        .len(indices.len())
760        .nulls(final_nulls)
761        .offset(0)
762        .add_buffer(values_buffer)
763        .build()?;
764
765    Ok(FixedSizeBinaryArray::from(array_data))
766}
767
768/// `take` implementation for dictionary arrays
769///
770/// applies `take` to the keys of the dictionary array and returns a new dictionary array
771/// with the same dictionary values and reordered keys
772fn take_dict<T: ArrowDictionaryKeyType, I: ArrowPrimitiveType>(
773    values: &DictionaryArray<T>,
774    indices: &PrimitiveArray<I>,
775) -> Result<DictionaryArray<T>, ArrowError> {
776    let new_keys = take_primitive(values.keys(), indices)?;
777    Ok(unsafe { DictionaryArray::new_unchecked(new_keys, values.values().clone()) })
778}
779
780/// `take` implementation for run arrays
781///
782/// Finds physical indices for the given logical indices and builds output run array
783/// by taking values in the input run_array.values at the physical indices.
784/// The output run array will be run encoded on the physical indices and not on output values.
785/// For e.g. an input `RunArray{ run_ends = [2,4,6,8], values=[1,2,1,2] }` and `logical_indices=[2,3,6,7]`
786/// would be converted to `physical_indices=[1,1,3,3]` which will be used to build
787/// output `RunArray{ run_ends=[2,4], values=[2,2] }`.
788fn take_run<T: RunEndIndexType, I: ArrowPrimitiveType>(
789    run_array: &RunArray<T>,
790    logical_indices: &PrimitiveArray<I>,
791) -> Result<RunArray<T>, ArrowError> {
792    // get physical indices for the input logical indices
793    let physical_indices = run_array.get_physical_indices(logical_indices.values())?;
794
795    // Run encode the physical indices into new_run_ends_builder
796    // Keep track of the physical indices to take in take_value_indices
797    // `unwrap` is used in this function because the unwrapped values are bounded by the corresponding `::Native`.
798    let mut new_run_ends_builder = BufferBuilder::<T::Native>::new(1);
799    let mut take_value_indices = BufferBuilder::<I::Native>::new(1);
800    let mut new_physical_len = 1;
801    for ix in 1..physical_indices.len() {
802        if physical_indices[ix] != physical_indices[ix - 1] {
803            take_value_indices.append(I::Native::from_usize(physical_indices[ix - 1]).unwrap());
804            new_run_ends_builder.append(T::Native::from_usize(ix).unwrap());
805            new_physical_len += 1;
806        }
807    }
808    take_value_indices
809        .append(I::Native::from_usize(physical_indices[physical_indices.len() - 1]).unwrap());
810    new_run_ends_builder.append(T::Native::from_usize(physical_indices.len()).unwrap());
811    let new_run_ends = unsafe {
812        // Safety:
813        // The function builds a valid run_ends array and hence need not be validated.
814        ArrayDataBuilder::new(T::DATA_TYPE)
815            .len(new_physical_len)
816            .null_count(0)
817            .add_buffer(new_run_ends_builder.finish())
818            .build_unchecked()
819    };
820
821    let take_value_indices: PrimitiveArray<I> = unsafe {
822        // Safety:
823        // The function builds a valid take_value_indices array and hence need not be validated.
824        ArrayDataBuilder::new(I::DATA_TYPE)
825            .len(new_physical_len)
826            .null_count(0)
827            .add_buffer(take_value_indices.finish())
828            .build_unchecked()
829            .into()
830    };
831
832    let new_values = take(run_array.values(), &take_value_indices, None)?;
833
834    let builder = ArrayDataBuilder::new(run_array.data_type().clone())
835        .len(physical_indices.len())
836        .add_child_data(new_run_ends)
837        .add_child_data(new_values.into_data());
838    let array_data = unsafe {
839        // Safety:
840        //  This function builds a valid run array and hence can skip validation.
841        builder.build_unchecked()
842    };
843    Ok(array_data.into())
844}
845
846/// Takes/filters a list array's inner data using the offsets of the list array.
847///
848/// Where a list array has indices `[0,2,5,10]`, taking indices of `[2,0]` returns
849/// an array of the indices `[5..10, 0..2]` and offsets `[0,5,7]` (5 elements and 2
850/// elements)
851#[allow(clippy::type_complexity)]
852fn take_value_indices_from_list<IndexType, OffsetType>(
853    list: &GenericListArray<OffsetType::Native>,
854    indices: &PrimitiveArray<IndexType>,
855) -> Result<
856    (
857        PrimitiveArray<OffsetType>,
858        Vec<OffsetType::Native>,
859        MutableBuffer,
860    ),
861    ArrowError,
862>
863where
864    IndexType: ArrowPrimitiveType,
865    OffsetType: ArrowPrimitiveType,
866    OffsetType::Native: OffsetSizeTrait + std::ops::Add + Zero + One,
867    PrimitiveArray<OffsetType>: From<Vec<OffsetType::Native>>,
868{
869    // TODO: benchmark this function, there might be a faster unsafe alternative
870    let offsets: &[OffsetType::Native] = list.value_offsets();
871
872    let mut new_offsets = Vec::with_capacity(indices.len());
873    let mut values = Vec::new();
874    let mut current_offset = OffsetType::Native::zero();
875    // add first offset
876    new_offsets.push(OffsetType::Native::zero());
877
878    // Initialize null buffer
879    let num_bytes = bit_util::ceil(indices.len(), 8);
880    let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
881    let null_slice = null_buf.as_slice_mut();
882
883    // compute the value indices, and set offsets accordingly
884    for i in 0..indices.len() {
885        if indices.is_valid(i) {
886            let ix = indices
887                .value(i)
888                .to_usize()
889                .ok_or_else(|| ArrowError::ComputeError("Cast to usize failed".to_string()))?;
890            let start = offsets[ix];
891            let end = offsets[ix + 1];
892            current_offset += end - start;
893            new_offsets.push(current_offset);
894
895            let mut curr = start;
896
897            // if start == end, this slot is empty
898            while curr < end {
899                values.push(curr);
900                curr += One::one();
901            }
902            if !list.is_valid(ix) {
903                bit_util::unset_bit(null_slice, i);
904            }
905        } else {
906            bit_util::unset_bit(null_slice, i);
907            new_offsets.push(current_offset);
908        }
909    }
910
911    Ok((
912        PrimitiveArray::<OffsetType>::from(values),
913        new_offsets,
914        null_buf,
915    ))
916}
917
918/// Takes/filters a fixed size list array's inner data using the offsets of the list array.
919fn take_value_indices_from_fixed_size_list<IndexType>(
920    list: &FixedSizeListArray,
921    indices: &PrimitiveArray<IndexType>,
922    length: <UInt32Type as ArrowPrimitiveType>::Native,
923) -> Result<PrimitiveArray<UInt32Type>, ArrowError>
924where
925    IndexType: ArrowPrimitiveType,
926{
927    let mut values = UInt32Builder::with_capacity(length as usize * indices.len());
928
929    for i in 0..indices.len() {
930        if indices.is_valid(i) {
931            let index = indices
932                .value(i)
933                .to_usize()
934                .ok_or_else(|| ArrowError::ComputeError("Cast to usize failed".to_string()))?;
935            let start = list.value_offset(index) as <UInt32Type as ArrowPrimitiveType>::Native;
936
937            // Safety: Range always has known length.
938            unsafe {
939                values.append_trusted_len_iter(start..start + length);
940            }
941        } else {
942            values.append_nulls(length as usize);
943        }
944    }
945
946    Ok(values.finish())
947}
948
949/// To avoid generating take implementations for every index type, instead we
950/// only generate for UInt32 and UInt64 and coerce inputs to these types
951trait ToIndices {
952    type T: ArrowPrimitiveType;
953
954    fn to_indices(&self) -> PrimitiveArray<Self::T>;
955}
956
957macro_rules! to_indices_reinterpret {
958    ($t:ty, $o:ty) => {
959        impl ToIndices for PrimitiveArray<$t> {
960            type T = $o;
961
962            fn to_indices(&self) -> PrimitiveArray<$o> {
963                let cast = ScalarBuffer::new(self.values().inner().clone(), 0, self.len());
964                PrimitiveArray::new(cast, self.nulls().cloned())
965            }
966        }
967    };
968}
969
970macro_rules! to_indices_identity {
971    ($t:ty) => {
972        impl ToIndices for PrimitiveArray<$t> {
973            type T = $t;
974
975            fn to_indices(&self) -> PrimitiveArray<$t> {
976                self.clone()
977            }
978        }
979    };
980}
981
982macro_rules! to_indices_widening {
983    ($t:ty, $o:ty) => {
984        impl ToIndices for PrimitiveArray<$t> {
985            type T = UInt32Type;
986
987            fn to_indices(&self) -> PrimitiveArray<$o> {
988                let cast = self.values().iter().copied().map(|x| x as _).collect();
989                PrimitiveArray::new(cast, self.nulls().cloned())
990            }
991        }
992    };
993}
994
995to_indices_widening!(UInt8Type, UInt32Type);
996to_indices_widening!(Int8Type, UInt32Type);
997
998to_indices_widening!(UInt16Type, UInt32Type);
999to_indices_widening!(Int16Type, UInt32Type);
1000
1001to_indices_identity!(UInt32Type);
1002to_indices_reinterpret!(Int32Type, UInt32Type);
1003
1004to_indices_identity!(UInt64Type);
1005to_indices_reinterpret!(Int64Type, UInt64Type);
1006
1007/// Take rows by index from [`RecordBatch`] and returns a new [`RecordBatch`] from those indexes.
1008///
1009/// This function will call [`take`] on each array of the [`RecordBatch`] and assemble a new [`RecordBatch`].
1010///
1011/// # Example
1012/// ```
1013/// # use std::sync::Arc;
1014/// # use arrow_array::{StringArray, Int32Array, UInt32Array, RecordBatch};
1015/// # use arrow_schema::{DataType, Field, Schema};
1016/// # use arrow_select::take::take_record_batch;
1017/// let schema = Arc::new(Schema::new(vec![
1018///     Field::new("a", DataType::Int32, true),
1019///     Field::new("b", DataType::Utf8, true),
1020/// ]));
1021/// let batch = RecordBatch::try_new(
1022///     schema.clone(),
1023///     vec![
1024///         Arc::new(Int32Array::from_iter_values(0..20)),
1025///         Arc::new(StringArray::from_iter_values(
1026///             (0..20).map(|i| format!("str-{}", i)),
1027///         )),
1028///     ],
1029/// )
1030/// .unwrap();
1031///
1032/// let indices = UInt32Array::from(vec![1, 5, 10]);
1033/// let taken = take_record_batch(&batch, &indices).unwrap();
1034///
1035/// let expected = RecordBatch::try_new(
1036///     schema,
1037///     vec![
1038///         Arc::new(Int32Array::from(vec![1, 5, 10])),
1039///         Arc::new(StringArray::from(vec!["str-1", "str-5", "str-10"])),
1040///     ],
1041/// )
1042/// .unwrap();
1043/// assert_eq!(taken, expected);
1044/// ```
1045pub fn take_record_batch(
1046    record_batch: &RecordBatch,
1047    indices: &dyn Array,
1048) -> Result<RecordBatch, ArrowError> {
1049    let columns = record_batch
1050        .columns()
1051        .iter()
1052        .map(|c| take(c, indices, None))
1053        .collect::<Result<Vec<_>, _>>()?;
1054    RecordBatch::try_new(record_batch.schema(), columns)
1055}
1056
1057#[cfg(test)]
1058mod tests {
1059    use super::*;
1060    use arrow_array::builder::*;
1061    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
1062    use arrow_data::ArrayData;
1063    use arrow_schema::{Field, Fields, TimeUnit, UnionFields};
1064    use num_traits::ToPrimitive;
1065
1066    fn test_take_decimal_arrays(
1067        data: Vec<Option<i128>>,
1068        index: &UInt32Array,
1069        options: Option<TakeOptions>,
1070        expected_data: Vec<Option<i128>>,
1071        precision: &u8,
1072        scale: &i8,
1073    ) -> Result<(), ArrowError> {
1074        let output = data
1075            .into_iter()
1076            .collect::<Decimal128Array>()
1077            .with_precision_and_scale(*precision, *scale)
1078            .unwrap();
1079
1080        let expected = expected_data
1081            .into_iter()
1082            .collect::<Decimal128Array>()
1083            .with_precision_and_scale(*precision, *scale)
1084            .unwrap();
1085
1086        let expected = Arc::new(expected) as ArrayRef;
1087        let output = take(&output, index, options).unwrap();
1088        assert_eq!(&output, &expected);
1089        Ok(())
1090    }
1091
1092    fn test_take_boolean_arrays(
1093        data: Vec<Option<bool>>,
1094        index: &UInt32Array,
1095        options: Option<TakeOptions>,
1096        expected_data: Vec<Option<bool>>,
1097    ) {
1098        let output = BooleanArray::from(data);
1099        let expected = Arc::new(BooleanArray::from(expected_data)) as ArrayRef;
1100        let output = take(&output, index, options).unwrap();
1101        assert_eq!(&output, &expected)
1102    }
1103
1104    fn test_take_primitive_arrays<T>(
1105        data: Vec<Option<T::Native>>,
1106        index: &UInt32Array,
1107        options: Option<TakeOptions>,
1108        expected_data: Vec<Option<T::Native>>,
1109    ) -> Result<(), ArrowError>
1110    where
1111        T: ArrowPrimitiveType,
1112        PrimitiveArray<T>: From<Vec<Option<T::Native>>>,
1113    {
1114        let output = PrimitiveArray::<T>::from(data);
1115        let expected = Arc::new(PrimitiveArray::<T>::from(expected_data)) as ArrayRef;
1116        let output = take(&output, index, options)?;
1117        assert_eq!(&output, &expected);
1118        Ok(())
1119    }
1120
1121    fn test_take_primitive_arrays_non_null<T>(
1122        data: Vec<T::Native>,
1123        index: &UInt32Array,
1124        options: Option<TakeOptions>,
1125        expected_data: Vec<Option<T::Native>>,
1126    ) -> Result<(), ArrowError>
1127    where
1128        T: ArrowPrimitiveType,
1129        PrimitiveArray<T>: From<Vec<T::Native>>,
1130        PrimitiveArray<T>: From<Vec<Option<T::Native>>>,
1131    {
1132        let output = PrimitiveArray::<T>::from(data);
1133        let expected = Arc::new(PrimitiveArray::<T>::from(expected_data)) as ArrayRef;
1134        let output = take(&output, index, options)?;
1135        assert_eq!(&output, &expected);
1136        Ok(())
1137    }
1138
1139    fn test_take_impl_primitive_arrays<T, I>(
1140        data: Vec<Option<T::Native>>,
1141        index: &PrimitiveArray<I>,
1142        options: Option<TakeOptions>,
1143        expected_data: Vec<Option<T::Native>>,
1144    ) where
1145        T: ArrowPrimitiveType,
1146        PrimitiveArray<T>: From<Vec<Option<T::Native>>>,
1147        I: ArrowPrimitiveType,
1148    {
1149        let output = PrimitiveArray::<T>::from(data);
1150        let expected = PrimitiveArray::<T>::from(expected_data);
1151        let output = take(&output, index, options).unwrap();
1152        let output = output.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
1153        assert_eq!(output, &expected)
1154    }
1155
1156    // create a simple struct for testing purposes
1157    fn create_test_struct(values: Vec<Option<(Option<bool>, Option<i32>)>>) -> StructArray {
1158        let mut struct_builder = StructBuilder::new(
1159            Fields::from(vec![
1160                Field::new("a", DataType::Boolean, true),
1161                Field::new("b", DataType::Int32, true),
1162            ]),
1163            vec![
1164                Box::new(BooleanBuilder::with_capacity(values.len())),
1165                Box::new(Int32Builder::with_capacity(values.len())),
1166            ],
1167        );
1168
1169        for value in values {
1170            struct_builder
1171                .field_builder::<BooleanBuilder>(0)
1172                .unwrap()
1173                .append_option(value.and_then(|v| v.0));
1174            struct_builder
1175                .field_builder::<Int32Builder>(1)
1176                .unwrap()
1177                .append_option(value.and_then(|v| v.1));
1178            struct_builder.append(value.is_some());
1179        }
1180        struct_builder.finish()
1181    }
1182
1183    #[test]
1184    fn test_take_decimal128_non_null_indices() {
1185        let index = UInt32Array::from(vec![0, 5, 3, 1, 4, 2]);
1186        let precision: u8 = 10;
1187        let scale: i8 = 5;
1188        test_take_decimal_arrays(
1189            vec![None, Some(3), Some(5), Some(2), Some(3), None],
1190            &index,
1191            None,
1192            vec![None, None, Some(2), Some(3), Some(3), Some(5)],
1193            &precision,
1194            &scale,
1195        )
1196        .unwrap();
1197    }
1198
1199    #[test]
1200    fn test_take_decimal128() {
1201        let index = UInt32Array::from(vec![Some(3), None, Some(1), Some(3), Some(2)]);
1202        let precision: u8 = 10;
1203        let scale: i8 = 5;
1204        test_take_decimal_arrays(
1205            vec![Some(0), Some(1), Some(2), Some(3), Some(4)],
1206            &index,
1207            None,
1208            vec![Some(3), None, Some(1), Some(3), Some(2)],
1209            &precision,
1210            &scale,
1211        )
1212        .unwrap();
1213    }
1214
1215    #[test]
1216    fn test_take_primitive_non_null_indices() {
1217        let index = UInt32Array::from(vec![0, 5, 3, 1, 4, 2]);
1218        test_take_primitive_arrays::<Int8Type>(
1219            vec![None, Some(3), Some(5), Some(2), Some(3), None],
1220            &index,
1221            None,
1222            vec![None, None, Some(2), Some(3), Some(3), Some(5)],
1223        )
1224        .unwrap();
1225    }
1226
1227    #[test]
1228    fn test_take_primitive_non_null_values() {
1229        let index = UInt32Array::from(vec![Some(3), None, Some(1), Some(3), Some(2)]);
1230        test_take_primitive_arrays::<Int8Type>(
1231            vec![Some(0), Some(1), Some(2), Some(3), Some(4)],
1232            &index,
1233            None,
1234            vec![Some(3), None, Some(1), Some(3), Some(2)],
1235        )
1236        .unwrap();
1237    }
1238
1239    #[test]
1240    fn test_take_primitive_non_null() {
1241        let index = UInt32Array::from(vec![0, 5, 3, 1, 4, 2]);
1242        test_take_primitive_arrays::<Int8Type>(
1243            vec![Some(0), Some(3), Some(5), Some(2), Some(3), Some(1)],
1244            &index,
1245            None,
1246            vec![Some(0), Some(1), Some(2), Some(3), Some(3), Some(5)],
1247        )
1248        .unwrap();
1249    }
1250
1251    #[test]
1252    fn test_take_primitive_nullable_indices_non_null_values_with_offset() {
1253        let index = UInt32Array::from(vec![Some(0), Some(1), Some(2), Some(3), None, None]);
1254        let index = index.slice(2, 4);
1255        let index = index.as_any().downcast_ref::<UInt32Array>().unwrap();
1256
1257        assert_eq!(
1258            index,
1259            &UInt32Array::from(vec![Some(2), Some(3), None, None])
1260        );
1261
1262        test_take_primitive_arrays_non_null::<Int64Type>(
1263            vec![0, 10, 20, 30, 40, 50],
1264            index,
1265            None,
1266            vec![Some(20), Some(30), None, None],
1267        )
1268        .unwrap();
1269    }
1270
1271    #[test]
1272    fn test_take_primitive_nullable_indices_nullable_values_with_offset() {
1273        let index = UInt32Array::from(vec![Some(0), Some(1), Some(2), Some(3), None, None]);
1274        let index = index.slice(2, 4);
1275        let index = index.as_any().downcast_ref::<UInt32Array>().unwrap();
1276
1277        assert_eq!(
1278            index,
1279            &UInt32Array::from(vec![Some(2), Some(3), None, None])
1280        );
1281
1282        test_take_primitive_arrays::<Int64Type>(
1283            vec![None, None, Some(20), Some(30), Some(40), Some(50)],
1284            index,
1285            None,
1286            vec![Some(20), Some(30), None, None],
1287        )
1288        .unwrap();
1289    }
1290
1291    #[test]
1292    fn test_take_primitive() {
1293        let index = UInt32Array::from(vec![Some(3), None, Some(1), Some(3), Some(2)]);
1294
1295        // int8
1296        test_take_primitive_arrays::<Int8Type>(
1297            vec![Some(0), None, Some(2), Some(3), None],
1298            &index,
1299            None,
1300            vec![Some(3), None, None, Some(3), Some(2)],
1301        )
1302        .unwrap();
1303
1304        // int16
1305        test_take_primitive_arrays::<Int16Type>(
1306            vec![Some(0), None, Some(2), Some(3), None],
1307            &index,
1308            None,
1309            vec![Some(3), None, None, Some(3), Some(2)],
1310        )
1311        .unwrap();
1312
1313        // int32
1314        test_take_primitive_arrays::<Int32Type>(
1315            vec![Some(0), None, Some(2), Some(3), None],
1316            &index,
1317            None,
1318            vec![Some(3), None, None, Some(3), Some(2)],
1319        )
1320        .unwrap();
1321
1322        // int64
1323        test_take_primitive_arrays::<Int64Type>(
1324            vec![Some(0), None, Some(2), Some(3), None],
1325            &index,
1326            None,
1327            vec![Some(3), None, None, Some(3), Some(2)],
1328        )
1329        .unwrap();
1330
1331        // uint8
1332        test_take_primitive_arrays::<UInt8Type>(
1333            vec![Some(0), None, Some(2), Some(3), None],
1334            &index,
1335            None,
1336            vec![Some(3), None, None, Some(3), Some(2)],
1337        )
1338        .unwrap();
1339
1340        // uint16
1341        test_take_primitive_arrays::<UInt16Type>(
1342            vec![Some(0), None, Some(2), Some(3), None],
1343            &index,
1344            None,
1345            vec![Some(3), None, None, Some(3), Some(2)],
1346        )
1347        .unwrap();
1348
1349        // uint32
1350        test_take_primitive_arrays::<UInt32Type>(
1351            vec![Some(0), None, Some(2), Some(3), None],
1352            &index,
1353            None,
1354            vec![Some(3), None, None, Some(3), Some(2)],
1355        )
1356        .unwrap();
1357
1358        // int64
1359        test_take_primitive_arrays::<Int64Type>(
1360            vec![Some(0), None, Some(2), Some(-15), None],
1361            &index,
1362            None,
1363            vec![Some(-15), None, None, Some(-15), Some(2)],
1364        )
1365        .unwrap();
1366
1367        // interval_year_month
1368        test_take_primitive_arrays::<IntervalYearMonthType>(
1369            vec![Some(0), None, Some(2), Some(-15), None],
1370            &index,
1371            None,
1372            vec![Some(-15), None, None, Some(-15), Some(2)],
1373        )
1374        .unwrap();
1375
1376        // interval_day_time
1377        let v1 = IntervalDayTime::new(0, 0);
1378        let v2 = IntervalDayTime::new(2, 0);
1379        let v3 = IntervalDayTime::new(-15, 0);
1380        test_take_primitive_arrays::<IntervalDayTimeType>(
1381            vec![Some(v1), None, Some(v2), Some(v3), None],
1382            &index,
1383            None,
1384            vec![Some(v3), None, None, Some(v3), Some(v2)],
1385        )
1386        .unwrap();
1387
1388        // interval_month_day_nano
1389        let v1 = IntervalMonthDayNano::new(0, 0, 0);
1390        let v2 = IntervalMonthDayNano::new(2, 0, 0);
1391        let v3 = IntervalMonthDayNano::new(-15, 0, 0);
1392        test_take_primitive_arrays::<IntervalMonthDayNanoType>(
1393            vec![Some(v1), None, Some(v2), Some(v3), None],
1394            &index,
1395            None,
1396            vec![Some(v3), None, None, Some(v3), Some(v2)],
1397        )
1398        .unwrap();
1399
1400        // duration_second
1401        test_take_primitive_arrays::<DurationSecondType>(
1402            vec![Some(0), None, Some(2), Some(-15), None],
1403            &index,
1404            None,
1405            vec![Some(-15), None, None, Some(-15), Some(2)],
1406        )
1407        .unwrap();
1408
1409        // duration_millisecond
1410        test_take_primitive_arrays::<DurationMillisecondType>(
1411            vec![Some(0), None, Some(2), Some(-15), None],
1412            &index,
1413            None,
1414            vec![Some(-15), None, None, Some(-15), Some(2)],
1415        )
1416        .unwrap();
1417
1418        // duration_microsecond
1419        test_take_primitive_arrays::<DurationMicrosecondType>(
1420            vec![Some(0), None, Some(2), Some(-15), None],
1421            &index,
1422            None,
1423            vec![Some(-15), None, None, Some(-15), Some(2)],
1424        )
1425        .unwrap();
1426
1427        // duration_nanosecond
1428        test_take_primitive_arrays::<DurationNanosecondType>(
1429            vec![Some(0), None, Some(2), Some(-15), None],
1430            &index,
1431            None,
1432            vec![Some(-15), None, None, Some(-15), Some(2)],
1433        )
1434        .unwrap();
1435
1436        // float32
1437        test_take_primitive_arrays::<Float32Type>(
1438            vec![Some(0.0), None, Some(2.21), Some(-3.1), None],
1439            &index,
1440            None,
1441            vec![Some(-3.1), None, None, Some(-3.1), Some(2.21)],
1442        )
1443        .unwrap();
1444
1445        // float64
1446        test_take_primitive_arrays::<Float64Type>(
1447            vec![Some(0.0), None, Some(2.21), Some(-3.1), None],
1448            &index,
1449            None,
1450            vec![Some(-3.1), None, None, Some(-3.1), Some(2.21)],
1451        )
1452        .unwrap();
1453    }
1454
1455    #[test]
1456    fn test_take_preserve_timezone() {
1457        let index = Int64Array::from(vec![Some(0), None]);
1458
1459        let input = TimestampNanosecondArray::from(vec![
1460            1_639_715_368_000_000_000,
1461            1_639_715_368_000_000_000,
1462        ])
1463        .with_timezone("UTC".to_string());
1464        let result = take(&input, &index, None).unwrap();
1465        match result.data_type() {
1466            DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
1467                assert_eq!(tz.clone(), Some("UTC".into()))
1468            }
1469            _ => panic!(),
1470        }
1471    }
1472
1473    #[test]
1474    fn test_take_impl_primitive_with_int64_indices() {
1475        let index = Int64Array::from(vec![Some(3), None, Some(1), Some(3), Some(2)]);
1476
1477        // int16
1478        test_take_impl_primitive_arrays::<Int16Type, Int64Type>(
1479            vec![Some(0), None, Some(2), Some(3), None],
1480            &index,
1481            None,
1482            vec![Some(3), None, None, Some(3), Some(2)],
1483        );
1484
1485        // int64
1486        test_take_impl_primitive_arrays::<Int64Type, Int64Type>(
1487            vec![Some(0), None, Some(2), Some(-15), None],
1488            &index,
1489            None,
1490            vec![Some(-15), None, None, Some(-15), Some(2)],
1491        );
1492
1493        // uint64
1494        test_take_impl_primitive_arrays::<UInt64Type, Int64Type>(
1495            vec![Some(0), None, Some(2), Some(3), None],
1496            &index,
1497            None,
1498            vec![Some(3), None, None, Some(3), Some(2)],
1499        );
1500
1501        // duration_millisecond
1502        test_take_impl_primitive_arrays::<DurationMillisecondType, Int64Type>(
1503            vec![Some(0), None, Some(2), Some(-15), None],
1504            &index,
1505            None,
1506            vec![Some(-15), None, None, Some(-15), Some(2)],
1507        );
1508
1509        // float32
1510        test_take_impl_primitive_arrays::<Float32Type, Int64Type>(
1511            vec![Some(0.0), None, Some(2.21), Some(-3.1), None],
1512            &index,
1513            None,
1514            vec![Some(-3.1), None, None, Some(-3.1), Some(2.21)],
1515        );
1516    }
1517
1518    #[test]
1519    fn test_take_impl_primitive_with_uint8_indices() {
1520        let index = UInt8Array::from(vec![Some(3), None, Some(1), Some(3), Some(2)]);
1521
1522        // int16
1523        test_take_impl_primitive_arrays::<Int16Type, UInt8Type>(
1524            vec![Some(0), None, Some(2), Some(3), None],
1525            &index,
1526            None,
1527            vec![Some(3), None, None, Some(3), Some(2)],
1528        );
1529
1530        // duration_millisecond
1531        test_take_impl_primitive_arrays::<DurationMillisecondType, UInt8Type>(
1532            vec![Some(0), None, Some(2), Some(-15), None],
1533            &index,
1534            None,
1535            vec![Some(-15), None, None, Some(-15), Some(2)],
1536        );
1537
1538        // float32
1539        test_take_impl_primitive_arrays::<Float32Type, UInt8Type>(
1540            vec![Some(0.0), None, Some(2.21), Some(-3.1), None],
1541            &index,
1542            None,
1543            vec![Some(-3.1), None, None, Some(-3.1), Some(2.21)],
1544        );
1545    }
1546
1547    #[test]
1548    fn test_take_bool() {
1549        let index = UInt32Array::from(vec![Some(3), None, Some(1), Some(3), Some(2)]);
1550        // boolean
1551        test_take_boolean_arrays(
1552            vec![Some(false), None, Some(true), Some(false), None],
1553            &index,
1554            None,
1555            vec![Some(false), None, None, Some(false), Some(true)],
1556        );
1557    }
1558
1559    #[test]
1560    fn test_take_bool_nullable_index() {
1561        // indices where the masked invalid elements would be out of bounds
1562        let index_data = ArrayData::try_new(
1563            DataType::UInt32,
1564            6,
1565            Some(Buffer::from_iter(vec![
1566                false, true, false, true, false, true,
1567            ])),
1568            0,
1569            vec![Buffer::from_iter(vec![99, 0, 999, 1, 9999, 2])],
1570            vec![],
1571        )
1572        .unwrap();
1573        let index = UInt32Array::from(index_data);
1574        test_take_boolean_arrays(
1575            vec![Some(true), None, Some(false)],
1576            &index,
1577            None,
1578            vec![None, Some(true), None, None, None, Some(false)],
1579        );
1580    }
1581
1582    #[test]
1583    fn test_take_bool_nullable_index_nonnull_values() {
1584        // indices where the masked invalid elements would be out of bounds
1585        let index_data = ArrayData::try_new(
1586            DataType::UInt32,
1587            6,
1588            Some(Buffer::from_iter(vec![
1589                false, true, false, true, false, true,
1590            ])),
1591            0,
1592            vec![Buffer::from_iter(vec![99, 0, 999, 1, 9999, 2])],
1593            vec![],
1594        )
1595        .unwrap();
1596        let index = UInt32Array::from(index_data);
1597        test_take_boolean_arrays(
1598            vec![Some(true), Some(true), Some(false)],
1599            &index,
1600            None,
1601            vec![None, Some(true), None, Some(true), None, Some(false)],
1602        );
1603    }
1604
1605    #[test]
1606    fn test_take_bool_with_offset() {
1607        let index = UInt32Array::from(vec![Some(3), None, Some(1), Some(3), Some(2), None]);
1608        let index = index.slice(2, 4);
1609        let index = index
1610            .as_any()
1611            .downcast_ref::<PrimitiveArray<UInt32Type>>()
1612            .unwrap();
1613
1614        // boolean
1615        test_take_boolean_arrays(
1616            vec![Some(false), None, Some(true), Some(false), None],
1617            index,
1618            None,
1619            vec![None, Some(false), Some(true), None],
1620        );
1621    }
1622
1623    fn _test_take_string<'a, K>()
1624    where
1625        K: Array + PartialEq + From<Vec<Option<&'a str>>> + 'static,
1626    {
1627        let index = UInt32Array::from(vec![Some(3), None, Some(1), Some(3), Some(4)]);
1628
1629        let array = K::from(vec![
1630            Some("one"),
1631            None,
1632            Some("three"),
1633            Some("four"),
1634            Some("five"),
1635        ]);
1636        let actual = take(&array, &index, None).unwrap();
1637        assert_eq!(actual.len(), index.len());
1638
1639        let actual = actual.as_any().downcast_ref::<K>().unwrap();
1640
1641        let expected = K::from(vec![Some("four"), None, None, Some("four"), Some("five")]);
1642
1643        assert_eq!(actual, &expected);
1644    }
1645
1646    #[test]
1647    fn test_take_string() {
1648        _test_take_string::<StringArray>()
1649    }
1650
1651    #[test]
1652    fn test_take_large_string() {
1653        _test_take_string::<LargeStringArray>()
1654    }
1655
1656    #[test]
1657    fn test_take_slice_string() {
1658        let strings = StringArray::from(vec![Some("hello"), None, Some("world"), None, Some("hi")]);
1659        let indices = Int32Array::from(vec![Some(0), Some(1), None, Some(0), Some(2)]);
1660        let indices_slice = indices.slice(1, 4);
1661        let expected = StringArray::from(vec![None, None, Some("hello"), Some("world")]);
1662        let result = take(&strings, &indices_slice, None).unwrap();
1663        assert_eq!(result.as_ref(), &expected);
1664    }
1665
1666    fn _test_byte_view<T>()
1667    where
1668        T: ByteViewType,
1669        str: AsRef<T::Native>,
1670        T::Native: PartialEq,
1671    {
1672        let index = UInt32Array::from(vec![Some(3), None, Some(1), Some(3), Some(4), Some(2)]);
1673        let array = {
1674            // ["hello", "world", null, "large payload over 12 bytes", "lulu"]
1675            let mut builder = GenericByteViewBuilder::<T>::new();
1676            builder.append_value("hello");
1677            builder.append_value("world");
1678            builder.append_null();
1679            builder.append_value("large payload over 12 bytes");
1680            builder.append_value("lulu");
1681            builder.finish()
1682        };
1683
1684        let actual = take(&array, &index, None).unwrap();
1685
1686        assert_eq!(actual.len(), index.len());
1687
1688        let expected = {
1689            // ["large payload over 12 bytes", null, "world", "large payload over 12 bytes", "lulu", null]
1690            let mut builder = GenericByteViewBuilder::<T>::new();
1691            builder.append_value("large payload over 12 bytes");
1692            builder.append_null();
1693            builder.append_value("world");
1694            builder.append_value("large payload over 12 bytes");
1695            builder.append_value("lulu");
1696            builder.append_null();
1697            builder.finish()
1698        };
1699
1700        assert_eq!(actual.as_ref(), &expected);
1701    }
1702
1703    #[test]
1704    fn test_take_string_view() {
1705        _test_byte_view::<StringViewType>()
1706    }
1707
1708    #[test]
1709    fn test_take_binary_view() {
1710        _test_byte_view::<BinaryViewType>()
1711    }
1712
1713    macro_rules! test_take_list {
1714        ($offset_type:ty, $list_data_type:ident, $list_array_type:ident) => {{
1715            // Construct a value array, [[0,0,0], [-1,-2,-1], [], [2,3]]
1716            let value_data = Int32Array::from(vec![0, 0, 0, -1, -2, -1, 2, 3]).into_data();
1717            // Construct offsets
1718            let value_offsets: [$offset_type; 5] = [0, 3, 6, 6, 8];
1719            let value_offsets = Buffer::from_slice_ref(&value_offsets);
1720            // Construct a list array from the above two
1721            let list_data_type =
1722                DataType::$list_data_type(Arc::new(Field::new_list_field(DataType::Int32, false)));
1723            let list_data = ArrayData::builder(list_data_type.clone())
1724                .len(4)
1725                .add_buffer(value_offsets)
1726                .add_child_data(value_data)
1727                .build()
1728                .unwrap();
1729            let list_array = $list_array_type::from(list_data);
1730
1731            // index returns: [[2,3], null, [-1,-2,-1], [], [0,0,0]]
1732            let index = UInt32Array::from(vec![Some(3), None, Some(1), Some(2), Some(0)]);
1733
1734            let a = take(&list_array, &index, None).unwrap();
1735            let a: &$list_array_type = a.as_any().downcast_ref::<$list_array_type>().unwrap();
1736
1737            // construct a value array with expected results:
1738            // [[2,3], null, [-1,-2,-1], [], [0,0,0]]
1739            let expected_data = Int32Array::from(vec![
1740                Some(2),
1741                Some(3),
1742                Some(-1),
1743                Some(-2),
1744                Some(-1),
1745                Some(0),
1746                Some(0),
1747                Some(0),
1748            ])
1749            .into_data();
1750            // construct offsets
1751            let expected_offsets: [$offset_type; 6] = [0, 2, 2, 5, 5, 8];
1752            let expected_offsets = Buffer::from_slice_ref(&expected_offsets);
1753            // construct list array from the two
1754            let expected_list_data = ArrayData::builder(list_data_type)
1755                .len(5)
1756                // null buffer remains the same as only the indices have nulls
1757                .nulls(index.nulls().cloned())
1758                .add_buffer(expected_offsets)
1759                .add_child_data(expected_data)
1760                .build()
1761                .unwrap();
1762            let expected_list_array = $list_array_type::from(expected_list_data);
1763
1764            assert_eq!(a, &expected_list_array);
1765        }};
1766    }
1767
1768    macro_rules! test_take_list_with_value_nulls {
1769        ($offset_type:ty, $list_data_type:ident, $list_array_type:ident) => {{
1770            // Construct a value array, [[0,null,0], [-1,-2,3], [null], [5,null]]
1771            let value_data = Int32Array::from(vec![
1772                Some(0),
1773                None,
1774                Some(0),
1775                Some(-1),
1776                Some(-2),
1777                Some(3),
1778                None,
1779                Some(5),
1780                None,
1781            ])
1782            .into_data();
1783            // Construct offsets
1784            let value_offsets: [$offset_type; 5] = [0, 3, 6, 7, 9];
1785            let value_offsets = Buffer::from_slice_ref(&value_offsets);
1786            // Construct a list array from the above two
1787            let list_data_type =
1788                DataType::$list_data_type(Arc::new(Field::new_list_field(DataType::Int32, true)));
1789            let list_data = ArrayData::builder(list_data_type.clone())
1790                .len(4)
1791                .add_buffer(value_offsets)
1792                .null_bit_buffer(Some(Buffer::from([0b11111111])))
1793                .add_child_data(value_data)
1794                .build()
1795                .unwrap();
1796            let list_array = $list_array_type::from(list_data);
1797
1798            // index returns: [[null], null, [-1,-2,3], [2,null], [0,null,0]]
1799            let index = UInt32Array::from(vec![Some(2), None, Some(1), Some(3), Some(0)]);
1800
1801            let a = take(&list_array, &index, None).unwrap();
1802            let a: &$list_array_type = a.as_any().downcast_ref::<$list_array_type>().unwrap();
1803
1804            // construct a value array with expected results:
1805            // [[null], null, [-1,-2,3], [5,null], [0,null,0]]
1806            let expected_data = Int32Array::from(vec![
1807                None,
1808                Some(-1),
1809                Some(-2),
1810                Some(3),
1811                Some(5),
1812                None,
1813                Some(0),
1814                None,
1815                Some(0),
1816            ])
1817            .into_data();
1818            // construct offsets
1819            let expected_offsets: [$offset_type; 6] = [0, 1, 1, 4, 6, 9];
1820            let expected_offsets = Buffer::from_slice_ref(&expected_offsets);
1821            // construct list array from the two
1822            let expected_list_data = ArrayData::builder(list_data_type)
1823                .len(5)
1824                // null buffer remains the same as only the indices have nulls
1825                .nulls(index.nulls().cloned())
1826                .add_buffer(expected_offsets)
1827                .add_child_data(expected_data)
1828                .build()
1829                .unwrap();
1830            let expected_list_array = $list_array_type::from(expected_list_data);
1831
1832            assert_eq!(a, &expected_list_array);
1833        }};
1834    }
1835
1836    macro_rules! test_take_list_with_nulls {
1837        ($offset_type:ty, $list_data_type:ident, $list_array_type:ident) => {{
1838            // Construct a value array, [[0,null,0], [-1,-2,3], null, [5,null]]
1839            let value_data = Int32Array::from(vec![
1840                Some(0),
1841                None,
1842                Some(0),
1843                Some(-1),
1844                Some(-2),
1845                Some(3),
1846                Some(5),
1847                None,
1848            ])
1849            .into_data();
1850            // Construct offsets
1851            let value_offsets: [$offset_type; 5] = [0, 3, 6, 6, 8];
1852            let value_offsets = Buffer::from_slice_ref(&value_offsets);
1853            // Construct a list array from the above two
1854            let list_data_type =
1855                DataType::$list_data_type(Arc::new(Field::new_list_field(DataType::Int32, true)));
1856            let list_data = ArrayData::builder(list_data_type.clone())
1857                .len(4)
1858                .add_buffer(value_offsets)
1859                .null_bit_buffer(Some(Buffer::from([0b11111011])))
1860                .add_child_data(value_data)
1861                .build()
1862                .unwrap();
1863            let list_array = $list_array_type::from(list_data);
1864
1865            // index returns: [null, null, [-1,-2,3], [5,null], [0,null,0]]
1866            let index = UInt32Array::from(vec![Some(2), None, Some(1), Some(3), Some(0)]);
1867
1868            let a = take(&list_array, &index, None).unwrap();
1869            let a: &$list_array_type = a.as_any().downcast_ref::<$list_array_type>().unwrap();
1870
1871            // construct a value array with expected results:
1872            // [null, null, [-1,-2,3], [5,null], [0,null,0]]
1873            let expected_data = Int32Array::from(vec![
1874                Some(-1),
1875                Some(-2),
1876                Some(3),
1877                Some(5),
1878                None,
1879                Some(0),
1880                None,
1881                Some(0),
1882            ])
1883            .into_data();
1884            // construct offsets
1885            let expected_offsets: [$offset_type; 6] = [0, 0, 0, 3, 5, 8];
1886            let expected_offsets = Buffer::from_slice_ref(&expected_offsets);
1887            // construct list array from the two
1888            let mut null_bits: [u8; 1] = [0; 1];
1889            bit_util::set_bit(&mut null_bits, 2);
1890            bit_util::set_bit(&mut null_bits, 3);
1891            bit_util::set_bit(&mut null_bits, 4);
1892            let expected_list_data = ArrayData::builder(list_data_type)
1893                .len(5)
1894                // null buffer must be recalculated as both values and indices have nulls
1895                .null_bit_buffer(Some(Buffer::from(null_bits)))
1896                .add_buffer(expected_offsets)
1897                .add_child_data(expected_data)
1898                .build()
1899                .unwrap();
1900            let expected_list_array = $list_array_type::from(expected_list_data);
1901
1902            assert_eq!(a, &expected_list_array);
1903        }};
1904    }
1905
1906    fn test_take_list_view_generic<OffsetType: OffsetSizeTrait, ValuesType: ArrowPrimitiveType, F>(
1907        values: Vec<Option<Vec<Option<ValuesType::Native>>>>,
1908        take_indices: Vec<Option<usize>>,
1909        expected: Vec<Option<Vec<Option<ValuesType::Native>>>>,
1910        mapper: F,
1911    ) where
1912        F: Fn(GenericListViewArray<OffsetType>) -> GenericListViewArray<OffsetType>,
1913    {
1914        let mut list_view_array =
1915            GenericListViewBuilder::<OffsetType, _>::new(PrimitiveBuilder::<ValuesType>::new());
1916
1917        for value in values {
1918            list_view_array.append_option(value);
1919        }
1920        let list_view_array = list_view_array.finish();
1921        let list_view_array = mapper(list_view_array);
1922
1923        let mut indices = UInt64Builder::new();
1924        for idx in take_indices {
1925            indices.append_option(idx.map(|i| i.to_u64().unwrap()));
1926        }
1927        let indices = indices.finish();
1928
1929        let taken = take(&list_view_array, &indices, None)
1930            .unwrap()
1931            .as_list_view()
1932            .clone();
1933
1934        let mut expected_array =
1935            GenericListViewBuilder::<OffsetType, _>::new(PrimitiveBuilder::<ValuesType>::new());
1936        for value in expected {
1937            expected_array.append_option(value);
1938        }
1939        let expected_array = expected_array.finish();
1940
1941        assert_eq!(taken, expected_array);
1942    }
1943
1944    macro_rules! list_view_test_case {
1945        (values: $values:expr, indices: $indices:expr, expected: $expected: expr) => {{
1946            test_take_list_view_generic::<i32, Int8Type, _>($values, $indices, $expected, |x| x);
1947            test_take_list_view_generic::<i64, Int8Type, _>($values, $indices, $expected, |x| x);
1948        }};
1949        (values: $values:expr, transform: $fn:expr, indices: $indices:expr, expected: $expected: expr) => {{
1950            test_take_list_view_generic::<i32, Int8Type, _>($values, $indices, $expected, $fn);
1951            test_take_list_view_generic::<i64, Int8Type, _>($values, $indices, $expected, $fn);
1952        }};
1953    }
1954
1955    fn do_take_fixed_size_list_test<T>(
1956        length: <Int32Type as ArrowPrimitiveType>::Native,
1957        input_data: Vec<Option<Vec<Option<T::Native>>>>,
1958        indices: Vec<<UInt32Type as ArrowPrimitiveType>::Native>,
1959        expected_data: Vec<Option<Vec<Option<T::Native>>>>,
1960    ) where
1961        T: ArrowPrimitiveType,
1962        PrimitiveArray<T>: From<Vec<Option<T::Native>>>,
1963    {
1964        let indices = UInt32Array::from(indices);
1965
1966        let input_array = FixedSizeListArray::from_iter_primitive::<T, _, _>(input_data, length);
1967
1968        let output = take_fixed_size_list(&input_array, &indices, length as u32).unwrap();
1969
1970        let expected = FixedSizeListArray::from_iter_primitive::<T, _, _>(expected_data, length);
1971
1972        assert_eq!(&output, &expected)
1973    }
1974
1975    #[test]
1976    fn test_take_list() {
1977        test_take_list!(i32, List, ListArray);
1978    }
1979
1980    #[test]
1981    fn test_take_large_list() {
1982        test_take_list!(i64, LargeList, LargeListArray);
1983    }
1984
1985    #[test]
1986    fn test_take_list_with_value_nulls() {
1987        test_take_list_with_value_nulls!(i32, List, ListArray);
1988    }
1989
1990    #[test]
1991    fn test_take_large_list_with_value_nulls() {
1992        test_take_list_with_value_nulls!(i64, LargeList, LargeListArray);
1993    }
1994
1995    #[test]
1996    fn test_test_take_list_with_nulls() {
1997        test_take_list_with_nulls!(i32, List, ListArray);
1998    }
1999
2000    #[test]
2001    fn test_test_take_large_list_with_nulls() {
2002        test_take_list_with_nulls!(i64, LargeList, LargeListArray);
2003    }
2004
2005    #[test]
2006    fn test_test_take_list_view_reversed() {
2007        // Take reversed indices
2008        list_view_test_case! {
2009            values: vec![
2010                Some(vec![Some(1), None, Some(3)]),
2011                None,
2012                Some(vec![Some(7), Some(8), None]),
2013            ],
2014            indices: vec![Some(2), Some(1), Some(0)],
2015            expected: vec![
2016                Some(vec![Some(7), Some(8), None]),
2017                None,
2018                Some(vec![Some(1), None, Some(3)]),
2019            ]
2020        }
2021    }
2022
2023    #[test]
2024    fn test_take_list_view_null_indices() {
2025        // Take with null indices
2026        list_view_test_case! {
2027            values: vec![
2028                Some(vec![Some(1), None, Some(3)]),
2029                None,
2030                Some(vec![Some(7), Some(8), None]),
2031            ],
2032            indices: vec![None, Some(0), None],
2033            expected: vec![None, Some(vec![Some(1), None, Some(3)]), None]
2034        }
2035    }
2036
2037    #[test]
2038    fn test_take_list_view_null_values() {
2039        // Take at null values
2040        list_view_test_case! {
2041            values: vec![
2042                Some(vec![Some(1), None, Some(3)]),
2043                None,
2044                Some(vec![Some(7), Some(8), None]),
2045            ],
2046            indices: vec![Some(1), Some(1), Some(1), None, None],
2047            expected: vec![None; 5]
2048        }
2049    }
2050
2051    #[test]
2052    fn test_take_list_view_sliced() {
2053        // Take null indices/values, with slicing.
2054        list_view_test_case! {
2055            values: vec![
2056                Some(vec![Some(1)]),
2057                None,
2058                None,
2059                Some(vec![Some(2), Some(3)]),
2060                Some(vec![Some(4), Some(5)]),
2061                None,
2062            ],
2063            transform: |l| l.slice(2, 4),
2064            indices: vec![Some(0), Some(3), None, Some(1), Some(2)],
2065            expected: vec![
2066                None, None, None, Some(vec![Some(2), Some(3)]), Some(vec![Some(4), Some(5)])
2067            ]
2068        }
2069    }
2070
2071    #[test]
2072    fn test_take_fixed_size_list() {
2073        do_take_fixed_size_list_test::<Int32Type>(
2074            3,
2075            vec![
2076                Some(vec![None, Some(1), Some(2)]),
2077                Some(vec![Some(3), Some(4), None]),
2078                Some(vec![Some(6), Some(7), Some(8)]),
2079            ],
2080            vec![2, 1, 0],
2081            vec![
2082                Some(vec![Some(6), Some(7), Some(8)]),
2083                Some(vec![Some(3), Some(4), None]),
2084                Some(vec![None, Some(1), Some(2)]),
2085            ],
2086        );
2087
2088        do_take_fixed_size_list_test::<UInt8Type>(
2089            1,
2090            vec![
2091                Some(vec![Some(1)]),
2092                Some(vec![Some(2)]),
2093                Some(vec![Some(3)]),
2094                Some(vec![Some(4)]),
2095                Some(vec![Some(5)]),
2096                Some(vec![Some(6)]),
2097                Some(vec![Some(7)]),
2098                Some(vec![Some(8)]),
2099            ],
2100            vec![2, 7, 0],
2101            vec![
2102                Some(vec![Some(3)]),
2103                Some(vec![Some(8)]),
2104                Some(vec![Some(1)]),
2105            ],
2106        );
2107
2108        do_take_fixed_size_list_test::<UInt64Type>(
2109            3,
2110            vec![
2111                Some(vec![Some(10), Some(11), Some(12)]),
2112                Some(vec![Some(13), Some(14), Some(15)]),
2113                None,
2114                Some(vec![Some(16), Some(17), Some(18)]),
2115            ],
2116            vec![3, 2, 1, 2, 0],
2117            vec![
2118                Some(vec![Some(16), Some(17), Some(18)]),
2119                None,
2120                Some(vec![Some(13), Some(14), Some(15)]),
2121                None,
2122                Some(vec![Some(10), Some(11), Some(12)]),
2123            ],
2124        );
2125    }
2126
2127    #[test]
2128    fn test_take_fixed_size_binary_with_nulls_indices() {
2129        let fsb = FixedSizeBinaryArray::try_from_sparse_iter_with_size(
2130            [
2131                Some(vec![0x01, 0x01, 0x01, 0x01]),
2132                Some(vec![0x02, 0x02, 0x02, 0x02]),
2133                Some(vec![0x03, 0x03, 0x03, 0x03]),
2134                Some(vec![0x04, 0x04, 0x04, 0x04]),
2135            ]
2136            .into_iter(),
2137            4,
2138        )
2139        .unwrap();
2140
2141        // The two middle indices are null -> Should be null in the output.
2142        let indices = UInt32Array::from(vec![Some(0), None, None, Some(3)]);
2143
2144        let result = take_fixed_size_binary(&fsb, &indices, 4).unwrap();
2145        assert_eq!(result.len(), 4);
2146        assert_eq!(result.null_count(), 2);
2147        assert_eq!(
2148            result.nulls().unwrap().iter().collect::<Vec<_>>(),
2149            vec![true, false, false, true]
2150        );
2151    }
2152
2153    #[test]
2154    #[should_panic(expected = "index out of bounds: the len is 4 but the index is 1000")]
2155    fn test_take_list_out_of_bounds() {
2156        // Construct a value array, [[0,0,0], [-1,-2,-1], [2,3]]
2157        let value_data = Int32Array::from(vec![0, 0, 0, -1, -2, -1, 2, 3]).into_data();
2158        // Construct offsets
2159        let value_offsets = Buffer::from_slice_ref([0, 3, 6, 8]);
2160        // Construct a list array from the above two
2161        let list_data_type =
2162            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false)));
2163        let list_data = ArrayData::builder(list_data_type)
2164            .len(3)
2165            .add_buffer(value_offsets)
2166            .add_child_data(value_data)
2167            .build()
2168            .unwrap();
2169        let list_array = ListArray::from(list_data);
2170
2171        let index = UInt32Array::from(vec![1000]);
2172
2173        // A panic is expected here since we have not supplied the check_bounds
2174        // option.
2175        take(&list_array, &index, None).unwrap();
2176    }
2177
2178    #[test]
2179    fn test_take_map() {
2180        let values = Int32Array::from(vec![1, 2, 3, 4]);
2181        let array =
2182            MapArray::new_from_strings(vec!["a", "b", "c", "a"].into_iter(), &values, &[0, 3, 4])
2183                .unwrap();
2184
2185        let index = UInt32Array::from(vec![0]);
2186
2187        let result = take(&array, &index, None).unwrap();
2188        let expected: ArrayRef = Arc::new(
2189            MapArray::new_from_strings(
2190                vec!["a", "b", "c"].into_iter(),
2191                &values.slice(0, 3),
2192                &[0, 3],
2193            )
2194            .unwrap(),
2195        );
2196        assert_eq!(&expected, &result);
2197    }
2198
2199    #[test]
2200    fn test_take_struct() {
2201        let array = create_test_struct(vec![
2202            Some((Some(true), Some(42))),
2203            Some((Some(false), Some(28))),
2204            Some((Some(false), Some(19))),
2205            Some((Some(true), Some(31))),
2206            None,
2207        ]);
2208
2209        let index = UInt32Array::from(vec![0, 3, 1, 0, 2, 4]);
2210        let actual = take(&array, &index, None).unwrap();
2211        let actual: &StructArray = actual.as_any().downcast_ref::<StructArray>().unwrap();
2212        assert_eq!(index.len(), actual.len());
2213        assert_eq!(1, actual.null_count());
2214
2215        let expected = create_test_struct(vec![
2216            Some((Some(true), Some(42))),
2217            Some((Some(true), Some(31))),
2218            Some((Some(false), Some(28))),
2219            Some((Some(true), Some(42))),
2220            Some((Some(false), Some(19))),
2221            None,
2222        ]);
2223
2224        assert_eq!(&expected, actual);
2225
2226        let nulls = NullBuffer::from(&[false, true, false, true, false, true]);
2227        let empty_struct_arr = StructArray::new_empty_fields(6, Some(nulls));
2228        let index = UInt32Array::from(vec![0, 2, 1, 4]);
2229        let actual = take(&empty_struct_arr, &index, None).unwrap();
2230
2231        let expected_nulls = NullBuffer::from(&[false, false, true, false]);
2232        let expected_struct_arr = StructArray::new_empty_fields(4, Some(expected_nulls));
2233        assert_eq!(&expected_struct_arr, actual.as_struct());
2234    }
2235
2236    #[test]
2237    fn test_take_struct_with_null_indices() {
2238        let array = create_test_struct(vec![
2239            Some((Some(true), Some(42))),
2240            Some((Some(false), Some(28))),
2241            Some((Some(false), Some(19))),
2242            Some((Some(true), Some(31))),
2243            None,
2244        ]);
2245
2246        let index = UInt32Array::from(vec![None, Some(3), Some(1), None, Some(0), Some(4)]);
2247        let actual = take(&array, &index, None).unwrap();
2248        let actual: &StructArray = actual.as_any().downcast_ref::<StructArray>().unwrap();
2249        assert_eq!(index.len(), actual.len());
2250        assert_eq!(3, actual.null_count()); // 2 because of indices, 1 because of struct array
2251
2252        let expected = create_test_struct(vec![
2253            None,
2254            Some((Some(true), Some(31))),
2255            Some((Some(false), Some(28))),
2256            None,
2257            Some((Some(true), Some(42))),
2258            None,
2259        ]);
2260
2261        assert_eq!(&expected, actual);
2262    }
2263
2264    #[test]
2265    fn test_take_out_of_bounds() {
2266        let index = UInt32Array::from(vec![Some(3), None, Some(1), Some(3), Some(6)]);
2267        let take_opt = TakeOptions { check_bounds: true };
2268
2269        // int64
2270        let result = test_take_primitive_arrays::<Int64Type>(
2271            vec![Some(0), None, Some(2), Some(3), None],
2272            &index,
2273            Some(take_opt),
2274            vec![None],
2275        );
2276        assert!(result.is_err());
2277    }
2278
2279    #[test]
2280    #[should_panic(expected = "index out of bounds: the len is 4 but the index is 1000")]
2281    fn test_take_out_of_bounds_panic() {
2282        let index = UInt32Array::from(vec![Some(1000)]);
2283
2284        test_take_primitive_arrays::<Int64Type>(
2285            vec![Some(0), Some(1), Some(2), Some(3)],
2286            &index,
2287            None,
2288            vec![None],
2289        )
2290        .unwrap();
2291    }
2292
2293    #[test]
2294    fn test_null_array_smaller_than_indices() {
2295        let values = NullArray::new(2);
2296        let indices = UInt32Array::from(vec![Some(0), None, Some(15)]);
2297
2298        let result = take(&values, &indices, None).unwrap();
2299        let expected: ArrayRef = Arc::new(NullArray::new(3));
2300        assert_eq!(&result, &expected);
2301    }
2302
2303    #[test]
2304    fn test_null_array_larger_than_indices() {
2305        let values = NullArray::new(5);
2306        let indices = UInt32Array::from(vec![Some(0), None, Some(15)]);
2307
2308        let result = take(&values, &indices, None).unwrap();
2309        let expected: ArrayRef = Arc::new(NullArray::new(3));
2310        assert_eq!(&result, &expected);
2311    }
2312
2313    #[test]
2314    fn test_null_array_indices_out_of_bounds() {
2315        let values = NullArray::new(5);
2316        let indices = UInt32Array::from(vec![Some(0), None, Some(15)]);
2317
2318        let result = take(&values, &indices, Some(TakeOptions { check_bounds: true }));
2319        assert_eq!(
2320            result.unwrap_err().to_string(),
2321            "Compute error: Array index out of bounds, cannot get item at index 15 from 5 entries"
2322        );
2323    }
2324
2325    #[test]
2326    fn test_take_dict() {
2327        let mut dict_builder = StringDictionaryBuilder::<Int16Type>::new();
2328
2329        dict_builder.append("foo").unwrap();
2330        dict_builder.append("bar").unwrap();
2331        dict_builder.append("").unwrap();
2332        dict_builder.append_null();
2333        dict_builder.append("foo").unwrap();
2334        dict_builder.append("bar").unwrap();
2335        dict_builder.append("bar").unwrap();
2336        dict_builder.append("foo").unwrap();
2337
2338        let array = dict_builder.finish();
2339        let dict_values = array.values().clone();
2340        let dict_values = dict_values.as_any().downcast_ref::<StringArray>().unwrap();
2341
2342        let indices = UInt32Array::from(vec![
2343            Some(0), // first "foo"
2344            Some(7), // last "foo"
2345            None,    // null index should return null
2346            Some(5), // second "bar"
2347            Some(6), // another "bar"
2348            Some(2), // empty string
2349            Some(3), // input is null at this index
2350        ]);
2351
2352        let result = take(&array, &indices, None).unwrap();
2353        let result = result
2354            .as_any()
2355            .downcast_ref::<DictionaryArray<Int16Type>>()
2356            .unwrap();
2357
2358        let result_values: StringArray = result.values().to_data().into();
2359
2360        // dictionary values should stay the same
2361        let expected_values = StringArray::from(vec!["foo", "bar", ""]);
2362        assert_eq!(&expected_values, dict_values);
2363        assert_eq!(&expected_values, &result_values);
2364
2365        let expected_keys = Int16Array::from(vec![
2366            Some(0),
2367            Some(0),
2368            None,
2369            Some(1),
2370            Some(1),
2371            Some(2),
2372            None,
2373        ]);
2374        assert_eq!(result.keys(), &expected_keys);
2375    }
2376
2377    fn build_generic_list<S, T>(data: Vec<Option<Vec<T::Native>>>) -> GenericListArray<S>
2378    where
2379        S: OffsetSizeTrait + 'static,
2380        T: ArrowPrimitiveType,
2381        PrimitiveArray<T>: From<Vec<Option<T::Native>>>,
2382    {
2383        GenericListArray::from_iter_primitive::<T, _, _>(
2384            data.iter()
2385                .map(|x| x.as_ref().map(|x| x.iter().map(|x| Some(*x)))),
2386        )
2387    }
2388
2389    #[test]
2390    fn test_take_value_index_from_list() {
2391        let list = build_generic_list::<i32, Int32Type>(vec![
2392            Some(vec![0, 1]),
2393            Some(vec![2, 3, 4]),
2394            Some(vec![5, 6, 7, 8, 9]),
2395        ]);
2396        let indices = UInt32Array::from(vec![2, 0]);
2397
2398        let (indexed, offsets, null_buf) = take_value_indices_from_list(&list, &indices).unwrap();
2399
2400        assert_eq!(indexed, Int32Array::from(vec![5, 6, 7, 8, 9, 0, 1]));
2401        assert_eq!(offsets, vec![0, 5, 7]);
2402        assert_eq!(null_buf.as_slice(), &[0b11111111]);
2403    }
2404
2405    #[test]
2406    fn test_take_value_index_from_large_list() {
2407        let list = build_generic_list::<i64, Int32Type>(vec![
2408            Some(vec![0, 1]),
2409            Some(vec![2, 3, 4]),
2410            Some(vec![5, 6, 7, 8, 9]),
2411        ]);
2412        let indices = UInt32Array::from(vec![2, 0]);
2413
2414        let (indexed, offsets, null_buf) =
2415            take_value_indices_from_list::<_, Int64Type>(&list, &indices).unwrap();
2416
2417        assert_eq!(indexed, Int64Array::from(vec![5, 6, 7, 8, 9, 0, 1]));
2418        assert_eq!(offsets, vec![0, 5, 7]);
2419        assert_eq!(null_buf.as_slice(), &[0b11111111]);
2420    }
2421
2422    #[test]
2423    fn test_take_runs() {
2424        let logical_array: Vec<i32> = vec![1_i32, 1, 2, 2, 1, 1, 1, 2, 2, 1, 1, 2, 2];
2425
2426        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
2427        builder.extend(logical_array.into_iter().map(Some));
2428        let run_array = builder.finish();
2429
2430        let take_indices: PrimitiveArray<Int32Type> =
2431            vec![7, 2, 3, 7, 11, 4, 6].into_iter().collect();
2432
2433        let take_out = take_run(&run_array, &take_indices).unwrap();
2434
2435        assert_eq!(take_out.len(), 7);
2436        assert_eq!(take_out.run_ends().len(), 7);
2437        assert_eq!(take_out.run_ends().values(), &[1_i32, 3, 4, 5, 7]);
2438
2439        let take_out_values = take_out.values().as_primitive::<Int32Type>();
2440        assert_eq!(take_out_values.values(), &[2, 2, 2, 2, 1]);
2441    }
2442
2443    #[test]
2444    fn test_take_runs_sliced() {
2445        let logical_array: Vec<i32> = vec![1, 1, 2, 2, 3, 3, 3, 4, 4, 5, 5, 6, 6];
2446
2447        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
2448        builder.extend(logical_array.into_iter().map(Some));
2449        let run_array = builder.finish();
2450
2451        let run_array = run_array.slice(4, 6); // [3, 3, 3, 4, 4, 5]
2452
2453        let take_indices: PrimitiveArray<Int32Type> = vec![0, 5, 5, 1, 4].into_iter().collect();
2454
2455        let result = take_run(&run_array, &take_indices).unwrap();
2456        let result = result.downcast::<Int32Array>().unwrap();
2457
2458        let expected = vec![3, 5, 5, 3, 4];
2459        let actual = result.into_iter().flatten().collect::<Vec<_>>();
2460
2461        assert_eq!(expected, actual);
2462    }
2463
2464    #[test]
2465    fn test_take_value_index_from_fixed_list() {
2466        let list = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
2467            vec![
2468                Some(vec![Some(1), Some(2), None]),
2469                Some(vec![Some(4), None, Some(6)]),
2470                None,
2471                Some(vec![None, Some(8), Some(9)]),
2472            ],
2473            3,
2474        );
2475
2476        let indices = UInt32Array::from(vec![2, 1, 0]);
2477        let indexed = take_value_indices_from_fixed_size_list(&list, &indices, 3).unwrap();
2478
2479        assert_eq!(indexed, UInt32Array::from(vec![6, 7, 8, 3, 4, 5, 0, 1, 2]));
2480
2481        let indices = UInt32Array::from(vec![3, 2, 1, 2, 0]);
2482        let indexed = take_value_indices_from_fixed_size_list(&list, &indices, 3).unwrap();
2483
2484        assert_eq!(
2485            indexed,
2486            UInt32Array::from(vec![9, 10, 11, 6, 7, 8, 3, 4, 5, 6, 7, 8, 0, 1, 2])
2487        );
2488    }
2489
2490    #[test]
2491    fn test_take_null_indices() {
2492        // Build indices with values that are out of bounds, but masked by null mask
2493        let indices = Int32Array::new(
2494            vec![1, 2, 400, 400].into(),
2495            Some(NullBuffer::from(vec![true, true, false, false])),
2496        );
2497        let values = Int32Array::from(vec![1, 23, 4, 5]);
2498        let r = take(&values, &indices, None).unwrap();
2499        let values = r
2500            .as_primitive::<Int32Type>()
2501            .into_iter()
2502            .collect::<Vec<_>>();
2503        assert_eq!(&values, &[Some(23), Some(4), None, None])
2504    }
2505
2506    #[test]
2507    fn test_take_fixed_size_list_null_indices() {
2508        let indices = Int32Array::from_iter([Some(0), None]);
2509        let values = Arc::new(Int32Array::from(vec![0, 1, 2, 3]));
2510        let arr_field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
2511        let values = FixedSizeListArray::try_new(arr_field, 2, values, None).unwrap();
2512
2513        let r = take(&values, &indices, None).unwrap();
2514        let values = r
2515            .as_fixed_size_list()
2516            .values()
2517            .as_primitive::<Int32Type>()
2518            .into_iter()
2519            .collect::<Vec<_>>();
2520        assert_eq!(values, &[Some(0), Some(1), None, None])
2521    }
2522
2523    #[test]
2524    fn test_take_bytes_null_indices() {
2525        let indices = Int32Array::new(
2526            vec![0, 1, 400, 400].into(),
2527            Some(NullBuffer::from_iter(vec![true, true, false, false])),
2528        );
2529        let values = StringArray::from(vec![Some("foo"), None]);
2530        let r = take(&values, &indices, None).unwrap();
2531        let values = r.as_string::<i32>().iter().collect::<Vec<_>>();
2532        assert_eq!(&values, &[Some("foo"), None, None, None])
2533    }
2534
2535    #[test]
2536    fn test_take_union_sparse() {
2537        let structs = create_test_struct(vec![
2538            Some((Some(true), Some(42))),
2539            Some((Some(false), Some(28))),
2540            Some((Some(false), Some(19))),
2541            Some((Some(true), Some(31))),
2542            None,
2543        ]);
2544        let strings = StringArray::from(vec![Some("a"), None, Some("c"), None, Some("d")]);
2545        let type_ids = [1; 5].into_iter().collect::<ScalarBuffer<i8>>();
2546
2547        let union_fields = [
2548            (
2549                0,
2550                Arc::new(Field::new("f1", structs.data_type().clone(), true)),
2551            ),
2552            (
2553                1,
2554                Arc::new(Field::new("f2", strings.data_type().clone(), true)),
2555            ),
2556        ]
2557        .into_iter()
2558        .collect();
2559        let children = vec![Arc::new(structs) as Arc<dyn Array>, Arc::new(strings)];
2560        let array = UnionArray::try_new(union_fields, type_ids, None, children).unwrap();
2561
2562        let indices = vec![0, 3, 1, 0, 2, 4];
2563        let index = UInt32Array::from(indices.clone());
2564        let actual = take(&array, &index, None).unwrap();
2565        let actual = actual.as_any().downcast_ref::<UnionArray>().unwrap();
2566        let strings = actual.child(1);
2567        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
2568
2569        let actual = strings.iter().collect::<Vec<_>>();
2570        let expected = vec![Some("a"), None, None, Some("a"), Some("c"), Some("d")];
2571        assert_eq!(expected, actual);
2572    }
2573
2574    #[test]
2575    fn test_take_union_dense() {
2576        let type_ids = vec![0, 1, 1, 0, 0, 1, 0];
2577        let offsets = vec![0, 0, 1, 1, 2, 2, 3];
2578        let ints = vec![10, 20, 30, 40];
2579        let strings = vec![Some("a"), None, Some("c"), Some("d")];
2580
2581        let indices = vec![0, 3, 1, 0, 2, 4];
2582
2583        let taken_type_ids = vec![0, 0, 1, 0, 1, 0];
2584        let taken_offsets = vec![0, 1, 0, 2, 1, 3];
2585        let taken_ints = vec![10, 20, 10, 30];
2586        let taken_strings = vec![Some("a"), None];
2587
2588        let type_ids = <ScalarBuffer<i8>>::from(type_ids);
2589        let offsets = <ScalarBuffer<i32>>::from(offsets);
2590        let ints = UInt32Array::from(ints);
2591        let strings = StringArray::from(strings);
2592
2593        let union_fields = [
2594            (
2595                0,
2596                Arc::new(Field::new("f1", ints.data_type().clone(), true)),
2597            ),
2598            (
2599                1,
2600                Arc::new(Field::new("f2", strings.data_type().clone(), true)),
2601            ),
2602        ]
2603        .into_iter()
2604        .collect();
2605
2606        let array = UnionArray::try_new(
2607            union_fields,
2608            type_ids,
2609            Some(offsets),
2610            vec![Arc::new(ints), Arc::new(strings)],
2611        )
2612        .unwrap();
2613
2614        let index = UInt32Array::from(indices);
2615
2616        let actual = take(&array, &index, None).unwrap();
2617        let actual = actual.as_any().downcast_ref::<UnionArray>().unwrap();
2618
2619        assert_eq!(actual.offsets(), Some(&ScalarBuffer::from(taken_offsets)));
2620        assert_eq!(actual.type_ids(), &ScalarBuffer::from(taken_type_ids));
2621        assert_eq!(
2622            UInt32Array::from(actual.child(0).to_data()),
2623            UInt32Array::from(taken_ints)
2624        );
2625        assert_eq!(
2626            StringArray::from(actual.child(1).to_data()),
2627            StringArray::from(taken_strings)
2628        );
2629    }
2630
2631    #[test]
2632    fn test_take_union_dense_using_builder() {
2633        let mut builder = UnionBuilder::new_dense();
2634
2635        builder.append::<Int32Type>("a", 1).unwrap();
2636        builder.append::<Float64Type>("b", 3.0).unwrap();
2637        builder.append::<Int32Type>("a", 4).unwrap();
2638        builder.append::<Int32Type>("a", 5).unwrap();
2639        builder.append::<Float64Type>("b", 2.0).unwrap();
2640
2641        let union = builder.build().unwrap();
2642
2643        let indices = UInt32Array::from(vec![2, 0, 1, 2]);
2644
2645        let mut builder = UnionBuilder::new_dense();
2646
2647        builder.append::<Int32Type>("a", 4).unwrap();
2648        builder.append::<Int32Type>("a", 1).unwrap();
2649        builder.append::<Float64Type>("b", 3.0).unwrap();
2650        builder.append::<Int32Type>("a", 4).unwrap();
2651
2652        let taken = builder.build().unwrap();
2653
2654        assert_eq!(
2655            taken.to_data(),
2656            take(&union, &indices, None).unwrap().to_data()
2657        );
2658    }
2659
2660    #[test]
2661    fn test_take_union_dense_all_match_issue_6206() {
2662        let fields = UnionFields::from_fields(vec![Field::new("a", DataType::Int64, false)]);
2663        let ints = Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5]));
2664
2665        let array = UnionArray::try_new(
2666            fields,
2667            ScalarBuffer::from(vec![0_i8, 0, 0, 0, 0]),
2668            Some(ScalarBuffer::from_iter(0_i32..5)),
2669            vec![ints],
2670        )
2671        .unwrap();
2672
2673        let indicies = Int64Array::from(vec![0, 2, 4]);
2674        let array = take(&array, &indicies, None).unwrap();
2675        assert_eq!(array.len(), 3);
2676    }
2677
2678    #[test]
2679    fn test_take_bytes_offset_overflow() {
2680        let indices = Int32Array::from(vec![0; (i32::MAX >> 4) as usize]);
2681        let text = ('a'..='z').collect::<String>();
2682        let values = StringArray::from(vec![Some(text.clone())]);
2683        assert!(matches!(
2684            take(&values, &indices, None),
2685            Err(ArrowError::OffsetOverflowError(_))
2686        ));
2687    }
2688
2689    #[test]
2690    fn test_take_run_empty_indices() {
2691        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
2692        builder.extend([Some(1), Some(1), Some(2), Some(2)]);
2693        let run_array = builder.finish();
2694
2695        let logical_indices: PrimitiveArray<Int32Type> = PrimitiveArray::from(Vec::<i32>::new());
2696
2697        let result = take_impl(&run_array, &logical_indices).expect("take_run with empty indices");
2698
2699        // Verify the result is a valid empty RunArray
2700        assert_eq!(result.len(), 0);
2701        assert_eq!(result.null_count(), 0);
2702
2703        // Verify that the result can be downcast and used without validation errors
2704        // This specifically tests that "The values in run_ends array should be strictly positive" is not triggered
2705        let run_result = result
2706            .as_any()
2707            .downcast_ref::<RunArray<Int32Type>>()
2708            .expect("result should be a RunArray");
2709        assert_eq!(run_result.run_ends().len(), 0);
2710        assert_eq!(run_result.values().len(), 0);
2711    }
2712}