Skip to main content

arrow_buffer/buffer/
run.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::ArrowNativeType;
19use crate::buffer::ScalarBuffer;
20
21/// A buffer of monotonically increasing, positive integers used to store run-ends.
22///
23/// Used to compactly represent runs of the same value. Values being represented
24/// are stored in a separate buffer from this struct. See [`RunArray`] for an example
25/// of how this is used with a companion array to represent the values.
26///
27/// # Logical vs Physical
28///
29/// Physically, each value in the `run_ends` buffer is the cumulative length of
30/// all runs in the logical representation, up to that physical index. Consider
31/// the following example:
32///
33/// ```text
34///           physical                        logical
35///     ┌─────────┬─────────┐           ┌─────────┬─────────┐
36///     │    3    │    0    │ ◄──────┬─ │    A    │    0    │
37///     ├─────────┼─────────┤        │  ├─────────┼─────────┤
38///     │    4    │    1    │ ◄────┐ ├─ │    A    │    1    │
39///     ├─────────┼─────────┤      │ │  ├─────────┼─────────┤
40///     │    6    │    2    │ ◄──┐ │ └─ │    A    │    2    │
41///     └─────────┴─────────┘    │ │    ├─────────┼─────────┤
42///      run-ends    index       │ └─── │    B    │    3    │
43///                              │      ├─────────┼─────────┤
44///      logical_offset = 0      ├───── │    C    │    4    │
45///      logical_length = 6      │      ├─────────┼─────────┤
46///                              └───── │    C    │    5    │
47///                                     └─────────┴─────────┘
48///                                       values     index
49/// ```
50///
51/// A [`RunEndBuffer`] is physically the buffer and offset with length on the left.
52/// In this case, the offset and length represent the whole buffer, so it is essentially
53/// unsliced. See the section below on slicing for more details on how this buffer
54/// handles slicing.
55///
56/// This means that multiple logical values are represented in the same physical index,
57/// and multiple logical indices map to the same physical index. The [`RunEndBuffer`]
58/// containing `[3, 4, 6]` is essentially the physical indices `[0, 0, 0, 1, 2, 2]`,
59/// and having a separately stored buffer of values such as `[A, B, C]` can turn
60/// this into a representation of `[A, A, A, B, C, C]`.
61///
62/// # Slicing
63///
64/// In order to provide zero-copy slicing, this struct stores a separate **logical**
65/// offset and length. Consider the following example:
66///
67/// ```text
68///           physical                        logical
69///     ┌─────────┬─────────┐           ┌ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ┐
70///     │    3    │    0    │ ◄──────┐       A         0
71///     ├─────────┼─────────┤        │  ├── ─ ─ ─ ┼ ─ ─ ─ ─ ┤
72///     │    4    │    1    │ ◄────┐ │       A         1
73///     ├─────────┼─────────┤      │ │  ├─────────┼─────────┤
74///     │    6    │    2    │ ◄──┐ │ └─ │    A    │    2    │◄─── logical_offset
75///     └─────────┴─────────┘    │ │    ├─────────┼─────────┤
76///      run-ends    index       │ └─── │    B    │    3    │
77///                              │      ├─────────┼─────────┤
78///      logical_offset = 2      └───── │    C    │    4    │
79///      logical_length = 3             ├─────────┼─────────┤
80///                                          C         5     ◄─── logical_offset + logical_length
81///                                     └ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ┘
82///                                       values     index
83/// ```
84///
85/// The physical `run_ends` [`ScalarBuffer`] remains unchanged, in order to facilitate
86/// zero-copy. However, we now offset into the **logical** representation with an
87/// accompanying length. This allows us to represent values `[A, B, C]` using physical
88/// indices `0, 1, 2` with the same underlying physical buffer, at the cost of two
89/// extra `usize`s to represent the logical slice that was taken.
90///
91/// (A [`RunEndBuffer`] is considered unsliced when `logical_offset` is `0` and
92/// `logical_length` is equal to the last value in `run_ends`)
93///
94/// [`RunArray`]: https://docs.rs/arrow/latest/arrow/array/struct.RunArray.html
95/// [Run-End encoded layout]: https://arrow.apache.org/docs/format/Columnar.html#run-end-encoded-layout
96#[derive(Debug, Clone)]
97pub struct RunEndBuffer<E: ArrowNativeType> {
98    run_ends: ScalarBuffer<E>,
99    logical_length: usize,
100    logical_offset: usize,
101}
102
103impl<E> RunEndBuffer<E>
104where
105    E: ArrowNativeType,
106{
107    /// Create a new [`RunEndBuffer`] from a [`ScalarBuffer`], `logical_offset`
108    /// and `logical_length`.
109    ///
110    /// # Panics
111    ///
112    /// - `run_ends` does not contain strictly increasing values greater than zero
113    /// - The last value of `run_ends` is less than `logical_offset + logical_length`
114    pub fn new(run_ends: ScalarBuffer<E>, logical_offset: usize, logical_length: usize) -> Self {
115        assert!(
116            run_ends.windows(2).all(|w| w[0] < w[1]),
117            "run-ends not strictly increasing"
118        );
119
120        if logical_length != 0 {
121            assert!(!run_ends.is_empty(), "non-empty slice but empty run-ends");
122            let end = E::from_usize(logical_offset.saturating_add(logical_length)).unwrap();
123            assert!(
124                *run_ends.first().unwrap() > E::usize_as(0),
125                "run-ends not greater than 0"
126            );
127            assert!(
128                *run_ends.last().unwrap() >= end,
129                "slice beyond bounds of run-ends"
130            );
131        }
132
133        Self {
134            run_ends,
135            logical_offset,
136            logical_length,
137        }
138    }
139
140    /// Create a new [`RunEndBuffer`] from a [`ScalarBuffer`], `logical_offset`
141    /// and `logical_length`.
142    ///
143    /// # Safety
144    ///
145    /// - `run_ends` must contain strictly increasing values greater than zero
146    /// - The last value of `run_ends` must be greater than or equal to `logical_offset + logical_len`
147    pub unsafe fn new_unchecked(
148        run_ends: ScalarBuffer<E>,
149        logical_offset: usize,
150        logical_length: usize,
151    ) -> Self {
152        Self {
153            run_ends,
154            logical_offset,
155            logical_length,
156        }
157    }
158
159    /// Returns the logical offset into the run-ends stored by this buffer.
160    #[inline]
161    pub fn offset(&self) -> usize {
162        self.logical_offset
163    }
164
165    /// Returns the logical length of the run-ends stored by this buffer.
166    #[inline]
167    pub fn len(&self) -> usize {
168        self.logical_length
169    }
170
171    /// Returns true if this buffer is logically empty.
172    #[inline]
173    pub fn is_empty(&self) -> bool {
174        self.logical_length == 0
175    }
176
177    /// Free up unused memory.
178    pub fn shrink_to_fit(&mut self) {
179        // TODO(emilk): we could shrink even more in the case where we are a small sub-slice of the full buffer
180        self.run_ends.shrink_to_fit();
181    }
182
183    /// Returns the physical (**unsliced**) run ends of this buffer.
184    ///
185    /// Take care when operating on these values as it doesn't take into account
186    /// any logical slicing that may have occurred.
187    #[inline]
188    pub fn values(&self) -> &[E] {
189        &self.run_ends
190    }
191
192    /// Returns an iterator yielding run ends adjusted for the logical slice.
193    ///
194    /// Each yielded value is subtracted by the [`logical_offset`] and capped
195    /// at the [`logical_length`].
196    ///
197    /// [`logical_offset`]: Self::offset
198    /// [`logical_length`]: Self::len
199    pub fn sliced_values(&self) -> impl Iterator<Item = E> + '_ {
200        let offset = self.logical_offset;
201        let len = self.logical_length;
202        // Doing this roundabout way since the iterator type we return must be
203        // the same (i.e. cannot use std::iter::empty())
204        let physical_slice = if self.is_empty() {
205            &self.run_ends[0..0]
206        } else {
207            let start = self.get_start_physical_index();
208            let end = self.get_end_physical_index();
209            &self.run_ends[start..=end]
210        };
211        physical_slice.iter().map(move |&val| {
212            let val = val.as_usize().saturating_sub(offset).min(len);
213            E::from_usize(val).unwrap()
214        })
215    }
216
217    /// Returns the maximum run-end encoded in the underlying buffer; that is, the
218    /// last physical run of the buffer. This does not take into account any logical
219    /// slicing that may have occurred.
220    #[inline]
221    pub fn max_value(&self) -> usize {
222        self.values().last().copied().unwrap_or_default().as_usize()
223    }
224
225    /// Performs a binary search to find the physical index for the given logical
226    /// index.
227    ///
228    /// Useful for extracting the corresponding physical `run_ends` when this buffer
229    /// is logically sliced.
230    ///
231    /// The result is arbitrary if `logical_index >= self.len()`.
232    pub fn get_physical_index(&self, logical_index: usize) -> usize {
233        let logical_index = E::usize_as(self.logical_offset + logical_index);
234        let cmp = |p: &E| p.partial_cmp(&logical_index).unwrap();
235
236        match self.run_ends.binary_search_by(cmp) {
237            Ok(idx) => idx + 1,
238            Err(idx) => idx,
239        }
240    }
241
242    /// Returns the physical index at which the logical array starts.
243    ///
244    /// The same as calling `get_physical_index(0)` but with a fast path if the
245    /// buffer is not logically sliced, in which case it always returns `0`.
246    pub fn get_start_physical_index(&self) -> usize {
247        if self.logical_offset == 0 || self.logical_length == 0 {
248            return 0;
249        }
250        // Fallback to binary search
251        self.get_physical_index(0)
252    }
253
254    /// Returns the physical index at which the logical array ends.
255    ///
256    /// The same as calling `get_physical_index(length - 1)` but with a fast path
257    /// if the buffer is not logically sliced, in which case it returns `length - 1`.
258    pub fn get_end_physical_index(&self) -> usize {
259        if self.logical_length == 0 {
260            return 0;
261        }
262        if self.max_value() == self.logical_offset + self.logical_length {
263            return self.values().len() - 1;
264        }
265        // Fallback to binary search
266        self.get_physical_index(self.logical_length - 1)
267    }
268
269    /// Slices this [`RunEndBuffer`] by the provided `logical_offset` and `logical_length`.
270    ///
271    /// # Panics
272    ///
273    /// - Specified slice (`logical_offset` + `logical_length`) exceeds existing
274    ///   logical length
275    pub fn slice(&self, logical_offset: usize, logical_length: usize) -> Self {
276        assert!(
277            logical_offset.saturating_add(logical_length) <= self.logical_length,
278            "the length + offset of the sliced RunEndBuffer cannot exceed the existing length"
279        );
280        Self {
281            run_ends: self.run_ends.clone(),
282            logical_offset: self.logical_offset + logical_offset,
283            logical_length,
284        }
285    }
286
287    /// Returns the inner [`ScalarBuffer`].
288    pub fn inner(&self) -> &ScalarBuffer<E> {
289        &self.run_ends
290    }
291
292    /// Returns the inner [`ScalarBuffer`], consuming self.
293    pub fn into_inner(self) -> ScalarBuffer<E> {
294        self.run_ends
295    }
296
297    /// Returns the physical indices corresponding to the provided logical indices.
298    ///
299    /// Given a slice of logical indices, this method returns a `Vec` containing the
300    /// corresponding physical indices into the run-ends buffer.
301    ///
302    /// This method operates by iterating the logical indices in sorted order, instead of
303    /// finding the physical index for each logical index using binary search via
304    /// the function [`RunEndBuffer::get_physical_index`].
305    ///
306    /// Running benchmarks on both approaches showed that the approach used here
307    /// scaled well for larger inputs.
308    ///
309    /// See <https://github.com/apache/arrow-rs/pull/3622#issuecomment-1407753727> for more details.
310    ///
311    /// # Errors
312    ///
313    /// If any logical index is out of bounds (>= self.len()), returns an error containing the invalid index.
314    #[inline]
315    pub fn get_physical_indices<I>(&self, logical_indices: &[I]) -> Result<Vec<usize>, I>
316    where
317        I: ArrowNativeType,
318    {
319        let len = self.len();
320        let offset = self.offset();
321
322        let indices_len = logical_indices.len();
323
324        if indices_len == 0 {
325            return Ok(vec![]);
326        }
327
328        // `ordered_indices` store index into `logical_indices` and can be used
329        // to iterate `logical_indices` in sorted order.
330        let mut ordered_indices: Vec<usize> = (0..indices_len).collect();
331
332        // Instead of sorting `logical_indices` directly, sort the `ordered_indices`
333        // whose values are index of `logical_indices`
334        ordered_indices.sort_unstable_by(|lhs, rhs| {
335            logical_indices[*lhs]
336                .partial_cmp(&logical_indices[*rhs])
337                .unwrap()
338        });
339
340        // Return early if all the logical indices cannot be converted to physical indices.
341        let largest_logical_index = logical_indices[*ordered_indices.last().unwrap()].as_usize();
342        if largest_logical_index >= len {
343            return Err(logical_indices[*ordered_indices.last().unwrap()]);
344        }
345
346        // Skip some physical indices based on offset.
347        let skip_value = self.get_start_physical_index();
348
349        let mut physical_indices = vec![0; indices_len];
350
351        let mut ordered_index = 0_usize;
352        for (physical_index, run_end) in self.values().iter().enumerate().skip(skip_value) {
353            // Get the run end index (relative to offset) of current physical index
354            let run_end_value = run_end.as_usize() - offset;
355
356            // All the `logical_indices` that are less than current run end index
357            // belongs to current physical index.
358            while ordered_index < indices_len
359                && logical_indices[ordered_indices[ordered_index]].as_usize() < run_end_value
360            {
361                physical_indices[ordered_indices[ordered_index]] = physical_index;
362                ordered_index += 1;
363            }
364        }
365
366        // If there are input values >= run_ends.last_value then we'll not be able to convert
367        // all logical indices to physical indices.
368        if ordered_index < logical_indices.len() {
369            return Err(logical_indices[ordered_indices[ordered_index]]);
370        }
371        Ok(physical_indices)
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use crate::buffer::RunEndBuffer;
378
379    #[test]
380    fn test_zero_length_slice() {
381        let buffer = RunEndBuffer::new(vec![1_i32, 4_i32].into(), 0, 4);
382        assert_eq!(buffer.get_start_physical_index(), 0);
383        assert_eq!(buffer.get_end_physical_index(), 1);
384        assert_eq!(buffer.get_physical_index(3), 1);
385
386        for offset in 0..4 {
387            let sliced = buffer.slice(offset, 0);
388            assert_eq!(sliced.get_start_physical_index(), 0);
389            assert_eq!(sliced.get_end_physical_index(), 0);
390        }
391
392        let buffer = RunEndBuffer::new(Vec::<i32>::new().into(), 0, 0);
393        assert_eq!(buffer.get_start_physical_index(), 0);
394        assert_eq!(buffer.get_end_physical_index(), 0);
395    }
396
397    #[test]
398    fn test_sliced_values() {
399        // [0, 0, 1, 2, 2, 2]
400        let buffer = RunEndBuffer::new(vec![2i32, 3, 6].into(), 0, 6);
401
402        // Slice: [0, 1, 2, 2] start: 1, len: 4
403        // Logical indices: 1, 2, 3, 4
404        // Original run ends: [2, 3, 6]
405        // Adjusted: [2-1, 3-1, 6-1] capped at 4 -> [1, 2, 4]
406        let sliced = buffer.slice(1, 4);
407        let sliced_values: Vec<i32> = sliced.sliced_values().collect();
408        assert_eq!(sliced_values, &[1, 2, 4]);
409
410        // Slice: [2, 2] start: 4, len: 2
411        // Original run ends: [2, 3, 6]
412        // Slicing at 4 means we only have the last run (physical index 2, which ends at 6)
413        // Adjusted: [6-4] capped at 2 -> [2]
414        let sliced = buffer.slice(4, 2);
415        let sliced_values: Vec<i32> = sliced.sliced_values().collect();
416        assert_eq!(sliced_values, &[2]);
417    }
418}