1use arrow_array::{Array, ArrayRef, BooleanArray, make_array};
21use arrow_buffer::buffer::{bitwise_bin_op_helper, bitwise_unary_op_helper};
22use arrow_buffer::{BooleanBuffer, NullBuffer};
23use arrow_schema::{ArrowError, DataType};
24
25pub fn nullif(left: &dyn Array, right: &BooleanArray) -> Result<ArrayRef, ArrowError> {
45    let left_data = left.to_data();
46
47    if left_data.len() != right.len() {
48        return Err(ArrowError::ComputeError(
49            "Cannot perform comparison operation on arrays of different length".to_string(),
50        ));
51    }
52    let len = left_data.len();
53
54    if len == 0 || left_data.data_type() == &DataType::Null {
55        return Ok(make_array(left_data));
56    }
57
58    let right = match right.nulls() {
69        Some(nulls) => right.values() & nulls.inner(),
70        None => right.values().clone(),
71    };
72
73    let (combined, null_count) = match left_data.nulls() {
76        Some(left) => {
77            let mut valid_count = 0;
78            let b = bitwise_bin_op_helper(
79                left.buffer(),
80                left.offset(),
81                right.inner(),
82                right.offset(),
83                len,
84                |l, r| {
85                    let t = l & !r;
86                    valid_count += t.count_ones() as usize;
87                    t
88                },
89            );
90            (b, len - valid_count)
91        }
92        None => {
93            let mut null_count = 0;
94            let buffer = bitwise_unary_op_helper(right.inner(), right.offset(), len, |b| {
95                let t = !b;
96                null_count += t.count_zeros() as usize;
97                t
98            });
99            (buffer, null_count)
100        }
101    };
102
103    let combined = BooleanBuffer::new(combined, 0, len);
104    let nulls = unsafe { NullBuffer::new_unchecked(combined, null_count) };
107    let data = left_data.into_builder().nulls(Some(nulls));
108
109    Ok(make_array(unsafe { data.build_unchecked() }))
112}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117    use arrow_array::builder::{BooleanBuilder, Int32Builder, StructBuilder};
118    use arrow_array::cast::AsArray;
119    use arrow_array::types::Int32Type;
120    use arrow_array::{Int32Array, NullArray, StringArray, StructArray};
121    use arrow_data::ArrayData;
122    use arrow_schema::{Field, Fields};
123    use rand::{Rng, rng};
124
125    #[test]
126    fn test_nullif_int_array() {
127        let a = Int32Array::from(vec![Some(15), None, Some(8), Some(1), Some(9)]);
128        let comp = BooleanArray::from(vec![Some(false), None, Some(true), Some(false), None]);
129        let res = nullif(&a, &comp).unwrap();
130
131        let expected = Int32Array::from(vec![
132            Some(15),
133            None,
134            None, Some(1),
136            Some(9),
139        ]);
140
141        let res = res.as_primitive::<Int32Type>();
142        assert_eq!(&expected, res);
143    }
144
145    #[test]
146    fn test_nullif_null_array() {
147        assert_eq!(
148            nullif(&NullArray::new(0), &BooleanArray::new_null(0))
149                .unwrap()
150                .as_ref(),
151            &NullArray::new(0)
152        );
153
154        assert_eq!(
155            nullif(
156                &NullArray::new(3),
157                &BooleanArray::from(vec![Some(false), Some(true), None]),
158            )
159            .unwrap()
160            .as_ref(),
161            &NullArray::new(3)
162        );
163    }
164
165    #[test]
166    fn test_nullif_int_array_offset() {
167        let a = Int32Array::from(vec![None, Some(15), Some(8), Some(1), Some(9)]);
168        let a = a.slice(1, 3); let a = a.as_any().downcast_ref::<Int32Array>().unwrap();
170        let comp = BooleanArray::from(vec![
171            Some(false),
172            Some(false),
173            Some(false),
174            None,
175            Some(true),
176            Some(false),
177            None,
178        ]);
179        let comp = comp.slice(2, 3); let comp = comp.as_any().downcast_ref::<BooleanArray>().unwrap();
181        let res = nullif(a, comp).unwrap();
182
183        let expected = Int32Array::from(vec![
184            Some(15), Some(8),  None,     ]);
188        let res = res.as_primitive::<Int32Type>();
189        assert_eq!(&expected, res)
190    }
191
192    #[test]
193    fn test_nullif_string() {
194        let s = StringArray::from_iter([
195            Some("hello"),
196            None,
197            Some("world"),
198            Some("a"),
199            Some("b"),
200            None,
201            None,
202        ]);
203        let select = BooleanArray::from_iter([
204            Some(true),
205            Some(true),
206            Some(false),
207            Some(true),
208            Some(false),
209            Some(false),
210            None,
211        ]);
212
213        let a = nullif(&s, &select).unwrap();
214        let r: Vec<_> = a.as_string::<i32>().iter().collect();
215        assert_eq!(
216            r,
217            vec![None, None, Some("world"), None, Some("b"), None, None]
218        );
219
220        let s = s.slice(2, 3);
221        let select = select.slice(1, 3);
222        let a = nullif(&s, &select).unwrap();
223        let r: Vec<_> = a.as_string::<i32>().iter().collect();
224        assert_eq!(r, vec![None, Some("a"), None]);
225    }
226
227    #[test]
228    fn test_nullif_int_large_left_offset() {
229        let a = Int32Array::from(vec![
230            Some(-1), Some(-1),
232            Some(-1),
233            Some(-1),
234            Some(-1),
235            Some(-1),
236            Some(-1),
237            Some(-1),
238            Some(-1), Some(-1),
240            Some(-1),
241            Some(-1),
242            Some(-1),
243            Some(-1),
244            Some(-1),
245            Some(-1),
246            None,     Some(15), Some(8),
249            Some(1),
250            Some(9),
251        ]);
252        let a = a.slice(17, 3); let comp = BooleanArray::from(vec![
255            Some(false),
256            Some(false),
257            Some(false),
258            None,
259            Some(true),
260            Some(false),
261            None,
262        ]);
263        let comp = comp.slice(2, 3); let comp = comp.as_any().downcast_ref::<BooleanArray>().unwrap();
265        let res = nullif(&a, comp).unwrap();
266        let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
267
268        let expected = Int32Array::from(vec![
269            Some(15), Some(8),  None,     ]);
273        assert_eq!(&expected, res)
274    }
275
276    #[test]
277    fn test_nullif_int_large_right_offset() {
278        let a = Int32Array::from(vec![
279            None,     Some(15), Some(8),
282            Some(1),
283            Some(9),
284        ]);
285        let a = a.slice(1, 3); let comp = BooleanArray::from(vec![
288            Some(false), Some(false),
290            Some(false),
291            Some(false),
292            Some(false),
293            Some(false),
294            Some(false),
295            Some(false),
296            Some(false), Some(false),
298            Some(false),
299            Some(false),
300            Some(false),
301            Some(false),
302            Some(false),
303            Some(false),
304            Some(false), Some(false), Some(false), None,
308            Some(true),
309            Some(false),
310            None,
311        ]);
312        let comp = comp.slice(18, 3); let comp = comp.as_any().downcast_ref::<BooleanArray>().unwrap();
314        let res = nullif(&a, comp).unwrap();
315        let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
316
317        let expected = Int32Array::from(vec![
318            Some(15), Some(8),  None,     ]);
322        assert_eq!(&expected, res)
323    }
324
325    #[test]
326    fn test_nullif_boolean_offset() {
327        let a = BooleanArray::from(vec![
328            None,       Some(true), Some(false),
331            Some(true),
332            Some(true),
333        ]);
334        let a = a.slice(1, 3); let comp = BooleanArray::from(vec![
337            Some(false), Some(false), Some(false), None,
341            Some(true),
342            Some(false),
343            None,
344        ]);
345        let comp = comp.slice(2, 3); let comp = comp.as_any().downcast_ref::<BooleanArray>().unwrap();
347        let res = nullif(&a, comp).unwrap();
348        let res = res.as_any().downcast_ref::<BooleanArray>().unwrap();
349
350        let expected = BooleanArray::from(vec![
351            Some(true),  Some(false), None,        ]);
355        assert_eq!(&expected, res)
356    }
357
358    struct Foo {
359        a: Option<i32>,
360        b: Option<bool>,
361        is_valid: bool,
363    }
364
365    impl Foo {
366        fn new_valid(a: i32, b: bool) -> Foo {
367            Self {
368                a: Some(a),
369                b: Some(b),
370                is_valid: true,
371            }
372        }
373
374        fn new_null() -> Foo {
375            Self {
376                a: None,
377                b: None,
378                is_valid: false,
379            }
380        }
381    }
382
383    fn create_foo_struct(values: Vec<Foo>) -> StructArray {
387        let mut struct_array = StructBuilder::new(
388            Fields::from(vec![
389                Field::new("a", DataType::Int32, true),
390                Field::new("b", DataType::Boolean, true),
391            ]),
392            vec![
393                Box::new(Int32Builder::with_capacity(values.len())),
394                Box::new(BooleanBuilder::with_capacity(values.len())),
395            ],
396        );
397
398        for value in values {
399            struct_array
400                .field_builder::<Int32Builder>(0)
401                .unwrap()
402                .append_option(value.a);
403            struct_array
404                .field_builder::<BooleanBuilder>(1)
405                .unwrap()
406                .append_option(value.b);
407            struct_array.append(value.is_valid);
408        }
409
410        struct_array.finish()
411    }
412
413    #[test]
414    fn test_nullif_struct_slices() {
415        let struct_array = create_foo_struct(vec![
416            Foo::new_valid(7, true),
417            Foo::new_valid(15, false),
418            Foo::new_valid(8, true),
419            Foo::new_valid(12, false),
420            Foo::new_null(),
421            Foo::new_null(),
422            Foo::new_valid(42, true),
423        ]);
424
425        let struct_array = struct_array.slice(1, 5);
428        let comp = BooleanArray::from(vec![
429            Some(false), Some(false), Some(false), None,
433            Some(true),
434            Some(false),
435            None,
436        ]);
437        let comp = comp.slice(2, 5); let comp = comp.as_any().downcast_ref::<BooleanArray>().unwrap();
439        let res = nullif(&struct_array, comp).unwrap();
440        let res = res.as_any().downcast_ref::<StructArray>().unwrap();
441
442        let expected = create_foo_struct(vec![
443            Foo::new_valid(15, false),
445            Foo::new_valid(8, true),
447            Foo {
449                a: Some(12),
450                b: Some(false),
451                is_valid: false,
452            },
453            Foo::new_null(),
455            Foo::new_null(),
457        ]);
458
459        assert_eq!(&expected, res);
460    }
461
462    #[test]
463    fn test_nullif_no_nulls() {
464        let a = Int32Array::from(vec![Some(15), Some(7), Some(8), Some(1), Some(9)]);
465        let comp = BooleanArray::from(vec![Some(false), None, Some(true), Some(false), None]);
466        let res = nullif(&a, &comp).unwrap();
467        let res = res.as_primitive::<Int32Type>();
468
469        let expected = Int32Array::from(vec![Some(15), Some(7), None, Some(1), Some(9)]);
470        assert_eq!(res, &expected);
471    }
472
473    #[test]
474    fn nullif_empty() {
475        let a = Int32Array::from(ArrayData::new_empty(&DataType::Int32));
476        let mask = BooleanArray::from(ArrayData::new_empty(&DataType::Boolean));
477        let res = nullif(&a, &mask).unwrap();
478        assert_eq!(res.as_ref(), &a);
479    }
480
481    fn test_nullif(values: &Int32Array, filter: &BooleanArray) {
482        let expected: Int32Array = values
483            .iter()
484            .zip(filter.iter())
485            .map(|(a, b)| match b {
486                Some(true) => None,
487                Some(false) | None => a,
488            })
489            .collect();
490
491        let r = nullif(values, filter).unwrap();
492        let r_data = r.to_data();
493        r_data.validate().unwrap();
494
495        assert_eq!(r.as_ref(), &expected);
496    }
497
498    #[test]
499    fn nullif_fuzz() {
500        let mut rng = rng();
501
502        let arrays = [
503            Int32Array::from(vec![0; 128]),
504            (0..128)
505                .map(|_| rng.random_bool(0.5).then_some(0))
506                .collect(),
507        ];
508
509        for a in arrays {
510            let a_slices = [(0, 128), (64, 64), (0, 64), (32, 32), (0, 0), (32, 0)];
511
512            for (a_offset, a_length) in a_slices {
513                let a = a.slice(a_offset, a_length);
514
515                for i in 1..65 {
516                    let b_start_offset = rng.random_range(0..i);
517                    let b_end_offset = rng.random_range(0..i);
518
519                    let b: BooleanArray = (0..a_length + b_start_offset + b_end_offset)
520                        .map(|_| rng.random_bool(0.5).then(|| rng.random_bool(0.5)))
521                        .collect();
522                    let b = b.slice(b_start_offset, a_length);
523
524                    test_nullif(&a, &b);
525                }
526            }
527        }
528    }
529}