arrow_buffer/buffer/
run.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::ArrowNativeType;
19use crate::buffer::ScalarBuffer;
20
21/// A buffer of monotonically increasing, positive integers used to store run-ends.
22///
23/// Used to compactly represent runs of the same value. Values being represented
24/// are stored in a separate buffer from this struct. See [`RunArray`] for an example
25/// of how this is used with a companion array to represent the values.
26///
27/// # Logical vs Physical
28///
29/// Physically, each value in the `run_ends` buffer is the cumulative length of
30/// all runs in the logical representation, up to that physical index. Consider
31/// the following example:
32///
33/// ```text
34///           physical                        logical
35///     ┌─────────┬─────────┐           ┌─────────┬─────────┐
36///     │    3    │    0    │ ◄──────┬─ │    A    │    0    │
37///     ├─────────┼─────────┤        │  ├─────────┼─────────┤
38///     │    4    │    1    │ ◄────┐ ├─ │    A    │    1    │
39///     ├─────────┼─────────┤      │ │  ├─────────┼─────────┤
40///     │    6    │    2    │ ◄──┐ │ └─ │    A    │    2    │
41///     └─────────┴─────────┘    │ │    ├─────────┼─────────┤
42///      run-ends    index       │ └─── │    B    │    3    │
43///                              │      ├─────────┼─────────┤
44///      logical_offset = 0      ├───── │    C    │    4    │
45///      logical_length = 6      │      ├─────────┼─────────┤
46///                              └───── │    C    │    5    │
47///                                     └─────────┴─────────┘
48///                                       values     index
49/// ```
50///
51/// A [`RunEndBuffer`] is physically the buffer and offset with length on the left.
52/// In this case, the offset and length represent the whole buffer, so it is essentially
53/// unsliced. See the section below on slicing for more details on how this buffer
54/// handles slicing.
55///
56/// This means that multiple logical values are represented in the same physical index,
57/// and multiple logical indices map to the same physical index. The [`RunEndBuffer`]
58/// containing `[3, 4, 6]` is essentially the physical indices `[0, 0, 0, 1, 2, 2]`,
59/// and having a separately stored buffer of values such as `[A, B, C]` can turn
60/// this into a representation of `[A, A, A, B, C, C]`.
61///
62/// # Slicing
63///
64/// In order to provide zero-copy slicing, this struct stores a separate **logical**
65/// offset and length. Consider the following example:
66///
67/// ```text
68///           physical                        logical
69///     ┌─────────┬─────────┐           ┌ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ┐
70///     │    3    │    0    │ ◄──────┐       A         0
71///     ├─────────┼─────────┤        │  ├── ─ ─ ─ ┼ ─ ─ ─ ─ ┤
72///     │    4    │    1    │ ◄────┐ │       A         1
73///     ├─────────┼─────────┤      │ │  ├─────────┼─────────┤
74///     │    6    │    2    │ ◄──┐ │ └─ │    A    │    2    │◄─── logical_offset
75///     └─────────┴─────────┘    │ │    ├─────────┼─────────┤
76///      run-ends    index       │ └─── │    B    │    3    │
77///                              │      ├─────────┼─────────┤
78///      logical_offset = 2      └───── │    C    │    4    │
79///      logical_length = 3             ├─────────┼─────────┤
80///                                          C         5     ◄─── logical_offset + logical_length
81///                                     └ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ┘
82///                                       values     index
83/// ```
84///
85/// The physical `run_ends` [`ScalarBuffer`] remains unchanged, in order to facilitate
86/// zero-copy. However, we now offset into the **logical** representation with an
87/// accompanying length. This allows us to represent values `[A, B, C]` using physical
88/// indices `0, 1, 2` with the same underlying physical buffer, at the cost of two
89/// extra `usize`s to represent the logical slice that was taken.
90///
91/// (A [`RunEndBuffer`] is considered unsliced when `logical_offset` is `0` and
92/// `logical_length` is equal to the last value in `run_ends`)
93///
94/// [`RunArray`]: https://docs.rs/arrow/latest/arrow/array/struct.RunArray.html
95/// [Run-End encoded layout]: https://arrow.apache.org/docs/format/Columnar.html#run-end-encoded-layout
96#[derive(Debug, Clone)]
97pub struct RunEndBuffer<E: ArrowNativeType> {
98    run_ends: ScalarBuffer<E>,
99    logical_length: usize,
100    logical_offset: usize,
101}
102
103impl<E> RunEndBuffer<E>
104where
105    E: ArrowNativeType,
106{
107    /// Create a new [`RunEndBuffer`] from a [`ScalarBuffer`], `logical_offset`
108    /// and `logical_length`.
109    ///
110    /// # Panics
111    ///
112    /// - `run_ends` does not contain strictly increasing values greater than zero
113    /// - The last value of `run_ends` is less than `logical_offset + logical_length`
114    pub fn new(run_ends: ScalarBuffer<E>, logical_offset: usize, logical_length: usize) -> Self {
115        assert!(
116            run_ends.windows(2).all(|w| w[0] < w[1]),
117            "run-ends not strictly increasing"
118        );
119
120        if logical_length != 0 {
121            assert!(!run_ends.is_empty(), "non-empty slice but empty run-ends");
122            let end = E::from_usize(logical_offset.saturating_add(logical_length)).unwrap();
123            assert!(
124                *run_ends.first().unwrap() > E::usize_as(0),
125                "run-ends not greater than 0"
126            );
127            assert!(
128                *run_ends.last().unwrap() >= end,
129                "slice beyond bounds of run-ends"
130            );
131        }
132
133        Self {
134            run_ends,
135            logical_offset,
136            logical_length,
137        }
138    }
139
140    /// Create a new [`RunEndBuffer`] from a [`ScalarBuffer`], `logical_offset`
141    /// and `logical_length`.
142    ///
143    /// # Safety
144    ///
145    /// - `run_ends` must contain strictly increasing values greater than zero
146    /// - The last value of `run_ends` must be greater than or equal to `logical_offset + logical_len`
147    pub unsafe fn new_unchecked(
148        run_ends: ScalarBuffer<E>,
149        logical_offset: usize,
150        logical_length: usize,
151    ) -> Self {
152        Self {
153            run_ends,
154            logical_offset,
155            logical_length,
156        }
157    }
158
159    /// Returns the logical offset into the run-ends stored by this buffer.
160    #[inline]
161    pub fn offset(&self) -> usize {
162        self.logical_offset
163    }
164
165    /// Returns the logical length of the run-ends stored by this buffer.
166    #[inline]
167    pub fn len(&self) -> usize {
168        self.logical_length
169    }
170
171    /// Returns true if this buffer is logically empty.
172    #[inline]
173    pub fn is_empty(&self) -> bool {
174        self.logical_length == 0
175    }
176
177    /// Free up unused memory.
178    pub fn shrink_to_fit(&mut self) {
179        // TODO(emilk): we could shrink even more in the case where we are a small sub-slice of the full buffer
180        self.run_ends.shrink_to_fit();
181    }
182
183    /// Returns the physical (**unsliced**) run ends of this buffer.
184    ///
185    /// Take care when operating on these values as it doesn't take into account
186    /// any logical slicing that may have occurred.
187    #[inline]
188    pub fn values(&self) -> &[E] {
189        &self.run_ends
190    }
191
192    /// Returns the maximum run-end encoded in the underlying buffer; that is, the
193    /// last physical run of the buffer. This does not take into account any logical
194    /// slicing that may have occurred.
195    #[inline]
196    pub fn max_value(&self) -> usize {
197        self.values().last().copied().unwrap_or_default().as_usize()
198    }
199
200    /// Performs a binary search to find the physical index for the given logical
201    /// index.
202    ///
203    /// Useful for extracting the corresponding physical `run_ends` when this buffer
204    /// is logically sliced.
205    ///
206    /// The result is arbitrary if `logical_index >= self.len()`.
207    pub fn get_physical_index(&self, logical_index: usize) -> usize {
208        let logical_index = E::usize_as(self.logical_offset + logical_index);
209        let cmp = |p: &E| p.partial_cmp(&logical_index).unwrap();
210
211        match self.run_ends.binary_search_by(cmp) {
212            Ok(idx) => idx + 1,
213            Err(idx) => idx,
214        }
215    }
216
217    /// Returns the physical index at which the logical array starts.
218    ///
219    /// The same as calling `get_physical_index(0)` but with a fast path if the
220    /// buffer is not logically sliced, in which case it always returns `0`.
221    pub fn get_start_physical_index(&self) -> usize {
222        if self.logical_offset == 0 || self.logical_length == 0 {
223            return 0;
224        }
225        // Fallback to binary search
226        self.get_physical_index(0)
227    }
228
229    /// Returns the physical index at which the logical array ends.
230    ///
231    /// The same as calling `get_physical_index(length - 1)` but with a fast path
232    /// if the buffer is not logically sliced, in which case it returns `length - 1`.
233    pub fn get_end_physical_index(&self) -> usize {
234        if self.logical_length == 0 {
235            return 0;
236        }
237        if self.max_value() == self.logical_offset + self.logical_length {
238            return self.values().len() - 1;
239        }
240        // Fallback to binary search
241        self.get_physical_index(self.logical_length - 1)
242    }
243
244    /// Slices this [`RunEndBuffer`] by the provided `logical_offset` and `logical_length`.
245    ///
246    /// # Panics
247    ///
248    /// - Specified slice (`logical_offset` + `logical_length`) exceeds existing
249    ///   logical length
250    pub fn slice(&self, logical_offset: usize, logical_length: usize) -> Self {
251        assert!(
252            logical_offset.saturating_add(logical_length) <= self.logical_length,
253            "the length + offset of the sliced RunEndBuffer cannot exceed the existing length"
254        );
255        Self {
256            run_ends: self.run_ends.clone(),
257            logical_offset: self.logical_offset + logical_offset,
258            logical_length,
259        }
260    }
261
262    /// Returns the inner [`ScalarBuffer`].
263    pub fn inner(&self) -> &ScalarBuffer<E> {
264        &self.run_ends
265    }
266
267    /// Returns the inner [`ScalarBuffer`], consuming self.
268    pub fn into_inner(self) -> ScalarBuffer<E> {
269        self.run_ends
270    }
271
272    /// Returns the physical indices corresponding to the provided logical indices.
273    ///
274    /// Given a slice of logical indices, this method returns a `Vec` containing the
275    /// corresponding physical indices into the run-ends buffer.
276    ///
277    /// This method operates by iterating the logical indices in sorted order, instead of
278    /// finding the physical index for each logical index using binary search via
279    /// the function [`RunEndBuffer::get_physical_index`].
280    ///
281    /// Running benchmarks on both approaches showed that the approach used here
282    /// scaled well for larger inputs.
283    ///
284    /// See <https://github.com/apache/arrow-rs/pull/3622#issuecomment-1407753727> for more details.
285    ///
286    /// # Errors
287    ///
288    /// If any logical index is out of bounds (>= self.len()), returns an error containing the invalid index.
289    #[inline]
290    pub fn get_physical_indices<I>(&self, logical_indices: &[I]) -> Result<Vec<usize>, I>
291    where
292        I: ArrowNativeType,
293    {
294        let len = self.len();
295        let offset = self.offset();
296
297        let indices_len = logical_indices.len();
298
299        if indices_len == 0 {
300            return Ok(vec![]);
301        }
302
303        // `ordered_indices` store index into `logical_indices` and can be used
304        // to iterate `logical_indices` in sorted order.
305        let mut ordered_indices: Vec<usize> = (0..indices_len).collect();
306
307        // Instead of sorting `logical_indices` directly, sort the `ordered_indices`
308        // whose values are index of `logical_indices`
309        ordered_indices.sort_unstable_by(|lhs, rhs| {
310            logical_indices[*lhs]
311                .partial_cmp(&logical_indices[*rhs])
312                .unwrap()
313        });
314
315        // Return early if all the logical indices cannot be converted to physical indices.
316        let largest_logical_index = logical_indices[*ordered_indices.last().unwrap()].as_usize();
317        if largest_logical_index >= len {
318            return Err(logical_indices[*ordered_indices.last().unwrap()]);
319        }
320
321        // Skip some physical indices based on offset.
322        let skip_value = self.get_start_physical_index();
323
324        let mut physical_indices = vec![0; indices_len];
325
326        let mut ordered_index = 0_usize;
327        for (physical_index, run_end) in self.values().iter().enumerate().skip(skip_value) {
328            // Get the run end index (relative to offset) of current physical index
329            let run_end_value = run_end.as_usize() - offset;
330
331            // All the `logical_indices` that are less than current run end index
332            // belongs to current physical index.
333            while ordered_index < indices_len
334                && logical_indices[ordered_indices[ordered_index]].as_usize() < run_end_value
335            {
336                physical_indices[ordered_indices[ordered_index]] = physical_index;
337                ordered_index += 1;
338            }
339        }
340
341        // If there are input values >= run_ends.last_value then we'll not be able to convert
342        // all logical indices to physical indices.
343        if ordered_index < logical_indices.len() {
344            return Err(logical_indices[ordered_indices[ordered_index]]);
345        }
346        Ok(physical_indices)
347    }
348}
349
350#[cfg(test)]
351mod tests {
352    use crate::buffer::RunEndBuffer;
353
354    #[test]
355    fn test_zero_length_slice() {
356        let buffer = RunEndBuffer::new(vec![1_i32, 4_i32].into(), 0, 4);
357        assert_eq!(buffer.get_start_physical_index(), 0);
358        assert_eq!(buffer.get_end_physical_index(), 1);
359        assert_eq!(buffer.get_physical_index(3), 1);
360
361        for offset in 0..4 {
362            let sliced = buffer.slice(offset, 0);
363            assert_eq!(sliced.get_start_physical_index(), 0);
364            assert_eq!(sliced.get_end_physical_index(), 0);
365        }
366
367        let buffer = RunEndBuffer::new(Vec::<i32>::new().into(), 0, 0);
368        assert_eq!(buffer.get_start_physical_index(), 0);
369        assert_eq!(buffer.get_end_physical_index(), 0);
370    }
371}