arrow_buffer/buffer/run.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::ArrowNativeType;
19use crate::buffer::ScalarBuffer;
20
21/// A buffer of monotonically increasing, positive integers used to store run-ends.
22///
23/// Used to compactly represent runs of the same value. Values being represented
24/// are stored in a separate buffer from this struct. See [`RunArray`] for an example
25/// of how this is used with a companion array to represent the values.
26///
27/// # Logical vs Physical
28///
29/// Physically, each value in the `run_ends` buffer is the cumulative length of
30/// all runs in the logical representation, up to that physical index. Consider
31/// the following example:
32///
33/// ```text
34/// physical logical
35/// ┌─────────┬─────────┐ ┌─────────┬─────────┐
36/// │ 3 │ 0 │ ◄──────┬─ │ A │ 0 │
37/// ├─────────┼─────────┤ │ ├─────────┼─────────┤
38/// │ 4 │ 1 │ ◄────┐ ├─ │ A │ 1 │
39/// ├─────────┼─────────┤ │ │ ├─────────┼─────────┤
40/// │ 6 │ 2 │ ◄──┐ │ └─ │ A │ 2 │
41/// └─────────┴─────────┘ │ │ ├─────────┼─────────┤
42/// run-ends index │ └─── │ B │ 3 │
43/// │ ├─────────┼─────────┤
44/// logical_offset = 0 ├───── │ C │ 4 │
45/// logical_length = 6 │ ├─────────┼─────────┤
46/// └───── │ C │ 5 │
47/// └─────────┴─────────┘
48/// values index
49/// ```
50///
51/// A [`RunEndBuffer`] is physically the buffer and offset with length on the left.
52/// In this case, the offset and length represent the whole buffer, so it is essentially
53/// unsliced. See the section below on slicing for more details on how this buffer
54/// handles slicing.
55///
56/// This means that multiple logical values are represented in the same physical index,
57/// and multiple logical indices map to the same physical index. The [`RunEndBuffer`]
58/// containing `[3, 4, 6]` is essentially the physical indices `[0, 0, 0, 1, 2, 2]`,
59/// and having a separately stored buffer of values such as `[A, B, C]` can turn
60/// this into a representation of `[A, A, A, B, C, C]`.
61///
62/// # Slicing
63///
64/// In order to provide zero-copy slicing, this struct stores a separate **logical**
65/// offset and length. Consider the following example:
66///
67/// ```text
68/// physical logical
69/// ┌─────────┬─────────┐ ┌ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ┐
70/// │ 3 │ 0 │ ◄──────┐ A 0
71/// ├─────────┼─────────┤ │ ├── ─ ─ ─ ┼ ─ ─ ─ ─ ┤
72/// │ 4 │ 1 │ ◄────┐ │ A 1
73/// ├─────────┼─────────┤ │ │ ├─────────┼─────────┤
74/// │ 6 │ 2 │ ◄──┐ │ └─ │ A │ 2 │◄─── logical_offset
75/// └─────────┴─────────┘ │ │ ├─────────┼─────────┤
76/// run-ends index │ └─── │ B │ 3 │
77/// │ ├─────────┼─────────┤
78/// logical_offset = 2 └───── │ C │ 4 │
79/// logical_length = 3 ├─────────┼─────────┤
80/// C 5 ◄─── logical_offset + logical_length
81/// └ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ┘
82/// values index
83/// ```
84///
85/// The physical `run_ends` [`ScalarBuffer`] remains unchanged, in order to facilitate
86/// zero-copy. However, we now offset into the **logical** representation with an
87/// accompanying length. This allows us to represent values `[A, B, C]` using physical
88/// indices `0, 1, 2` with the same underlying physical buffer, at the cost of two
89/// extra `usize`s to represent the logical slice that was taken.
90///
91/// (A [`RunEndBuffer`] is considered unsliced when `logical_offset` is `0` and
92/// `logical_length` is equal to the last value in `run_ends`)
93///
94/// [`RunArray`]: https://docs.rs/arrow/latest/arrow/array/struct.RunArray.html
95/// [Run-End encoded layout]: https://arrow.apache.org/docs/format/Columnar.html#run-end-encoded-layout
96#[derive(Debug, Clone)]
97pub struct RunEndBuffer<E: ArrowNativeType> {
98 run_ends: ScalarBuffer<E>,
99 logical_length: usize,
100 logical_offset: usize,
101}
102
103impl<E> RunEndBuffer<E>
104where
105 E: ArrowNativeType,
106{
107 /// Create a new [`RunEndBuffer`] from a [`ScalarBuffer`], `logical_offset`
108 /// and `logical_length`.
109 ///
110 /// # Panics
111 ///
112 /// - `run_ends` does not contain strictly increasing values greater than zero
113 /// - The last value of `run_ends` is less than `logical_offset + logical_length`
114 pub fn new(run_ends: ScalarBuffer<E>, logical_offset: usize, logical_length: usize) -> Self {
115 assert!(
116 run_ends.windows(2).all(|w| w[0] < w[1]),
117 "run-ends not strictly increasing"
118 );
119
120 if logical_length != 0 {
121 assert!(!run_ends.is_empty(), "non-empty slice but empty run-ends");
122 let end = E::from_usize(logical_offset.saturating_add(logical_length)).unwrap();
123 assert!(
124 *run_ends.first().unwrap() > E::usize_as(0),
125 "run-ends not greater than 0"
126 );
127 assert!(
128 *run_ends.last().unwrap() >= end,
129 "slice beyond bounds of run-ends"
130 );
131 }
132
133 Self {
134 run_ends,
135 logical_offset,
136 logical_length,
137 }
138 }
139
140 /// Create a new [`RunEndBuffer`] from a [`ScalarBuffer`], `logical_offset`
141 /// and `logical_length`.
142 ///
143 /// # Safety
144 ///
145 /// - `run_ends` must contain strictly increasing values greater than zero
146 /// - The last value of `run_ends` must be greater than or equal to `logical_offset + logical_len`
147 pub unsafe fn new_unchecked(
148 run_ends: ScalarBuffer<E>,
149 logical_offset: usize,
150 logical_length: usize,
151 ) -> Self {
152 Self {
153 run_ends,
154 logical_offset,
155 logical_length,
156 }
157 }
158
159 /// Returns the logical offset into the run-ends stored by this buffer.
160 #[inline]
161 pub fn offset(&self) -> usize {
162 self.logical_offset
163 }
164
165 /// Returns the logical length of the run-ends stored by this buffer.
166 #[inline]
167 pub fn len(&self) -> usize {
168 self.logical_length
169 }
170
171 /// Returns true if this buffer is logically empty.
172 #[inline]
173 pub fn is_empty(&self) -> bool {
174 self.logical_length == 0
175 }
176
177 /// Free up unused memory.
178 pub fn shrink_to_fit(&mut self) {
179 // TODO(emilk): we could shrink even more in the case where we are a small sub-slice of the full buffer
180 self.run_ends.shrink_to_fit();
181 }
182
183 /// Returns the physical (**unsliced**) run ends of this buffer.
184 ///
185 /// Take care when operating on these values as it doesn't take into account
186 /// any logical slicing that may have occurred.
187 #[inline]
188 pub fn values(&self) -> &[E] {
189 &self.run_ends
190 }
191
192 /// Returns the maximum run-end encoded in the underlying buffer; that is, the
193 /// last physical run of the buffer. This does not take into account any logical
194 /// slicing that may have occurred.
195 #[inline]
196 pub fn max_value(&self) -> usize {
197 self.values().last().copied().unwrap_or_default().as_usize()
198 }
199
200 /// Performs a binary search to find the physical index for the given logical
201 /// index.
202 ///
203 /// Useful for extracting the corresponding physical `run_ends` when this buffer
204 /// is logically sliced.
205 ///
206 /// The result is arbitrary if `logical_index >= self.len()`.
207 pub fn get_physical_index(&self, logical_index: usize) -> usize {
208 let logical_index = E::usize_as(self.logical_offset + logical_index);
209 let cmp = |p: &E| p.partial_cmp(&logical_index).unwrap();
210
211 match self.run_ends.binary_search_by(cmp) {
212 Ok(idx) => idx + 1,
213 Err(idx) => idx,
214 }
215 }
216
217 /// Returns the physical index at which the logical array starts.
218 ///
219 /// The same as calling `get_physical_index(0)` but with a fast path if the
220 /// buffer is not logically sliced, in which case it always returns `0`.
221 pub fn get_start_physical_index(&self) -> usize {
222 if self.logical_offset == 0 || self.logical_length == 0 {
223 return 0;
224 }
225 // Fallback to binary search
226 self.get_physical_index(0)
227 }
228
229 /// Returns the physical index at which the logical array ends.
230 ///
231 /// The same as calling `get_physical_index(length - 1)` but with a fast path
232 /// if the buffer is not logically sliced, in which case it returns `length - 1`.
233 pub fn get_end_physical_index(&self) -> usize {
234 if self.logical_length == 0 {
235 return 0;
236 }
237 if self.max_value() == self.logical_offset + self.logical_length {
238 return self.values().len() - 1;
239 }
240 // Fallback to binary search
241 self.get_physical_index(self.logical_length - 1)
242 }
243
244 /// Slices this [`RunEndBuffer`] by the provided `logical_offset` and `logical_length`.
245 ///
246 /// # Panics
247 ///
248 /// - Specified slice (`logical_offset` + `logical_length`) exceeds existing
249 /// logical length
250 pub fn slice(&self, logical_offset: usize, logical_length: usize) -> Self {
251 assert!(
252 logical_offset.saturating_add(logical_length) <= self.logical_length,
253 "the length + offset of the sliced RunEndBuffer cannot exceed the existing length"
254 );
255 Self {
256 run_ends: self.run_ends.clone(),
257 logical_offset: self.logical_offset + logical_offset,
258 logical_length,
259 }
260 }
261
262 /// Returns the inner [`ScalarBuffer`].
263 pub fn inner(&self) -> &ScalarBuffer<E> {
264 &self.run_ends
265 }
266
267 /// Returns the inner [`ScalarBuffer`], consuming self.
268 pub fn into_inner(self) -> ScalarBuffer<E> {
269 self.run_ends
270 }
271
272 /// Returns the physical indices corresponding to the provided logical indices.
273 ///
274 /// Given a slice of logical indices, this method returns a `Vec` containing the
275 /// corresponding physical indices into the run-ends buffer.
276 ///
277 /// This method operates by iterating the logical indices in sorted order, instead of
278 /// finding the physical index for each logical index using binary search via
279 /// the function [`RunEndBuffer::get_physical_index`].
280 ///
281 /// Running benchmarks on both approaches showed that the approach used here
282 /// scaled well for larger inputs.
283 ///
284 /// See <https://github.com/apache/arrow-rs/pull/3622#issuecomment-1407753727> for more details.
285 ///
286 /// # Errors
287 ///
288 /// If any logical index is out of bounds (>= self.len()), returns an error containing the invalid index.
289 #[inline]
290 pub fn get_physical_indices<I>(&self, logical_indices: &[I]) -> Result<Vec<usize>, I>
291 where
292 I: ArrowNativeType,
293 {
294 let len = self.len();
295 let offset = self.offset();
296
297 let indices_len = logical_indices.len();
298
299 if indices_len == 0 {
300 return Ok(vec![]);
301 }
302
303 // `ordered_indices` store index into `logical_indices` and can be used
304 // to iterate `logical_indices` in sorted order.
305 let mut ordered_indices: Vec<usize> = (0..indices_len).collect();
306
307 // Instead of sorting `logical_indices` directly, sort the `ordered_indices`
308 // whose values are index of `logical_indices`
309 ordered_indices.sort_unstable_by(|lhs, rhs| {
310 logical_indices[*lhs]
311 .partial_cmp(&logical_indices[*rhs])
312 .unwrap()
313 });
314
315 // Return early if all the logical indices cannot be converted to physical indices.
316 let largest_logical_index = logical_indices[*ordered_indices.last().unwrap()].as_usize();
317 if largest_logical_index >= len {
318 return Err(logical_indices[*ordered_indices.last().unwrap()]);
319 }
320
321 // Skip some physical indices based on offset.
322 let skip_value = self.get_start_physical_index();
323
324 let mut physical_indices = vec![0; indices_len];
325
326 let mut ordered_index = 0_usize;
327 for (physical_index, run_end) in self.values().iter().enumerate().skip(skip_value) {
328 // Get the run end index (relative to offset) of current physical index
329 let run_end_value = run_end.as_usize() - offset;
330
331 // All the `logical_indices` that are less than current run end index
332 // belongs to current physical index.
333 while ordered_index < indices_len
334 && logical_indices[ordered_indices[ordered_index]].as_usize() < run_end_value
335 {
336 physical_indices[ordered_indices[ordered_index]] = physical_index;
337 ordered_index += 1;
338 }
339 }
340
341 // If there are input values >= run_ends.last_value then we'll not be able to convert
342 // all logical indices to physical indices.
343 if ordered_index < logical_indices.len() {
344 return Err(logical_indices[ordered_indices[ordered_index]]);
345 }
346 Ok(physical_indices)
347 }
348}
349
350#[cfg(test)]
351mod tests {
352 use crate::buffer::RunEndBuffer;
353
354 #[test]
355 fn test_zero_length_slice() {
356 let buffer = RunEndBuffer::new(vec![1_i32, 4_i32].into(), 0, 4);
357 assert_eq!(buffer.get_start_physical_index(), 0);
358 assert_eq!(buffer.get_end_physical_index(), 1);
359 assert_eq!(buffer.get_physical_index(3), 1);
360
361 for offset in 0..4 {
362 let sliced = buffer.slice(offset, 0);
363 assert_eq!(sliced.get_start_physical_index(), 0);
364 assert_eq!(sliced.get_end_physical_index(), 0);
365 }
366
367 let buffer = RunEndBuffer::new(Vec::<i32>::new().into(), 0, 0);
368 assert_eq!(buffer.get_start_physical_index(), 0);
369 assert_eq!(buffer.get_end_physical_index(), 0);
370 }
371}