arrow_buffer/buffer/run.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::ArrowNativeType;
19use crate::buffer::ScalarBuffer;
20
21/// A buffer of monotonically increasing, positive integers used to store run-ends.
22///
23/// Used to compactly represent runs of the same value. Values being represented
24/// are stored in a separate buffer from this struct. See [`RunArray`] for an example
25/// of how this is used with a companion array to represent the values.
26///
27/// # Logical vs Physical
28///
29/// Physically, each value in the `run_ends` buffer is the cumulative length of
30/// all runs in the logical representation, up to that physical index. Consider
31/// the following example:
32///
33/// ```text
34/// physical logical
35/// ┌─────────┬─────────┐ ┌─────────┬─────────┐
36/// │ 3 │ 0 │ ◄──────┬─ │ A │ 0 │
37/// ├─────────┼─────────┤ │ ├─────────┼─────────┤
38/// │ 4 │ 1 │ ◄────┐ ├─ │ A │ 1 │
39/// ├─────────┼─────────┤ │ │ ├─────────┼─────────┤
40/// │ 6 │ 2 │ ◄──┐ │ └─ │ A │ 2 │
41/// └─────────┴─────────┘ │ │ ├─────────┼─────────┤
42/// run-ends index │ └─── │ B │ 3 │
43/// │ ├─────────┼─────────┤
44/// logical_offset = 0 ├───── │ C │ 4 │
45/// logical_length = 6 │ ├─────────┼─────────┤
46/// └───── │ C │ 5 │
47/// └─────────┴─────────┘
48/// values index
49/// ```
50///
51/// A [`RunEndBuffer`] is physically the buffer and offset with length on the left.
52/// In this case, the offset and length represent the whole buffer, so it is essentially
53/// unsliced. See the section below on slicing for more details on how this buffer
54/// handles slicing.
55///
56/// This means that multiple logical values are represented in the same physical index,
57/// and multiple logical indices map to the same physical index. The [`RunEndBuffer`]
58/// containing `[3, 4, 6]` is essentially the physical indices `[0, 0, 0, 1, 2, 2]`,
59/// and having a separately stored buffer of values such as `[A, B, C]` can turn
60/// this into a representation of `[A, A, A, B, C, C]`.
61///
62/// # Slicing
63///
64/// In order to provide zero-copy slicing, this struct stores a separate **logical**
65/// offset and length. Consider the following example:
66///
67/// ```text
68/// physical logical
69/// ┌─────────┬─────────┐ ┌ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ┐
70/// │ 3 │ 0 │ ◄──────┐ A 0
71/// ├─────────┼─────────┤ │ ├── ─ ─ ─ ┼ ─ ─ ─ ─ ┤
72/// │ 4 │ 1 │ ◄────┐ │ A 1
73/// ├─────────┼─────────┤ │ │ ├─────────┼─────────┤
74/// │ 6 │ 2 │ ◄──┐ │ └─ │ A │ 2 │◄─── logical_offset
75/// └─────────┴─────────┘ │ │ ├─────────┼─────────┤
76/// run-ends index │ └─── │ B │ 3 │
77/// │ ├─────────┼─────────┤
78/// logical_offset = 2 └───── │ C │ 4 │
79/// logical_length = 3 ├─────────┼─────────┤
80/// C 5 ◄─── logical_offset + logical_length
81/// └ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ┘
82/// values index
83/// ```
84///
85/// The physical `run_ends` [`ScalarBuffer`] remains unchanged, in order to facilitate
86/// zero-copy. However, we now offset into the **logical** representation with an
87/// accompanying length. This allows us to represent values `[A, B, C]` using physical
88/// indices `0, 1, 2` with the same underlying physical buffer, at the cost of two
89/// extra `usize`s to represent the logical slice that was taken.
90///
91/// (A [`RunEndBuffer`] is considered unsliced when `logical_offset` is `0` and
92/// `logical_length` is equal to the last value in `run_ends`)
93///
94/// [`RunArray`]: https://docs.rs/arrow/latest/arrow/array/struct.RunArray.html
95/// [Run-End encoded layout]: https://arrow.apache.org/docs/format/Columnar.html#run-end-encoded-layout
96#[derive(Debug, Clone)]
97pub struct RunEndBuffer<E: ArrowNativeType> {
98 run_ends: ScalarBuffer<E>,
99 logical_length: usize,
100 logical_offset: usize,
101}
102
103impl<E> RunEndBuffer<E>
104where
105 E: ArrowNativeType,
106{
107 /// Create a new [`RunEndBuffer`] from a [`ScalarBuffer`], `logical_offset`
108 /// and `logical_length`.
109 ///
110 /// # Panics
111 ///
112 /// - `run_ends` does not contain strictly increasing values greater than zero
113 /// - The last value of `run_ends` is less than `logical_offset + logical_length`
114 pub fn new(run_ends: ScalarBuffer<E>, logical_offset: usize, logical_length: usize) -> Self {
115 assert!(
116 run_ends.windows(2).all(|w| w[0] < w[1]),
117 "run-ends not strictly increasing"
118 );
119
120 if logical_length != 0 {
121 assert!(!run_ends.is_empty(), "non-empty slice but empty run-ends");
122 let end = E::from_usize(logical_offset.saturating_add(logical_length)).unwrap();
123 assert!(
124 *run_ends.first().unwrap() > E::usize_as(0),
125 "run-ends not greater than 0"
126 );
127 assert!(
128 *run_ends.last().unwrap() >= end,
129 "slice beyond bounds of run-ends"
130 );
131 }
132
133 Self {
134 run_ends,
135 logical_offset,
136 logical_length,
137 }
138 }
139
140 /// Create a new [`RunEndBuffer`] from a [`ScalarBuffer`], `logical_offset`
141 /// and `logical_length`.
142 ///
143 /// # Safety
144 ///
145 /// - `run_ends` must contain strictly increasing values greater than zero
146 /// - The last value of `run_ends` must be greater than or equal to `logical_offset + logical_len`
147 pub unsafe fn new_unchecked(
148 run_ends: ScalarBuffer<E>,
149 logical_offset: usize,
150 logical_length: usize,
151 ) -> Self {
152 Self {
153 run_ends,
154 logical_offset,
155 logical_length,
156 }
157 }
158
159 /// Returns the logical offset into the run-ends stored by this buffer.
160 #[inline]
161 pub fn offset(&self) -> usize {
162 self.logical_offset
163 }
164
165 /// Returns the logical length of the run-ends stored by this buffer.
166 #[inline]
167 pub fn len(&self) -> usize {
168 self.logical_length
169 }
170
171 /// Returns true if this buffer is logically empty.
172 #[inline]
173 pub fn is_empty(&self) -> bool {
174 self.logical_length == 0
175 }
176
177 /// Free up unused memory.
178 pub fn shrink_to_fit(&mut self) {
179 // TODO(emilk): we could shrink even more in the case where we are a small sub-slice of the full buffer
180 self.run_ends.shrink_to_fit();
181 }
182
183 /// Returns the physical (**unsliced**) run ends of this buffer.
184 ///
185 /// Take care when operating on these values as it doesn't take into account
186 /// any logical slicing that may have occurred.
187 #[inline]
188 pub fn values(&self) -> &[E] {
189 &self.run_ends
190 }
191
192 /// Returns an iterator yielding run ends adjusted for the logical slice.
193 ///
194 /// Each yielded value is subtracted by the [`logical_offset`] and capped
195 /// at the [`logical_length`].
196 ///
197 /// [`logical_offset`]: Self::offset
198 /// [`logical_length`]: Self::len
199 pub fn sliced_values(&self) -> impl Iterator<Item = E> + '_ {
200 let offset = self.logical_offset;
201 let len = self.logical_length;
202 // Doing this roundabout way since the iterator type we return must be
203 // the same (i.e. cannot use std::iter::empty())
204 let physical_slice = if self.is_empty() {
205 &self.run_ends[0..0]
206 } else {
207 let start = self.get_start_physical_index();
208 let end = self.get_end_physical_index();
209 &self.run_ends[start..=end]
210 };
211 physical_slice.iter().map(move |&val| {
212 let val = val.as_usize().saturating_sub(offset).min(len);
213 E::from_usize(val).unwrap()
214 })
215 }
216
217 /// Returns the maximum run-end encoded in the underlying buffer; that is, the
218 /// last physical run of the buffer. This does not take into account any logical
219 /// slicing that may have occurred.
220 #[inline]
221 pub fn max_value(&self) -> usize {
222 self.values().last().copied().unwrap_or_default().as_usize()
223 }
224
225 /// Performs a binary search to find the physical index for the given logical
226 /// index.
227 ///
228 /// Useful for extracting the corresponding physical `run_ends` when this buffer
229 /// is logically sliced.
230 ///
231 /// The result is arbitrary if `logical_index >= self.len()`.
232 pub fn get_physical_index(&self, logical_index: usize) -> usize {
233 let logical_index = E::usize_as(self.logical_offset + logical_index);
234 let cmp = |p: &E| p.partial_cmp(&logical_index).unwrap();
235
236 match self.run_ends.binary_search_by(cmp) {
237 Ok(idx) => idx + 1,
238 Err(idx) => idx,
239 }
240 }
241
242 /// Returns the physical index at which the logical array starts.
243 ///
244 /// The same as calling `get_physical_index(0)` but with a fast path if the
245 /// buffer is not logically sliced, in which case it always returns `0`.
246 pub fn get_start_physical_index(&self) -> usize {
247 if self.logical_offset == 0 || self.logical_length == 0 {
248 return 0;
249 }
250 // Fallback to binary search
251 self.get_physical_index(0)
252 }
253
254 /// Returns the physical index at which the logical array ends.
255 ///
256 /// The same as calling `get_physical_index(length - 1)` but with a fast path
257 /// if the buffer is not logically sliced, in which case it returns `length - 1`.
258 pub fn get_end_physical_index(&self) -> usize {
259 if self.logical_length == 0 {
260 return 0;
261 }
262 if self.max_value() == self.logical_offset + self.logical_length {
263 return self.values().len() - 1;
264 }
265 // Fallback to binary search
266 self.get_physical_index(self.logical_length - 1)
267 }
268
269 /// Slices this [`RunEndBuffer`] by the provided `logical_offset` and `logical_length`.
270 ///
271 /// # Panics
272 ///
273 /// - Specified slice (`logical_offset` + `logical_length`) exceeds existing
274 /// logical length
275 pub fn slice(&self, logical_offset: usize, logical_length: usize) -> Self {
276 assert!(
277 logical_offset.saturating_add(logical_length) <= self.logical_length,
278 "the length + offset of the sliced RunEndBuffer cannot exceed the existing length"
279 );
280 Self {
281 run_ends: self.run_ends.clone(),
282 logical_offset: self.logical_offset + logical_offset,
283 logical_length,
284 }
285 }
286
287 /// Returns the inner [`ScalarBuffer`].
288 pub fn inner(&self) -> &ScalarBuffer<E> {
289 &self.run_ends
290 }
291
292 /// Returns the inner [`ScalarBuffer`], consuming self.
293 pub fn into_inner(self) -> ScalarBuffer<E> {
294 self.run_ends
295 }
296
297 /// Returns the physical indices corresponding to the provided logical indices.
298 ///
299 /// Given a slice of logical indices, this method returns a `Vec` containing the
300 /// corresponding physical indices into the run-ends buffer.
301 ///
302 /// This method operates by iterating the logical indices in sorted order, instead of
303 /// finding the physical index for each logical index using binary search via
304 /// the function [`RunEndBuffer::get_physical_index`].
305 ///
306 /// Running benchmarks on both approaches showed that the approach used here
307 /// scaled well for larger inputs.
308 ///
309 /// See <https://github.com/apache/arrow-rs/pull/3622#issuecomment-1407753727> for more details.
310 ///
311 /// # Errors
312 ///
313 /// If any logical index is out of bounds (>= self.len()), returns an error containing the invalid index.
314 #[inline]
315 pub fn get_physical_indices<I>(&self, logical_indices: &[I]) -> Result<Vec<usize>, I>
316 where
317 I: ArrowNativeType,
318 {
319 let len = self.len();
320 let offset = self.offset();
321
322 let indices_len = logical_indices.len();
323
324 if indices_len == 0 {
325 return Ok(vec![]);
326 }
327
328 // `ordered_indices` store index into `logical_indices` and can be used
329 // to iterate `logical_indices` in sorted order.
330 let mut ordered_indices: Vec<usize> = (0..indices_len).collect();
331
332 // Instead of sorting `logical_indices` directly, sort the `ordered_indices`
333 // whose values are index of `logical_indices`
334 ordered_indices.sort_unstable_by(|lhs, rhs| {
335 logical_indices[*lhs]
336 .partial_cmp(&logical_indices[*rhs])
337 .unwrap()
338 });
339
340 // Return early if all the logical indices cannot be converted to physical indices.
341 let largest_logical_index = logical_indices[*ordered_indices.last().unwrap()].as_usize();
342 if largest_logical_index >= len {
343 return Err(logical_indices[*ordered_indices.last().unwrap()]);
344 }
345
346 // Skip some physical indices based on offset.
347 let skip_value = self.get_start_physical_index();
348
349 let mut physical_indices = vec![0; indices_len];
350
351 let mut ordered_index = 0_usize;
352 for (physical_index, run_end) in self.values().iter().enumerate().skip(skip_value) {
353 // Get the run end index (relative to offset) of current physical index
354 let run_end_value = run_end.as_usize() - offset;
355
356 // All the `logical_indices` that are less than current run end index
357 // belongs to current physical index.
358 while ordered_index < indices_len
359 && logical_indices[ordered_indices[ordered_index]].as_usize() < run_end_value
360 {
361 physical_indices[ordered_indices[ordered_index]] = physical_index;
362 ordered_index += 1;
363 }
364 }
365
366 // If there are input values >= run_ends.last_value then we'll not be able to convert
367 // all logical indices to physical indices.
368 if ordered_index < logical_indices.len() {
369 return Err(logical_indices[ordered_indices[ordered_index]]);
370 }
371 Ok(physical_indices)
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use crate::buffer::RunEndBuffer;
378
379 #[test]
380 fn test_zero_length_slice() {
381 let buffer = RunEndBuffer::new(vec![1_i32, 4_i32].into(), 0, 4);
382 assert_eq!(buffer.get_start_physical_index(), 0);
383 assert_eq!(buffer.get_end_physical_index(), 1);
384 assert_eq!(buffer.get_physical_index(3), 1);
385
386 for offset in 0..4 {
387 let sliced = buffer.slice(offset, 0);
388 assert_eq!(sliced.get_start_physical_index(), 0);
389 assert_eq!(sliced.get_end_physical_index(), 0);
390 }
391
392 let buffer = RunEndBuffer::new(Vec::<i32>::new().into(), 0, 0);
393 assert_eq!(buffer.get_start_physical_index(), 0);
394 assert_eq!(buffer.get_end_physical_index(), 0);
395 }
396
397 #[test]
398 fn test_sliced_values() {
399 // [0, 0, 1, 2, 2, 2]
400 let buffer = RunEndBuffer::new(vec![2i32, 3, 6].into(), 0, 6);
401
402 // Slice: [0, 1, 2, 2] start: 1, len: 4
403 // Logical indices: 1, 2, 3, 4
404 // Original run ends: [2, 3, 6]
405 // Adjusted: [2-1, 3-1, 6-1] capped at 4 -> [1, 2, 4]
406 let sliced = buffer.slice(1, 4);
407 let sliced_values: Vec<i32> = sliced.sliced_values().collect();
408 assert_eq!(sliced_values, &[1, 2, 4]);
409
410 // Slice: [2, 2] start: 4, len: 2
411 // Original run ends: [2, 3, 6]
412 // Slicing at 4 means we only have the last run (physical index 2, which ends at 6)
413 // Adjusted: [6-4] capped at 2 -> [2]
414 let sliced = buffer.slice(4, 2);
415 let sliced_values: Vec<i32> = sliced.sliced_values().collect();
416 assert_eq!(sliced_values, &[2]);
417 }
418}