Skip to main content

arrow_select/coalesce/
byte_view.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::coalesce::InProgressArray;
19use crate::filter::{FilterPredicate, FilterSelection, filter_null_mask};
20use arrow_array::cast::AsArray;
21use arrow_array::types::ByteViewType;
22use arrow_array::{Array, ArrayRef, GenericByteViewArray};
23use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, NullBufferBuilder};
24use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
25use arrow_schema::ArrowError;
26use std::marker::PhantomData;
27use std::sync::Arc;
28
29/// InProgressArray for [`StringViewArray`] and [`BinaryViewArray`]
30///
31/// This structure buffers the views and data buffers as they are copied from
32/// the source array, and then produces a new array when `finish` is called. It
33/// also handles "garbage collection" by copying strings to a new buffer when
34/// the source buffer is sparse (i.e. uses at least 2x more than the memory it
35/// needs).
36///
37/// [`StringViewArray`]: arrow_array::StringViewArray
38/// [`BinaryViewArray`]: arrow_array::BinaryViewArray
39pub(crate) struct InProgressByteViewArray<B: ByteViewType> {
40    /// The source array and information
41    source: Option<Source>,
42    /// the target batch size (and thus size for views allocation)
43    batch_size: usize,
44    /// The in progress views
45    views: Vec<u128>,
46    /// In progress nulls
47    nulls: NullBufferBuilder,
48    /// current buffer
49    current: Option<Vec<u8>>,
50    /// completed buffers
51    completed: Vec<Buffer>,
52    /// Allocates new buffers of increasing size as needed
53    buffer_source: BufferSource,
54    /// Phantom so we can use the same struct for both StringViewArray and
55    /// BinaryViewArray
56    _phantom: PhantomData<B>,
57}
58
59struct Source {
60    /// The array to copy form
61    array: ArrayRef,
62    /// Should the strings from the source array be copied into new buffers?
63    need_gc: bool,
64    /// How many bytes were actually used in the source array's buffers?
65    ideal_buffer_size: usize,
66}
67
68// manually implement Debug because ByteViewType doesn't implement Debug
69impl<B: ByteViewType> std::fmt::Debug for InProgressByteViewArray<B> {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct("InProgressByteViewArray")
72            .field("batch_size", &self.batch_size)
73            .field("views", &self.views.len())
74            .field("nulls", &self.nulls)
75            .field("current", &self.current.as_ref().map(|_| "Some(...)"))
76            .field("completed", &self.completed.len())
77            .finish()
78    }
79}
80
81impl<B: ByteViewType> InProgressByteViewArray<B> {
82    pub(crate) fn new(batch_size: usize) -> Self {
83        let buffer_source = BufferSource::new();
84
85        Self {
86            batch_size,
87            source: None,
88            views: Vec::new(),                         // allocate in push
89            nulls: NullBufferBuilder::new(batch_size), // no allocation
90            current: None,
91            completed: vec![],
92            buffer_source,
93            _phantom: PhantomData,
94        }
95    }
96
97    /// Allocate space for output views and nulls if needed
98    ///
99    /// This is done on write (when we know it is necessary) rather than
100    /// eagerly to avoid allocations that are not used.
101    fn ensure_capacity(&mut self) {
102        if self.views.capacity() == 0 {
103            self.views.reserve(self.batch_size);
104        }
105    }
106
107    /// Finishes in progress buffer, if any
108    fn finish_current(&mut self) {
109        let Some(next_buffer) = self.current.take() else {
110            return;
111        };
112        self.completed.push(next_buffer.into());
113    }
114
115    fn append_views_by_filter(&mut self, views: &[u128], filter: &FilterPredicate) {
116        let selected_count = filter.count();
117        let current_len = self.views.len();
118        self.views.reserve(selected_count);
119
120        let mut written = 0;
121
122        unsafe {
123            let mut out = self.views.spare_capacity_mut().as_mut_ptr().cast::<u128>();
124
125            match filter.selection() {
126                FilterSelection::None => {}
127                FilterSelection::All { .. } => {
128                    std::ptr::copy_nonoverlapping(views.as_ptr(), out, selected_count);
129                    written = selected_count;
130                }
131                FilterSelection::Slices(slices) => {
132                    slices.for_each(|(start, end)| {
133                        let len = end - start;
134                        std::ptr::copy_nonoverlapping(views.as_ptr().add(start), out, len);
135                        out = out.add(len);
136                        written += len;
137                    });
138                }
139                FilterSelection::Indices(indices) => {
140                    indices.for_each(|idx| {
141                        out.write(*views.get_unchecked(idx));
142                        out = out.add(1);
143                        written += 1;
144                    });
145                }
146            }
147
148            self.views.set_len(current_len + written);
149        }
150
151        debug_assert_eq!(written, selected_count);
152    }
153
154    fn append_nulls_by_filter(
155        &mut self,
156        filter: &FilterPredicate,
157        source_nulls: Option<&NullBuffer>,
158    ) {
159        let Some((null_count, nulls)) = filter_null_mask(source_nulls, filter) else {
160            self.nulls.append_n_non_nulls(filter.count());
161            return;
162        };
163
164        let nulls = unsafe {
165            NullBuffer::new_unchecked(BooleanBuffer::new(nulls, 0, filter.count()), null_count)
166        };
167        self.nulls.append_buffer(&nulls);
168    }
169
170    /// Append views to self.views, updating the buffer index if necessary
171    #[inline(never)]
172    fn append_views_and_update_buffer_index(&mut self, views: &[u128], buffers: &[Buffer]) {
173        if let Some(buffer) = self.current.take() {
174            self.completed.push(buffer.into());
175        }
176        let starting_buffer: u32 = self.completed.len().try_into().expect("too many buffers");
177        self.completed.extend_from_slice(buffers);
178
179        if starting_buffer == 0 {
180            // If there are no buffers, we can just use the views as is
181            self.views.extend_from_slice(views);
182        } else {
183            // If there are buffers, we need to update the buffer index
184            let updated_views = views.iter().map(|v| {
185                let mut byte_view = ByteView::from(*v);
186                if byte_view.length > MAX_INLINE_VIEW_LEN {
187                    // Small views (<=12 bytes) are inlined, so only need to update large views
188                    byte_view.buffer_index += starting_buffer;
189                };
190                byte_view.as_u128()
191            });
192
193            self.views.extend(updated_views);
194        }
195    }
196
197    /// Append views to self.views, copying data from the buffers into
198    /// self.buffers and updating the buffer index as necessary.
199    ///
200    /// # Arguments
201    /// - `views` - the views to append
202    /// - `view_buffer_size` - the total number of bytes pointed to by all
203    ///   views (used to allocate new buffers if needed)
204    /// - `buffers` - the buffers the reviews point to
205    #[inline(never)]
206    fn append_views_and_copy_strings(
207        &mut self,
208        views: &[u128],
209        view_buffer_size: usize,
210        buffers: &[Buffer],
211    ) {
212        // Note: the calculations below are designed to avoid any reallocations
213        // of the current buffer, and to only allocate new buffers when
214        // necessary, which is critical for performance.
215
216        // If there is no current buffer, allocate a new one
217        let Some(current) = self.current.take() else {
218            let new_buffer = self.buffer_source.next_buffer(view_buffer_size);
219            self.append_views_and_copy_strings_inner(views, new_buffer, buffers);
220            return;
221        };
222
223        // If there is a current buffer with enough space, append the views and
224        // copy the strings into the existing buffer.
225        let mut remaining_capacity = current.capacity() - current.len();
226        if view_buffer_size <= remaining_capacity {
227            self.append_views_and_copy_strings_inner(views, current, buffers);
228            return;
229        }
230
231        // Here there is a current buffer, but it doesn't have enough space to
232        // hold all the strings. Copy as many views as we can into the current
233        // buffer and then allocate a new buffer for the remaining views
234        //
235        // TODO: should we copy the strings too at the same time?
236        let mut num_view_to_current = 0;
237        for view in views {
238            let b = ByteView::from(*view);
239            let str_len = b.length;
240            if remaining_capacity < str_len as usize {
241                break;
242            }
243            if str_len > MAX_INLINE_VIEW_LEN {
244                remaining_capacity -= str_len as usize;
245            }
246            num_view_to_current += 1;
247        }
248
249        let first_views = &views[0..num_view_to_current];
250        let string_bytes_to_copy = current.capacity() - current.len() - remaining_capacity;
251        let remaining_view_buffer_size = view_buffer_size - string_bytes_to_copy;
252
253        self.append_views_and_copy_strings_inner(first_views, current, buffers);
254        let completed = self.current.take().expect("completed");
255        self.completed.push(completed.into());
256
257        // Copy any remaining views into a new buffer
258        let remaining_views = &views[num_view_to_current..];
259        let new_buffer = self.buffer_source.next_buffer(remaining_view_buffer_size);
260        self.append_views_and_copy_strings_inner(remaining_views, new_buffer, buffers);
261    }
262
263    /// Append views to self.views, copying data from the buffers into
264    /// dst_buffer, which is then set as self.current
265    ///
266    /// # Panics:
267    /// If `self.current` is `Some`
268    ///
269    /// See `append_views_and_copy_strings` for more details
270    #[inline(never)]
271    fn append_views_and_copy_strings_inner(
272        &mut self,
273        views: &[u128],
274        mut dst_buffer: Vec<u8>,
275        buffers: &[Buffer],
276    ) {
277        assert!(self.current.is_none(), "current buffer should be None");
278
279        if views.is_empty() {
280            self.current = Some(dst_buffer);
281            return;
282        }
283
284        let new_buffer_index: u32 = self.completed.len().try_into().expect("too many buffers");
285
286        // In debug builds, check that the vector has enough capacity to copy
287        // the views into it without reallocating.
288        #[cfg(debug_assertions)]
289        {
290            let total_length: usize = views
291                .iter()
292                .filter_map(|v| {
293                    let b = ByteView::from(*v);
294                    if b.length > MAX_INLINE_VIEW_LEN {
295                        Some(b.length as usize)
296                    } else {
297                        None
298                    }
299                })
300                .sum();
301            debug_assert!(
302                dst_buffer.capacity() >= total_length,
303                "dst_buffer capacity {} is less than total length {}",
304                dst_buffer.capacity(),
305                total_length
306            );
307        }
308
309        // Copy the views, updating the buffer index and copying the data as needed
310        let new_views = views.iter().map(|v| {
311            let mut b: ByteView = ByteView::from(*v);
312            if b.length > MAX_INLINE_VIEW_LEN {
313                let buffer_index = b.buffer_index as usize;
314                let buffer_offset = b.offset as usize;
315                let str_len = b.length as usize;
316
317                // Update view to location in current
318                b.offset = dst_buffer.len() as u32;
319                b.buffer_index = new_buffer_index;
320
321                // safety: input views are validly constructed
322                let src = unsafe {
323                    buffers
324                        .get_unchecked(buffer_index)
325                        .get_unchecked(buffer_offset..buffer_offset + str_len)
326                };
327                dst_buffer.extend_from_slice(src);
328            }
329            b.as_u128()
330        });
331        self.views.extend(new_views);
332        self.current = Some(dst_buffer);
333    }
334}
335
336impl<B: ByteViewType> InProgressArray for InProgressByteViewArray<B> {
337    fn set_source(&mut self, source: Option<ArrayRef>) {
338        self.source = source.map(|array| {
339            let s = array.as_byte_view::<B>();
340
341            let (need_gc, ideal_buffer_size) = if s.data_buffers().is_empty() {
342                (false, 0)
343            } else {
344                let ideal_buffer_size = s.total_buffer_bytes_used();
345                // We don't use get_buffer_memory_size here, because gc is for the contents of the
346                // data buffers, not views and nulls.
347                let actual_buffer_size =
348                    s.data_buffers().iter().map(|b| b.capacity()).sum::<usize>();
349                // copying strings is expensive, so only do it if the array is
350                // sparse (uses at least 2x the memory it needs)
351                let need_gc =
352                    ideal_buffer_size != 0 && actual_buffer_size > (ideal_buffer_size * 2);
353                (need_gc, ideal_buffer_size)
354            };
355
356            Source {
357                array,
358                need_gc,
359                ideal_buffer_size,
360            }
361        })
362    }
363
364    fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> {
365        self.ensure_capacity();
366        let source = self.source.take().ok_or_else(|| {
367            ArrowError::InvalidArgumentError(
368                "Internal Error: InProgressByteViewArray: source not set".to_string(),
369            )
370        })?;
371
372        // If creating StringViewArray output, ensure input was valid utf8 too
373        let s = source.array.as_byte_view::<B>();
374
375        // add any nulls, as necessary
376        if let Some(nulls) = s.nulls().as_ref() {
377            let nulls = nulls.slice(offset, len);
378            self.nulls.append_buffer(&nulls);
379        } else {
380            self.nulls.append_n_non_nulls(len);
381        };
382
383        let buffers = s.data_buffers();
384        // SAFETY: copy_rows is called with ranges derived from the source array.
385        let views = unsafe { s.views().as_ref().get_unchecked(offset..offset + len) };
386
387        // If there are no data buffers in s (all inlined views), can append the
388        // views/nulls and done
389        if source.ideal_buffer_size == 0 {
390            self.views.extend_from_slice(views);
391            self.source = Some(source);
392            return Ok(());
393        }
394
395        // Copying the strings into a buffer can be time-consuming so
396        // only do it if the array is sparse
397        if source.need_gc {
398            self.append_views_and_copy_strings(views, source.ideal_buffer_size, buffers);
399        } else {
400            self.append_views_and_update_buffer_index(views, buffers);
401        }
402        self.source = Some(source);
403        Ok(())
404    }
405
406    fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), ArrowError> {
407        self.ensure_capacity();
408        let source = self.source.take().ok_or_else(|| {
409            ArrowError::InvalidArgumentError(
410                "Internal Error: InProgressByteViewArray: source not set".to_string(),
411            )
412        })?;
413
414        let s = source.array.as_byte_view::<B>();
415
416        if !s.data_buffers().is_empty() {
417            // Restore the source taken above before returning the guard error.
418            self.source = Some(source);
419            return Err(ArrowError::InvalidArgumentError(
420                "Internal Error: InProgressByteViewArray::copy_rows_by_filter requires inline views"
421                    .to_string(),
422            ));
423        }
424
425        self.append_nulls_by_filter(filter, s.nulls());
426        self.append_views_by_filter(s.views(), filter);
427
428        self.source = Some(source);
429        Ok(())
430    }
431
432    fn copy_rows_by_filter_from(
433        &mut self,
434        source: ArrayRef,
435        filter: &FilterPredicate,
436    ) -> Result<(), ArrowError> {
437        let s = source.as_byte_view::<B>();
438        if s.data_buffers().is_empty() {
439            self.ensure_capacity();
440            self.append_nulls_by_filter(filter, s.nulls());
441            self.append_views_by_filter(s.views(), filter);
442            return Ok(());
443        }
444
445        // Match the filter kernel: filter views/nulls, but reuse data buffers.
446        let filtered = filter.filter(source.as_ref())?;
447        let filtered = filtered.as_byte_view::<B>();
448
449        self.ensure_capacity();
450        if let Some(nulls) = filtered.nulls().as_ref() {
451            self.nulls.append_buffer(nulls);
452        } else {
453            self.nulls.append_n_non_nulls(filter.count());
454        }
455        self.append_views_and_update_buffer_index(filtered.views(), filtered.data_buffers());
456        Ok(())
457    }
458
459    fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
460        self.finish_current();
461        assert!(self.current.is_none());
462        let buffers = std::mem::take(&mut self.completed);
463        let views = std::mem::take(&mut self.views);
464        let nulls = self.nulls.finish();
465        self.nulls = NullBufferBuilder::new(self.batch_size);
466
467        // Safety: we created valid views and buffers above and the
468        // input arrays had value data and nulls
469        let new_array =
470            unsafe { GenericByteViewArray::<B>::new_unchecked(views.into(), buffers, nulls) };
471        Ok(Arc::new(new_array))
472    }
473}
474
475const STARTING_BLOCK_SIZE: usize = 4 * 1024; // (note the first size used is actually 8KiB)
476const MAX_BLOCK_SIZE: usize = 1024 * 1024; // 1MiB
477
478/// Manages allocating new buffers for `StringViewArray` in increasing sizes
479#[derive(Debug)]
480struct BufferSource {
481    current_size: usize,
482}
483
484impl BufferSource {
485    fn new() -> Self {
486        Self {
487            current_size: STARTING_BLOCK_SIZE,
488        }
489    }
490
491    /// Return a new buffer, with a capacity of at least `min_size`
492    fn next_buffer(&mut self, min_size: usize) -> Vec<u8> {
493        let size = self.next_size(min_size);
494        Vec::with_capacity(size)
495    }
496
497    fn next_size(&mut self, min_size: usize) -> usize {
498        if self.current_size < MAX_BLOCK_SIZE {
499            // If the current size is less than the max size, we can double it
500            // we have fixed start/end block sizes, so we can't overflow
501            self.current_size = self.current_size.saturating_mul(2);
502        }
503        if self.current_size >= min_size {
504            self.current_size
505        } else {
506            // increase next size until we hit min_size or max  size
507            while self.current_size <= min_size && self.current_size < MAX_BLOCK_SIZE {
508                self.current_size = self.current_size.saturating_mul(2);
509            }
510            self.current_size.max(min_size)
511        }
512    }
513}
514
515#[cfg(test)]
516mod tests {
517    use super::*;
518    use crate::filter::FilterBuilder;
519    use arrow_array::types::BinaryViewType;
520    use arrow_array::{BinaryViewArray, BooleanArray};
521
522    #[test]
523    fn test_buffer_source() {
524        let mut source = BufferSource::new();
525        assert_eq!(source.next_buffer(1000).capacity(), 8192);
526        assert_eq!(source.next_buffer(1000).capacity(), 16384);
527        assert_eq!(source.next_buffer(1000).capacity(), 32768);
528        assert_eq!(source.next_buffer(1000).capacity(), 65536);
529        assert_eq!(source.next_buffer(1000).capacity(), 131072);
530        assert_eq!(source.next_buffer(1000).capacity(), 262144);
531        assert_eq!(source.next_buffer(1000).capacity(), 524288);
532        assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
533        // clamped to max size
534        assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
535        // Can override with larger size request
536        assert_eq!(source.next_buffer(10_000_000).capacity(), 10_000_000);
537    }
538
539    #[test]
540    fn test_buffer_source_with_min_small() {
541        let mut source = BufferSource::new();
542        // First buffer should be 8kb
543        assert_eq!(source.next_buffer(5_600).capacity(), 8 * 1024);
544        // then 16kb
545        assert_eq!(source.next_buffer(5_600).capacity(), 16 * 1024);
546        // then 32kb
547        assert_eq!(source.next_buffer(5_600).capacity(), 32 * 1024);
548    }
549
550    #[test]
551    fn test_buffer_source_with_min_large() {
552        let mut source = BufferSource::new();
553        assert_eq!(source.next_buffer(500_000).capacity(), 512 * 1024);
554        assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
555        // clamped to max size
556        assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
557        // Can override with larger size request
558        assert_eq!(source.next_buffer(2_000_000).capacity(), 2_000_000);
559    }
560
561    #[test]
562    fn test_copy_rows_by_filter_rejects_non_inline_views() {
563        let values: Vec<Option<&[u8]>> = vec![Some(b"This value is longer than 12 bytes")];
564        let array = BinaryViewArray::from_iter(values);
565        assert!(!array.data_buffers().is_empty());
566
567        let mut in_progress = InProgressByteViewArray::<BinaryViewType>::new(1);
568        in_progress.set_source(Some(Arc::new(array)));
569
570        let filter = BooleanArray::from(vec![true]);
571        let predicate = FilterBuilder::new(&filter).build();
572        let err = in_progress.copy_rows_by_filter(&predicate).unwrap_err();
573
574        assert!(
575            err.to_string().contains("requires inline views"),
576            "unexpected error: {err}"
577        );
578    }
579
580    #[test]
581    fn test_copy_rows_by_filter_from_reuses_non_inline_buffers() {
582        let values = (0..32)
583            .map(|i| format!("This value is longer than 12 bytes: {i}").into_bytes())
584            .collect::<Vec<_>>();
585        let array = BinaryViewArray::from_iter(values.iter().map(|v| Some(v.as_slice())));
586        assert!(!array.data_buffers().is_empty());
587        let source_buffer = array.data_buffers()[0].as_ptr();
588
589        let filter = BooleanArray::from((0..32).map(|i| i == 3 || i == 29).collect::<Vec<_>>());
590        let predicate = FilterBuilder::new(&filter).build();
591
592        let mut in_progress = InProgressByteViewArray::<BinaryViewType>::new(32);
593        in_progress
594            .copy_rows_by_filter_from(Arc::new(array), &predicate)
595            .unwrap();
596        let output = in_progress.finish().unwrap();
597        let output = output.as_binary_view();
598
599        assert_eq!(output.len(), 2);
600        assert_eq!(output.value(0), values[3].as_slice());
601        assert_eq!(output.value(1), values[29].as_slice());
602        assert!(
603            output
604                .data_buffers()
605                .iter()
606                .any(|buffer| std::ptr::addr_eq(buffer.as_ptr(), source_buffer)),
607            "expected filtered output to reuse the source data buffer"
608        );
609    }
610}