Skip to main content

arrow_select/coalesce/
byte_view.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::coalesce::InProgressArray;
19use arrow_array::cast::AsArray;
20use arrow_array::types::ByteViewType;
21use arrow_array::{Array, ArrayRef, GenericByteViewArray};
22use arrow_buffer::{Buffer, NullBufferBuilder};
23use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
24use arrow_schema::ArrowError;
25use std::marker::PhantomData;
26use std::sync::Arc;
27
28/// InProgressArray for [`StringViewArray`] and [`BinaryViewArray`]
29///
30/// This structure buffers the views and data buffers as they are copied from
31/// the source array, and then produces a new array when `finish` is called. It
32/// also handles "garbage collection" by copying strings to a new buffer when
33/// the source buffer is sparse (i.e. uses at least 2x more than the memory it
34/// needs).
35///
36/// [`StringViewArray`]: arrow_array::StringViewArray
37/// [`BinaryViewArray`]: arrow_array::BinaryViewArray
38pub(crate) struct InProgressByteViewArray<B: ByteViewType> {
39    /// The source array and information
40    source: Option<Source>,
41    /// the target batch size (and thus size for views allocation)
42    batch_size: usize,
43    /// The in progress views
44    views: Vec<u128>,
45    /// In progress nulls
46    nulls: NullBufferBuilder,
47    /// current buffer
48    current: Option<Vec<u8>>,
49    /// completed buffers
50    completed: Vec<Buffer>,
51    /// Allocates new buffers of increasing size as needed
52    buffer_source: BufferSource,
53    /// Phantom so we can use the same struct for both StringViewArray and
54    /// BinaryViewArray
55    _phantom: PhantomData<B>,
56}
57
58struct Source {
59    /// The array to copy form
60    array: ArrayRef,
61    /// Should the strings from the source array be copied into new buffers?
62    need_gc: bool,
63    /// How many bytes were actually used in the source array's buffers?
64    ideal_buffer_size: usize,
65}
66
67// manually implement Debug because ByteViewType doesn't implement Debug
68impl<B: ByteViewType> std::fmt::Debug for InProgressByteViewArray<B> {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        f.debug_struct("InProgressByteViewArray")
71            .field("batch_size", &self.batch_size)
72            .field("views", &self.views.len())
73            .field("nulls", &self.nulls)
74            .field("current", &self.current.as_ref().map(|_| "Some(...)"))
75            .field("completed", &self.completed.len())
76            .finish()
77    }
78}
79
80impl<B: ByteViewType> InProgressByteViewArray<B> {
81    pub(crate) fn new(batch_size: usize) -> Self {
82        let buffer_source = BufferSource::new();
83
84        Self {
85            batch_size,
86            source: None,
87            views: Vec::new(),                         // allocate in push
88            nulls: NullBufferBuilder::new(batch_size), // no allocation
89            current: None,
90            completed: vec![],
91            buffer_source,
92            _phantom: PhantomData,
93        }
94    }
95
96    /// Allocate space for output views and nulls if needed
97    ///
98    /// This is done on write (when we know it is necessary) rather than
99    /// eagerly to avoid allocations that are not used.
100    fn ensure_capacity(&mut self) {
101        if self.views.capacity() == 0 {
102            self.views.reserve(self.batch_size);
103        }
104        debug_assert_eq!(self.views.capacity(), self.batch_size);
105    }
106
107    /// Finishes in progress buffer, if any
108    fn finish_current(&mut self) {
109        let Some(next_buffer) = self.current.take() else {
110            return;
111        };
112        self.completed.push(next_buffer.into());
113    }
114
115    /// Append views to self.views, updating the buffer index if necessary
116    #[inline(never)]
117    fn append_views_and_update_buffer_index(&mut self, views: &[u128], buffers: &[Buffer]) {
118        if let Some(buffer) = self.current.take() {
119            self.completed.push(buffer.into());
120        }
121        let starting_buffer: u32 = self.completed.len().try_into().expect("too many buffers");
122        self.completed.extend_from_slice(buffers);
123
124        if starting_buffer == 0 {
125            // If there are no buffers, we can just use the views as is
126            self.views.extend_from_slice(views);
127        } else {
128            // If there are buffers, we need to update the buffer index
129            let updated_views = views.iter().map(|v| {
130                let mut byte_view = ByteView::from(*v);
131                if byte_view.length > MAX_INLINE_VIEW_LEN {
132                    // Small views (<=12 bytes) are inlined, so only need to update large views
133                    byte_view.buffer_index += starting_buffer;
134                };
135                byte_view.as_u128()
136            });
137
138            self.views.extend(updated_views);
139        }
140    }
141
142    /// Append views to self.views, copying data from the buffers into
143    /// self.buffers and updating the buffer index as necessary.
144    ///
145    /// # Arguments
146    /// - `views` - the views to append
147    /// - `view_buffer_size` - the total number of bytes pointed to by all
148    ///   views (used to allocate new buffers if needed)
149    /// - `buffers` - the buffers the reviews point to
150    #[inline(never)]
151    fn append_views_and_copy_strings(
152        &mut self,
153        views: &[u128],
154        view_buffer_size: usize,
155        buffers: &[Buffer],
156    ) {
157        // Note: the calculations below are designed to avoid any reallocations
158        // of the current buffer, and to only allocate new buffers when
159        // necessary, which is critical for performance.
160
161        // If there is no current buffer, allocate a new one
162        let Some(current) = self.current.take() else {
163            let new_buffer = self.buffer_source.next_buffer(view_buffer_size);
164            self.append_views_and_copy_strings_inner(views, new_buffer, buffers);
165            return;
166        };
167
168        // If there is a current buffer with enough space, append the views and
169        // copy the strings into the existing buffer.
170        let mut remaining_capacity = current.capacity() - current.len();
171        if view_buffer_size <= remaining_capacity {
172            self.append_views_and_copy_strings_inner(views, current, buffers);
173            return;
174        }
175
176        // Here there is a current buffer, but it doesn't have enough space to
177        // hold all the strings. Copy as many views as we can into the current
178        // buffer and then allocate a new buffer for the remaining views
179        //
180        // TODO: should we copy the strings too at the same time?
181        let mut num_view_to_current = 0;
182        for view in views {
183            let b = ByteView::from(*view);
184            let str_len = b.length;
185            if remaining_capacity < str_len as usize {
186                break;
187            }
188            if str_len > MAX_INLINE_VIEW_LEN {
189                remaining_capacity -= str_len as usize;
190            }
191            num_view_to_current += 1;
192        }
193
194        let first_views = &views[0..num_view_to_current];
195        let string_bytes_to_copy = current.capacity() - current.len() - remaining_capacity;
196        let remaining_view_buffer_size = view_buffer_size - string_bytes_to_copy;
197
198        self.append_views_and_copy_strings_inner(first_views, current, buffers);
199        let completed = self.current.take().expect("completed");
200        self.completed.push(completed.into());
201
202        // Copy any remaining views into a new buffer
203        let remaining_views = &views[num_view_to_current..];
204        let new_buffer = self.buffer_source.next_buffer(remaining_view_buffer_size);
205        self.append_views_and_copy_strings_inner(remaining_views, new_buffer, buffers);
206    }
207
208    /// Append views to self.views, copying data from the buffers into
209    /// dst_buffer, which is then set as self.current
210    ///
211    /// # Panics:
212    /// If `self.current` is `Some`
213    ///
214    /// See `append_views_and_copy_strings` for more details
215    #[inline(never)]
216    fn append_views_and_copy_strings_inner(
217        &mut self,
218        views: &[u128],
219        mut dst_buffer: Vec<u8>,
220        buffers: &[Buffer],
221    ) {
222        assert!(self.current.is_none(), "current buffer should be None");
223
224        if views.is_empty() {
225            self.current = Some(dst_buffer);
226            return;
227        }
228
229        let new_buffer_index: u32 = self.completed.len().try_into().expect("too many buffers");
230
231        // In debug builds, check that the vector has enough capacity to copy
232        // the views into it without reallocating.
233        #[cfg(debug_assertions)]
234        {
235            let total_length: usize = views
236                .iter()
237                .filter_map(|v| {
238                    let b = ByteView::from(*v);
239                    if b.length > MAX_INLINE_VIEW_LEN {
240                        Some(b.length as usize)
241                    } else {
242                        None
243                    }
244                })
245                .sum();
246            debug_assert!(
247                dst_buffer.capacity() >= total_length,
248                "dst_buffer capacity {} is less than total length {}",
249                dst_buffer.capacity(),
250                total_length
251            );
252        }
253
254        // Copy the views, updating the buffer index and copying the data as needed
255        let new_views = views.iter().map(|v| {
256            let mut b: ByteView = ByteView::from(*v);
257            if b.length > MAX_INLINE_VIEW_LEN {
258                let buffer_index = b.buffer_index as usize;
259                let buffer_offset = b.offset as usize;
260                let str_len = b.length as usize;
261
262                // Update view to location in current
263                b.offset = dst_buffer.len() as u32;
264                b.buffer_index = new_buffer_index;
265
266                // safety: input views are validly constructed
267                let src = unsafe {
268                    buffers
269                        .get_unchecked(buffer_index)
270                        .get_unchecked(buffer_offset..buffer_offset + str_len)
271                };
272                dst_buffer.extend_from_slice(src);
273            }
274            b.as_u128()
275        });
276        self.views.extend(new_views);
277        self.current = Some(dst_buffer);
278    }
279}
280
281impl<B: ByteViewType> InProgressArray for InProgressByteViewArray<B> {
282    fn set_source(&mut self, source: Option<ArrayRef>) {
283        self.source = source.map(|array| {
284            let s = array.as_byte_view::<B>();
285
286            let (need_gc, ideal_buffer_size) = if s.data_buffers().is_empty() {
287                (false, 0)
288            } else {
289                let ideal_buffer_size = s.total_buffer_bytes_used();
290                // We don't use get_buffer_memory_size here, because gc is for the contents of the
291                // data buffers, not views and nulls.
292                let actual_buffer_size =
293                    s.data_buffers().iter().map(|b| b.capacity()).sum::<usize>();
294                // copying strings is expensive, so only do it if the array is
295                // sparse (uses at least 2x the memory it needs)
296                let need_gc =
297                    ideal_buffer_size != 0 && actual_buffer_size > (ideal_buffer_size * 2);
298                (need_gc, ideal_buffer_size)
299            };
300
301            Source {
302                array,
303                need_gc,
304                ideal_buffer_size,
305            }
306        })
307    }
308
309    fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> {
310        self.ensure_capacity();
311        let source = self.source.take().ok_or_else(|| {
312            ArrowError::InvalidArgumentError(
313                "Internal Error: InProgressByteViewArray: source not set".to_string(),
314            )
315        })?;
316
317        // If creating StringViewArray output, ensure input was valid utf8 too
318        let s = source.array.as_byte_view::<B>();
319
320        // add any nulls, as necessary
321        if let Some(nulls) = s.nulls().as_ref() {
322            let nulls = nulls.slice(offset, len);
323            self.nulls.append_buffer(&nulls);
324        } else {
325            self.nulls.append_n_non_nulls(len);
326        };
327
328        let buffers = s.data_buffers();
329        let views = &s.views().as_ref()[offset..offset + len];
330
331        // If there are no data buffers in s (all inlined views), can append the
332        // views/nulls and done
333        if source.ideal_buffer_size == 0 {
334            self.views.extend_from_slice(views);
335            self.source = Some(source);
336            return Ok(());
337        }
338
339        // Copying the strings into a buffer can be time-consuming so
340        // only do it if the array is sparse
341        if source.need_gc {
342            self.append_views_and_copy_strings(views, source.ideal_buffer_size, buffers);
343        } else {
344            self.append_views_and_update_buffer_index(views, buffers);
345        }
346        self.source = Some(source);
347        Ok(())
348    }
349
350    fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
351        self.finish_current();
352        assert!(self.current.is_none());
353        let buffers = std::mem::take(&mut self.completed);
354        let views = std::mem::take(&mut self.views);
355        let nulls = self.nulls.finish();
356        self.nulls = NullBufferBuilder::new(self.batch_size);
357
358        // Safety: we created valid views and buffers above and the
359        // input arrays had value data and nulls
360        let new_array =
361            unsafe { GenericByteViewArray::<B>::new_unchecked(views.into(), buffers, nulls) };
362        Ok(Arc::new(new_array))
363    }
364}
365
366const STARTING_BLOCK_SIZE: usize = 4 * 1024; // (note the first size used is actually 8KiB)
367const MAX_BLOCK_SIZE: usize = 1024 * 1024; // 1MiB
368
369/// Manages allocating new buffers for `StringViewArray` in increasing sizes
370#[derive(Debug)]
371struct BufferSource {
372    current_size: usize,
373}
374
375impl BufferSource {
376    fn new() -> Self {
377        Self {
378            current_size: STARTING_BLOCK_SIZE,
379        }
380    }
381
382    /// Return a new buffer, with a capacity of at least `min_size`
383    fn next_buffer(&mut self, min_size: usize) -> Vec<u8> {
384        let size = self.next_size(min_size);
385        Vec::with_capacity(size)
386    }
387
388    fn next_size(&mut self, min_size: usize) -> usize {
389        if self.current_size < MAX_BLOCK_SIZE {
390            // If the current size is less than the max size, we can double it
391            // we have fixed start/end block sizes, so we can't overflow
392            self.current_size = self.current_size.saturating_mul(2);
393        }
394        if self.current_size >= min_size {
395            self.current_size
396        } else {
397            // increase next size until we hit min_size or max  size
398            while self.current_size <= min_size && self.current_size < MAX_BLOCK_SIZE {
399                self.current_size = self.current_size.saturating_mul(2);
400            }
401            self.current_size.max(min_size)
402        }
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409
410    #[test]
411    fn test_buffer_source() {
412        let mut source = BufferSource::new();
413        assert_eq!(source.next_buffer(1000).capacity(), 8192);
414        assert_eq!(source.next_buffer(1000).capacity(), 16384);
415        assert_eq!(source.next_buffer(1000).capacity(), 32768);
416        assert_eq!(source.next_buffer(1000).capacity(), 65536);
417        assert_eq!(source.next_buffer(1000).capacity(), 131072);
418        assert_eq!(source.next_buffer(1000).capacity(), 262144);
419        assert_eq!(source.next_buffer(1000).capacity(), 524288);
420        assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
421        // clamped to max size
422        assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
423        // Can override with larger size request
424        assert_eq!(source.next_buffer(10_000_000).capacity(), 10_000_000);
425    }
426
427    #[test]
428    fn test_buffer_source_with_min_small() {
429        let mut source = BufferSource::new();
430        // First buffer should be 8kb
431        assert_eq!(source.next_buffer(5_600).capacity(), 8 * 1024);
432        // then 16kb
433        assert_eq!(source.next_buffer(5_600).capacity(), 16 * 1024);
434        // then 32kb
435        assert_eq!(source.next_buffer(5_600).capacity(), 32 * 1024);
436    }
437
438    #[test]
439    fn test_buffer_source_with_min_large() {
440        let mut source = BufferSource::new();
441        assert_eq!(source.next_buffer(500_000).capacity(), 512 * 1024);
442        assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
443        // clamped to max size
444        assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
445        // Can override with larger size request
446        assert_eq!(source.next_buffer(2_000_000).capacity(), 2_000_000);
447    }
448}