Skip to main content

arrow_ord/
cmp.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Comparison kernels for `Array`s.
19//!
20//! These kernels can leverage SIMD if available on your system.  Currently no runtime
21//! detection is provided, you should enable the specific SIMD intrinsics using
22//! `RUSTFLAGS="-C target-feature=+avx2"` for example.  See the documentation
23//! [here](https://doc.rust-lang.org/stable/core/arch/) for more information.
24//!
25
26use arrow_array::cast::AsArray;
27use arrow_array::types::{ByteArrayType, ByteViewType};
28use arrow_array::{
29    AnyDictionaryArray, Array, ArrowNativeTypeOp, BooleanArray, Datum, FixedSizeBinaryArray,
30    GenericByteArray, GenericByteViewArray, downcast_primitive_array, downcast_run_array,
31};
32use arrow_buffer::bit_util::ceil;
33use arrow_buffer::{ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer};
34use arrow_schema::{ArrowError, DataType};
35use arrow_select::take::take;
36use std::cmp::Ordering;
37use std::ops::Not;
38
39#[derive(Debug, Copy, Clone)]
40enum Op {
41    Equal,
42    NotEqual,
43    Less,
44    LessEqual,
45    Greater,
46    GreaterEqual,
47    Distinct,
48    NotDistinct,
49}
50
51impl std::fmt::Display for Op {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        match self {
54            Op::Equal => write!(f, "=="),
55            Op::NotEqual => write!(f, "!="),
56            Op::Less => write!(f, "<"),
57            Op::LessEqual => write!(f, "<="),
58            Op::Greater => write!(f, ">"),
59            Op::GreaterEqual => write!(f, ">="),
60            Op::Distinct => write!(f, "IS DISTINCT FROM"),
61            Op::NotDistinct => write!(f, "IS NOT DISTINCT FROM"),
62        }
63    }
64}
65
66/// Perform `left == right` operation on two [`Datum`].
67///
68/// Comparing null values on either side will yield a null in the corresponding
69/// slot of the resulting [`BooleanArray`].
70///
71/// For floating values like f32 and f64, this comparison produces an ordering in accordance to
72/// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard.
73/// Note that totalOrder treats positive and negative zeros as different. If it is necessary
74/// to treat them as equal, please normalize zeros before calling this kernel. See
75/// [`f32::total_cmp`] and [`f64::total_cmp`].
76///
77/// Nested types, such as lists, are not supported as the null semantics are not well-defined.
78/// For comparisons involving nested types see [`crate::ord::make_comparator`]
79pub fn eq(lhs: &dyn Datum, rhs: &dyn Datum) -> Result<BooleanArray, ArrowError> {
80    compare_op(Op::Equal, lhs, rhs)
81}
82
83/// Perform `left != right` operation on two [`Datum`].
84///
85/// Comparing null values on either side will yield a null in the corresponding
86/// slot of the resulting [`BooleanArray`].
87///
88/// For floating values like f32 and f64, this comparison produces an ordering in accordance to
89/// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard.
90/// Note that totalOrder treats positive and negative zeros as different. If it is necessary
91/// to treat them as equal, please normalize zeros before calling this kernel. See
92/// [`f32::total_cmp`] and [`f64::total_cmp`].
93///
94/// Nested types, such as lists, are not supported as the null semantics are not well-defined.
95/// For comparisons involving nested types see [`crate::ord::make_comparator`]
96pub fn neq(lhs: &dyn Datum, rhs: &dyn Datum) -> Result<BooleanArray, ArrowError> {
97    compare_op(Op::NotEqual, lhs, rhs)
98}
99
100/// Perform `left < right` operation on two [`Datum`].
101///
102/// Comparing null values on either side will yield a null in the corresponding
103/// slot of the resulting [`BooleanArray`].
104///
105/// For floating values like f32 and f64, this comparison produces an ordering in accordance to
106/// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard.
107/// Note that totalOrder treats positive and negative zeros as different. If it is necessary
108/// to treat them as equal, please normalize zeros before calling this kernel. See
109/// [`f32::total_cmp`] and [`f64::total_cmp`].
110///
111/// Nested types, such as lists, are not supported as the null semantics are not well-defined.
112/// For comparisons involving nested types see [`crate::ord::make_comparator`]
113pub fn lt(lhs: &dyn Datum, rhs: &dyn Datum) -> Result<BooleanArray, ArrowError> {
114    compare_op(Op::Less, lhs, rhs)
115}
116
117/// Perform `left <= right` operation on two [`Datum`].
118///
119/// Comparing null values on either side will yield a null in the corresponding
120/// slot of the resulting [`BooleanArray`].
121///
122/// For floating values like f32 and f64, this comparison produces an ordering in accordance to
123/// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard.
124/// Note that totalOrder treats positive and negative zeros as different. If it is necessary
125/// to treat them as equal, please normalize zeros before calling this kernel. See
126/// [`f32::total_cmp`] and [`f64::total_cmp`].
127///
128/// Nested types, such as lists, are not supported as the null semantics are not well-defined.
129/// For comparisons involving nested types see [`crate::ord::make_comparator`]
130pub fn lt_eq(lhs: &dyn Datum, rhs: &dyn Datum) -> Result<BooleanArray, ArrowError> {
131    compare_op(Op::LessEqual, lhs, rhs)
132}
133
134/// Perform `left > right` operation on two [`Datum`].
135///
136/// Comparing null values on either side will yield a null in the corresponding
137/// slot of the resulting [`BooleanArray`].
138///
139/// For floating values like f32 and f64, this comparison produces an ordering in accordance to
140/// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard.
141/// Note that totalOrder treats positive and negative zeros as different. If it is necessary
142/// to treat them as equal, please normalize zeros before calling this kernel. See
143/// [`f32::total_cmp`] and [`f64::total_cmp`].
144///
145/// Nested types, such as lists, are not supported as the null semantics are not well-defined.
146/// For comparisons involving nested types see [`crate::ord::make_comparator`]
147pub fn gt(lhs: &dyn Datum, rhs: &dyn Datum) -> Result<BooleanArray, ArrowError> {
148    compare_op(Op::Greater, lhs, rhs)
149}
150
151/// Perform `left >= right` operation on two [`Datum`].
152///
153/// Comparing null values on either side will yield a null in the corresponding
154/// slot of the resulting [`BooleanArray`].
155///
156/// For floating values like f32 and f64, this comparison produces an ordering in accordance to
157/// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard.
158/// Note that totalOrder treats positive and negative zeros as different. If it is necessary
159/// to treat them as equal, please normalize zeros before calling this kernel. See
160/// [`f32::total_cmp`] and [`f64::total_cmp`].
161///
162/// Nested types, such as lists, are not supported as the null semantics are not well-defined.
163/// For comparisons involving nested types see [`crate::ord::make_comparator`]
164pub fn gt_eq(lhs: &dyn Datum, rhs: &dyn Datum) -> Result<BooleanArray, ArrowError> {
165    compare_op(Op::GreaterEqual, lhs, rhs)
166}
167
168/// Perform `left IS DISTINCT FROM right` operation on two [`Datum`]
169///
170/// [`distinct`] is similar to [`neq`], only differing in null handling. In particular, two
171/// operands are considered DISTINCT if they have a different value or if one of them is NULL
172/// and the other isn't. The result of [`distinct`] is never NULL.
173///
174/// For floating values like f32 and f64, this comparison produces an ordering in accordance to
175/// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard.
176/// Note that totalOrder treats positive and negative zeros as different. If it is necessary
177/// to treat them as equal, please normalize zeros before calling this kernel. See
178/// [`f32::total_cmp`] and [`f64::total_cmp`].
179///
180/// Nested types, such as lists, are not supported as the null semantics are not well-defined.
181/// For comparisons involving nested types see [`crate::ord::make_comparator`]
182pub fn distinct(lhs: &dyn Datum, rhs: &dyn Datum) -> Result<BooleanArray, ArrowError> {
183    compare_op(Op::Distinct, lhs, rhs)
184}
185
186/// Perform `left IS NOT DISTINCT FROM right` operation on two [`Datum`]
187///
188/// [`not_distinct`] is similar to [`eq`], only differing in null handling. In particular, two
189/// operands are considered `NOT DISTINCT` if they have the same value or if both of them
190/// is NULL. The result of [`not_distinct`] is never NULL.
191///
192/// For floating values like f32 and f64, this comparison produces an ordering in accordance to
193/// the totalOrder predicate as defined in the IEEE 754 (2008 revision) floating point standard.
194/// Note that totalOrder treats positive and negative zeros as different. If it is necessary
195/// to treat them as equal, please normalize zeros before calling this kernel. See
196/// [`f32::total_cmp`] and [`f64::total_cmp`].
197///
198/// Nested types, such as lists, are not supported as the null semantics are not well-defined.
199/// For comparisons involving nested types see [`crate::ord::make_comparator`]
200pub fn not_distinct(lhs: &dyn Datum, rhs: &dyn Datum) -> Result<BooleanArray, ArrowError> {
201    compare_op(Op::NotDistinct, lhs, rhs)
202}
203
204/// Returns true if `distinct` (via `compare_op`) can handle this data type.
205///
206/// `compare_op` unwraps at most one level of dictionary, then dispatches on
207/// the leaf type. Anything else (REE, nested dictionary, nested/complex types)
208/// must go through `make_comparator` instead.
209pub(crate) fn supports_distinct(dt: &DataType) -> bool {
210    use arrow_schema::DataType::*;
211    let leaf = match dt {
212        Dictionary(_, v) => v.as_ref(),
213        dt => dt,
214    };
215    !leaf.is_nested() && !matches!(leaf, Dictionary(_, _) | RunEndEncoded(_, _))
216}
217
218/// Perform `op` on the provided `Datum`
219#[inline(never)]
220fn compare_op(op: Op, lhs: &dyn Datum, rhs: &dyn Datum) -> Result<BooleanArray, ArrowError> {
221    use arrow_schema::DataType::*;
222    let (l, l_s) = lhs.get();
223    let (r, r_s) = rhs.get();
224
225    let l_len = l.len();
226    let r_len = r.len();
227
228    if l_len != r_len && !l_s && !r_s {
229        return Err(ArrowError::InvalidArgumentError(format!(
230            "Cannot compare arrays of different lengths, got {l_len} vs {r_len}"
231        )));
232    }
233
234    let len = match l_s {
235        true => r_len,
236        false => l_len,
237    };
238
239    let l_nulls = l.logical_nulls();
240    let r_nulls = r.logical_nulls();
241
242    let l_ree_info = ree_info_opt(l);
243    let l = l_ree_info.as_ref().map(|(vals, _)| *vals).unwrap_or(l);
244
245    let r_ree_info = ree_info_opt(r);
246    let r = r_ree_info.as_ref().map(|(vals, _)| *vals).unwrap_or(r);
247
248    let l_v = l.as_any_dictionary_opt();
249    let l = l_v.map(|x| x.values().as_ref()).unwrap_or(l);
250    let l_t = l.data_type();
251
252    let r_v = r.as_any_dictionary_opt();
253    let r = r_v.map(|x| x.values().as_ref()).unwrap_or(r);
254    let r_t = r.data_type();
255
256    if r_t.is_nested() || l_t.is_nested() {
257        return Err(ArrowError::InvalidArgumentError(format!(
258            "Nested comparison: {l_t} {op} {r_t} (hint: use make_comparator instead)"
259        )));
260    } else if l_t != r_t {
261        return Err(ArrowError::InvalidArgumentError(format!(
262            "Invalid comparison operation: {l_t} {op} {r_t}"
263        )));
264    }
265
266    let l_side = SideInfo {
267        is_scalar: l_s,
268        dict: l_v,
269        ree: l_ree_info.as_ref().map(|(_, info)| info),
270    };
271    let r_side = SideInfo {
272        is_scalar: r_s,
273        dict: r_v,
274        ree: r_ree_info.as_ref().map(|(_, info)| info),
275    };
276
277    // Defer computation as may not be necessary
278    let values = || -> BooleanBuffer {
279        let d = downcast_primitive_array! {
280            (l, r) => apply(op, l.values().as_ref(), &l_side, r.values().as_ref(), &r_side),
281            (Boolean, Boolean) => apply(op, l.as_boolean(), &l_side, r.as_boolean(), &r_side),
282            (Utf8, Utf8) => apply(op, l.as_string::<i32>(), &l_side, r.as_string::<i32>(), &r_side),
283            (Utf8View, Utf8View) => apply(op, l.as_string_view(), &l_side, r.as_string_view(), &r_side),
284            (LargeUtf8, LargeUtf8) => apply(op, l.as_string::<i64>(), &l_side, r.as_string::<i64>(), &r_side),
285            (Binary, Binary) => apply(op, l.as_binary::<i32>(), &l_side, r.as_binary::<i32>(), &r_side),
286            (BinaryView, BinaryView) => apply(op, l.as_binary_view(), &l_side, r.as_binary_view(), &r_side),
287            (LargeBinary, LargeBinary) => apply(op, l.as_binary::<i64>(), &l_side, r.as_binary::<i64>(), &r_side),
288            (FixedSizeBinary(_), FixedSizeBinary(_)) => apply(op, l.as_fixed_size_binary(), &l_side, r.as_fixed_size_binary(), &r_side),
289            (Null, Null) => None,
290            _ => unreachable!(),
291        };
292        d.unwrap_or_else(|| BooleanBuffer::new_unset(len))
293    };
294
295    let l_nulls = l_nulls.filter(|n| n.null_count() > 0);
296    let r_nulls = r_nulls.filter(|n| n.null_count() > 0);
297    Ok(match (l_nulls, l_s, r_nulls, r_s) {
298        (Some(l), true, Some(r), true) | (Some(l), false, Some(r), false) => {
299            // Either both sides are scalar or neither side is scalar
300            match op {
301                Op::Distinct => {
302                    let values = values();
303                    let l = l.inner().bit_chunks().iter_padded();
304                    let r = r.inner().bit_chunks().iter_padded();
305                    let ne = values.bit_chunks().iter_padded();
306
307                    let c = |((l, r), n)| (l ^ r) | (l & r & n);
308                    let buffer = l.zip(r).zip(ne).map(c).collect();
309                    BooleanBuffer::new(buffer, 0, len).into()
310                }
311                Op::NotDistinct => {
312                    let values = values();
313                    let l = l.inner().bit_chunks().iter_padded();
314                    let r = r.inner().bit_chunks().iter_padded();
315                    let e = values.bit_chunks().iter_padded();
316
317                    let c = |((l, r), e)| u64::not(l | r) | (l & r & e);
318                    let buffer = l.zip(r).zip(e).map(c).collect();
319                    BooleanBuffer::new(buffer, 0, len).into()
320                }
321                _ => BooleanArray::new(values(), NullBuffer::union(Some(&l), Some(&r))),
322            }
323        }
324        (Some(_), true, Some(a), false) | (Some(a), false, Some(_), true) => {
325            // Scalar is null, other side is non-scalar and nullable
326            match op {
327                Op::Distinct => a.into_inner().into(),
328                Op::NotDistinct => a.into_inner().not().into(),
329                _ => BooleanArray::new_null(len),
330            }
331        }
332        (Some(nulls), is_scalar, None, _) | (None, _, Some(nulls), is_scalar) => {
333            // Only one side is nullable
334            match is_scalar {
335                true => match op {
336                    // Scalar is null, other side is not nullable
337                    Op::Distinct => BooleanBuffer::new_set(len).into(),
338                    Op::NotDistinct => BooleanBuffer::new_unset(len).into(),
339                    _ => BooleanArray::new_null(len),
340                },
341                false => match op {
342                    Op::Distinct => {
343                        let values = values();
344                        let l = nulls.inner().bit_chunks().iter_padded();
345                        let ne = values.bit_chunks().iter_padded();
346                        let c = |(l, n)| u64::not(l) | n;
347                        let buffer = l.zip(ne).map(c).collect();
348                        BooleanBuffer::new(buffer, 0, len).into()
349                    }
350                    Op::NotDistinct => (nulls.inner() & &values()).into(),
351                    _ => BooleanArray::new(values(), Some(nulls)),
352                },
353            }
354        }
355        // Neither side is nullable
356        (None, _, None, _) => BooleanArray::new(values(), None),
357    })
358}
359
360/// Per-side metadata for a comparison operand.
361struct SideInfo<'a> {
362    is_scalar: bool,
363    dict: Option<&'a dyn AnyDictionaryArray>,
364    ree: Option<&'a ReeInfo<'a>>,
365}
366
367impl SideInfo<'_> {
368    fn has_indirection(&self) -> bool {
369        self.dict.is_some() || self.ree.is_some()
370    }
371}
372
373/// Perform a potentially vectored `op` on the provided `ArrayOrd`
374fn apply<T: ArrayOrd>(
375    op: Op,
376    l: T,
377    l_info: &SideInfo,
378    r: T,
379    r_info: &SideInfo,
380) -> Option<BooleanBuffer> {
381    if l.len() == 0 || r.len() == 0 {
382        return None; // Handle empty dictionaries
383    }
384
385    if !l_info.is_scalar
386        && !r_info.is_scalar
387        && (l_info.has_indirection() || r_info.has_indirection())
388    {
389        // Both non-scalar with indirection. Pure REE-vs-REE uses segment-based
390        // bulk comparison; other combinations fall back to index vectors.
391        if let (Some(li), None, Some(ri), None) = (l_info.ree, l_info.dict, r_info.ree, r_info.dict)
392        {
393            return Some(apply_ree(op, l, li, r, ri));
394        }
395
396        let l_v = logical_indices(l.len(), l_info.dict, l_info.ree);
397        let r_v = logical_indices(r.len(), r_info.dict, r_info.ree);
398
399        assert_eq!(l_v.len(), r_v.len()); // Sanity check
400
401        Some(match op {
402            Op::Equal | Op::NotDistinct => apply_op_vectored(l, &l_v, r, &r_v, false, T::is_eq),
403            Op::NotEqual | Op::Distinct => apply_op_vectored(l, &l_v, r, &r_v, true, T::is_eq),
404            Op::Less => apply_op_vectored(l, &l_v, r, &r_v, false, T::is_lt),
405            Op::LessEqual => apply_op_vectored(r, &r_v, l, &l_v, true, T::is_lt),
406            Op::Greater => apply_op_vectored(r, &r_v, l, &l_v, false, T::is_lt),
407            Op::GreaterEqual => apply_op_vectored(l, &l_v, r, &r_v, true, T::is_lt),
408        })
409    } else {
410        let l_s = l_info
411            .is_scalar
412            .then(|| scalar_index(l_info.dict, l_info.ree));
413        let r_s = r_info
414            .is_scalar
415            .then(|| scalar_index(r_info.dict, r_info.ree));
416
417        let buffer = match op {
418            Op::Equal | Op::NotDistinct => apply_op(l, l_s, r, r_s, false, T::is_eq),
419            Op::NotEqual | Op::Distinct => apply_op(l, l_s, r, r_s, true, T::is_eq),
420            Op::Less => apply_op(l, l_s, r, r_s, false, T::is_lt),
421            Op::LessEqual => apply_op(r, r_s, l, l_s, true, T::is_lt),
422            Op::Greater => apply_op(r, r_s, l, l_s, false, T::is_lt),
423            Op::GreaterEqual => apply_op(l, l_s, r, r_s, true, T::is_lt),
424        };
425
426        // Expand the physical-length result back to logical length.
427        // Find the non-scalar side that needs expansion (at most one).
428        let side = if l_s.is_none() { l_info } else { r_info };
429        let buffer = match side.dict {
430            Some(d) => take_bits(d, buffer),
431            None => buffer,
432        };
433        Some(match side.ree {
434            Some(info) => expand_from_runs(info, buffer),
435            None => buffer,
436        })
437    }
438}
439
440/// Build a logical→physical index vector for one side of a non-scalar comparison.
441fn logical_indices(
442    len: usize,
443    dict: Option<&dyn AnyDictionaryArray>,
444    ree: Option<&ReeInfo>,
445) -> Vec<usize> {
446    match (dict, ree) {
447        (Some(d), Some(info)) => {
448            let keys = d.normalized_keys();
449            ree_physical_indices(info)
450                .iter()
451                .map(|&i| keys[i])
452                .collect()
453        }
454        (Some(d), None) => d.normalized_keys(),
455        (None, Some(info)) => ree_physical_indices(info),
456        (None, None) => (0..len).collect(),
457    }
458}
459
460fn ree_physical_indices(info: &ReeInfo) -> Vec<usize> {
461    let run_ends = info.run_ends_as_usize();
462    let end = info.offset + info.len;
463    let mut indices = Vec::with_capacity(info.len);
464    let mut pos = info.offset;
465    for (physical, &run_end) in run_ends.iter().enumerate().skip(info.start_physical) {
466        let run_end = run_end.min(end);
467        indices.extend(std::iter::repeat_n(physical, run_end - pos));
468        pos = run_end;
469        if pos >= end {
470            break;
471        }
472    }
473    indices
474}
475
476fn scalar_index(dict: Option<&dyn AnyDictionaryArray>, ree: Option<&ReeInfo>) -> usize {
477    let idx = ree.map(|r| r.start_physical).unwrap_or_default();
478    dict.map(|d| d.normalized_keys()[idx]).unwrap_or(idx)
479}
480
481/// Expand a physical-length BooleanBuffer to logical length by bulk-appending
482/// each run's result. Zero allocation — downcasts internally to access typed
483/// run_ends directly.
484fn expand_from_runs(info: &ReeInfo, buffer: BooleanBuffer) -> BooleanBuffer {
485    let array = info.array;
486    downcast_run_array!(
487        array => {
488            let run_ends = array.run_ends().values();
489            let end = info.offset + info.len;
490            let mut builder = BooleanBufferBuilder::new(info.len);
491            let mut pos = info.offset;
492            for (physical, re) in run_ends.iter().enumerate().skip(info.start_physical) {
493                let run_end = re.as_usize().min(end);
494                // SAFETY: physical < buffer.len() (one entry per run in the values array)
495                builder.append_n(run_end - pos, unsafe { buffer.value_unchecked(physical) });
496                pos = run_end;
497                if pos >= end {
498                    break;
499                }
500            }
501            builder.finish()
502        },
503        _ => unreachable!()
504    )
505}
506
507fn take_bits(v: &dyn AnyDictionaryArray, buffer: BooleanBuffer) -> BooleanBuffer {
508    let array = take(&BooleanArray::new(buffer, None), v.keys(), None).unwrap();
509    array.as_boolean().values().clone()
510}
511
512/// Invokes `f` with values `0..len` collecting the boolean results into a new `BooleanBuffer`
513///
514/// This is similar to [`arrow_buffer::MutableBuffer::collect_bool`] but with
515/// the option to efficiently negate the result
516fn collect_bool(len: usize, neg: bool, f: impl Fn(usize) -> bool) -> BooleanBuffer {
517    let mut buffer = Vec::with_capacity(ceil(len, 64));
518
519    let chunks = len / 64;
520    let remainder = len % 64;
521    buffer.extend((0..chunks).map(|chunk| {
522        let mut packed = 0;
523        for bit_idx in 0..64 {
524            let i = bit_idx + chunk * 64;
525            packed |= (f(i) as u64) << bit_idx;
526        }
527        if neg {
528            packed = !packed
529        }
530
531        packed
532    }));
533
534    if remainder != 0 {
535        let mut packed = 0;
536        for bit_idx in 0..remainder {
537            let i = bit_idx + chunks * 64;
538            packed |= (f(i) as u64) << bit_idx;
539        }
540        if neg {
541            packed = !packed
542        }
543
544        buffer.push(packed);
545    }
546    BooleanBuffer::new(buffer.into(), 0, len)
547}
548
549/// Applies `op` to possibly scalar `ArrayOrd`
550///
551/// If l is scalar `l_s` will be `Some(idx)` where `idx` is the index of the scalar value in `l`
552/// If r is scalar `r_s` will be `Some(idx)` where `idx` is the index of the scalar value in `r`
553///
554/// If `neg` is true the result of `op` will be negated
555fn apply_op<T: ArrayOrd>(
556    l: T,
557    l_s: Option<usize>,
558    r: T,
559    r_s: Option<usize>,
560    neg: bool,
561    op: impl Fn(T::Item, T::Item) -> bool,
562) -> BooleanBuffer {
563    match (l_s, r_s) {
564        (None, None) => {
565            assert_eq!(l.len(), r.len());
566            collect_bool(l.len(), neg, |idx| unsafe {
567                op(l.value_unchecked(idx), r.value_unchecked(idx))
568            })
569        }
570        (Some(l_s), Some(r_s)) => {
571            let a = l.value(l_s);
572            let b = r.value(r_s);
573            std::iter::once(op(a, b) ^ neg).collect()
574        }
575        (Some(l_s), None) => {
576            let v = l.value(l_s);
577            collect_bool(r.len(), neg, |idx| op(v, unsafe { r.value_unchecked(idx) }))
578        }
579        (None, Some(r_s)) => {
580            let v = r.value(r_s);
581            collect_bool(l.len(), neg, |idx| op(unsafe { l.value_unchecked(idx) }, v))
582        }
583    }
584}
585
586/// Applies `op` to possibly scalar `ArrayOrd` with the given indices
587fn apply_op_vectored<T: ArrayOrd>(
588    l: T,
589    l_v: &[usize],
590    r: T,
591    r_v: &[usize],
592    neg: bool,
593    op: impl Fn(T::Item, T::Item) -> bool,
594) -> BooleanBuffer {
595    assert_eq!(l_v.len(), r_v.len());
596    collect_bool(l_v.len(), neg, |idx| unsafe {
597        let l_idx = *l_v.get_unchecked(idx);
598        let r_idx = *r_v.get_unchecked(idx);
599        op(l.value_unchecked(l_idx), r.value_unchecked(r_idx))
600    })
601}
602
603/// Dispatch `op` for a REE-vs-REE comparison (no dictionary on either side)
604/// using segment-based bulk comparison.
605fn apply_ree<T: ArrayOrd>(op: Op, l: T, l_info: &ReeInfo, r: T, r_info: &ReeInfo) -> BooleanBuffer {
606    match op {
607        Op::Equal | Op::NotDistinct => apply_op_ree_segments(l, l_info, r, r_info, false, T::is_eq),
608        Op::NotEqual | Op::Distinct => apply_op_ree_segments(l, l_info, r, r_info, true, T::is_eq),
609        Op::Less => apply_op_ree_segments(l, l_info, r, r_info, false, T::is_lt),
610        Op::LessEqual => apply_op_ree_segments(r, r_info, l, l_info, true, T::is_lt),
611        Op::Greater => apply_op_ree_segments(r, r_info, l, l_info, false, T::is_lt),
612        Op::GreaterEqual => apply_op_ree_segments(l, l_info, r, r_info, true, T::is_lt),
613    }
614}
615
616/// Compare two REE arrays by walking both run_ends simultaneously, comparing
617/// once per aligned segment and bulk-filling the result.
618fn apply_op_ree_segments<T: ArrayOrd>(
619    l: T,
620    l_info: &ReeInfo,
621    r: T,
622    r_info: &ReeInfo,
623    neg: bool,
624    op: fn(T::Item, T::Item) -> bool,
625) -> BooleanBuffer {
626    let l_re = l_info.run_ends_as_usize();
627    let r_re = r_info.run_ends_as_usize();
628    let end = l_info.len;
629    let mut builder = BooleanBufferBuilder::new(l_info.len);
630    let mut l_phys = l_info.start_physical;
631    let mut r_phys = r_info.start_physical;
632    let mut pos = 0usize;
633
634    while pos < end {
635        // SAFETY: l_phys/r_phys are bounded by their respective run counts —
636        // they advance only when pos reaches a run boundary, and pos < end
637        // guarantees we haven't exhausted all runs.
638        // Subtract each side's offset to convert absolute run-ends to logical
639        // coordinates so that arrays with different offsets align correctly.
640        let l_end = (unsafe { *l_re.get_unchecked(l_phys) } - l_info.offset).min(end);
641        let r_end = (unsafe { *r_re.get_unchecked(r_phys) } - r_info.offset).min(end);
642        let seg_end = l_end.min(r_end);
643
644        let result = unsafe { op(l.value_unchecked(l_phys), r.value_unchecked(r_phys)) } ^ neg;
645        builder.append_n(seg_end - pos, result);
646
647        pos = seg_end;
648        if pos >= l_end {
649            l_phys += 1;
650        }
651        if pos >= r_end {
652            r_phys += 1;
653        }
654    }
655
656    builder.finish()
657}
658
659trait ArrayOrd {
660    type Item: Copy;
661
662    fn len(&self) -> usize;
663
664    fn value(&self, idx: usize) -> Self::Item {
665        assert!(idx < self.len());
666        unsafe { self.value_unchecked(idx) }
667    }
668
669    /// # Safety
670    ///
671    /// Safe if `idx < self.len()`
672    unsafe fn value_unchecked(&self, idx: usize) -> Self::Item;
673
674    fn is_eq(l: Self::Item, r: Self::Item) -> bool;
675
676    fn is_lt(l: Self::Item, r: Self::Item) -> bool;
677}
678
679impl ArrayOrd for &BooleanArray {
680    type Item = bool;
681
682    fn len(&self) -> usize {
683        Array::len(self)
684    }
685
686    unsafe fn value_unchecked(&self, idx: usize) -> Self::Item {
687        unsafe { BooleanArray::value_unchecked(self, idx) }
688    }
689
690    fn is_eq(l: Self::Item, r: Self::Item) -> bool {
691        l == r
692    }
693
694    fn is_lt(l: Self::Item, r: Self::Item) -> bool {
695        !l & r
696    }
697}
698
699impl<T: ArrowNativeTypeOp> ArrayOrd for &[T] {
700    type Item = T;
701
702    fn len(&self) -> usize {
703        (*self).len()
704    }
705
706    unsafe fn value_unchecked(&self, idx: usize) -> Self::Item {
707        unsafe { *self.get_unchecked(idx) }
708    }
709
710    fn is_eq(l: Self::Item, r: Self::Item) -> bool {
711        l.is_eq(r)
712    }
713
714    fn is_lt(l: Self::Item, r: Self::Item) -> bool {
715        l.is_lt(r)
716    }
717}
718
719impl<'a, T: ByteArrayType> ArrayOrd for &'a GenericByteArray<T> {
720    type Item = &'a [u8];
721
722    fn len(&self) -> usize {
723        Array::len(self)
724    }
725
726    unsafe fn value_unchecked(&self, idx: usize) -> Self::Item {
727        unsafe { GenericByteArray::value_unchecked(self, idx).as_ref() }
728    }
729
730    fn is_eq(l: Self::Item, r: Self::Item) -> bool {
731        l == r
732    }
733
734    fn is_lt(l: Self::Item, r: Self::Item) -> bool {
735        l < r
736    }
737}
738
739impl<'a, T: ByteViewType> ArrayOrd for &'a GenericByteViewArray<T> {
740    /// This is the item type for the GenericByteViewArray::compare
741    /// Item.0 is the array, Item.1 is the index
742    type Item = (&'a GenericByteViewArray<T>, usize);
743
744    #[inline(always)]
745    fn is_eq(l: Self::Item, r: Self::Item) -> bool {
746        let l_view = unsafe { l.0.views().get_unchecked(l.1) };
747        let r_view = unsafe { r.0.views().get_unchecked(r.1) };
748        if l.0.data_buffers().is_empty() && r.0.data_buffers().is_empty() {
749            // For eq case, we can directly compare the inlined bytes
750            return l_view == r_view;
751        }
752
753        // Fast path for same view (and both inlined)
754        if l_view == r_view && *l_view as u32 <= 12 {
755            return true;
756        }
757
758        let l_len = *l_view as u32;
759        let r_len = *r_view as u32;
760        // Lengths differ
761        if l_len != r_len {
762            return false;
763        }
764
765        // Both are empty
766        if l_len == 0 {
767            return true;
768        }
769
770        // Check prefix
771        if (*l_view >> 32) as u32 != (*r_view >> 32) as u32 {
772            return false;
773        }
774
775        // Both are inlined, and prefixes are equal (so they differ in rest of inlined bytes)
776        if l_len <= 12 {
777            return false;
778        }
779
780        // # Safety
781        // The index is within bounds as it is checked in value()
782        unsafe {
783            let l_buffer_idx = (*l_view >> 64) as u32;
784            let l_offset = (*l_view >> 96) as u32;
785            let r_buffer_idx = (*r_view >> 64) as u32;
786            let r_offset = (*r_view >> 96) as u32;
787
788            let l_data = l.0.data_buffers().get_unchecked(l_buffer_idx as usize);
789            let r_data = r.0.data_buffers().get_unchecked(r_buffer_idx as usize);
790
791            let l_slice = l_data
792                .as_slice()
793                .get_unchecked(l_offset as usize..(l_offset + l_len) as usize);
794            let r_slice = r_data
795                .as_slice()
796                .get_unchecked(r_offset as usize..(r_offset + r_len) as usize);
797            l_slice == r_slice
798        }
799    }
800
801    #[inline(always)]
802    fn is_lt(l: Self::Item, r: Self::Item) -> bool {
803        let l_view = unsafe { l.0.views().get_unchecked(l.1) };
804        let r_view = unsafe { r.0.views().get_unchecked(r.1) };
805
806        if l.0.data_buffers().is_empty() && r.0.data_buffers().is_empty() {
807            // For lt case, we can directly compare the inlined bytes
808            return GenericByteViewArray::<T>::inline_key_fast(*l_view)
809                < GenericByteViewArray::<T>::inline_key_fast(*r_view);
810        }
811
812        if (*l_view as u32) <= 12 && (*r_view as u32) <= 12 {
813            return GenericByteViewArray::<T>::inline_key_fast(*l_view)
814                < GenericByteViewArray::<T>::inline_key_fast(*r_view);
815        }
816
817        let l_prefix = (*l_view >> 32) as u32;
818        let r_prefix = (*r_view >> 32) as u32;
819        if l_prefix != r_prefix {
820            return l_prefix.swap_bytes() < r_prefix.swap_bytes();
821        }
822
823        // Fallback to the generic, unchecked comparison for mixed cases
824        // # Safety
825        // The index is within bounds as it is checked in value()
826        unsafe {
827            let l_data: &[u8] = l.0.value_unchecked(l.1).as_ref();
828            let r_data: &[u8] = r.0.value_unchecked(r.1).as_ref();
829            l_data < r_data
830        }
831    }
832
833    fn len(&self) -> usize {
834        Array::len(self)
835    }
836
837    unsafe fn value_unchecked(&self, idx: usize) -> Self::Item {
838        (self, idx)
839    }
840}
841
842impl<'a> ArrayOrd for &'a FixedSizeBinaryArray {
843    type Item = &'a [u8];
844
845    fn len(&self) -> usize {
846        Array::len(self)
847    }
848
849    unsafe fn value_unchecked(&self, idx: usize) -> Self::Item {
850        unsafe { FixedSizeBinaryArray::value_unchecked(self, idx) }
851    }
852
853    fn is_eq(l: Self::Item, r: Self::Item) -> bool {
854        l == r
855    }
856
857    fn is_lt(l: Self::Item, r: Self::Item) -> bool {
858        l < r
859    }
860}
861
862/// Compares two [`GenericByteViewArray`] at index `left_idx` and `right_idx`
863#[inline(always)]
864pub fn compare_byte_view<T: ByteViewType>(
865    left: &GenericByteViewArray<T>,
866    left_idx: usize,
867    right: &GenericByteViewArray<T>,
868    right_idx: usize,
869) -> Ordering {
870    assert!(left_idx < left.len());
871    assert!(right_idx < right.len());
872    if left.data_buffers().is_empty() && right.data_buffers().is_empty() {
873        let l_view = unsafe { left.views().get_unchecked(left_idx) };
874        let r_view = unsafe { right.views().get_unchecked(right_idx) };
875        return GenericByteViewArray::<T>::inline_key_fast(*l_view)
876            .cmp(&GenericByteViewArray::<T>::inline_key_fast(*r_view));
877    }
878    unsafe { GenericByteViewArray::compare_unchecked(left, left_idx, right, right_idx) }
879}
880
881/// Run-end encoding metadata for one side of a comparison. Holds a reference
882/// to the original REE array for deferred typed access to run_ends.
883struct ReeInfo<'a> {
884    array: &'a dyn Array,
885    offset: usize,
886    start_physical: usize,
887    len: usize,
888}
889
890impl ReeInfo<'_> {
891    /// Materialize run_ends as `Vec<usize>`.
892    fn run_ends_as_usize(&self) -> Vec<usize> {
893        let array = self.array;
894        downcast_run_array!(
895            array => array.run_ends().values().iter().map(|v| v.as_usize()).collect(),
896            _ => unreachable!()
897        )
898    }
899}
900
901/// If `array` is RunEndEncoded, return its physical values array and run metadata.
902fn ree_info_opt(array: &dyn Array) -> Option<(&dyn Array, ReeInfo<'_>)> {
903    downcast_run_array!(
904        array => {
905            let run_ends = array.run_ends();
906            let info = ReeInfo {
907                array,
908                offset: run_ends.offset(),
909                start_physical: run_ends.get_start_physical_index(),
910                len: run_ends.len(),
911            };
912            Some((array.values().as_ref(), info))
913        },
914        _ => None
915    )
916}
917
918#[cfg(test)]
919mod tests {
920    use std::sync::Arc;
921
922    use arrow_array::types::Int32Type;
923    use arrow_array::{DictionaryArray, Int32Array, Scalar, StringArray};
924    use arrow_buffer::{Buffer, ScalarBuffer};
925
926    use super::*;
927
928    #[test]
929    fn test_null_dict() {
930        let a = DictionaryArray::new(Int32Array::new_null(10), Arc::new(Int32Array::new_null(0)));
931        let r = eq(&a, &a).unwrap();
932        assert_eq!(r.null_count(), 10);
933
934        let a = DictionaryArray::new(
935            Int32Array::from(vec![1, 2, 3, 4, 5, 6]),
936            Arc::new(Int32Array::new_null(10)),
937        );
938        let r = eq(&a, &a).unwrap();
939        assert_eq!(r.null_count(), 6);
940
941        let scalar =
942            DictionaryArray::new(Int32Array::new_null(1), Arc::new(Int32Array::new_null(0)));
943        let r = eq(&a, &Scalar::new(&scalar)).unwrap();
944        assert_eq!(r.null_count(), 6);
945
946        let scalar =
947            DictionaryArray::new(Int32Array::new_null(1), Arc::new(Int32Array::new_null(0)));
948        let r = eq(&Scalar::new(&scalar), &Scalar::new(&scalar)).unwrap();
949        assert_eq!(r.null_count(), 1);
950
951        let a = DictionaryArray::new(
952            Int32Array::from(vec![0, 1, 2]),
953            Arc::new(Int32Array::from(vec![3, 2, 1])),
954        );
955        let r = eq(&a, &Scalar::new(&scalar)).unwrap();
956        assert_eq!(r.null_count(), 3);
957    }
958
959    #[test]
960    fn is_distinct_from_non_nulls() {
961        let left_int_array = Int32Array::from(vec![0, 1, 2, 3, 4]);
962        let right_int_array = Int32Array::from(vec![4, 3, 2, 1, 0]);
963
964        assert_eq!(
965            BooleanArray::from(vec![true, true, false, true, true,]),
966            distinct(&left_int_array, &right_int_array).unwrap()
967        );
968        assert_eq!(
969            BooleanArray::from(vec![false, false, true, false, false,]),
970            not_distinct(&left_int_array, &right_int_array).unwrap()
971        );
972    }
973
974    #[test]
975    fn is_distinct_from_nulls() {
976        // [0, 0, NULL, 0, 0, 0]
977        let left_int_array = Int32Array::new(
978            vec![0, 0, 1, 3, 0, 0].into(),
979            Some(NullBuffer::from(vec![true, true, false, true, true, true])),
980        );
981        // [0, NULL, NULL, NULL, 0, NULL]
982        let right_int_array = Int32Array::new(
983            vec![0; 6].into(),
984            Some(NullBuffer::from(vec![
985                true, false, false, false, true, false,
986            ])),
987        );
988
989        assert_eq!(
990            BooleanArray::from(vec![false, true, false, true, false, true,]),
991            distinct(&left_int_array, &right_int_array).unwrap()
992        );
993
994        assert_eq!(
995            BooleanArray::from(vec![true, false, true, false, true, false,]),
996            not_distinct(&left_int_array, &right_int_array).unwrap()
997        );
998    }
999
1000    #[test]
1001    fn test_distinct_scalar() {
1002        let a = Int32Array::new_scalar(12);
1003        let b = Int32Array::new_scalar(12);
1004        assert!(!distinct(&a, &b).unwrap().value(0));
1005        assert!(not_distinct(&a, &b).unwrap().value(0));
1006
1007        let a = Int32Array::new_scalar(12);
1008        let b = Int32Array::new_null(1);
1009        assert!(distinct(&a, &b).unwrap().value(0));
1010        assert!(!not_distinct(&a, &b).unwrap().value(0));
1011        assert!(distinct(&b, &a).unwrap().value(0));
1012        assert!(!not_distinct(&b, &a).unwrap().value(0));
1013
1014        let b = Scalar::new(b);
1015        assert!(distinct(&a, &b).unwrap().value(0));
1016        assert!(!not_distinct(&a, &b).unwrap().value(0));
1017
1018        assert!(!distinct(&b, &b).unwrap().value(0));
1019        assert!(not_distinct(&b, &b).unwrap().value(0));
1020
1021        let a = Int32Array::new(
1022            vec![0, 1, 2, 3].into(),
1023            Some(vec![false, false, true, true].into()),
1024        );
1025        let expected = BooleanArray::from(vec![false, false, true, true]);
1026        assert_eq!(distinct(&a, &b).unwrap(), expected);
1027        assert_eq!(distinct(&b, &a).unwrap(), expected);
1028
1029        let expected = BooleanArray::from(vec![true, true, false, false]);
1030        assert_eq!(not_distinct(&a, &b).unwrap(), expected);
1031        assert_eq!(not_distinct(&b, &a).unwrap(), expected);
1032
1033        let b = Int32Array::new_scalar(1);
1034        let expected = BooleanArray::from(vec![true; 4]);
1035        assert_eq!(distinct(&a, &b).unwrap(), expected);
1036        assert_eq!(distinct(&b, &a).unwrap(), expected);
1037        let expected = BooleanArray::from(vec![false; 4]);
1038        assert_eq!(not_distinct(&a, &b).unwrap(), expected);
1039        assert_eq!(not_distinct(&b, &a).unwrap(), expected);
1040
1041        let b = Int32Array::new_scalar(3);
1042        let expected = BooleanArray::from(vec![true, true, true, false]);
1043        assert_eq!(distinct(&a, &b).unwrap(), expected);
1044        assert_eq!(distinct(&b, &a).unwrap(), expected);
1045        let expected = BooleanArray::from(vec![false, false, false, true]);
1046        assert_eq!(not_distinct(&a, &b).unwrap(), expected);
1047        assert_eq!(not_distinct(&b, &a).unwrap(), expected);
1048    }
1049
1050    #[test]
1051    fn test_supports_distinct() {
1052        use arrow_schema::{DataType::*, Field};
1053
1054        assert!(supports_distinct(&Int32));
1055        assert!(supports_distinct(&Float64));
1056        assert!(supports_distinct(&Utf8));
1057        assert!(supports_distinct(&Boolean));
1058
1059        // One level of dictionary unwrap is supported.
1060        assert!(supports_distinct(&Dictionary(
1061            Box::new(Int16),
1062            Box::new(Utf8),
1063        )));
1064
1065        // REE, nested dictionary, and complex types are not supported.
1066        assert!(!supports_distinct(&RunEndEncoded(
1067            Arc::new(Field::new("run_ends", Int32, false)),
1068            Arc::new(Field::new("values", Int32, true)),
1069        )));
1070        assert!(!supports_distinct(&Dictionary(
1071            Box::new(Int16),
1072            Box::new(Dictionary(Box::new(Int8), Box::new(Utf8))),
1073        )));
1074        assert!(!supports_distinct(&List(Arc::new(Field::new(
1075            "item", Int32, true,
1076        )))));
1077        assert!(!supports_distinct(&Struct(
1078            vec![Field::new("a", Int32, true)].into()
1079        )));
1080    }
1081
1082    #[test]
1083    fn test_scalar_negation() {
1084        let a = Int32Array::new_scalar(54);
1085        let b = Int32Array::new_scalar(54);
1086        let r = eq(&a, &b).unwrap();
1087        assert!(r.value(0));
1088
1089        let r = neq(&a, &b).unwrap();
1090        assert!(!r.value(0))
1091    }
1092
1093    #[test]
1094    fn test_scalar_empty() {
1095        let a = Int32Array::new_null(0);
1096        let b = Int32Array::new_scalar(23);
1097        let r = eq(&a, &b).unwrap();
1098        assert_eq!(r.len(), 0);
1099        let r = eq(&b, &a).unwrap();
1100        assert_eq!(r.len(), 0);
1101    }
1102
1103    #[test]
1104    fn test_dictionary_nulls() {
1105        let values = StringArray::from(vec![Some("us-west"), Some("us-east")]);
1106        let nulls = NullBuffer::from(vec![false, true, true]);
1107
1108        let key_values = vec![100i32, 1i32, 0i32].into();
1109        let keys = Int32Array::new(key_values, Some(nulls));
1110        let col = DictionaryArray::try_new(keys, Arc::new(values)).unwrap();
1111
1112        neq(&col.slice(0, col.len() - 1), &col.slice(1, col.len() - 1)).unwrap();
1113    }
1114
1115    #[test]
1116    fn test_string_view_mixed_lt() {
1117        let a = arrow_array::StringViewArray::from(vec![
1118            Some("apple"),
1119            Some("apple"),
1120            Some("apple_long_string"),
1121        ]);
1122        let b = arrow_array::StringViewArray::from(vec![
1123            Some("apple_long_string"),
1124            Some("appl"),
1125            Some("apple"),
1126        ]);
1127        // "apple" < "apple_long_string" -> true
1128        // "apple" < "appl" -> false
1129        // "apple_long_string" < "apple" -> false
1130        assert_eq!(
1131            lt(&a, &b).unwrap(),
1132            BooleanArray::from(vec![true, false, false])
1133        );
1134    }
1135
1136    #[test]
1137    fn test_string_view_eq() {
1138        let a = arrow_array::StringViewArray::from(vec![
1139            Some("hello"),
1140            Some("world"),
1141            None,
1142            Some("very long string exceeding 12 bytes"),
1143        ]);
1144        let b = arrow_array::StringViewArray::from(vec![
1145            Some("hello"),
1146            Some("world"),
1147            None,
1148            Some("very long string exceeding 12 bytes"),
1149        ]);
1150        assert_eq!(
1151            eq(&a, &b).unwrap(),
1152            BooleanArray::from(vec![Some(true), Some(true), None, Some(true)])
1153        );
1154
1155        let c = arrow_array::StringViewArray::from(vec![
1156            Some("hello"),
1157            Some("world!"),
1158            None,
1159            Some("very long string exceeding 12 bytes!"),
1160        ]);
1161        assert_eq!(
1162            eq(&a, &c).unwrap(),
1163            BooleanArray::from(vec![Some(true), Some(false), None, Some(false)])
1164        );
1165    }
1166
1167    #[test]
1168    fn test_string_view_lt() {
1169        let a = arrow_array::StringViewArray::from(vec![
1170            Some("apple"),
1171            Some("banana"),
1172            Some("very long apple exceeding 12 bytes"),
1173            Some("very long banana exceeding 12 bytes"),
1174        ]);
1175        let b = arrow_array::StringViewArray::from(vec![
1176            Some("banana"),
1177            Some("apple"),
1178            Some("very long banana exceeding 12 bytes"),
1179            Some("very long apple exceeding 12 bytes"),
1180        ]);
1181        assert_eq!(
1182            lt(&a, &b).unwrap(),
1183            BooleanArray::from(vec![true, false, true, false])
1184        );
1185    }
1186
1187    #[test]
1188    fn test_string_view_eq_prefix_mismatch() {
1189        // Prefix mismatch should short-circuit equality for long values.
1190        let a =
1191            arrow_array::StringViewArray::from(vec![Some("very long apple exceeding 12 bytes")]);
1192        let b =
1193            arrow_array::StringViewArray::from(vec![Some("very long banana exceeding 12 bytes")]);
1194        assert_eq!(eq(&a, &b).unwrap(), BooleanArray::from(vec![Some(false)]));
1195    }
1196
1197    #[test]
1198    fn test_string_view_lt_prefix_mismatch() {
1199        // Prefix mismatch should decide ordering without full compare for long values.
1200        let a =
1201            arrow_array::StringViewArray::from(vec![Some("apple long string exceeding 12 bytes")]);
1202        let b =
1203            arrow_array::StringViewArray::from(vec![Some("banana long string exceeding 12 bytes")]);
1204        assert_eq!(lt(&a, &b).unwrap(), BooleanArray::from(vec![true]));
1205    }
1206
1207    #[test]
1208    fn test_string_view_eq_inline_fast_path() {
1209        // Inline-only arrays should compare by view equality fast path.
1210        let a = arrow_array::StringViewArray::from(vec![Some("ab")]);
1211        let b = arrow_array::StringViewArray::from(vec![Some("ab")]);
1212        assert!(!has_buffers(&a));
1213        assert!(!has_buffers(&b));
1214        assert_eq!(eq(&a, &b).unwrap(), BooleanArray::from(vec![Some(true)]));
1215    }
1216
1217    #[test]
1218    fn test_string_view_eq_inline_prefix_mismatch_with_buffers() {
1219        // Non-empty buffers force the prefix mismatch branch for inline values.
1220        let a = arrow_array::StringViewArray::from(vec![
1221            Some("ab"),
1222            Some("long string to allocate buffers"),
1223        ]);
1224        let b = arrow_array::StringViewArray::from(vec![
1225            Some("ac"),
1226            Some("long string to allocate buffers"),
1227        ]);
1228        assert!(has_buffers(&a));
1229        assert!(has_buffers(&b));
1230        assert_eq!(
1231            eq(&a, &b).unwrap(),
1232            BooleanArray::from(vec![Some(false), Some(true)])
1233        );
1234    }
1235
1236    #[test]
1237    fn test_string_view_eq_empty_len_branch() {
1238        // Reach the zero-length branch by bypassing the inline fast path with a dummy buffer.
1239        let raw_a = 0u128;
1240        let raw_b = 1u128 << 96;
1241        let views_a = ScalarBuffer::from(vec![raw_a]);
1242        let views_b = ScalarBuffer::from(vec![raw_b]);
1243        let buffers: Arc<[Buffer]> = Arc::from([Buffer::from_slice_ref([0u8])]);
1244        let a =
1245            unsafe { arrow_array::StringViewArray::new_unchecked(views_a, buffers.clone(), None) };
1246        let b = unsafe { arrow_array::StringViewArray::new_unchecked(views_b, buffers, None) };
1247        assert!(has_buffers(&a));
1248        assert!(has_buffers(&b));
1249        assert!(<&arrow_array::StringViewArray as ArrayOrd>::is_eq(
1250            (&a, 0),
1251            (&b, 0)
1252        ));
1253    }
1254
1255    #[test]
1256    fn test_string_view_long_prefix_mismatch_array_ord() {
1257        // Long strings with differing prefixes should short-circuit on prefix ordering.
1258        let a =
1259            arrow_array::StringViewArray::from(vec![Some("apple long string exceeding 12 bytes")]);
1260        let b =
1261            arrow_array::StringViewArray::from(vec![Some("banana long string exceeding 12 bytes")]);
1262        assert!(has_buffers(&a));
1263        assert!(has_buffers(&b));
1264        assert!(<&arrow_array::StringViewArray as ArrayOrd>::is_lt(
1265            (&a, 0),
1266            (&b, 0)
1267        ));
1268    }
1269
1270    #[test]
1271    fn test_string_view_inline_mismatch_array_ord() {
1272        // Long strings with differing prefixes should short-circuit on prefix ordering.
1273        let a = arrow_array::StringViewArray::from(vec![Some("ap")]);
1274        let b = arrow_array::StringViewArray::from(vec![Some("ba")]);
1275        assert!(!has_buffers(&a));
1276        assert!(!has_buffers(&b));
1277        assert!(<&arrow_array::StringViewArray as ArrayOrd>::is_lt(
1278            (&a, 0),
1279            (&b, 0)
1280        ));
1281    }
1282    #[test]
1283    fn test_compare_byte_view_inline_fast_path() {
1284        // Inline-only views should compare via inline key in compare_byte_view.
1285        let a = arrow_array::StringViewArray::from(vec![Some("ab")]);
1286        let b = arrow_array::StringViewArray::from(vec![Some("ac")]);
1287        assert!(!has_buffers(&a));
1288        assert!(!has_buffers(&b));
1289        assert_eq!(compare_byte_view(&a, 0, &b, 0), Ordering::Less);
1290    }
1291
1292    fn has_buffers<T: ByteViewType>(array: &GenericByteViewArray<T>) -> bool {
1293        !array.data_buffers().is_empty()
1294    }
1295
1296    fn ree_str(runs: &[(Option<&str>, i32)]) -> arrow_array::RunArray<Int32Type> {
1297        let mut ends = Vec::new();
1298        let mut vals = Vec::new();
1299        let mut end = 0i32;
1300        for &(v, n) in runs {
1301            end += n;
1302            ends.push(end);
1303            vals.push(v);
1304        }
1305        arrow_array::RunArray::try_new(&Int32Array::from(ends), &StringArray::from(vals)).unwrap()
1306    }
1307
1308    #[test]
1309    fn test_ree_scalar() {
1310        let a = ree_str(&[(Some("a"), 3), (Some("b"), 2)]);
1311
1312        let s = Scalar::new(StringArray::from(vec!["b"]));
1313        assert_eq!(
1314            eq(&a, &s).unwrap(),
1315            BooleanArray::from(vec![false, false, false, true, true])
1316        );
1317        assert_eq!(
1318            neq(&a, &s).unwrap(),
1319            BooleanArray::from(vec![true, true, true, false, false])
1320        );
1321        assert_eq!(
1322            lt(&a, &s).unwrap(),
1323            BooleanArray::from(vec![true, true, true, false, false])
1324        );
1325        assert_eq!(lt_eq(&a, &s).unwrap(), BooleanArray::from(vec![true; 5]));
1326        assert_eq!(gt(&a, &s).unwrap(), BooleanArray::from(vec![false; 5]));
1327        assert_eq!(
1328            gt_eq(&a, &s).unwrap(),
1329            BooleanArray::from(vec![false, false, false, true, true])
1330        );
1331
1332        // Scalar on left side
1333        let scalar = Scalar::new(ree_str(&[(Some("a"), 1)]));
1334        assert_eq!(
1335            eq(&scalar, &a).unwrap(),
1336            BooleanArray::from(vec![true, true, true, false, false])
1337        );
1338        assert_eq!(
1339            lt_eq(&scalar, &a).unwrap(),
1340            BooleanArray::from(vec![true; 5])
1341        );
1342
1343        // REE-wrapped scalar (DataFusion's ScalarValue::RunEndEncoded)
1344        assert_eq!(
1345            eq(&a, &Scalar::new(ree_str(&[(Some("a"), 1)]))).unwrap(),
1346            BooleanArray::from(vec![true, true, true, false, false]),
1347        );
1348
1349        // Single run
1350        let a = ree_str(&[(Some("x"), 100)]);
1351        let r = eq(&a, &Scalar::new(StringArray::from(vec!["x"]))).unwrap();
1352        assert_eq!(r.true_count(), 100);
1353    }
1354
1355    #[test]
1356    fn test_ree_ree() {
1357        // Different run boundaries, all ops.
1358        let a = ree_str(&[(Some("a"), 3), (Some("b"), 2)]);
1359        let b = ree_str(&[(Some("a"), 2), (Some("b"), 3)]);
1360        // a=[a,a,a,b,b] vs b=[a,a,b,b,b]
1361        assert_eq!(
1362            eq(&a, &b).unwrap(),
1363            BooleanArray::from(vec![true, true, false, true, true])
1364        );
1365        assert_eq!(
1366            neq(&a, &b).unwrap(),
1367            BooleanArray::from(vec![false, false, true, false, false])
1368        );
1369        assert_eq!(
1370            lt(&a, &b).unwrap(),
1371            BooleanArray::from(vec![false, false, true, false, false])
1372        );
1373        assert_eq!(
1374            gt_eq(&a, &b).unwrap(),
1375            BooleanArray::from(vec![true, true, false, true, true])
1376        );
1377    }
1378
1379    #[test]
1380    fn test_ree_sliced() {
1381        // Scalar with sliced REE
1382        let a = ree_str(&[(Some("a"), 3), (Some("b"), 2)]).slice(2, 3);
1383        let s = Scalar::new(StringArray::from(vec!["b"]));
1384        assert_eq!(
1385            eq(&a, &s).unwrap(),
1386            BooleanArray::from(vec![false, true, true])
1387        );
1388
1389        // Both sides sliced, REE vs REE
1390        let a = ree_str(&[(Some("a"), 3), (Some("b"), 2)]).slice(1, 4);
1391        let b = ree_str(&[(Some("a"), 2), (Some("b"), 3)]).slice(1, 4);
1392        assert_eq!(
1393            eq(&a, &b).unwrap(),
1394            BooleanArray::from(vec![true, false, true, true])
1395        );
1396    }
1397
1398    #[test]
1399    fn test_ree_sliced_different_offsets() {
1400        // left expands to ["a", "a", "b", "b"]
1401        let a = ree_str(&[(Some("a"), 3), (Some("b"), 2)]).slice(1, 4);
1402        // right expands to ["a", "a", "b", "b"]
1403        let b = ree_str(&[(Some("a"), 2), (Some("b"), 3)]).slice(0, 4);
1404        assert_eq!(
1405            eq(&a, &b).unwrap(),
1406            BooleanArray::from(vec![true, true, true, true])
1407        );
1408    }
1409
1410    #[test]
1411    fn test_ree_nullable() {
1412        let a = ree_str(&[(Some("a"), 2), (None, 1), (Some("b"), 2)]);
1413
1414        // Scalar: null-aware ops
1415        let s = Scalar::new(StringArray::from(vec!["a"]));
1416        assert_eq!(
1417            not_distinct(&a, &s).unwrap(),
1418            BooleanArray::from(vec![true, true, false, false, false])
1419        );
1420        assert_eq!(
1421            distinct(&a, &s).unwrap(),
1422            BooleanArray::from(vec![false, false, true, true, true])
1423        );
1424
1425        // REE vs REE with nulls
1426        let b = ree_str(&[(Some("a"), 3), (None, 2)]);
1427        assert_eq!(
1428            eq(&a, &b).unwrap(),
1429            BooleanArray::from(vec![Some(true), Some(true), None, None, None])
1430        );
1431    }
1432
1433    #[test]
1434    fn test_ree_mixed() {
1435        let a = ree_str(&[(Some("a"), 3), (Some("b"), 2)]);
1436
1437        // REE vs plain array
1438        let b = StringArray::from(vec!["a", "a", "b", "b", "b"]);
1439        assert_eq!(
1440            eq(&a, &b).unwrap(),
1441            BooleanArray::from(vec![true, true, false, true, true])
1442        );
1443
1444        // REE wrapping a DictionaryArray
1445        let dict = DictionaryArray::new(
1446            Int32Array::from(vec![1, 0]),
1447            Arc::new(StringArray::from(vec!["x", "y"])),
1448        );
1449        let ree_dict =
1450            arrow_array::RunArray::try_new(&Int32Array::from(vec![3, 5]), &dict).unwrap();
1451        let s = Scalar::new(StringArray::from(vec!["y"]));
1452        assert_eq!(
1453            eq(&ree_dict, &s).unwrap(),
1454            BooleanArray::from(vec![true, true, true, false, false])
1455        );
1456
1457        // Numeric REE (Int32 values)
1458        let ree_int = arrow_array::RunArray::try_new(
1459            &Int32Array::from(vec![3, 5]),
1460            &Int32Array::from(vec![10, 20]),
1461        )
1462        .unwrap();
1463        assert_eq!(
1464            eq(&ree_int, &Scalar::new(Int32Array::from(vec![10]))).unwrap(),
1465            BooleanArray::from(vec![true, true, true, false, false])
1466        );
1467
1468        // Empty REE
1469        let empty_a = ree_str(&[(Some("a"), 1)]).slice(0, 0);
1470        let empty_b = ree_str(&[(Some("b"), 1)]).slice(0, 0);
1471        assert_eq!(eq(&empty_a, &empty_b).unwrap().len(), 0);
1472    }
1473
1474    #[test]
1475    fn test_compare_byte_view() {
1476        let a = arrow_array::StringViewArray::from(vec![
1477            Some("apple"),
1478            Some("banana"),
1479            Some("very long apple exceeding 12 bytes"),
1480            Some("very long banana exceeding 12 bytes"),
1481        ]);
1482        let b = arrow_array::StringViewArray::from(vec![
1483            Some("apple"),
1484            Some("apple"),
1485            Some("very long apple exceeding 12 bytes"),
1486            Some("very long apple exceeding 12 bytes"),
1487        ]);
1488
1489        assert_eq!(compare_byte_view(&a, 0, &b, 0), Ordering::Equal);
1490        assert_eq!(compare_byte_view(&a, 1, &b, 1), Ordering::Greater);
1491        assert_eq!(compare_byte_view(&a, 2, &b, 2), Ordering::Equal);
1492        assert_eq!(compare_byte_view(&a, 3, &b, 3), Ordering::Greater);
1493    }
1494}