1use crate::coalesce::InProgressArray;
19use arrow_array::cast::AsArray;
20use arrow_array::types::ByteViewType;
21use arrow_array::{Array, ArrayRef, GenericByteViewArray};
22use arrow_buffer::{Buffer, NullBufferBuilder};
23use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
24use arrow_schema::ArrowError;
25use std::marker::PhantomData;
26use std::sync::Arc;
27
28pub(crate) struct InProgressByteViewArray<B: ByteViewType> {
39    source: Option<Source>,
41    batch_size: usize,
43    views: Vec<u128>,
45    nulls: NullBufferBuilder,
47    current: Option<Vec<u8>>,
49    completed: Vec<Buffer>,
51    buffer_source: BufferSource,
53    _phantom: PhantomData<B>,
56}
57
58struct Source {
59    array: ArrayRef,
61    need_gc: bool,
63    ideal_buffer_size: usize,
65}
66
67impl<B: ByteViewType> std::fmt::Debug for InProgressByteViewArray<B> {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        f.debug_struct("InProgressByteViewArray")
71            .field("batch_size", &self.batch_size)
72            .field("views", &self.views.len())
73            .field("nulls", &self.nulls)
74            .field("current", &self.current.as_ref().map(|_| "Some(...)"))
75            .field("completed", &self.completed.len())
76            .finish()
77    }
78}
79
80impl<B: ByteViewType> InProgressByteViewArray<B> {
81    pub(crate) fn new(batch_size: usize) -> Self {
82        let buffer_source = BufferSource::new();
83
84        Self {
85            batch_size,
86            source: None,
87            views: Vec::new(),                         nulls: NullBufferBuilder::new(batch_size), current: None,
90            completed: vec![],
91            buffer_source,
92            _phantom: PhantomData,
93        }
94    }
95
96    fn ensure_capacity(&mut self) {
101        self.views.reserve(self.batch_size);
102    }
103
104    fn finish_current(&mut self) {
106        let Some(next_buffer) = self.current.take() else {
107            return;
108        };
109        self.completed.push(next_buffer.into());
110    }
111
112    #[inline(never)]
114    fn append_views_and_update_buffer_index(&mut self, views: &[u128], buffers: &[Buffer]) {
115        if let Some(buffer) = self.current.take() {
116            self.completed.push(buffer.into());
117        }
118        let starting_buffer: u32 = self.completed.len().try_into().expect("too many buffers");
119        self.completed.extend_from_slice(buffers);
120
121        if starting_buffer == 0 {
122            self.views.extend_from_slice(views);
124        } else {
125            let updated_views = views.iter().map(|v| {
127                let mut byte_view = ByteView::from(*v);
128                if byte_view.length > MAX_INLINE_VIEW_LEN {
129                    byte_view.buffer_index += starting_buffer;
131                };
132                byte_view.as_u128()
133            });
134
135            self.views.extend(updated_views);
136        }
137    }
138
139    #[inline(never)]
148    fn append_views_and_copy_strings(
149        &mut self,
150        views: &[u128],
151        view_buffer_size: usize,
152        buffers: &[Buffer],
153    ) {
154        let Some(current) = self.current.take() else {
160            let new_buffer = self.buffer_source.next_buffer(view_buffer_size);
161            self.append_views_and_copy_strings_inner(views, new_buffer, buffers);
162            return;
163        };
164
165        let mut remaining_capacity = current.capacity() - current.len();
168        if view_buffer_size <= remaining_capacity {
169            self.append_views_and_copy_strings_inner(views, current, buffers);
170            return;
171        }
172
173        let mut num_view_to_current = 0;
179        for view in views {
180            let b = ByteView::from(*view);
181            let str_len = b.length;
182            if remaining_capacity < str_len as usize {
183                break;
184            }
185            if str_len > MAX_INLINE_VIEW_LEN {
186                remaining_capacity -= str_len as usize;
187            }
188            num_view_to_current += 1;
189        }
190
191        let first_views = &views[0..num_view_to_current];
192        let string_bytes_to_copy = current.capacity() - current.len() - remaining_capacity;
193        let remaining_view_buffer_size = view_buffer_size - string_bytes_to_copy;
194
195        self.append_views_and_copy_strings_inner(first_views, current, buffers);
196        let completed = self.current.take().expect("completed");
197        self.completed.push(completed.into());
198
199        let remaining_views = &views[num_view_to_current..];
201        let new_buffer = self.buffer_source.next_buffer(remaining_view_buffer_size);
202        self.append_views_and_copy_strings_inner(remaining_views, new_buffer, buffers);
203    }
204
205    #[inline(never)]
213    fn append_views_and_copy_strings_inner(
214        &mut self,
215        views: &[u128],
216        mut dst_buffer: Vec<u8>,
217        buffers: &[Buffer],
218    ) {
219        assert!(self.current.is_none(), "current buffer should be None");
220
221        if views.is_empty() {
222            self.current = Some(dst_buffer);
223            return;
224        }
225
226        let new_buffer_index: u32 = self.completed.len().try_into().expect("too many buffers");
227
228        #[cfg(debug_assertions)]
231        {
232            let total_length: usize = views
233                .iter()
234                .filter_map(|v| {
235                    let b = ByteView::from(*v);
236                    if b.length > MAX_INLINE_VIEW_LEN {
237                        Some(b.length as usize)
238                    } else {
239                        None
240                    }
241                })
242                .sum();
243            debug_assert!(
244                dst_buffer.capacity() >= total_length,
245                "dst_buffer capacity {} is less than total length {}",
246                dst_buffer.capacity(),
247                total_length
248            );
249        }
250
251        let new_views = views.iter().map(|v| {
253            let mut b: ByteView = ByteView::from(*v);
254            if b.length > MAX_INLINE_VIEW_LEN {
255                let buffer_index = b.buffer_index as usize;
256                let buffer_offset = b.offset as usize;
257                let str_len = b.length as usize;
258
259                b.offset = dst_buffer.len() as u32;
261                b.buffer_index = new_buffer_index;
262
263                let src = unsafe {
265                    buffers
266                        .get_unchecked(buffer_index)
267                        .get_unchecked(buffer_offset..buffer_offset + str_len)
268                };
269                dst_buffer.extend_from_slice(src);
270            }
271            b.as_u128()
272        });
273        self.views.extend(new_views);
274        self.current = Some(dst_buffer);
275    }
276}
277
278impl<B: ByteViewType> InProgressArray for InProgressByteViewArray<B> {
279    fn set_source(&mut self, source: Option<ArrayRef>) {
280        self.source = source.map(|array| {
281            let s = array.as_byte_view::<B>();
282
283            let (need_gc, ideal_buffer_size) = if s.data_buffers().is_empty() {
284                (false, 0)
285            } else {
286                let ideal_buffer_size = s.total_buffer_bytes_used();
287                let actual_buffer_size =
290                    s.data_buffers().iter().map(|b| b.capacity()).sum::<usize>();
291                let need_gc =
294                    ideal_buffer_size != 0 && actual_buffer_size > (ideal_buffer_size * 2);
295                (need_gc, ideal_buffer_size)
296            };
297
298            Source {
299                array,
300                need_gc,
301                ideal_buffer_size,
302            }
303        })
304    }
305
306    fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> {
307        self.ensure_capacity();
308        let source = self.source.take().ok_or_else(|| {
309            ArrowError::InvalidArgumentError(
310                "Internal Error: InProgressByteViewArray: source not set".to_string(),
311            )
312        })?;
313
314        let s = source.array.as_byte_view::<B>();
316
317        if let Some(nulls) = s.nulls().as_ref() {
319            let nulls = nulls.slice(offset, len);
320            self.nulls.append_buffer(&nulls);
321        } else {
322            self.nulls.append_n_non_nulls(len);
323        };
324
325        let buffers = s.data_buffers();
326        let views = &s.views().as_ref()[offset..offset + len];
327
328        if source.ideal_buffer_size == 0 {
331            self.views.extend_from_slice(views);
332            self.source = Some(source);
333            return Ok(());
334        }
335
336        if source.need_gc {
339            self.append_views_and_copy_strings(views, source.ideal_buffer_size, buffers);
340        } else {
341            self.append_views_and_update_buffer_index(views, buffers);
342        }
343        self.source = Some(source);
344        Ok(())
345    }
346
347    fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
348        self.finish_current();
349        assert!(self.current.is_none());
350        let buffers = std::mem::take(&mut self.completed);
351        let views = std::mem::take(&mut self.views);
352        let nulls = self.nulls.finish();
353        self.nulls = NullBufferBuilder::new(self.batch_size);
354
355        let new_array =
358            unsafe { GenericByteViewArray::<B>::new_unchecked(views.into(), buffers, nulls) };
359        Ok(Arc::new(new_array))
360    }
361}
362
363const STARTING_BLOCK_SIZE: usize = 4 * 1024; const MAX_BLOCK_SIZE: usize = 1024 * 1024; #[derive(Debug)]
368struct BufferSource {
369    current_size: usize,
370}
371
372impl BufferSource {
373    fn new() -> Self {
374        Self {
375            current_size: STARTING_BLOCK_SIZE,
376        }
377    }
378
379    fn next_buffer(&mut self, min_size: usize) -> Vec<u8> {
381        let size = self.next_size(min_size);
382        Vec::with_capacity(size)
383    }
384
385    fn next_size(&mut self, min_size: usize) -> usize {
386        if self.current_size < MAX_BLOCK_SIZE {
387            self.current_size = self.current_size.saturating_mul(2);
390        }
391        if self.current_size >= min_size {
392            self.current_size
393        } else {
394            while self.current_size <= min_size && self.current_size < MAX_BLOCK_SIZE {
396                self.current_size = self.current_size.saturating_mul(2);
397            }
398            self.current_size.max(min_size)
399        }
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406
407    #[test]
408    fn test_buffer_source() {
409        let mut source = BufferSource::new();
410        assert_eq!(source.next_buffer(1000).capacity(), 8192);
411        assert_eq!(source.next_buffer(1000).capacity(), 16384);
412        assert_eq!(source.next_buffer(1000).capacity(), 32768);
413        assert_eq!(source.next_buffer(1000).capacity(), 65536);
414        assert_eq!(source.next_buffer(1000).capacity(), 131072);
415        assert_eq!(source.next_buffer(1000).capacity(), 262144);
416        assert_eq!(source.next_buffer(1000).capacity(), 524288);
417        assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
418        assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
420        assert_eq!(source.next_buffer(10_000_000).capacity(), 10_000_000);
422    }
423
424    #[test]
425    fn test_buffer_source_with_min_small() {
426        let mut source = BufferSource::new();
427        assert_eq!(source.next_buffer(5_600).capacity(), 8 * 1024);
429        assert_eq!(source.next_buffer(5_600).capacity(), 16 * 1024);
431        assert_eq!(source.next_buffer(5_600).capacity(), 32 * 1024);
433    }
434
435    #[test]
436    fn test_buffer_source_with_min_large() {
437        let mut source = BufferSource::new();
438        assert_eq!(source.next_buffer(500_000).capacity(), 512 * 1024);
439        assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
440        assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
442        assert_eq!(source.next_buffer(2_000_000).capacity(), 2_000_000);
444    }
445}