arrow_select/coalesce/
byte_view.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::coalesce::InProgressArray;
19use arrow_array::cast::AsArray;
20use arrow_array::types::ByteViewType;
21use arrow_array::{Array, ArrayRef, GenericByteViewArray};
22use arrow_buffer::{Buffer, NullBufferBuilder};
23use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
24use arrow_schema::ArrowError;
25use std::marker::PhantomData;
26use std::sync::Arc;
27
28/// InProgressArray for [`StringViewArray`] and [`BinaryViewArray`]
29///
30/// This structure buffers the views and data buffers as they are copied from
31/// the source array, and then produces a new array when `finish` is called. It
32/// also handles "garbage collection" by copying strings to a new buffer when
33/// the source buffer is sparse (i.e. uses at least 2x more than the memory it
34/// needs).
35///
36/// [`StringViewArray`]: arrow_array::StringViewArray
37/// [`BinaryViewArray`]: arrow_array::BinaryViewArray
38pub(crate) struct InProgressByteViewArray<B: ByteViewType> {
39    /// The source array and information
40    source: Option<Source>,
41    /// the target batch size (and thus size for views allocation)
42    batch_size: usize,
43    /// The in progress views
44    views: Vec<u128>,
45    /// In progress nulls
46    nulls: NullBufferBuilder,
47    /// current buffer
48    current: Option<Vec<u8>>,
49    /// completed buffers
50    completed: Vec<Buffer>,
51    /// Allocates new buffers of increasing size as needed
52    buffer_source: BufferSource,
53    /// Phantom so we can use the same struct for both StringViewArray and
54    /// BinaryViewArray
55    _phantom: PhantomData<B>,
56}
57
58struct Source {
59    /// The array to copy form
60    array: ArrayRef,
61    /// Should the strings from the source array be copied into new buffers?
62    need_gc: bool,
63    /// How many bytes were actually used in the source array's buffers?
64    ideal_buffer_size: usize,
65}
66
67// manually implement Debug because ByteViewType doesn't implement Debug
68impl<B: ByteViewType> std::fmt::Debug for InProgressByteViewArray<B> {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        f.debug_struct("InProgressByteViewArray")
71            .field("batch_size", &self.batch_size)
72            .field("views", &self.views.len())
73            .field("nulls", &self.nulls)
74            .field("current", &self.current.as_ref().map(|_| "Some(...)"))
75            .field("completed", &self.completed.len())
76            .finish()
77    }
78}
79
80impl<B: ByteViewType> InProgressByteViewArray<B> {
81    pub(crate) fn new(batch_size: usize) -> Self {
82        let buffer_source = BufferSource::new();
83
84        Self {
85            batch_size,
86            source: None,
87            views: Vec::new(),                         // allocate in push
88            nulls: NullBufferBuilder::new(batch_size), // no allocation
89            current: None,
90            completed: vec![],
91            buffer_source,
92            _phantom: PhantomData,
93        }
94    }
95
96    /// Allocate space for output views and nulls if needed
97    ///
98    /// This is done on write (when we know it is necessary) rather than
99    /// eagerly to avoid allocations that are not used.
100    fn ensure_capacity(&mut self) {
101        self.views.reserve(self.batch_size);
102    }
103
104    /// Finishes in progress buffer, if any
105    fn finish_current(&mut self) {
106        let Some(next_buffer) = self.current.take() else {
107            return;
108        };
109        self.completed.push(next_buffer.into());
110    }
111
112    /// Append views to self.views, updating the buffer index if necessary
113    #[inline(never)]
114    fn append_views_and_update_buffer_index(&mut self, views: &[u128], buffers: &[Buffer]) {
115        if let Some(buffer) = self.current.take() {
116            self.completed.push(buffer.into());
117        }
118        let starting_buffer: u32 = self.completed.len().try_into().expect("too many buffers");
119        self.completed.extend_from_slice(buffers);
120
121        if starting_buffer == 0 {
122            // If there are no buffers, we can just use the views as is
123            self.views.extend_from_slice(views);
124        } else {
125            // If there are buffers, we need to update the buffer index
126            let updated_views = views.iter().map(|v| {
127                let mut byte_view = ByteView::from(*v);
128                if byte_view.length > MAX_INLINE_VIEW_LEN {
129                    // Small views (<=12 bytes) are inlined, so only need to update large views
130                    byte_view.buffer_index += starting_buffer;
131                };
132                byte_view.as_u128()
133            });
134
135            self.views.extend(updated_views);
136        }
137    }
138
139    /// Append views to self.views, copying data from the buffers into
140    /// self.buffers and updating the buffer index as necessary.
141    ///
142    /// # Arguments
143    /// - `views` - the views to append
144    /// - `view_buffer_size` - the total number of bytes pointed to by all
145    ///   views (used to allocate new buffers if needed)
146    /// - `buffers` - the buffers the reviews point to
147    #[inline(never)]
148    fn append_views_and_copy_strings(
149        &mut self,
150        views: &[u128],
151        view_buffer_size: usize,
152        buffers: &[Buffer],
153    ) {
154        // Note: the calculations below are designed to avoid any reallocations
155        // of the current buffer, and to only allocate new buffers when
156        // necessary, which is critical for performance.
157
158        // If there is no current buffer, allocate a new one
159        let Some(current) = self.current.take() else {
160            let new_buffer = self.buffer_source.next_buffer(view_buffer_size);
161            self.append_views_and_copy_strings_inner(views, new_buffer, buffers);
162            return;
163        };
164
165        // If there is a current buffer with enough space, append the views and
166        // copy the strings into the existing buffer.
167        let mut remaining_capacity = current.capacity() - current.len();
168        if view_buffer_size <= remaining_capacity {
169            self.append_views_and_copy_strings_inner(views, current, buffers);
170            return;
171        }
172
173        // Here there is a current buffer, but it doesn't have enough space to
174        // hold all the strings. Copy as many views as we can into the current
175        // buffer and then allocate a new buffer for the remaining views
176        //
177        // TODO: should we copy the strings too at the same time?
178        let mut num_view_to_current = 0;
179        for view in views {
180            let b = ByteView::from(*view);
181            let str_len = b.length;
182            if remaining_capacity < str_len as usize {
183                break;
184            }
185            if str_len > MAX_INLINE_VIEW_LEN {
186                remaining_capacity -= str_len as usize;
187            }
188            num_view_to_current += 1;
189        }
190
191        let first_views = &views[0..num_view_to_current];
192        let string_bytes_to_copy = current.capacity() - current.len() - remaining_capacity;
193        let remaining_view_buffer_size = view_buffer_size - string_bytes_to_copy;
194
195        self.append_views_and_copy_strings_inner(first_views, current, buffers);
196        let completed = self.current.take().expect("completed");
197        self.completed.push(completed.into());
198
199        // Copy any remaining views into a new buffer
200        let remaining_views = &views[num_view_to_current..];
201        let new_buffer = self.buffer_source.next_buffer(remaining_view_buffer_size);
202        self.append_views_and_copy_strings_inner(remaining_views, new_buffer, buffers);
203    }
204
205    /// Append views to self.views, copying data from the buffers into
206    /// dst_buffer, which is then set as self.current
207    ///
208    /// # Panics:
209    /// If `self.current` is `Some`
210    ///
211    /// See `append_views_and_copy_strings` for more details
212    #[inline(never)]
213    fn append_views_and_copy_strings_inner(
214        &mut self,
215        views: &[u128],
216        mut dst_buffer: Vec<u8>,
217        buffers: &[Buffer],
218    ) {
219        assert!(self.current.is_none(), "current buffer should be None");
220
221        if views.is_empty() {
222            self.current = Some(dst_buffer);
223            return;
224        }
225
226        let new_buffer_index: u32 = self.completed.len().try_into().expect("too many buffers");
227
228        // In debug builds, check that the vector has enough capacity to copy
229        // the views into it without reallocating.
230        #[cfg(debug_assertions)]
231        {
232            let total_length: usize = views
233                .iter()
234                .filter_map(|v| {
235                    let b = ByteView::from(*v);
236                    if b.length > MAX_INLINE_VIEW_LEN {
237                        Some(b.length as usize)
238                    } else {
239                        None
240                    }
241                })
242                .sum();
243            debug_assert!(
244                dst_buffer.capacity() >= total_length,
245                "dst_buffer capacity {} is less than total length {}",
246                dst_buffer.capacity(),
247                total_length
248            );
249        }
250
251        // Copy the views, updating the buffer index and copying the data as needed
252        let new_views = views.iter().map(|v| {
253            let mut b: ByteView = ByteView::from(*v);
254            if b.length > MAX_INLINE_VIEW_LEN {
255                let buffer_index = b.buffer_index as usize;
256                let buffer_offset = b.offset as usize;
257                let str_len = b.length as usize;
258
259                // Update view to location in current
260                b.offset = dst_buffer.len() as u32;
261                b.buffer_index = new_buffer_index;
262
263                // safety: input views are validly constructed
264                let src = unsafe {
265                    buffers
266                        .get_unchecked(buffer_index)
267                        .get_unchecked(buffer_offset..buffer_offset + str_len)
268                };
269                dst_buffer.extend_from_slice(src);
270            }
271            b.as_u128()
272        });
273        self.views.extend(new_views);
274        self.current = Some(dst_buffer);
275    }
276}
277
278impl<B: ByteViewType> InProgressArray for InProgressByteViewArray<B> {
279    fn set_source(&mut self, source: Option<ArrayRef>) {
280        self.source = source.map(|array| {
281            let s = array.as_byte_view::<B>();
282
283            let (need_gc, ideal_buffer_size) = if s.data_buffers().is_empty() {
284                (false, 0)
285            } else {
286                let ideal_buffer_size = s.total_buffer_bytes_used();
287                // We don't use get_buffer_memory_size here, because gc is for the contents of the
288                // data buffers, not views and nulls.
289                let actual_buffer_size =
290                    s.data_buffers().iter().map(|b| b.capacity()).sum::<usize>();
291                // copying strings is expensive, so only do it if the array is
292                // sparse (uses at least 2x the memory it needs)
293                let need_gc =
294                    ideal_buffer_size != 0 && actual_buffer_size > (ideal_buffer_size * 2);
295                (need_gc, ideal_buffer_size)
296            };
297
298            Source {
299                array,
300                need_gc,
301                ideal_buffer_size,
302            }
303        })
304    }
305
306    fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> {
307        self.ensure_capacity();
308        let source = self.source.take().ok_or_else(|| {
309            ArrowError::InvalidArgumentError(
310                "Internal Error: InProgressByteViewArray: source not set".to_string(),
311            )
312        })?;
313
314        // If creating StringViewArray output, ensure input was valid utf8 too
315        let s = source.array.as_byte_view::<B>();
316
317        // add any nulls, as necessary
318        if let Some(nulls) = s.nulls().as_ref() {
319            let nulls = nulls.slice(offset, len);
320            self.nulls.append_buffer(&nulls);
321        } else {
322            self.nulls.append_n_non_nulls(len);
323        };
324
325        let buffers = s.data_buffers();
326        let views = &s.views().as_ref()[offset..offset + len];
327
328        // If there are no data buffers in s (all inlined views), can append the
329        // views/nulls and done
330        if source.ideal_buffer_size == 0 {
331            self.views.extend_from_slice(views);
332            self.source = Some(source);
333            return Ok(());
334        }
335
336        // Copying the strings into a buffer can be time-consuming so
337        // only do it if the array is sparse
338        if source.need_gc {
339            self.append_views_and_copy_strings(views, source.ideal_buffer_size, buffers);
340        } else {
341            self.append_views_and_update_buffer_index(views, buffers);
342        }
343        self.source = Some(source);
344        Ok(())
345    }
346
347    fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
348        self.finish_current();
349        assert!(self.current.is_none());
350        let buffers = std::mem::take(&mut self.completed);
351        let views = std::mem::take(&mut self.views);
352        let nulls = self.nulls.finish();
353        self.nulls = NullBufferBuilder::new(self.batch_size);
354
355        // Safety: we created valid views and buffers above and the
356        // input arrays had value data and nulls
357        let new_array =
358            unsafe { GenericByteViewArray::<B>::new_unchecked(views.into(), buffers, nulls) };
359        Ok(Arc::new(new_array))
360    }
361}
362
363const STARTING_BLOCK_SIZE: usize = 4 * 1024; // (note the first size used is actually 8KiB)
364const MAX_BLOCK_SIZE: usize = 1024 * 1024; // 1MiB
365
366/// Manages allocating new buffers for `StringViewArray` in increasing sizes
367#[derive(Debug)]
368struct BufferSource {
369    current_size: usize,
370}
371
372impl BufferSource {
373    fn new() -> Self {
374        Self {
375            current_size: STARTING_BLOCK_SIZE,
376        }
377    }
378
379    /// Return a new buffer, with a capacity of at least `min_size`
380    fn next_buffer(&mut self, min_size: usize) -> Vec<u8> {
381        let size = self.next_size(min_size);
382        Vec::with_capacity(size)
383    }
384
385    fn next_size(&mut self, min_size: usize) -> usize {
386        if self.current_size < MAX_BLOCK_SIZE {
387            // If the current size is less than the max size, we can double it
388            // we have fixed start/end block sizes, so we can't overflow
389            self.current_size = self.current_size.saturating_mul(2);
390        }
391        if self.current_size >= min_size {
392            self.current_size
393        } else {
394            // increase next size until we hit min_size or max  size
395            while self.current_size <= min_size && self.current_size < MAX_BLOCK_SIZE {
396                self.current_size = self.current_size.saturating_mul(2);
397            }
398            self.current_size.max(min_size)
399        }
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406
407    #[test]
408    fn test_buffer_source() {
409        let mut source = BufferSource::new();
410        assert_eq!(source.next_buffer(1000).capacity(), 8192);
411        assert_eq!(source.next_buffer(1000).capacity(), 16384);
412        assert_eq!(source.next_buffer(1000).capacity(), 32768);
413        assert_eq!(source.next_buffer(1000).capacity(), 65536);
414        assert_eq!(source.next_buffer(1000).capacity(), 131072);
415        assert_eq!(source.next_buffer(1000).capacity(), 262144);
416        assert_eq!(source.next_buffer(1000).capacity(), 524288);
417        assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
418        // clamped to max size
419        assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
420        // Can override with larger size request
421        assert_eq!(source.next_buffer(10_000_000).capacity(), 10_000_000);
422    }
423
424    #[test]
425    fn test_buffer_source_with_min_small() {
426        let mut source = BufferSource::new();
427        // First buffer should be 8kb
428        assert_eq!(source.next_buffer(5_600).capacity(), 8 * 1024);
429        // then 16kb
430        assert_eq!(source.next_buffer(5_600).capacity(), 16 * 1024);
431        // then 32kb
432        assert_eq!(source.next_buffer(5_600).capacity(), 32 * 1024);
433    }
434
435    #[test]
436    fn test_buffer_source_with_min_large() {
437        let mut source = BufferSource::new();
438        assert_eq!(source.next_buffer(500_000).capacity(), 512 * 1024);
439        assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
440        // clamped to max size
441        assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
442        // Can override with larger size request
443        assert_eq!(source.next_buffer(2_000_000).capacity(), 2_000_000);
444    }
445}