arrow_select/coalesce/
byte_view.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::coalesce::InProgressArray;
19use arrow_array::cast::AsArray;
20use arrow_array::types::ByteViewType;
21use arrow_array::{Array, ArrayRef, GenericByteViewArray};
22use arrow_buffer::{Buffer, NullBufferBuilder};
23use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
24use arrow_schema::ArrowError;
25use std::marker::PhantomData;
26use std::sync::Arc;
27
28/// InProgressArray for [`StringViewArray`] and [`BinaryViewArray`]
29///
30/// This structure buffers the views and data buffers as they are copied from
31/// the source array, and then produces a new array when `finish` is called. It
32/// also handles "garbage collection" by copying strings to a new buffer when
33/// the source buffer is sparse (i.e. uses at least 2x more than the memory it
34/// needs).
35///
36/// [`StringViewArray`]: arrow_array::StringViewArray
37/// [`BinaryViewArray`]: arrow_array::BinaryViewArray
38pub(crate) struct InProgressByteViewArray<B: ByteViewType> {
39    /// The source array and information
40    source: Option<Source>,
41    /// the target batch size (and thus size for views allocation)
42    batch_size: usize,
43    /// The in progress views
44    views: Vec<u128>,
45    /// In progress nulls
46    nulls: NullBufferBuilder,
47    /// current buffer
48    current: Option<Vec<u8>>,
49    /// completed buffers
50    completed: Vec<Buffer>,
51    /// Allocates new buffers of increasing size as needed
52    buffer_source: BufferSource,
53    /// Phantom so we can use the same struct for both StringViewArray and
54    /// BinaryViewArray
55    _phantom: PhantomData<B>,
56}
57
58struct Source {
59    /// The array to copy form
60    array: ArrayRef,
61    /// Should the strings from the source array be copied into new buffers?
62    need_gc: bool,
63    /// How many bytes were actually used in the source array's buffers?
64    ideal_buffer_size: usize,
65}
66
67// manually implement Debug because ByteViewType doesn't implement Debug
68impl<B: ByteViewType> std::fmt::Debug for InProgressByteViewArray<B> {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        f.debug_struct("InProgressByteViewArray")
71            .field("batch_size", &self.batch_size)
72            .field("views", &self.views.len())
73            .field("nulls", &self.nulls)
74            .field("current", &self.current.as_ref().map(|_| "Some(...)"))
75            .field("completed", &self.completed.len())
76            .finish()
77    }
78}
79
80impl<B: ByteViewType> InProgressByteViewArray<B> {
81    pub(crate) fn new(batch_size: usize) -> Self {
82        let buffer_source = BufferSource::new();
83
84        Self {
85            batch_size,
86            source: None,
87            views: Vec::new(),                         // allocate in push
88            nulls: NullBufferBuilder::new(batch_size), // no allocation
89            current: None,
90            completed: vec![],
91            buffer_source,
92            _phantom: PhantomData,
93        }
94    }
95
96    /// Allocate space for output views and nulls if needed
97    ///
98    /// This is done on write (when we know it is necessary) rather than
99    /// eagerly to avoid allocations that are not used.
100    fn ensure_capacity(&mut self) {
101        self.views.reserve(self.batch_size);
102    }
103
104    /// Finishes in progress buffer, if any
105    fn finish_current(&mut self) {
106        let Some(next_buffer) = self.current.take() else {
107            return;
108        };
109        self.completed.push(next_buffer.into());
110    }
111
112    /// Append views to self.views, updating the buffer index if necessary
113    #[inline(never)]
114    fn append_views_and_update_buffer_index(&mut self, views: &[u128], buffers: &[Buffer]) {
115        if let Some(buffer) = self.current.take() {
116            self.completed.push(buffer.into());
117        }
118        let starting_buffer: u32 = self.completed.len().try_into().expect("too many buffers");
119        self.completed.extend_from_slice(buffers);
120
121        if starting_buffer == 0 {
122            // If there are no buffers, we can just use the views as is
123            self.views.extend_from_slice(views);
124        } else {
125            // If there are buffers, we need to update the buffer index
126            let updated_views = views.iter().map(|v| {
127                let mut byte_view = ByteView::from(*v);
128                if byte_view.length > MAX_INLINE_VIEW_LEN {
129                    // Small views (<=12 bytes) are inlined, so only need to update large views
130                    byte_view.buffer_index += starting_buffer;
131                };
132                byte_view.as_u128()
133            });
134
135            self.views.extend(updated_views);
136        }
137    }
138
139    /// Append views to self.views, copying data from the buffers into
140    /// self.buffers and updating the buffer index as necessary.
141    ///
142    /// # Arguments
143    /// - `views` - the views to append
144    /// - `view_buffer_size` - the total number of bytes pointed to by all
145    ///   views (used to allocate new buffers if needed)
146    /// - `buffers` - the buffers the reviews point to
147    #[inline(never)]
148    fn append_views_and_copy_strings(
149        &mut self,
150        views: &[u128],
151        view_buffer_size: usize,
152        buffers: &[Buffer],
153    ) {
154        // Note: the calculations below are designed to avoid any reallocations
155        // of the current buffer, and to only allocate new buffers when
156        // necessary, which is critical for performance.
157
158        // If there is no current buffer, allocate a new one
159        let Some(current) = self.current.take() else {
160            let new_buffer = self.buffer_source.next_buffer(view_buffer_size);
161            self.append_views_and_copy_strings_inner(views, new_buffer, buffers);
162            return;
163        };
164
165        // If there is a current buffer with enough space, append the views and
166        // copy the strings into the existing buffer.
167        let mut remaining_capacity = current.capacity() - current.len();
168        if view_buffer_size <= remaining_capacity {
169            self.append_views_and_copy_strings_inner(views, current, buffers);
170            return;
171        }
172
173        // Here there is a current buffer, but it doesn't have enough space to
174        // hold all the strings. Copy as many views as we can into the current
175        // buffer and then allocate a new buffer for the remaining views
176        //
177        // TODO: should we copy the strings too at the same time?
178        let mut num_view_to_current = 0;
179        for view in views {
180            let b = ByteView::from(*view);
181            let str_len = b.length;
182            if remaining_capacity < str_len as usize {
183                break;
184            }
185            if str_len > MAX_INLINE_VIEW_LEN {
186                remaining_capacity -= str_len as usize;
187            }
188            num_view_to_current += 1;
189        }
190
191        let first_views = &views[0..num_view_to_current];
192        let string_bytes_to_copy = current.capacity() - current.len() - remaining_capacity;
193        let remaining_view_buffer_size = view_buffer_size - string_bytes_to_copy;
194
195        self.append_views_and_copy_strings_inner(first_views, current, buffers);
196        let completed = self.current.take().expect("completed");
197        self.completed.push(completed.into());
198
199        // Copy any remaining views into a new buffer
200        let remaining_views = &views[num_view_to_current..];
201        let new_buffer = self.buffer_source.next_buffer(remaining_view_buffer_size);
202        self.append_views_and_copy_strings_inner(remaining_views, new_buffer, buffers);
203    }
204
205    /// Append views to self.views, copying data from the buffers into
206    /// dst_buffer, which is then set as self.current
207    ///
208    /// # Panics:
209    /// If `self.current` is `Some`
210    ///
211    /// See `append_views_and_copy_strings` for more details
212    #[inline(never)]
213    fn append_views_and_copy_strings_inner(
214        &mut self,
215        views: &[u128],
216        mut dst_buffer: Vec<u8>,
217        buffers: &[Buffer],
218    ) {
219        assert!(self.current.is_none(), "current buffer should be None");
220
221        if views.is_empty() {
222            self.current = Some(dst_buffer);
223            return;
224        }
225
226        let new_buffer_index: u32 = self.completed.len().try_into().expect("too many buffers");
227
228        // In debug builds, check that the vector has enough capacity to copy
229        // the views into it without reallocating.
230        #[cfg(debug_assertions)]
231        {
232            let total_length: usize = views
233                .iter()
234                .filter_map(|v| {
235                    let b = ByteView::from(*v);
236                    if b.length > MAX_INLINE_VIEW_LEN {
237                        Some(b.length as usize)
238                    } else {
239                        None
240                    }
241                })
242                .sum();
243            debug_assert!(
244                dst_buffer.capacity() >= total_length,
245                "dst_buffer capacity {} is less than total length {}",
246                dst_buffer.capacity(),
247                total_length
248            );
249        }
250
251        // Copy the views, updating the buffer index and copying the data as needed
252        let new_views = views.iter().map(|v| {
253            let mut b: ByteView = ByteView::from(*v);
254            if b.length > MAX_INLINE_VIEW_LEN {
255                let buffer_index = b.buffer_index as usize;
256                let buffer_offset = b.offset as usize;
257                let str_len = b.length as usize;
258
259                // Update view to location in current
260                b.offset = dst_buffer.len() as u32;
261                b.buffer_index = new_buffer_index;
262
263                // safety: input views are validly constructed
264                let src = unsafe {
265                    buffers
266                        .get_unchecked(buffer_index)
267                        .get_unchecked(buffer_offset..buffer_offset + str_len)
268                };
269                dst_buffer.extend_from_slice(src);
270            }
271            b.as_u128()
272        });
273        self.views.extend(new_views);
274        self.current = Some(dst_buffer);
275    }
276}
277
278impl<B: ByteViewType> InProgressArray for InProgressByteViewArray<B> {
279    fn set_source(&mut self, source: Option<ArrayRef>) {
280        self.source = source.map(|array| {
281            let s = array.as_byte_view::<B>();
282
283            let (need_gc, ideal_buffer_size) = if s.data_buffers().is_empty() {
284                (false, 0)
285            } else {
286                let ideal_buffer_size = s.total_buffer_bytes_used();
287                let actual_buffer_size = s.get_buffer_memory_size();
288                // copying strings is expensive, so only do it if the array is
289                // sparse (uses at least 2x the memory it needs)
290                let need_gc =
291                    ideal_buffer_size != 0 && actual_buffer_size > (ideal_buffer_size * 2);
292                (need_gc, ideal_buffer_size)
293            };
294
295            Source {
296                array,
297                need_gc,
298                ideal_buffer_size,
299            }
300        })
301    }
302
303    fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> {
304        self.ensure_capacity();
305        let source = self.source.take().ok_or_else(|| {
306            ArrowError::InvalidArgumentError(
307                "Internal Error: InProgressByteViewArray: source not set".to_string(),
308            )
309        })?;
310
311        // If creating StringViewArray output, ensure input was valid utf8 too
312        let s = source.array.as_byte_view::<B>();
313
314        // add any nulls, as necessary
315        if let Some(nulls) = s.nulls().as_ref() {
316            let nulls = nulls.slice(offset, len);
317            self.nulls.append_buffer(&nulls);
318        } else {
319            self.nulls.append_n_non_nulls(len);
320        };
321
322        let buffers = s.data_buffers();
323        let views = &s.views().as_ref()[offset..offset + len];
324
325        // If there are no data buffers in s (all inlined views), can append the
326        // views/nulls and done
327        if source.ideal_buffer_size == 0 {
328            self.views.extend_from_slice(views);
329            self.source = Some(source);
330            return Ok(());
331        }
332
333        // Copying the strings into a buffer can be time-consuming so
334        // only do it if the array is sparse
335        if source.need_gc {
336            self.append_views_and_copy_strings(views, source.ideal_buffer_size, buffers);
337        } else {
338            self.append_views_and_update_buffer_index(views, buffers);
339        }
340        self.source = Some(source);
341        Ok(())
342    }
343
344    fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
345        self.finish_current();
346        assert!(self.current.is_none());
347        let buffers = std::mem::take(&mut self.completed);
348        let views = std::mem::take(&mut self.views);
349        let nulls = self.nulls.finish();
350        self.nulls = NullBufferBuilder::new(self.batch_size);
351
352        // Safety: we created valid views and buffers above and the
353        // input arrays had value data and nulls
354        let new_array =
355            unsafe { GenericByteViewArray::<B>::new_unchecked(views.into(), buffers, nulls) };
356        Ok(Arc::new(new_array))
357    }
358}
359
360const STARTING_BLOCK_SIZE: usize = 4 * 1024; // (note the first size used is actually 8KiB)
361const MAX_BLOCK_SIZE: usize = 1024 * 1024; // 1MiB
362
363/// Manages allocating new buffers for `StringViewArray` in increasing sizes
364#[derive(Debug)]
365struct BufferSource {
366    current_size: usize,
367}
368
369impl BufferSource {
370    fn new() -> Self {
371        Self {
372            current_size: STARTING_BLOCK_SIZE,
373        }
374    }
375
376    /// Return a new buffer, with a capacity of at least `min_size`
377    fn next_buffer(&mut self, min_size: usize) -> Vec<u8> {
378        let size = self.next_size(min_size);
379        Vec::with_capacity(size)
380    }
381
382    fn next_size(&mut self, min_size: usize) -> usize {
383        if self.current_size < MAX_BLOCK_SIZE {
384            // If the current size is less than the max size, we can double it
385            // we have fixed start/end block sizes, so we can't overflow
386            self.current_size = self.current_size.saturating_mul(2);
387        }
388        if self.current_size >= min_size {
389            self.current_size
390        } else {
391            // increase next size until we hit min_size or max  size
392            while self.current_size <= min_size && self.current_size < MAX_BLOCK_SIZE {
393                self.current_size = self.current_size.saturating_mul(2);
394            }
395            self.current_size.max(min_size)
396        }
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403
404    #[test]
405    fn test_buffer_source() {
406        let mut source = BufferSource::new();
407        assert_eq!(source.next_buffer(1000).capacity(), 8192);
408        assert_eq!(source.next_buffer(1000).capacity(), 16384);
409        assert_eq!(source.next_buffer(1000).capacity(), 32768);
410        assert_eq!(source.next_buffer(1000).capacity(), 65536);
411        assert_eq!(source.next_buffer(1000).capacity(), 131072);
412        assert_eq!(source.next_buffer(1000).capacity(), 262144);
413        assert_eq!(source.next_buffer(1000).capacity(), 524288);
414        assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
415        // clamped to max size
416        assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
417        // Can override with larger size request
418        assert_eq!(source.next_buffer(10_000_000).capacity(), 10_000_000);
419    }
420
421    #[test]
422    fn test_buffer_source_with_min_small() {
423        let mut source = BufferSource::new();
424        // First buffer should be 8kb
425        assert_eq!(source.next_buffer(5_600).capacity(), 8 * 1024);
426        // then 16kb
427        assert_eq!(source.next_buffer(5_600).capacity(), 16 * 1024);
428        // then 32kb
429        assert_eq!(source.next_buffer(5_600).capacity(), 32 * 1024);
430    }
431
432    #[test]
433    fn test_buffer_source_with_min_large() {
434        let mut source = BufferSource::new();
435        assert_eq!(source.next_buffer(500_000).capacity(), 512 * 1024);
436        assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
437        // clamped to max size
438        assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
439        // Can override with larger size request
440        assert_eq!(source.next_buffer(2_000_000).capacity(), 2_000_000);
441    }
442}