1use crate::data::ArrayData;
19use arrow_buffer::ArrowNativeType;
20use arrow_buffer::RunEndBuffer;
21use arrow_schema::DataType;
22use num_traits::ToPrimitive;
23
24use super::equal_range;
25
26pub(super) fn run_equal(
31 lhs: &ArrayData,
32 rhs: &ArrayData,
33 lhs_start: usize,
34 rhs_start: usize,
35 len: usize,
36) -> bool {
37 let lhs_index_type = match lhs.data_type() {
38 DataType::RunEndEncoded(f, _) => f.data_type(),
39 _ => unreachable!(),
40 };
41
42 match lhs_index_type {
43 DataType::Int16 => run_equal_inner::<i16>(lhs, rhs, lhs_start, rhs_start, len),
44 DataType::Int32 => run_equal_inner::<i32>(lhs, rhs, lhs_start, rhs_start, len),
45 DataType::Int64 => run_equal_inner::<i64>(lhs, rhs, lhs_start, rhs_start, len),
46 _ => unreachable!(),
47 }
48}
49
50struct RunArrayData<'a, T: ArrowNativeType> {
51 run_ends: RunEndBuffer<T>,
52 values: &'a ArrayData,
53}
54
55impl<'a, T: ArrowNativeType + ToPrimitive> RunArrayData<'a, T> {
56 fn new(data: &'a ArrayData, start: usize, len: usize) -> Self {
57 debug_assert!(
58 data.child_data().len() == 2,
59 "RunEndEncoded arrays are guaranteed to have 2 children [run_ends, values]"
60 );
61 let run_ends_data = &data.child_data()[0];
62 let raw_run_ends_buffer = &run_ends_data.buffers()[0];
63 let run_ends = unsafe {
65 RunEndBuffer::<T>::new_unchecked(
66 raw_run_ends_buffer.clone().into(),
67 run_ends_data.offset() + data.offset() + start,
68 len,
69 )
70 };
71
72 let values = &data.child_data()[1];
73 Self { run_ends, values }
74 }
75
76 fn run_end(&self, index: usize) -> usize {
77 self.run_ends.values()[index].as_usize()
78 }
79
80 fn get_start_end_physical_indices(&self) -> (usize, usize) {
81 let start = self.run_ends.get_start_physical_index();
82 let end = self.run_ends.get_end_physical_index();
83 (start, end)
84 }
85}
86
87fn run_equal_inner<T: ArrowNativeType + ToPrimitive>(
88 lhs: &ArrayData,
89 rhs: &ArrayData,
90 lhs_start: usize,
91 rhs_start: usize,
92 len: usize,
93) -> bool {
94 if len == 0 {
95 return true;
96 }
97
98 let l_array = RunArrayData::<T>::new(lhs, lhs_start, len);
99 let r_array = RunArrayData::<T>::new(rhs, rhs_start, len);
100
101 let (l_start_phys, l_end_phys) = l_array.get_start_end_physical_indices();
102 let (r_start_phys, r_end_phys) = r_array.get_start_end_physical_indices();
103 let l_runs = l_end_phys - l_start_phys + 1;
104 let r_runs = r_end_phys - r_start_phys + 1;
105
106 if l_runs == r_runs {
107 let l_iter = l_array.run_ends.sliced_values();
110 let r_iter = r_array.run_ends.sliced_values();
111 let physical_match = l_iter.zip(r_iter).all(|(l_re, r_re)| l_re == r_re);
112
113 if physical_match {
114 return equal_range(
117 l_array.values,
118 r_array.values,
119 l_start_phys,
120 r_start_phys,
121 l_runs,
122 );
123 }
124 }
125
126 let mut l_phys = l_start_phys;
127 let mut r_phys = r_start_phys;
128 let mut processed = 0;
129 while processed < len {
130 if !equal_range(l_array.values, r_array.values, l_phys, r_phys, 1) {
131 return false;
132 }
133
134 let l_run_end = l_array.run_end(l_phys);
135 let r_run_end = r_array.run_end(r_phys);
136
137 let l_remaining_in_run = l_run_end - (l_array.run_ends.offset() + processed);
139 let r_remaining_in_run = r_run_end - (r_array.run_ends.offset() + processed);
140
141 let remaining_in_range = len - processed;
143
144 let step = l_remaining_in_run
149 .min(r_remaining_in_run)
150 .min(remaining_in_range);
151 processed += step;
152
153 if l_array.run_ends.offset() + processed == l_run_end {
154 l_phys += 1;
155 }
156 if r_array.run_ends.offset() + processed == r_run_end {
157 r_phys += 1;
158 }
159 }
160
161 true
162}