Skip to main content

arrow_select/
zip.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`zip`]: Combine values from two arrays based on boolean mask
19
20use crate::filter::{SlicesIterator, prep_null_mask_filter};
21use arrow_array::cast::AsArray;
22use arrow_array::types::{
23    BinaryType, BinaryViewType, ByteArrayType, ByteViewType, LargeBinaryType, LargeUtf8Type,
24    StringViewType, Utf8Type,
25};
26use arrow_array::*;
27use arrow_buffer::{
28    BooleanBuffer, Buffer, MutableBuffer, NullBuffer, OffsetBuffer, OffsetBufferBuilder,
29    ScalarBuffer, ToByteSlice,
30};
31use arrow_data::transform::MutableArrayData;
32use arrow_data::{ArrayData, ByteView};
33use arrow_schema::{ArrowError, DataType};
34use std::fmt::{Debug, Formatter};
35use std::hash::Hash;
36use std::marker::PhantomData;
37use std::ops::Not;
38use std::sync::{Arc, OnceLock};
39
40/// Zip two arrays by some boolean mask.
41///
42/// - Where `mask` is `true`, values of `truthy` are taken
43/// - Where `mask` is `false` or `NULL`, values of `falsy` are taken
44///
45/// # Example: `zip` two arrays
46/// ```
47/// # use std::sync::Arc;
48/// # use arrow_array::{ArrayRef, BooleanArray, Int32Array};
49/// # use arrow_select::zip::zip;
50/// // mask: [true, true, false, NULL, true]
51/// let mask = BooleanArray::from(vec![
52///   Some(true), Some(true), Some(false), None, Some(true)
53/// ]);
54/// // truthy array: [1, NULL, 3, 4, 5]
55/// let truthy = Int32Array::from(vec![
56///   Some(1), None, Some(3), Some(4), Some(5)
57/// ]);
58/// // falsy array: [10, 20, 30, 40, 50]
59/// let falsy = Int32Array::from(vec![
60///   Some(10), Some(20), Some(30), Some(40), Some(50)
61/// ]);
62/// // zip with this mask select the first, second and last value from `truthy`
63/// // and the third and fourth value from `falsy`
64/// let result = zip(&mask, &truthy, &falsy).unwrap();
65/// // Expected: [1, NULL, 30, 40, 5]
66/// let expected: ArrayRef = Arc::new(Int32Array::from(vec![
67///   Some(1), None, Some(30), Some(40), Some(5)
68/// ]));
69/// assert_eq!(&result, &expected);
70/// ```
71///
72/// # Example: `zip` and array with a scalar
73///
74/// Use `zip` to replace certain values in an array with a scalar
75///
76/// ```
77/// # use std::sync::Arc;
78/// # use arrow_array::{ArrayRef, BooleanArray, Int32Array};
79/// # use arrow_select::zip::zip;
80/// // mask: [true, true, false, NULL, true]
81/// let mask = BooleanArray::from(vec![
82///   Some(true), Some(true), Some(false), None, Some(true)
83/// ]);
84/// //  array: [1, NULL, 3, 4, 5]
85/// let arr = Int32Array::from(vec![
86///   Some(1), None, Some(3), Some(4), Some(5)
87/// ]);
88/// // scalar: 42
89/// let scalar = Int32Array::new_scalar(42);
90/// // zip the array with the  mask select the first, second and last value from `arr`
91/// // and fill the third and fourth value with the scalar 42
92/// let result = zip(&mask, &arr, &scalar).unwrap();
93/// // Expected: [1, NULL, 42, 42, 5]
94/// let expected: ArrayRef = Arc::new(Int32Array::from(vec![
95///   Some(1), None, Some(42), Some(42), Some(5)
96/// ]));
97/// assert_eq!(&result, &expected);
98/// ```
99pub fn zip(
100    mask: &BooleanArray,
101    truthy: &dyn Datum,
102    falsy: &dyn Datum,
103) -> Result<ArrayRef, ArrowError> {
104    let (truthy_array, truthy_is_scalar) = truthy.get();
105    let (falsy_array, falsy_is_scalar) = falsy.get();
106
107    if falsy_is_scalar && truthy_is_scalar {
108        let zipper = ScalarZipper::try_new(truthy, falsy)?;
109        return zipper.zip_impl.create_output(mask);
110    }
111
112    let truthy = truthy_array;
113    let falsy = falsy_array;
114
115    if truthy.data_type() != falsy.data_type() {
116        return Err(ArrowError::InvalidArgumentError(
117            "arguments need to have the same data type".into(),
118        ));
119    }
120
121    if truthy_is_scalar && truthy.len() != 1 {
122        return Err(ArrowError::InvalidArgumentError(
123            "scalar arrays must have 1 element".into(),
124        ));
125    }
126    if !truthy_is_scalar && truthy.len() != mask.len() {
127        return Err(ArrowError::InvalidArgumentError(
128            "all arrays should have the same length".into(),
129        ));
130    }
131    if falsy_is_scalar && falsy.len() != 1 {
132        return Err(ArrowError::InvalidArgumentError(
133            "scalar arrays must have 1 element".into(),
134        ));
135    }
136    if !falsy_is_scalar && falsy.len() != mask.len() {
137        return Err(ArrowError::InvalidArgumentError(
138            "all arrays should have the same length".into(),
139        ));
140    }
141
142    let falsy = falsy.to_data();
143    let truthy = truthy.to_data();
144
145    zip_impl(mask, &truthy, truthy_is_scalar, &falsy, falsy_is_scalar)
146}
147
148fn zip_impl(
149    mask: &BooleanArray,
150    truthy: &ArrayData,
151    truthy_is_scalar: bool,
152    falsy: &ArrayData,
153    falsy_is_scalar: bool,
154) -> Result<ArrayRef, ArrowError> {
155    let mut mutable = MutableArrayData::new(vec![truthy, falsy], false, truthy.len());
156
157    // the SlicesIterator slices only the true values. So the gaps left by this iterator we need to
158    // fill with falsy values
159
160    // keep track of how much is filled
161    let mut filled = 0;
162
163    let mask_buffer = maybe_prep_null_mask_filter(mask);
164    SlicesIterator::from(&mask_buffer).for_each(|(start, end)| {
165        // the gap needs to be filled with falsy values
166        if start > filled {
167            if falsy_is_scalar {
168                for _ in filled..start {
169                    // Copy the first item from the 'falsy' array into the output buffer.
170                    mutable.extend(1, 0, 1);
171                }
172            } else {
173                mutable.extend(1, filled, start);
174            }
175        }
176        // fill with truthy values
177        if truthy_is_scalar {
178            for _ in start..end {
179                // Copy the first item from the 'truthy' array into the output buffer.
180                mutable.extend(0, 0, 1);
181            }
182        } else {
183            mutable.extend(0, start, end);
184        }
185        filled = end;
186    });
187    // the remaining part is falsy
188    if filled < mask.len() {
189        if falsy_is_scalar {
190            for _ in filled..mask.len() {
191                // Copy the first item from the 'falsy' array into the output buffer.
192                mutable.extend(1, 0, 1);
193            }
194        } else {
195            mutable.extend(1, filled, mask.len());
196        }
197    }
198
199    let data = mutable.freeze();
200    Ok(make_array(data))
201}
202
203/// Zipper for 2 scalars
204///
205/// Useful for using in `IF <expr> THEN <scalar> ELSE <scalar> END` expressions
206///
207/// # Example
208/// ```
209/// # use std::sync::Arc;
210/// # use arrow_array::{ArrayRef, BooleanArray, Int32Array, Scalar, cast::AsArray, types::Int32Type};
211///
212/// # use arrow_select::zip::ScalarZipper;
213/// let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1));
214/// let scalar_falsy = Scalar::new(Int32Array::from_value(123, 1));
215/// let zipper = ScalarZipper::try_new(&scalar_truthy, &scalar_falsy).unwrap();
216///
217/// // Later when we have a boolean mask
218/// let mask = BooleanArray::from(vec![true, false, true, false, true]);
219/// let result = zipper.zip(&mask).unwrap();
220/// let actual = result.as_primitive::<Int32Type>();
221/// let expected = Int32Array::from(vec![Some(42), Some(123), Some(42), Some(123), Some(42)]);
222/// ```
223///
224#[derive(Debug, Clone)]
225pub struct ScalarZipper {
226    zip_impl: Arc<dyn ZipImpl>,
227}
228
229impl ScalarZipper {
230    /// Try to create a new ScalarZipper from two scalar Datum
231    ///
232    /// # Errors
233    /// returns error if:
234    /// - the two Datum have different data types
235    /// - either Datum is not a scalar (or has more than 1 element)
236    ///
237    pub fn try_new(truthy: &dyn Datum, falsy: &dyn Datum) -> Result<Self, ArrowError> {
238        let (truthy, truthy_is_scalar) = truthy.get();
239        let (falsy, falsy_is_scalar) = falsy.get();
240
241        if truthy.data_type() != falsy.data_type() {
242            return Err(ArrowError::InvalidArgumentError(
243                "arguments need to have the same data type".into(),
244            ));
245        }
246
247        if !truthy_is_scalar {
248            return Err(ArrowError::InvalidArgumentError(
249                "only scalar arrays are supported".into(),
250            ));
251        }
252
253        if !falsy_is_scalar {
254            return Err(ArrowError::InvalidArgumentError(
255                "only scalar arrays are supported".into(),
256            ));
257        }
258
259        if truthy.len() != 1 {
260            return Err(ArrowError::InvalidArgumentError(
261                "scalar arrays must have 1 element".into(),
262            ));
263        }
264        if falsy.len() != 1 {
265            return Err(ArrowError::InvalidArgumentError(
266                "scalar arrays must have 1 element".into(),
267            ));
268        }
269
270        macro_rules! primitive_size_helper {
271            ($t:ty) => {
272                Arc::new(PrimitiveScalarImpl::<$t>::new(truthy, falsy)) as Arc<dyn ZipImpl>
273            };
274        }
275
276        let zip_impl = downcast_primitive! {
277            truthy.data_type() => (primitive_size_helper),
278            DataType::Utf8 => {
279                Arc::new(BytesScalarImpl::<Utf8Type>::new(truthy, falsy)) as Arc<dyn ZipImpl>
280            },
281            DataType::LargeUtf8 => {
282                Arc::new(BytesScalarImpl::<LargeUtf8Type>::new(truthy, falsy)) as Arc<dyn ZipImpl>
283            },
284            DataType::Binary => {
285                Arc::new(BytesScalarImpl::<BinaryType>::new(truthy, falsy)) as Arc<dyn ZipImpl>
286            },
287            DataType::LargeBinary => {
288                Arc::new(BytesScalarImpl::<LargeBinaryType>::new(truthy, falsy)) as Arc<dyn ZipImpl>
289            },
290            DataType::Utf8View => {
291                Arc::new(ByteViewScalarImpl::<StringViewType>::new(truthy, falsy)) as Arc<dyn ZipImpl>
292            },
293            DataType::BinaryView => {
294                Arc::new(ByteViewScalarImpl::<BinaryViewType>::new(truthy, falsy)) as Arc<dyn ZipImpl>
295            },
296            _ => {
297                Arc::new(FallbackImpl::new(truthy, falsy)) as Arc<dyn ZipImpl>
298            },
299        };
300
301        Ok(Self { zip_impl })
302    }
303
304    /// Creating output array based on input boolean array and the two scalar values the zipper was created with
305    /// See struct level documentation for examples.
306    pub fn zip(&self, mask: &BooleanArray) -> Result<ArrayRef, ArrowError> {
307        self.zip_impl.create_output(mask)
308    }
309}
310
311/// Impl for creating output array based on a mask
312trait ZipImpl: Debug + Send + Sync {
313    /// Creating output array based on input boolean array
314    fn create_output(&self, input: &BooleanArray) -> Result<ArrayRef, ArrowError>;
315}
316
317#[derive(Debug, PartialEq)]
318struct FallbackImpl {
319    truthy: ArrayData,
320    falsy: ArrayData,
321}
322
323impl FallbackImpl {
324    fn new(left: &dyn Array, right: &dyn Array) -> Self {
325        Self {
326            truthy: left.to_data(),
327            falsy: right.to_data(),
328        }
329    }
330}
331
332impl ZipImpl for FallbackImpl {
333    fn create_output(&self, predicate: &BooleanArray) -> Result<ArrayRef, ArrowError> {
334        zip_impl(predicate, &self.truthy, true, &self.falsy, true)
335    }
336}
337
338struct PrimitiveScalarImpl<T: ArrowPrimitiveType> {
339    data_type: DataType,
340    truthy: Option<T::Native>,
341    falsy: Option<T::Native>,
342}
343
344impl<T: ArrowPrimitiveType> Debug for PrimitiveScalarImpl<T> {
345    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
346        f.debug_struct("PrimitiveScalarImpl")
347            .field("data_type", &self.data_type)
348            .field("truthy", &self.truthy)
349            .field("falsy", &self.falsy)
350            .finish()
351    }
352}
353
354impl<T: ArrowPrimitiveType> PrimitiveScalarImpl<T> {
355    fn new(truthy: &dyn Array, falsy: &dyn Array) -> Self {
356        Self {
357            data_type: truthy.data_type().clone(),
358            truthy: Self::get_value_from_scalar(truthy),
359            falsy: Self::get_value_from_scalar(falsy),
360        }
361    }
362
363    fn get_value_from_scalar(scalar: &dyn Array) -> Option<T::Native> {
364        if scalar.is_null(0) {
365            None
366        } else {
367            let value = scalar.as_primitive::<T>().value(0);
368
369            Some(value)
370        }
371    }
372
373    /// return an output array that has
374    /// `value` in all locations where predicate is true
375    /// `null` otherwise
376    fn get_scalar_and_null_buffer_for_single_non_nullable(
377        predicate: BooleanBuffer,
378        value: T::Native,
379    ) -> (Vec<T::Native>, Option<NullBuffer>) {
380        let result_len = predicate.len();
381        let nulls = NullBuffer::new(predicate);
382        let scalars = vec![value; result_len];
383
384        (scalars, Some(nulls))
385    }
386}
387
388impl<T: ArrowPrimitiveType> ZipImpl for PrimitiveScalarImpl<T> {
389    fn create_output(&self, predicate: &BooleanArray) -> Result<ArrayRef, ArrowError> {
390        let result_len = predicate.len();
391        // Nulls are treated as false
392        let predicate = maybe_prep_null_mask_filter(predicate);
393
394        let (scalars, nulls): (Vec<T::Native>, Option<NullBuffer>) = match (self.truthy, self.falsy)
395        {
396            (Some(truthy_val), Some(falsy_val)) => {
397                let scalars: Vec<T::Native> = predicate
398                    .iter()
399                    .map(|b| if b { truthy_val } else { falsy_val })
400                    .collect();
401
402                (scalars, None)
403            }
404            (Some(truthy_val), None) => {
405                // If a value is true we need the TRUTHY and the null buffer will have 1 (meaning not null)
406                // If a value is false we need the FALSY and the null buffer will have 0 (meaning null)
407
408                Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, truthy_val)
409            }
410            (None, Some(falsy_val)) => {
411                // Flipping the boolean buffer as we want the opposite of the TRUE case
412                //
413                // if the condition is true we want null so we need to NOT the value so we get 0 (meaning null)
414                // if the condition is false we want the FALSY value so we need to NOT the value so we get 1 (meaning not null)
415                let predicate = predicate.not();
416
417                Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, falsy_val)
418            }
419            (None, None) => {
420                // All values are null
421                let nulls = NullBuffer::new_null(result_len);
422                let scalars = vec![T::default_value(); result_len];
423
424                (scalars, Some(nulls))
425            }
426        };
427
428        let scalars = ScalarBuffer::<T::Native>::from(scalars);
429        let output = PrimitiveArray::<T>::try_new(scalars, nulls)?;
430
431        // Keep decimal precisions, scales or timestamps timezones
432        let output = output.with_data_type(self.data_type.clone());
433
434        Ok(Arc::new(output))
435    }
436}
437
438#[derive(PartialEq, Hash)]
439struct BytesScalarImpl<T: ByteArrayType> {
440    truthy: Option<Vec<u8>>,
441    falsy: Option<Vec<u8>>,
442    phantom: PhantomData<T>,
443}
444
445impl<T: ByteArrayType> Debug for BytesScalarImpl<T> {
446    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
447        f.debug_struct("BytesScalarImpl")
448            .field("truthy", &self.truthy)
449            .field("falsy", &self.falsy)
450            .finish()
451    }
452}
453
454impl<T: ByteArrayType> BytesScalarImpl<T> {
455    fn new(truthy_value: &dyn Array, falsy_value: &dyn Array) -> Self {
456        Self {
457            truthy: Self::get_value_from_scalar(truthy_value),
458            falsy: Self::get_value_from_scalar(falsy_value),
459            phantom: PhantomData,
460        }
461    }
462
463    fn get_value_from_scalar(scalar: &dyn Array) -> Option<Vec<u8>> {
464        if scalar.is_null(0) {
465            None
466        } else {
467            let bytes: &[u8] = scalar.as_bytes::<T>().value(0).as_ref();
468
469            Some(bytes.to_vec())
470        }
471    }
472
473    /// return an output array that has
474    /// `value` in all locations where predicate is true
475    /// `null` otherwise
476    fn get_scalar_and_null_buffer_for_single_non_nullable(
477        predicate: BooleanBuffer,
478        value: &[u8],
479    ) -> (Buffer, OffsetBuffer<T::Offset>, Option<NullBuffer>) {
480        let value_length = value.len();
481
482        let number_of_true = predicate.count_set_bits();
483
484        // Fast path for all nulls
485        if number_of_true == 0 {
486            // All values are null
487            let nulls = NullBuffer::new_null(predicate.len());
488
489            return (
490                // Empty bytes
491                Buffer::from(&[]),
492                // All nulls so all lengths are 0
493                OffsetBuffer::<T::Offset>::new_zeroed(predicate.len()),
494                Some(nulls),
495            );
496        }
497
498        let offsets = OffsetBuffer::<T::Offset>::from_lengths(
499            predicate.iter().map(|b| if b { value_length } else { 0 }),
500        );
501
502        let mut bytes = MutableBuffer::with_capacity(0);
503        bytes.repeat_slice_n_times(value, number_of_true);
504
505        let bytes = Buffer::from(bytes);
506
507        // If a value is true we need the TRUTHY and the null buffer will have 1 (meaning not null)
508        // If a value is false we need the FALSY and the null buffer will have 0 (meaning null)
509        let nulls = NullBuffer::new(predicate);
510
511        (bytes, offsets, Some(nulls))
512    }
513
514    /// Create a [`Buffer`] where `value` slice is repeated `number_of_values` times
515    /// and [`OffsetBuffer`] where there are `number_of_values` lengths, and all equals to `value` length
516    fn get_bytes_and_offset_for_all_same_value(
517        number_of_values: usize,
518        value: &[u8],
519    ) -> (Buffer, OffsetBuffer<T::Offset>) {
520        let value_length = value.len();
521
522        let offsets =
523            OffsetBuffer::<T::Offset>::from_repeated_length(value_length, number_of_values);
524
525        let mut bytes = MutableBuffer::with_capacity(0);
526        bytes.repeat_slice_n_times(value, number_of_values);
527        let bytes = Buffer::from(bytes);
528
529        (bytes, offsets)
530    }
531
532    fn create_output_on_non_nulls(
533        predicate: &BooleanBuffer,
534        truthy_val: &[u8],
535        falsy_val: &[u8],
536    ) -> (Buffer, OffsetBuffer<<T as ByteArrayType>::Offset>) {
537        let true_count = predicate.count_set_bits();
538
539        match true_count {
540            0 => {
541                // All values are falsy
542
543                let (bytes, offsets) =
544                    Self::get_bytes_and_offset_for_all_same_value(predicate.len(), falsy_val);
545
546                return (bytes, offsets);
547            }
548            n if n == predicate.len() => {
549                // All values are truthy
550                let (bytes, offsets) =
551                    Self::get_bytes_and_offset_for_all_same_value(predicate.len(), truthy_val);
552
553                return (bytes, offsets);
554            }
555
556            _ => {
557                // Fallback
558            }
559        }
560
561        let total_number_of_bytes =
562            true_count * truthy_val.len() + (predicate.len() - true_count) * falsy_val.len();
563        let mut mutable = MutableBuffer::with_capacity(total_number_of_bytes);
564        let mut offset_buffer_builder = OffsetBufferBuilder::<T::Offset>::new(predicate.len());
565
566        // keep track of how much is filled
567        let mut filled = 0;
568
569        let truthy_len = truthy_val.len();
570        let falsy_len = falsy_val.len();
571
572        SlicesIterator::from(predicate).for_each(|(start, end)| {
573            // the gap needs to be filled with falsy values
574            if start > filled {
575                let false_repeat_count = start - filled;
576                // Push false value `repeat_count` times
577                mutable.repeat_slice_n_times(falsy_val, false_repeat_count);
578
579                for _ in 0..false_repeat_count {
580                    offset_buffer_builder.push_length(falsy_len)
581                }
582            }
583
584            let true_repeat_count = end - start;
585            // fill with truthy values
586            mutable.repeat_slice_n_times(truthy_val, true_repeat_count);
587
588            for _ in 0..true_repeat_count {
589                offset_buffer_builder.push_length(truthy_len)
590            }
591            filled = end;
592        });
593        // the remaining part is falsy
594        if filled < predicate.len() {
595            let false_repeat_count = predicate.len() - filled;
596            // Copy the first item from the 'falsy' array into the output buffer.
597            mutable.repeat_slice_n_times(falsy_val, false_repeat_count);
598
599            for _ in 0..false_repeat_count {
600                offset_buffer_builder.push_length(falsy_len)
601            }
602        }
603
604        (mutable.into(), offset_buffer_builder.finish())
605    }
606}
607
608impl<T: ByteArrayType> ZipImpl for BytesScalarImpl<T> {
609    fn create_output(&self, predicate: &BooleanArray) -> Result<ArrayRef, ArrowError> {
610        let result_len = predicate.len();
611        // Nulls are treated as false
612        let predicate = maybe_prep_null_mask_filter(predicate);
613
614        let (bytes, offsets, nulls): (Buffer, OffsetBuffer<T::Offset>, Option<NullBuffer>) =
615            match (self.truthy.as_deref(), self.falsy.as_deref()) {
616                (Some(truthy_val), Some(falsy_val)) => {
617                    let (bytes, offsets) =
618                        Self::create_output_on_non_nulls(&predicate, truthy_val, falsy_val);
619
620                    (bytes, offsets, None)
621                }
622                (Some(truthy_val), None) => {
623                    Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, truthy_val)
624                }
625                (None, Some(falsy_val)) => {
626                    // Flipping the boolean buffer as we want the opposite of the TRUE case
627                    //
628                    // if the condition is true we want null so we need to NOT the value so we get 0 (meaning null)
629                    // if the condition is false we want the FALSE value so we need to NOT the value so we get 1 (meaning not null)
630                    let predicate = predicate.not();
631                    Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, falsy_val)
632                }
633                (None, None) => {
634                    // All values are null
635                    let nulls = NullBuffer::new_null(result_len);
636
637                    (
638                        // Empty bytes
639                        Buffer::from(&[]),
640                        // All nulls so all lengths are 0
641                        OffsetBuffer::<T::Offset>::new_zeroed(predicate.len()),
642                        Some(nulls),
643                    )
644                }
645            };
646
647        let output = unsafe {
648            // Safety: the values are based on valid inputs
649            // and `try_new` is expensive for strings as it validate that the input is valid utf8
650            GenericByteArray::<T>::new_unchecked(offsets, bytes, nulls)
651        };
652
653        Ok(Arc::new(output))
654    }
655}
656
657fn maybe_prep_null_mask_filter(predicate: &BooleanArray) -> BooleanBuffer {
658    // Nulls are treated as false
659    if predicate.null_count() == 0 {
660        predicate.values().clone()
661    } else {
662        let cleaned = prep_null_mask_filter(predicate);
663        let (boolean_buffer, _) = cleaned.into_parts();
664        boolean_buffer
665    }
666}
667
668struct ByteViewScalarImpl<T: ByteViewType> {
669    truthy_view: Option<u128>,
670    truthy_buffers: Arc<[Buffer]>,
671    falsy_view: Option<u128>,
672    falsy_buffers: Arc<[Buffer]>,
673    phantom: PhantomData<T>,
674}
675
676static EMPTY_ARC: OnceLock<Arc<[Buffer]>> = OnceLock::new();
677fn empty_arc_buffers() -> Arc<[Buffer]> {
678    Arc::clone(EMPTY_ARC.get_or_init(|| Arc::new([])))
679}
680
681impl<T: ByteViewType> ByteViewScalarImpl<T> {
682    fn new(truthy: &dyn Array, falsy: &dyn Array) -> Self {
683        let (truthy_view, truthy_buffers) = Self::get_value_from_scalar(truthy);
684        let (falsy_view, falsy_buffers) = Self::get_value_from_scalar(falsy);
685        Self {
686            truthy_view,
687            truthy_buffers,
688            falsy_view,
689            falsy_buffers,
690            phantom: PhantomData,
691        }
692    }
693
694    fn get_value_from_scalar(scalar: &dyn Array) -> (Option<u128>, Arc<[Buffer]>) {
695        if scalar.is_null(0) {
696            (None, empty_arc_buffers())
697        } else {
698            let (views, buffers, _) = scalar.as_byte_view::<T>().clone().into_parts();
699            (views.first().copied(), buffers)
700        }
701    }
702
703    fn get_views_for_single_non_nullable(
704        predicate: BooleanBuffer,
705        value: u128,
706        buffers: Arc<[Buffer]>,
707    ) -> (ScalarBuffer<u128>, Arc<[Buffer]>, Option<NullBuffer>) {
708        let number_of_true = predicate.count_set_bits();
709        let number_of_values = predicate.len();
710
711        // Fast path for all nulls
712        if number_of_true == 0 {
713            // All values are null
714            return (
715                vec![0; number_of_values].into(),
716                empty_arc_buffers(),
717                Some(NullBuffer::new_null(number_of_values)),
718            );
719        }
720        let bytes = vec![value; number_of_values];
721
722        // If value is true and we want to handle the TRUTHY case, the null buffer will have 1 (meaning not null)
723        // If value is false and we want to handle the FALSY case, the null buffer will have 0 (meaning null)
724        let nulls = NullBuffer::new(predicate);
725        (bytes.into(), buffers, Some(nulls))
726    }
727
728    fn get_views_for_non_nullable(
729        predicate: BooleanBuffer,
730        result_len: usize,
731        truthy_view: u128,
732        truthy_buffers: Arc<[Buffer]>,
733        falsy_view: u128,
734        falsy_buffers: Arc<[Buffer]>,
735    ) -> (ScalarBuffer<u128>, Arc<[Buffer]>, Option<NullBuffer>) {
736        let true_count = predicate.count_set_bits();
737        match true_count {
738            0 => {
739                // all values are falsy
740                (vec![falsy_view; result_len].into(), falsy_buffers, None)
741            }
742            n if n == predicate.len() => {
743                // all values are truthy
744                (vec![truthy_view; result_len].into(), truthy_buffers, None)
745            }
746            _ => {
747                let true_count = predicate.count_set_bits();
748                let mut buffers: Vec<Buffer> = truthy_buffers.to_vec();
749
750                // If the falsy buffers are empty, we can use the falsy view as it is, because the value
751                // is completely inlined. Otherwise, we have non-inlined values in the buffer, and we need
752                // to recalculate the falsy view
753                let view_falsy = if falsy_buffers.is_empty() {
754                    falsy_view
755                } else {
756                    let byte_view_falsy = ByteView::from(falsy_view);
757                    let new_index_falsy_buffers =
758                        buffers.len() as u32 + byte_view_falsy.buffer_index;
759                    buffers.extend(falsy_buffers.iter().cloned());
760                    let byte_view_falsy =
761                        byte_view_falsy.with_buffer_index(new_index_falsy_buffers);
762                    byte_view_falsy.as_u128()
763                };
764
765                let total_number_of_bytes = true_count * 16 + (predicate.len() - true_count) * 16;
766                let mut mutable = MutableBuffer::new(total_number_of_bytes);
767                let mut filled = 0;
768
769                SlicesIterator::from(&predicate).for_each(|(start, end)| {
770                    if start > filled {
771                        let false_repeat_count = start - filled;
772                        mutable
773                            .repeat_slice_n_times(view_falsy.to_byte_slice(), false_repeat_count);
774                    }
775                    let true_repeat_count = end - start;
776                    mutable.repeat_slice_n_times(truthy_view.to_byte_slice(), true_repeat_count);
777                    filled = end;
778                });
779
780                if filled < predicate.len() {
781                    let false_repeat_count = predicate.len() - filled;
782                    mutable.repeat_slice_n_times(view_falsy.to_byte_slice(), false_repeat_count);
783                }
784
785                let bytes = Buffer::from(mutable);
786                (bytes.into(), buffers.into(), None)
787            }
788        }
789    }
790}
791
792impl<T: ByteViewType> Debug for ByteViewScalarImpl<T> {
793    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
794        f.debug_struct("ByteViewScalarImpl")
795            .field("truthy", &self.truthy_view)
796            .field("falsy", &self.falsy_view)
797            .finish()
798    }
799}
800
801impl<T: ByteViewType> ZipImpl for ByteViewScalarImpl<T> {
802    fn create_output(&self, predicate: &BooleanArray) -> Result<ArrayRef, ArrowError> {
803        let result_len = predicate.len();
804        // Nulls are treated as false
805        let predicate = maybe_prep_null_mask_filter(predicate);
806
807        let (views, buffers, nulls) = match (self.truthy_view, self.falsy_view) {
808            (Some(truthy), Some(falsy)) => Self::get_views_for_non_nullable(
809                predicate,
810                result_len,
811                truthy,
812                Arc::clone(&self.truthy_buffers),
813                falsy,
814                Arc::clone(&self.falsy_buffers),
815            ),
816            (Some(truthy), None) => Self::get_views_for_single_non_nullable(
817                predicate,
818                truthy,
819                Arc::clone(&self.truthy_buffers),
820            ),
821            (None, Some(falsy)) => {
822                let predicate = predicate.not();
823                Self::get_views_for_single_non_nullable(
824                    predicate,
825                    falsy,
826                    Arc::clone(&self.falsy_buffers),
827                )
828            }
829            (None, None) => {
830                // All values are null
831                (
832                    vec![0; result_len].into(),
833                    empty_arc_buffers(),
834                    Some(NullBuffer::new_null(result_len)),
835                )
836            }
837        };
838
839        let result = unsafe { GenericByteViewArray::<T>::new_unchecked(views, buffers, nulls) };
840        Ok(Arc::new(result))
841    }
842}
843
844#[cfg(test)]
845mod test {
846    use super::*;
847    use arrow_array::types::Int32Type;
848
849    #[test]
850    fn test_zip_kernel_one() {
851        let a = Int32Array::from(vec![Some(5), None, Some(7), None, Some(1)]);
852        let b = Int32Array::from(vec![None, Some(3), Some(6), Some(7), Some(3)]);
853        let mask = BooleanArray::from(vec![true, true, false, false, true]);
854        let out = zip(&mask, &a, &b).unwrap();
855        let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
856        let expected = Int32Array::from(vec![Some(5), None, Some(6), Some(7), Some(1)]);
857        assert_eq!(actual, &expected);
858    }
859
860    #[test]
861    fn test_zip_kernel_two() {
862        let a = Int32Array::from(vec![Some(5), None, Some(7), None, Some(1)]);
863        let b = Int32Array::from(vec![None, Some(3), Some(6), Some(7), Some(3)]);
864        let mask = BooleanArray::from(vec![false, false, true, true, false]);
865        let out = zip(&mask, &a, &b).unwrap();
866        let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
867        let expected = Int32Array::from(vec![None, Some(3), Some(7), None, Some(3)]);
868        assert_eq!(actual, &expected);
869    }
870
871    #[test]
872    fn test_zip_kernel_scalar_falsy_1() {
873        let a = Int32Array::from(vec![Some(5), None, Some(7), None, Some(1)]);
874
875        let fallback = Scalar::new(Int32Array::from_value(42, 1));
876
877        let mask = BooleanArray::from(vec![true, true, false, false, true]);
878        let out = zip(&mask, &a, &fallback).unwrap();
879        let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
880        let expected = Int32Array::from(vec![Some(5), None, Some(42), Some(42), Some(1)]);
881        assert_eq!(actual, &expected);
882    }
883
884    #[test]
885    fn test_zip_kernel_scalar_falsy_2() {
886        let a = Int32Array::from(vec![Some(5), None, Some(7), None, Some(1)]);
887
888        let fallback = Scalar::new(Int32Array::from_value(42, 1));
889
890        let mask = BooleanArray::from(vec![false, false, true, true, false]);
891        let out = zip(&mask, &a, &fallback).unwrap();
892        let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
893        let expected = Int32Array::from(vec![Some(42), Some(42), Some(7), None, Some(42)]);
894        assert_eq!(actual, &expected);
895    }
896
897    #[test]
898    fn test_zip_kernel_scalar_truthy_1() {
899        let a = Int32Array::from(vec![Some(5), None, Some(7), None, Some(1)]);
900
901        let fallback = Scalar::new(Int32Array::from_value(42, 1));
902
903        let mask = BooleanArray::from(vec![true, true, false, false, true]);
904        let out = zip(&mask, &fallback, &a).unwrap();
905        let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
906        let expected = Int32Array::from(vec![Some(42), Some(42), Some(7), None, Some(42)]);
907        assert_eq!(actual, &expected);
908    }
909
910    #[test]
911    fn test_zip_kernel_scalar_truthy_2() {
912        let a = Int32Array::from(vec![Some(5), None, Some(7), None, Some(1)]);
913
914        let fallback = Scalar::new(Int32Array::from_value(42, 1));
915
916        let mask = BooleanArray::from(vec![false, false, true, true, false]);
917        let out = zip(&mask, &fallback, &a).unwrap();
918        let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
919        let expected = Int32Array::from(vec![Some(5), None, Some(42), Some(42), Some(1)]);
920        assert_eq!(actual, &expected);
921    }
922
923    #[test]
924    fn test_zip_kernel_scalar_both_mask_ends_with_true() {
925        let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1));
926        let scalar_falsy = Scalar::new(Int32Array::from_value(123, 1));
927
928        let mask = BooleanArray::from(vec![true, true, false, false, true]);
929        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
930        let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
931        let expected = Int32Array::from(vec![Some(42), Some(42), Some(123), Some(123), Some(42)]);
932        assert_eq!(actual, &expected);
933    }
934
935    #[test]
936    fn test_zip_kernel_scalar_both_mask_ends_with_false() {
937        let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1));
938        let scalar_falsy = Scalar::new(Int32Array::from_value(123, 1));
939
940        let mask = BooleanArray::from(vec![true, true, false, true, false, false]);
941        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
942        let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
943        let expected = Int32Array::from(vec![
944            Some(42),
945            Some(42),
946            Some(123),
947            Some(42),
948            Some(123),
949            Some(123),
950        ]);
951        assert_eq!(actual, &expected);
952    }
953
954    #[test]
955    fn test_zip_kernel_primitive_scalar_none_1() {
956        let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1));
957        let scalar_falsy = Scalar::new(Int32Array::new_null(1));
958
959        let mask = BooleanArray::from(vec![true, true, false, false, true]);
960        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
961        let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
962        let expected = Int32Array::from(vec![Some(42), Some(42), None, None, Some(42)]);
963        assert_eq!(actual, &expected);
964    }
965
966    #[test]
967    fn test_zip_kernel_primitive_scalar_none_2() {
968        let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1));
969        let scalar_falsy = Scalar::new(Int32Array::new_null(1));
970
971        let mask = BooleanArray::from(vec![false, false, true, true, false]);
972        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
973        let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
974        let expected = Int32Array::from(vec![None, None, Some(42), Some(42), None]);
975        assert_eq!(actual, &expected);
976    }
977
978    #[test]
979    fn test_zip_kernel_primitive_scalar_both_null() {
980        let scalar_truthy = Scalar::new(Int32Array::new_null(1));
981        let scalar_falsy = Scalar::new(Int32Array::new_null(1));
982
983        let mask = BooleanArray::from(vec![false, false, true, true, false]);
984        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
985        let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
986        let expected = Int32Array::from(vec![None, None, None, None, None]);
987        assert_eq!(actual, &expected);
988    }
989
990    #[test]
991    fn test_zip_primitive_array_with_nulls_is_mask_should_be_treated_as_false() {
992        let truthy = Int32Array::from_iter_values(vec![1, 2, 3, 4, 5, 6]);
993        let falsy = Int32Array::from_iter_values(vec![7, 8, 9, 10, 11, 12]);
994
995        let mask = {
996            let booleans = BooleanBuffer::from(vec![true, true, false, true, false, false]);
997            let nulls = NullBuffer::from(vec![
998                true, true, true,
999                false, // null treated as false even though in the original mask it was true
1000                true, true,
1001            ]);
1002            BooleanArray::new(booleans, Some(nulls))
1003        };
1004        let out = zip(&mask, &truthy, &falsy).unwrap();
1005        let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
1006        let expected = Int32Array::from(vec![
1007            Some(1),
1008            Some(2),
1009            Some(9),
1010            Some(10), // true in mask but null
1011            Some(11),
1012            Some(12),
1013        ]);
1014        assert_eq!(actual, &expected);
1015    }
1016
1017    #[test]
1018    fn test_zip_kernel_primitive_scalar_with_boolean_array_mask_with_nulls_should_be_treated_as_false()
1019     {
1020        let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1));
1021        let scalar_falsy = Scalar::new(Int32Array::from_value(123, 1));
1022
1023        let mask = {
1024            let booleans = BooleanBuffer::from(vec![true, true, false, true, false, false]);
1025            let nulls = NullBuffer::from(vec![
1026                true, true, true,
1027                false, // null treated as false even though in the original mask it was true
1028                true, true,
1029            ]);
1030            BooleanArray::new(booleans, Some(nulls))
1031        };
1032        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1033        let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
1034        let expected = Int32Array::from(vec![
1035            Some(42),
1036            Some(42),
1037            Some(123),
1038            Some(123), // true in mask but null
1039            Some(123),
1040            Some(123),
1041        ]);
1042        assert_eq!(actual, &expected);
1043    }
1044
1045    #[test]
1046    fn test_zip_string_array_with_nulls_is_mask_should_be_treated_as_false() {
1047        let truthy = StringArray::from_iter_values(vec!["1", "2", "3", "4", "5", "6"]);
1048        let falsy = StringArray::from_iter_values(vec!["7", "8", "9", "10", "11", "12"]);
1049
1050        let mask = {
1051            let booleans = BooleanBuffer::from(vec![true, true, false, true, false, false]);
1052            let nulls = NullBuffer::from(vec![
1053                true, true, true,
1054                false, // null treated as false even though in the original mask it was true
1055                true, true,
1056            ]);
1057            BooleanArray::new(booleans, Some(nulls))
1058        };
1059        let out = zip(&mask, &truthy, &falsy).unwrap();
1060        let actual = out.as_string::<i32>();
1061        let expected = StringArray::from_iter_values(vec![
1062            "1", "2", "9", "10", // true in mask but null
1063            "11", "12",
1064        ]);
1065        assert_eq!(actual, &expected);
1066    }
1067
1068    #[test]
1069    fn test_zip_kernel_large_string_scalar_with_boolean_array_mask_with_nulls_should_be_treated_as_false()
1070     {
1071        let scalar_truthy = Scalar::new(LargeStringArray::from_iter_values(["test"]));
1072        let scalar_falsy = Scalar::new(LargeStringArray::from_iter_values(["something else"]));
1073
1074        let mask = {
1075            let booleans = BooleanBuffer::from(vec![true, true, false, true, false, false]);
1076            let nulls = NullBuffer::from(vec![
1077                true, true, true,
1078                false, // null treated as false even though in the original mask it was true
1079                true, true,
1080            ]);
1081            BooleanArray::new(booleans, Some(nulls))
1082        };
1083        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1084        let actual = out.as_any().downcast_ref::<LargeStringArray>().unwrap();
1085        let expected = LargeStringArray::from_iter(vec![
1086            Some("test"),
1087            Some("test"),
1088            Some("something else"),
1089            Some("something else"), // true in mask but null
1090            Some("something else"),
1091            Some("something else"),
1092        ]);
1093        assert_eq!(actual, &expected);
1094    }
1095
1096    #[test]
1097    fn test_zip_kernel_bytes_scalar_none_1() {
1098        let scalar_truthy = Scalar::new(StringArray::from_iter_values(["hello"]));
1099        let scalar_falsy = Scalar::new(StringArray::new_null(1));
1100
1101        let mask = BooleanArray::from(vec![true, true, false, false, true]);
1102        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1103        let actual = out.as_any().downcast_ref::<StringArray>().unwrap();
1104        let expected = StringArray::from_iter(vec![
1105            Some("hello"),
1106            Some("hello"),
1107            None,
1108            None,
1109            Some("hello"),
1110        ]);
1111        assert_eq!(actual, &expected);
1112    }
1113
1114    #[test]
1115    fn test_zip_kernel_bytes_scalar_none_2() {
1116        let scalar_truthy = Scalar::new(StringArray::new_null(1));
1117        let scalar_falsy = Scalar::new(StringArray::from_iter_values(["hello"]));
1118
1119        let mask = BooleanArray::from(vec![true, true, false, false, true]);
1120        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1121        let actual = out.as_any().downcast_ref::<StringArray>().unwrap();
1122        let expected = StringArray::from_iter(vec![None, None, Some("hello"), Some("hello"), None]);
1123        assert_eq!(actual, &expected);
1124    }
1125
1126    #[test]
1127    fn test_zip_kernel_bytes_scalar_both() {
1128        let scalar_truthy = Scalar::new(StringArray::from_iter_values(["test"]));
1129        let scalar_falsy = Scalar::new(StringArray::from_iter_values(["something else"]));
1130
1131        // mask ends with false
1132        let mask = BooleanArray::from(vec![true, true, false, true, false, false]);
1133        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1134        let actual = out.as_any().downcast_ref::<StringArray>().unwrap();
1135        let expected = StringArray::from_iter(vec![
1136            Some("test"),
1137            Some("test"),
1138            Some("something else"),
1139            Some("test"),
1140            Some("something else"),
1141            Some("something else"),
1142        ]);
1143        assert_eq!(actual, &expected);
1144    }
1145
1146    #[test]
1147    fn test_zip_scalar_bytes_only_taking_one_side() {
1148        let mask_len = 5;
1149        let all_true_mask = BooleanArray::from(vec![true; mask_len]);
1150        let all_false_mask = BooleanArray::from(vec![false; mask_len]);
1151
1152        let null_scalar = Scalar::new(StringArray::new_null(1));
1153        let non_null_scalar_1 = Scalar::new(StringArray::from_iter_values(["test"]));
1154        let non_null_scalar_2 = Scalar::new(StringArray::from_iter_values(["something else"]));
1155
1156        {
1157            // 1. Test where left is null and right is non-null
1158            //    and mask is all true
1159            let out = zip(&all_true_mask, &null_scalar, &non_null_scalar_1).unwrap();
1160            let actual = out.as_string::<i32>();
1161            let expected = StringArray::from_iter(std::iter::repeat_n(None::<&str>, mask_len));
1162            assert_eq!(actual, &expected);
1163        }
1164
1165        {
1166            // 2. Test where left is null and right is non-null
1167            //    and mask is all false
1168            let out = zip(&all_false_mask, &null_scalar, &non_null_scalar_1).unwrap();
1169            let actual = out.as_string::<i32>();
1170            let expected = StringArray::from_iter(std::iter::repeat_n(Some("test"), mask_len));
1171            assert_eq!(actual, &expected);
1172        }
1173
1174        {
1175            // 3. Test where left is non-null and right is null
1176            //    and mask is all true
1177            let out = zip(&all_true_mask, &non_null_scalar_1, &null_scalar).unwrap();
1178            let actual = out.as_string::<i32>();
1179            let expected = StringArray::from_iter(std::iter::repeat_n(Some("test"), mask_len));
1180            assert_eq!(actual, &expected);
1181        }
1182
1183        {
1184            // 4. Test where left is non-null and right is null
1185            //    and mask is all false
1186            let out = zip(&all_false_mask, &non_null_scalar_1, &null_scalar).unwrap();
1187            let actual = out.as_string::<i32>();
1188            let expected = StringArray::from_iter(std::iter::repeat_n(None::<&str>, mask_len));
1189            assert_eq!(actual, &expected);
1190        }
1191
1192        {
1193            // 5. Test where both left and right are not null
1194            //    and mask is all true
1195            let out = zip(&all_true_mask, &non_null_scalar_1, &non_null_scalar_2).unwrap();
1196            let actual = out.as_string::<i32>();
1197            let expected = StringArray::from_iter(std::iter::repeat_n(Some("test"), mask_len));
1198            assert_eq!(actual, &expected);
1199        }
1200
1201        {
1202            // 6. Test where both left and right are not null
1203            //    and mask is all false
1204            let out = zip(&all_false_mask, &non_null_scalar_1, &non_null_scalar_2).unwrap();
1205            let actual = out.as_string::<i32>();
1206            let expected =
1207                StringArray::from_iter(std::iter::repeat_n(Some("something else"), mask_len));
1208            assert_eq!(actual, &expected);
1209        }
1210
1211        {
1212            // 7. Test where both left and right are null
1213            //    and mask is random
1214            let mask = BooleanArray::from(vec![true, false, true, false, true]);
1215            let out = zip(&mask, &null_scalar, &null_scalar).unwrap();
1216            let actual = out.as_string::<i32>();
1217            let expected = StringArray::from_iter(std::iter::repeat_n(None::<&str>, mask_len));
1218            assert_eq!(actual, &expected);
1219        }
1220    }
1221
1222    #[test]
1223    fn test_scalar_zipper() {
1224        let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1));
1225        let scalar_falsy = Scalar::new(Int32Array::from_value(123, 1));
1226
1227        let mask = BooleanArray::from(vec![false, false, true, true, false]);
1228
1229        let scalar_zipper = ScalarZipper::try_new(&scalar_truthy, &scalar_falsy).unwrap();
1230        let out = scalar_zipper.zip(&mask).unwrap();
1231        let actual = out.as_primitive::<Int32Type>();
1232        let expected = Int32Array::from(vec![Some(123), Some(123), Some(42), Some(42), Some(123)]);
1233        assert_eq!(actual, &expected);
1234
1235        // test with different mask length as well
1236        let mask = BooleanArray::from(vec![true, false, true]);
1237        let out = scalar_zipper.zip(&mask).unwrap();
1238        let actual = out.as_primitive::<Int32Type>();
1239        let expected = Int32Array::from(vec![Some(42), Some(123), Some(42)]);
1240        assert_eq!(actual, &expected);
1241    }
1242
1243    #[test]
1244    fn test_zip_kernel_scalar_strings() {
1245        let scalar_truthy = Scalar::new(StringArray::from(vec!["hello"]));
1246        let scalar_falsy = Scalar::new(StringArray::from(vec!["world"]));
1247
1248        let mask = BooleanArray::from(vec![true, false, true, false, true]);
1249        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1250        let actual = out.as_string::<i32>();
1251        let expected = StringArray::from(vec![
1252            Some("hello"),
1253            Some("world"),
1254            Some("hello"),
1255            Some("world"),
1256            Some("hello"),
1257        ]);
1258        assert_eq!(actual, &expected);
1259    }
1260
1261    #[test]
1262    fn test_zip_kernel_scalar_binary() {
1263        let truthy_bytes: &[u8] = b"\xFF\xFE\xFD";
1264        let falsy_bytes: &[u8] = b"world";
1265        let scalar_truthy = Scalar::new(BinaryArray::from_iter_values(
1266            // Non valid UTF8 bytes
1267            vec![truthy_bytes],
1268        ));
1269        let scalar_falsy = Scalar::new(BinaryArray::from_iter_values(vec![falsy_bytes]));
1270
1271        let mask = BooleanArray::from(vec![true, false, true, false, true]);
1272        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1273        let actual = out.as_binary::<i32>();
1274        let expected = BinaryArray::from(vec![
1275            Some(truthy_bytes),
1276            Some(falsy_bytes),
1277            Some(truthy_bytes),
1278            Some(falsy_bytes),
1279            Some(truthy_bytes),
1280        ]);
1281        assert_eq!(actual, &expected);
1282    }
1283
1284    #[test]
1285    fn test_zip_kernel_scalar_large_binary() {
1286        let truthy_bytes: &[u8] = b"hey";
1287        let falsy_bytes: &[u8] = b"world";
1288        let scalar_truthy = Scalar::new(LargeBinaryArray::from_iter_values(vec![truthy_bytes]));
1289        let scalar_falsy = Scalar::new(LargeBinaryArray::from_iter_values(vec![falsy_bytes]));
1290
1291        let mask = BooleanArray::from(vec![true, false, true, false, true]);
1292        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1293        let actual = out.as_binary::<i64>();
1294        let expected = LargeBinaryArray::from(vec![
1295            Some(truthy_bytes),
1296            Some(falsy_bytes),
1297            Some(truthy_bytes),
1298            Some(falsy_bytes),
1299            Some(truthy_bytes),
1300        ]);
1301        assert_eq!(actual, &expected);
1302    }
1303
1304    // Test to ensure that the precision and scale are kept when zipping Decimal128 data
1305    #[test]
1306    fn test_zip_decimal_with_custom_precision_and_scale() {
1307        let arr = Decimal128Array::from_iter_values([12345, 456, 7890, -123223423432432])
1308            .with_precision_and_scale(20, 2)
1309            .unwrap();
1310
1311        let arr: ArrayRef = Arc::new(arr);
1312
1313        let scalar_1 = Scalar::new(arr.slice(0, 1));
1314        let scalar_2 = Scalar::new(arr.slice(1, 1));
1315        let null_scalar = Scalar::new(new_null_array(arr.data_type(), 1));
1316        let array_1: ArrayRef = arr.slice(0, 2);
1317        let array_2: ArrayRef = arr.slice(2, 2);
1318
1319        test_zip_output_data_types_for_input(scalar_1, scalar_2, null_scalar, array_1, array_2);
1320    }
1321
1322    // Test to ensure that the timezone is kept when zipping TimestampArray data
1323    #[test]
1324    fn test_zip_timestamp_with_timezone() {
1325        let arr = TimestampSecondArray::from(vec![0, 1000, 2000, 4000])
1326            .with_timezone("+01:00".to_string());
1327
1328        let arr: ArrayRef = Arc::new(arr);
1329
1330        let scalar_1 = Scalar::new(arr.slice(0, 1));
1331        let scalar_2 = Scalar::new(arr.slice(1, 1));
1332        let null_scalar = Scalar::new(new_null_array(arr.data_type(), 1));
1333        let array_1: ArrayRef = arr.slice(0, 2);
1334        let array_2: ArrayRef = arr.slice(2, 2);
1335
1336        test_zip_output_data_types_for_input(scalar_1, scalar_2, null_scalar, array_1, array_2);
1337    }
1338
1339    fn test_zip_output_data_types_for_input(
1340        scalar_1: Scalar<ArrayRef>,
1341        scalar_2: Scalar<ArrayRef>,
1342        null_scalar: Scalar<ArrayRef>,
1343        array_1: ArrayRef,
1344        array_2: ArrayRef,
1345    ) {
1346        // non null Scalar vs non null Scalar
1347        test_zip_output_data_type(&scalar_1, &scalar_2, 10);
1348
1349        // null Scalar vs non-null Scalar (and vice versa)
1350        test_zip_output_data_type(&null_scalar, &scalar_1, 10);
1351        test_zip_output_data_type(&scalar_1, &null_scalar, 10);
1352
1353        // non-null Scalar and array (and vice versa)
1354        test_zip_output_data_type(&array_1.as_ref(), &scalar_1, array_1.len());
1355        test_zip_output_data_type(&scalar_1, &array_1.as_ref(), array_1.len());
1356
1357        // Array and null scalar (and vice versa)
1358        test_zip_output_data_type(&array_1.as_ref(), &null_scalar, array_1.len());
1359
1360        test_zip_output_data_type(&null_scalar, &array_1.as_ref(), array_1.len());
1361
1362        // Both arrays
1363        test_zip_output_data_type(&array_1.as_ref(), &array_2.as_ref(), array_1.len());
1364    }
1365
1366    fn test_zip_output_data_type(truthy: &dyn Datum, falsy: &dyn Datum, mask_length: usize) {
1367        let expected_data_type = truthy.get().0.data_type().clone();
1368        assert_eq!(&expected_data_type, falsy.get().0.data_type());
1369
1370        // Try different masks to test different paths
1371        let mask_all_true = BooleanArray::from(vec![true; mask_length]);
1372        let mask_all_false = BooleanArray::from(vec![false; mask_length]);
1373        let mask_some_true_and_false =
1374            BooleanArray::from((0..mask_length).map(|i| i % 2 == 0).collect::<Vec<bool>>());
1375
1376        for mask in [&mask_all_true, &mask_all_false, &mask_some_true_and_false] {
1377            let out = zip(mask, truthy, falsy).unwrap();
1378            assert_eq!(out.data_type(), &expected_data_type);
1379        }
1380    }
1381
1382    #[test]
1383    fn zip_scalar_fallback_impl() {
1384        let truthy_list_item_scalar = Some(vec![Some(1), None, Some(3)]);
1385        let truthy_list_array_scalar =
1386            Scalar::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1387                truthy_list_item_scalar.clone(),
1388            ]));
1389        let falsy_list_item_scalar = Some(vec![None, Some(2), Some(4)]);
1390        let falsy_list_array_scalar =
1391            Scalar::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1392                falsy_list_item_scalar.clone(),
1393            ]));
1394        let mask = BooleanArray::from(vec![true, false, true, false, false, true, false]);
1395        let out = zip(&mask, &truthy_list_array_scalar, &falsy_list_array_scalar).unwrap();
1396        let actual = out.as_list::<i32>();
1397
1398        let expected = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1399            truthy_list_item_scalar.clone(),
1400            falsy_list_item_scalar.clone(),
1401            truthy_list_item_scalar.clone(),
1402            falsy_list_item_scalar.clone(),
1403            falsy_list_item_scalar.clone(),
1404            truthy_list_item_scalar.clone(),
1405            falsy_list_item_scalar.clone(),
1406        ]);
1407        assert_eq!(actual, &expected);
1408    }
1409
1410    #[test]
1411    fn test_zip_kernel_scalar_strings_array_view() {
1412        let scalar_truthy = Scalar::new(StringViewArray::from(vec!["hello"]));
1413        let scalar_falsy = Scalar::new(StringViewArray::from(vec!["world"]));
1414
1415        let mask = BooleanArray::from(vec![true, false, true, false]);
1416        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1417        let actual = out.as_string_view();
1418        let expected = StringViewArray::from(vec![
1419            Some("hello"),
1420            Some("world"),
1421            Some("hello"),
1422            Some("world"),
1423        ]);
1424        assert_eq!(actual, &expected);
1425    }
1426
1427    #[test]
1428    fn test_zip_kernel_scalar_binary_array_view() {
1429        let scalar_truthy = Scalar::new(BinaryViewArray::from_iter_values(vec![b"hello"]));
1430        let scalar_falsy = Scalar::new(BinaryViewArray::from_iter_values(vec![b"world"]));
1431
1432        let mask = BooleanArray::from(vec![true, false]);
1433        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1434        let actual = out.as_byte_view();
1435        let expected = BinaryViewArray::from_iter_values(vec![b"hello", b"world"]);
1436        assert_eq!(actual, &expected);
1437    }
1438
1439    #[test]
1440    fn test_zip_kernel_scalar_strings_array_view_with_nulls() {
1441        let scalar_truthy = Scalar::new(StringViewArray::from_iter_values(["hello"]));
1442        let scalar_falsy = Scalar::new(StringViewArray::new_null(1));
1443
1444        let mask = BooleanArray::from(vec![true, true, false, false, true]);
1445        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1446        let actual = out.as_any().downcast_ref::<StringViewArray>().unwrap();
1447        let expected = StringViewArray::from_iter(vec![
1448            Some("hello"),
1449            Some("hello"),
1450            None,
1451            None,
1452            Some("hello"),
1453        ]);
1454        assert_eq!(actual, &expected);
1455    }
1456
1457    #[test]
1458    fn test_zip_kernel_scalar_strings_array_view_all_true_null() {
1459        let scalar_truthy = Scalar::new(StringViewArray::new_null(1));
1460        let scalar_falsy = Scalar::new(StringViewArray::new_null(1));
1461        let mask = BooleanArray::from(vec![true, true]);
1462        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1463        let actual = out.as_any().downcast_ref::<StringViewArray>().unwrap();
1464        let expected = StringViewArray::from_iter(vec![None::<String>, None]);
1465        assert_eq!(actual, &expected);
1466    }
1467
1468    #[test]
1469    fn test_zip_kernel_scalar_strings_array_view_all_false_null() {
1470        let scalar_truthy = Scalar::new(StringViewArray::new_null(1));
1471        let scalar_falsy = Scalar::new(StringViewArray::new_null(1));
1472        let mask = BooleanArray::from(vec![false, false]);
1473        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1474        let actual = out.as_any().downcast_ref::<StringViewArray>().unwrap();
1475        let expected = StringViewArray::from_iter(vec![None::<String>, None]);
1476        assert_eq!(actual, &expected);
1477    }
1478
1479    #[test]
1480    fn test_zip_kernel_scalar_string_array_view_all_true() {
1481        let scalar_truthy = Scalar::new(StringViewArray::from(vec!["hello"]));
1482        let scalar_falsy = Scalar::new(StringViewArray::from(vec!["world"]));
1483
1484        let mask = BooleanArray::from(vec![true, true]);
1485        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1486        let actual = out.as_string_view();
1487        let expected = StringViewArray::from(vec![Some("hello"), Some("hello")]);
1488        assert_eq!(actual, &expected);
1489    }
1490
1491    #[test]
1492    fn test_zip_kernel_scalar_string_array_view_all_false() {
1493        let scalar_truthy = Scalar::new(StringViewArray::from(vec!["hello"]));
1494        let scalar_falsy = Scalar::new(StringViewArray::from(vec!["world"]));
1495
1496        let mask = BooleanArray::from(vec![false, false]);
1497        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1498        let actual = out.as_string_view();
1499        let expected = StringViewArray::from(vec![Some("world"), Some("world")]);
1500        assert_eq!(actual, &expected);
1501    }
1502
1503    #[test]
1504    fn test_zip_kernel_scalar_strings_large_strings() {
1505        let scalar_truthy = Scalar::new(StringViewArray::from(vec!["longer than 12 bytes"]));
1506        let scalar_falsy = Scalar::new(StringViewArray::from(vec!["another longer than 12 bytes"]));
1507
1508        let mask = BooleanArray::from(vec![true, false]);
1509        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1510        let actual = out.as_string_view();
1511        let expected = StringViewArray::from(vec![
1512            Some("longer than 12 bytes"),
1513            Some("another longer than 12 bytes"),
1514        ]);
1515        assert_eq!(actual, &expected);
1516    }
1517
1518    #[test]
1519    fn test_zip_kernel_scalar_strings_array_view_large_short_strings() {
1520        let scalar_truthy = Scalar::new(StringViewArray::from(vec!["hello"]));
1521        let scalar_falsy = Scalar::new(StringViewArray::from(vec!["longer than 12 bytes"]));
1522
1523        let mask = BooleanArray::from(vec![true, false, true, false]);
1524        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1525        let actual = out.as_string_view();
1526        let expected = StringViewArray::from(vec![
1527            Some("hello"),
1528            Some("longer than 12 bytes"),
1529            Some("hello"),
1530            Some("longer than 12 bytes"),
1531        ]);
1532        assert_eq!(actual, &expected);
1533    }
1534    #[test]
1535    fn test_zip_kernel_scalar_strings_array_view_large_all_true() {
1536        let scalar_truthy = Scalar::new(StringViewArray::from(vec!["longer than 12 bytes"]));
1537        let scalar_falsy = Scalar::new(StringViewArray::from(vec!["another longer than 12 bytes"]));
1538
1539        let mask = BooleanArray::from(vec![true, true]);
1540        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1541        let actual = out.as_string_view();
1542        let expected = StringViewArray::from(vec![
1543            Some("longer than 12 bytes"),
1544            Some("longer than 12 bytes"),
1545        ]);
1546        assert_eq!(actual, &expected);
1547    }
1548
1549    #[test]
1550    fn test_zip_kernel_scalar_strings_array_view_large_all_false() {
1551        let scalar_truthy = Scalar::new(StringViewArray::from(vec!["longer than 12 bytes"]));
1552        let scalar_falsy = Scalar::new(StringViewArray::from(vec!["another longer than 12 bytes"]));
1553
1554        let mask = BooleanArray::from(vec![false, false]);
1555        let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1556        let actual = out.as_string_view();
1557        let expected = StringViewArray::from(vec![
1558            Some("another longer than 12 bytes"),
1559            Some("another longer than 12 bytes"),
1560        ]);
1561        assert_eq!(actual, &expected);
1562    }
1563}