Skip to main content

arrow_data/equal/
run.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::data::ArrayData;
19use arrow_buffer::ArrowNativeType;
20use arrow_buffer::RunEndBuffer;
21use arrow_schema::DataType;
22use num_traits::ToPrimitive;
23
24use super::equal_range;
25
26/// Returns true if the two `RunEndEncoded` arrays are equal.
27///
28/// This provides a specialized implementation of equality for REE arrays that
29/// handles differences in run-encoding by iterating through the logical range.
30pub(super) fn run_equal(
31    lhs: &ArrayData,
32    rhs: &ArrayData,
33    lhs_start: usize,
34    rhs_start: usize,
35    len: usize,
36) -> bool {
37    let lhs_index_type = match lhs.data_type() {
38        DataType::RunEndEncoded(f, _) => f.data_type(),
39        _ => unreachable!(),
40    };
41
42    match lhs_index_type {
43        DataType::Int16 => run_equal_inner::<i16>(lhs, rhs, lhs_start, rhs_start, len),
44        DataType::Int32 => run_equal_inner::<i32>(lhs, rhs, lhs_start, rhs_start, len),
45        DataType::Int64 => run_equal_inner::<i64>(lhs, rhs, lhs_start, rhs_start, len),
46        _ => unreachable!(),
47    }
48}
49
50struct RunArrayData<'a, T: ArrowNativeType> {
51    run_ends: RunEndBuffer<T>,
52    values: &'a ArrayData,
53}
54
55impl<'a, T: ArrowNativeType + ToPrimitive> RunArrayData<'a, T> {
56    fn new(data: &'a ArrayData, start: usize, len: usize) -> Self {
57        debug_assert!(
58            data.child_data().len() == 2,
59            "RunEndEncoded arrays are guaranteed to have 2 children [run_ends, values]"
60        );
61        let run_ends_data = &data.child_data()[0];
62        let raw_run_ends_buffer = &run_ends_data.buffers()[0];
63        // SAFETY: we're reconstructing RunEndBuffer from a known valid RunArray
64        let run_ends = unsafe {
65            RunEndBuffer::<T>::new_unchecked(
66                raw_run_ends_buffer.clone().into(),
67                run_ends_data.offset() + data.offset() + start,
68                len,
69            )
70        };
71
72        let values = &data.child_data()[1];
73        Self { run_ends, values }
74    }
75
76    fn run_end(&self, index: usize) -> usize {
77        self.run_ends.values()[index].as_usize()
78    }
79
80    fn get_start_end_physical_indices(&self) -> (usize, usize) {
81        let start = self.run_ends.get_start_physical_index();
82        let end = self.run_ends.get_end_physical_index();
83        (start, end)
84    }
85}
86
87fn run_equal_inner<T: ArrowNativeType + ToPrimitive>(
88    lhs: &ArrayData,
89    rhs: &ArrayData,
90    lhs_start: usize,
91    rhs_start: usize,
92    len: usize,
93) -> bool {
94    if len == 0 {
95        return true;
96    }
97
98    let l_array = RunArrayData::<T>::new(lhs, lhs_start, len);
99    let r_array = RunArrayData::<T>::new(rhs, rhs_start, len);
100
101    let (l_start_phys, l_end_phys) = l_array.get_start_end_physical_indices();
102    let (r_start_phys, r_end_phys) = r_array.get_start_end_physical_indices();
103    let l_runs = l_end_phys - l_start_phys + 1;
104    let r_runs = r_end_phys - r_start_phys + 1;
105
106    if l_runs == r_runs {
107        // When the boundaries align perfectly, we don't need the complex stepping loop that calculates overlaps.
108        // Instead, we can simply treat the underlying values arrays as if they were standard primitive arrays.
109        let l_iter = l_array.run_ends.sliced_values();
110        let r_iter = r_array.run_ends.sliced_values();
111        let physical_match = l_iter.zip(r_iter).all(|(l_re, r_re)| l_re == r_re);
112
113        if physical_match {
114            // Both arrays are partitioned identically.
115            // We can just verify if the physical values in those partitions match.
116            return equal_range(
117                l_array.values,
118                r_array.values,
119                l_start_phys,
120                r_start_phys,
121                l_runs,
122            );
123        }
124    }
125
126    let mut l_phys = l_start_phys;
127    let mut r_phys = r_start_phys;
128    let mut processed = 0;
129    while processed < len {
130        if !equal_range(l_array.values, r_array.values, l_phys, r_phys, 1) {
131            return false;
132        }
133
134        let l_run_end = l_array.run_end(l_phys);
135        let r_run_end = r_array.run_end(r_phys);
136
137        //Calculate how many more logical elements are in the current run of the left and right array
138        let l_remaining_in_run = l_run_end - (l_array.run_ends.offset() + processed);
139        let r_remaining_in_run = r_run_end - (r_array.run_ends.offset() + processed);
140
141        //Calculate how many elements are left to compare in the requested range
142        let remaining_in_range = len - processed;
143
144        //Find the smallest of these three to determine our step size
145        //The goal is to move the logical cursor (processed) forward as far as possible without:
146        //Crossing the boundary of a run in the left or right array (where the value might change).
147        //Going past the total length we were asked to compare.
148        let step = l_remaining_in_run
149            .min(r_remaining_in_run)
150            .min(remaining_in_range);
151        processed += step;
152
153        if l_array.run_ends.offset() + processed == l_run_end {
154            l_phys += 1;
155        }
156        if r_array.run_ends.offset() + processed == r_run_end {
157            r_phys += 1;
158        }
159    }
160
161    true
162}