Skip to main content

arrow_select/coalesce/
byte_view.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::coalesce::InProgressArray;
19use arrow_array::cast::AsArray;
20use arrow_array::types::ByteViewType;
21use arrow_array::{Array, ArrayRef, GenericByteViewArray};
22use arrow_buffer::{Buffer, NullBufferBuilder};
23use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
24use arrow_schema::ArrowError;
25use std::marker::PhantomData;
26use std::sync::Arc;
27
28/// InProgressArray for [`StringViewArray`] and [`BinaryViewArray`]
29///
30/// This structure buffers the views and data buffers as they are copied from
31/// the source array, and then produces a new array when `finish` is called. It
32/// also handles "garbage collection" by copying strings to a new buffer when
33/// the source buffer is sparse (i.e. uses at least 2x more than the memory it
34/// needs).
35///
36/// [`StringViewArray`]: arrow_array::StringViewArray
37/// [`BinaryViewArray`]: arrow_array::BinaryViewArray
38pub(crate) struct InProgressByteViewArray<B: ByteViewType> {
39    /// The source array and information
40    source: Option<Source>,
41    /// the target batch size (and thus size for views allocation)
42    batch_size: usize,
43    /// The in progress views
44    views: Vec<u128>,
45    /// In progress nulls
46    nulls: NullBufferBuilder,
47    /// current buffer
48    current: Option<Vec<u8>>,
49    /// completed buffers
50    completed: Vec<Buffer>,
51    /// Allocates new buffers of increasing size as needed
52    buffer_source: BufferSource,
53    /// Phantom so we can use the same struct for both StringViewArray and
54    /// BinaryViewArray
55    _phantom: PhantomData<B>,
56}
57
58struct Source {
59    /// The array to copy form
60    array: ArrayRef,
61    /// Should the strings from the source array be copied into new buffers?
62    need_gc: bool,
63    /// How many bytes were actually used in the source array's buffers?
64    ideal_buffer_size: usize,
65}
66
67// manually implement Debug because ByteViewType doesn't implement Debug
68impl<B: ByteViewType> std::fmt::Debug for InProgressByteViewArray<B> {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        f.debug_struct("InProgressByteViewArray")
71            .field("batch_size", &self.batch_size)
72            .field("views", &self.views.len())
73            .field("nulls", &self.nulls)
74            .field("current", &self.current.as_ref().map(|_| "Some(...)"))
75            .field("completed", &self.completed.len())
76            .finish()
77    }
78}
79
80impl<B: ByteViewType> InProgressByteViewArray<B> {
81    pub(crate) fn new(batch_size: usize) -> Self {
82        let buffer_source = BufferSource::new();
83
84        Self {
85            batch_size,
86            source: None,
87            views: Vec::new(),                         // allocate in push
88            nulls: NullBufferBuilder::new(batch_size), // no allocation
89            current: None,
90            completed: vec![],
91            buffer_source,
92            _phantom: PhantomData,
93        }
94    }
95
96    /// Allocate space for output views and nulls if needed
97    ///
98    /// This is done on write (when we know it is necessary) rather than
99    /// eagerly to avoid allocations that are not used.
100    fn ensure_capacity(&mut self) {
101        if self.views.capacity() == 0 {
102            self.views.reserve(self.batch_size);
103        }
104    }
105
106    /// Finishes in progress buffer, if any
107    fn finish_current(&mut self) {
108        let Some(next_buffer) = self.current.take() else {
109            return;
110        };
111        self.completed.push(next_buffer.into());
112    }
113
114    /// Append views to self.views, updating the buffer index if necessary
115    #[inline(never)]
116    fn append_views_and_update_buffer_index(&mut self, views: &[u128], buffers: &[Buffer]) {
117        if let Some(buffer) = self.current.take() {
118            self.completed.push(buffer.into());
119        }
120        let starting_buffer: u32 = self.completed.len().try_into().expect("too many buffers");
121        self.completed.extend_from_slice(buffers);
122
123        if starting_buffer == 0 {
124            // If there are no buffers, we can just use the views as is
125            self.views.extend_from_slice(views);
126        } else {
127            // If there are buffers, we need to update the buffer index
128            let updated_views = views.iter().map(|v| {
129                let mut byte_view = ByteView::from(*v);
130                if byte_view.length > MAX_INLINE_VIEW_LEN {
131                    // Small views (<=12 bytes) are inlined, so only need to update large views
132                    byte_view.buffer_index += starting_buffer;
133                };
134                byte_view.as_u128()
135            });
136
137            self.views.extend(updated_views);
138        }
139    }
140
141    /// Append views to self.views, copying data from the buffers into
142    /// self.buffers and updating the buffer index as necessary.
143    ///
144    /// # Arguments
145    /// - `views` - the views to append
146    /// - `view_buffer_size` - the total number of bytes pointed to by all
147    ///   views (used to allocate new buffers if needed)
148    /// - `buffers` - the buffers the reviews point to
149    #[inline(never)]
150    fn append_views_and_copy_strings(
151        &mut self,
152        views: &[u128],
153        view_buffer_size: usize,
154        buffers: &[Buffer],
155    ) {
156        // Note: the calculations below are designed to avoid any reallocations
157        // of the current buffer, and to only allocate new buffers when
158        // necessary, which is critical for performance.
159
160        // If there is no current buffer, allocate a new one
161        let Some(current) = self.current.take() else {
162            let new_buffer = self.buffer_source.next_buffer(view_buffer_size);
163            self.append_views_and_copy_strings_inner(views, new_buffer, buffers);
164            return;
165        };
166
167        // If there is a current buffer with enough space, append the views and
168        // copy the strings into the existing buffer.
169        let mut remaining_capacity = current.capacity() - current.len();
170        if view_buffer_size <= remaining_capacity {
171            self.append_views_and_copy_strings_inner(views, current, buffers);
172            return;
173        }
174
175        // Here there is a current buffer, but it doesn't have enough space to
176        // hold all the strings. Copy as many views as we can into the current
177        // buffer and then allocate a new buffer for the remaining views
178        //
179        // TODO: should we copy the strings too at the same time?
180        let mut num_view_to_current = 0;
181        for view in views {
182            let b = ByteView::from(*view);
183            let str_len = b.length;
184            if remaining_capacity < str_len as usize {
185                break;
186            }
187            if str_len > MAX_INLINE_VIEW_LEN {
188                remaining_capacity -= str_len as usize;
189            }
190            num_view_to_current += 1;
191        }
192
193        let first_views = &views[0..num_view_to_current];
194        let string_bytes_to_copy = current.capacity() - current.len() - remaining_capacity;
195        let remaining_view_buffer_size = view_buffer_size - string_bytes_to_copy;
196
197        self.append_views_and_copy_strings_inner(first_views, current, buffers);
198        let completed = self.current.take().expect("completed");
199        self.completed.push(completed.into());
200
201        // Copy any remaining views into a new buffer
202        let remaining_views = &views[num_view_to_current..];
203        let new_buffer = self.buffer_source.next_buffer(remaining_view_buffer_size);
204        self.append_views_and_copy_strings_inner(remaining_views, new_buffer, buffers);
205    }
206
207    /// Append views to self.views, copying data from the buffers into
208    /// dst_buffer, which is then set as self.current
209    ///
210    /// # Panics:
211    /// If `self.current` is `Some`
212    ///
213    /// See `append_views_and_copy_strings` for more details
214    #[inline(never)]
215    fn append_views_and_copy_strings_inner(
216        &mut self,
217        views: &[u128],
218        mut dst_buffer: Vec<u8>,
219        buffers: &[Buffer],
220    ) {
221        assert!(self.current.is_none(), "current buffer should be None");
222
223        if views.is_empty() {
224            self.current = Some(dst_buffer);
225            return;
226        }
227
228        let new_buffer_index: u32 = self.completed.len().try_into().expect("too many buffers");
229
230        // In debug builds, check that the vector has enough capacity to copy
231        // the views into it without reallocating.
232        #[cfg(debug_assertions)]
233        {
234            let total_length: usize = views
235                .iter()
236                .filter_map(|v| {
237                    let b = ByteView::from(*v);
238                    if b.length > MAX_INLINE_VIEW_LEN {
239                        Some(b.length as usize)
240                    } else {
241                        None
242                    }
243                })
244                .sum();
245            debug_assert!(
246                dst_buffer.capacity() >= total_length,
247                "dst_buffer capacity {} is less than total length {}",
248                dst_buffer.capacity(),
249                total_length
250            );
251        }
252
253        // Copy the views, updating the buffer index and copying the data as needed
254        let new_views = views.iter().map(|v| {
255            let mut b: ByteView = ByteView::from(*v);
256            if b.length > MAX_INLINE_VIEW_LEN {
257                let buffer_index = b.buffer_index as usize;
258                let buffer_offset = b.offset as usize;
259                let str_len = b.length as usize;
260
261                // Update view to location in current
262                b.offset = dst_buffer.len() as u32;
263                b.buffer_index = new_buffer_index;
264
265                // safety: input views are validly constructed
266                let src = unsafe {
267                    buffers
268                        .get_unchecked(buffer_index)
269                        .get_unchecked(buffer_offset..buffer_offset + str_len)
270                };
271                dst_buffer.extend_from_slice(src);
272            }
273            b.as_u128()
274        });
275        self.views.extend(new_views);
276        self.current = Some(dst_buffer);
277    }
278}
279
280impl<B: ByteViewType> InProgressArray for InProgressByteViewArray<B> {
281    fn set_source(&mut self, source: Option<ArrayRef>) {
282        self.source = source.map(|array| {
283            let s = array.as_byte_view::<B>();
284
285            let (need_gc, ideal_buffer_size) = if s.data_buffers().is_empty() {
286                (false, 0)
287            } else {
288                let ideal_buffer_size = s.total_buffer_bytes_used();
289                // We don't use get_buffer_memory_size here, because gc is for the contents of the
290                // data buffers, not views and nulls.
291                let actual_buffer_size =
292                    s.data_buffers().iter().map(|b| b.capacity()).sum::<usize>();
293                // copying strings is expensive, so only do it if the array is
294                // sparse (uses at least 2x the memory it needs)
295                let need_gc =
296                    ideal_buffer_size != 0 && actual_buffer_size > (ideal_buffer_size * 2);
297                (need_gc, ideal_buffer_size)
298            };
299
300            Source {
301                array,
302                need_gc,
303                ideal_buffer_size,
304            }
305        })
306    }
307
308    fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> {
309        self.ensure_capacity();
310        let source = self.source.take().ok_or_else(|| {
311            ArrowError::InvalidArgumentError(
312                "Internal Error: InProgressByteViewArray: source not set".to_string(),
313            )
314        })?;
315
316        // If creating StringViewArray output, ensure input was valid utf8 too
317        let s = source.array.as_byte_view::<B>();
318
319        // add any nulls, as necessary
320        if let Some(nulls) = s.nulls().as_ref() {
321            let nulls = nulls.slice(offset, len);
322            self.nulls.append_buffer(&nulls);
323        } else {
324            self.nulls.append_n_non_nulls(len);
325        };
326
327        let buffers = s.data_buffers();
328        let views = &s.views().as_ref()[offset..offset + len];
329
330        // If there are no data buffers in s (all inlined views), can append the
331        // views/nulls and done
332        if source.ideal_buffer_size == 0 {
333            self.views.extend_from_slice(views);
334            self.source = Some(source);
335            return Ok(());
336        }
337
338        // Copying the strings into a buffer can be time-consuming so
339        // only do it if the array is sparse
340        if source.need_gc {
341            self.append_views_and_copy_strings(views, source.ideal_buffer_size, buffers);
342        } else {
343            self.append_views_and_update_buffer_index(views, buffers);
344        }
345        self.source = Some(source);
346        Ok(())
347    }
348
349    fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
350        self.finish_current();
351        assert!(self.current.is_none());
352        let buffers = std::mem::take(&mut self.completed);
353        let views = std::mem::take(&mut self.views);
354        let nulls = self.nulls.finish();
355        self.nulls = NullBufferBuilder::new(self.batch_size);
356
357        // Safety: we created valid views and buffers above and the
358        // input arrays had value data and nulls
359        let new_array =
360            unsafe { GenericByteViewArray::<B>::new_unchecked(views.into(), buffers, nulls) };
361        Ok(Arc::new(new_array))
362    }
363}
364
365const STARTING_BLOCK_SIZE: usize = 4 * 1024; // (note the first size used is actually 8KiB)
366const MAX_BLOCK_SIZE: usize = 1024 * 1024; // 1MiB
367
368/// Manages allocating new buffers for `StringViewArray` in increasing sizes
369#[derive(Debug)]
370struct BufferSource {
371    current_size: usize,
372}
373
374impl BufferSource {
375    fn new() -> Self {
376        Self {
377            current_size: STARTING_BLOCK_SIZE,
378        }
379    }
380
381    /// Return a new buffer, with a capacity of at least `min_size`
382    fn next_buffer(&mut self, min_size: usize) -> Vec<u8> {
383        let size = self.next_size(min_size);
384        Vec::with_capacity(size)
385    }
386
387    fn next_size(&mut self, min_size: usize) -> usize {
388        if self.current_size < MAX_BLOCK_SIZE {
389            // If the current size is less than the max size, we can double it
390            // we have fixed start/end block sizes, so we can't overflow
391            self.current_size = self.current_size.saturating_mul(2);
392        }
393        if self.current_size >= min_size {
394            self.current_size
395        } else {
396            // increase next size until we hit min_size or max  size
397            while self.current_size <= min_size && self.current_size < MAX_BLOCK_SIZE {
398                self.current_size = self.current_size.saturating_mul(2);
399            }
400            self.current_size.max(min_size)
401        }
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408
409    #[test]
410    fn test_buffer_source() {
411        let mut source = BufferSource::new();
412        assert_eq!(source.next_buffer(1000).capacity(), 8192);
413        assert_eq!(source.next_buffer(1000).capacity(), 16384);
414        assert_eq!(source.next_buffer(1000).capacity(), 32768);
415        assert_eq!(source.next_buffer(1000).capacity(), 65536);
416        assert_eq!(source.next_buffer(1000).capacity(), 131072);
417        assert_eq!(source.next_buffer(1000).capacity(), 262144);
418        assert_eq!(source.next_buffer(1000).capacity(), 524288);
419        assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
420        // clamped to max size
421        assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
422        // Can override with larger size request
423        assert_eq!(source.next_buffer(10_000_000).capacity(), 10_000_000);
424    }
425
426    #[test]
427    fn test_buffer_source_with_min_small() {
428        let mut source = BufferSource::new();
429        // First buffer should be 8kb
430        assert_eq!(source.next_buffer(5_600).capacity(), 8 * 1024);
431        // then 16kb
432        assert_eq!(source.next_buffer(5_600).capacity(), 16 * 1024);
433        // then 32kb
434        assert_eq!(source.next_buffer(5_600).capacity(), 32 * 1024);
435    }
436
437    #[test]
438    fn test_buffer_source_with_min_large() {
439        let mut source = BufferSource::new();
440        assert_eq!(source.next_buffer(500_000).capacity(), 512 * 1024);
441        assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
442        // clamped to max size
443        assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
444        // Can override with larger size request
445        assert_eq!(source.next_buffer(2_000_000).capacity(), 2_000_000);
446    }
447}