1use crate::coalesce::InProgressArray;
19use arrow_array::cast::AsArray;
20use arrow_array::types::ByteViewType;
21use arrow_array::{Array, ArrayRef, GenericByteViewArray};
22use arrow_buffer::{Buffer, NullBufferBuilder};
23use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
24use arrow_schema::ArrowError;
25use std::marker::PhantomData;
26use std::sync::Arc;
27
28pub(crate) struct InProgressByteViewArray<B: ByteViewType> {
39 source: Option<Source>,
41 batch_size: usize,
43 views: Vec<u128>,
45 nulls: NullBufferBuilder,
47 current: Option<Vec<u8>>,
49 completed: Vec<Buffer>,
51 buffer_source: BufferSource,
53 _phantom: PhantomData<B>,
56}
57
58struct Source {
59 array: ArrayRef,
61 need_gc: bool,
63 ideal_buffer_size: usize,
65}
66
67impl<B: ByteViewType> std::fmt::Debug for InProgressByteViewArray<B> {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 f.debug_struct("InProgressByteViewArray")
71 .field("batch_size", &self.batch_size)
72 .field("views", &self.views.len())
73 .field("nulls", &self.nulls)
74 .field("current", &self.current.as_ref().map(|_| "Some(...)"))
75 .field("completed", &self.completed.len())
76 .finish()
77 }
78}
79
80impl<B: ByteViewType> InProgressByteViewArray<B> {
81 pub(crate) fn new(batch_size: usize) -> Self {
82 let buffer_source = BufferSource::new();
83
84 Self {
85 batch_size,
86 source: None,
87 views: Vec::new(), nulls: NullBufferBuilder::new(batch_size), current: None,
90 completed: vec![],
91 buffer_source,
92 _phantom: PhantomData,
93 }
94 }
95
96 fn ensure_capacity(&mut self) {
101 if self.views.capacity() == 0 {
102 self.views.reserve(self.batch_size);
103 }
104 }
105
106 fn finish_current(&mut self) {
108 let Some(next_buffer) = self.current.take() else {
109 return;
110 };
111 self.completed.push(next_buffer.into());
112 }
113
114 #[inline(never)]
116 fn append_views_and_update_buffer_index(&mut self, views: &[u128], buffers: &[Buffer]) {
117 if let Some(buffer) = self.current.take() {
118 self.completed.push(buffer.into());
119 }
120 let starting_buffer: u32 = self.completed.len().try_into().expect("too many buffers");
121 self.completed.extend_from_slice(buffers);
122
123 if starting_buffer == 0 {
124 self.views.extend_from_slice(views);
126 } else {
127 let updated_views = views.iter().map(|v| {
129 let mut byte_view = ByteView::from(*v);
130 if byte_view.length > MAX_INLINE_VIEW_LEN {
131 byte_view.buffer_index += starting_buffer;
133 };
134 byte_view.as_u128()
135 });
136
137 self.views.extend(updated_views);
138 }
139 }
140
141 #[inline(never)]
150 fn append_views_and_copy_strings(
151 &mut self,
152 views: &[u128],
153 view_buffer_size: usize,
154 buffers: &[Buffer],
155 ) {
156 let Some(current) = self.current.take() else {
162 let new_buffer = self.buffer_source.next_buffer(view_buffer_size);
163 self.append_views_and_copy_strings_inner(views, new_buffer, buffers);
164 return;
165 };
166
167 let mut remaining_capacity = current.capacity() - current.len();
170 if view_buffer_size <= remaining_capacity {
171 self.append_views_and_copy_strings_inner(views, current, buffers);
172 return;
173 }
174
175 let mut num_view_to_current = 0;
181 for view in views {
182 let b = ByteView::from(*view);
183 let str_len = b.length;
184 if remaining_capacity < str_len as usize {
185 break;
186 }
187 if str_len > MAX_INLINE_VIEW_LEN {
188 remaining_capacity -= str_len as usize;
189 }
190 num_view_to_current += 1;
191 }
192
193 let first_views = &views[0..num_view_to_current];
194 let string_bytes_to_copy = current.capacity() - current.len() - remaining_capacity;
195 let remaining_view_buffer_size = view_buffer_size - string_bytes_to_copy;
196
197 self.append_views_and_copy_strings_inner(first_views, current, buffers);
198 let completed = self.current.take().expect("completed");
199 self.completed.push(completed.into());
200
201 let remaining_views = &views[num_view_to_current..];
203 let new_buffer = self.buffer_source.next_buffer(remaining_view_buffer_size);
204 self.append_views_and_copy_strings_inner(remaining_views, new_buffer, buffers);
205 }
206
207 #[inline(never)]
215 fn append_views_and_copy_strings_inner(
216 &mut self,
217 views: &[u128],
218 mut dst_buffer: Vec<u8>,
219 buffers: &[Buffer],
220 ) {
221 assert!(self.current.is_none(), "current buffer should be None");
222
223 if views.is_empty() {
224 self.current = Some(dst_buffer);
225 return;
226 }
227
228 let new_buffer_index: u32 = self.completed.len().try_into().expect("too many buffers");
229
230 #[cfg(debug_assertions)]
233 {
234 let total_length: usize = views
235 .iter()
236 .filter_map(|v| {
237 let b = ByteView::from(*v);
238 if b.length > MAX_INLINE_VIEW_LEN {
239 Some(b.length as usize)
240 } else {
241 None
242 }
243 })
244 .sum();
245 debug_assert!(
246 dst_buffer.capacity() >= total_length,
247 "dst_buffer capacity {} is less than total length {}",
248 dst_buffer.capacity(),
249 total_length
250 );
251 }
252
253 let new_views = views.iter().map(|v| {
255 let mut b: ByteView = ByteView::from(*v);
256 if b.length > MAX_INLINE_VIEW_LEN {
257 let buffer_index = b.buffer_index as usize;
258 let buffer_offset = b.offset as usize;
259 let str_len = b.length as usize;
260
261 b.offset = dst_buffer.len() as u32;
263 b.buffer_index = new_buffer_index;
264
265 let src = unsafe {
267 buffers
268 .get_unchecked(buffer_index)
269 .get_unchecked(buffer_offset..buffer_offset + str_len)
270 };
271 dst_buffer.extend_from_slice(src);
272 }
273 b.as_u128()
274 });
275 self.views.extend(new_views);
276 self.current = Some(dst_buffer);
277 }
278}
279
280impl<B: ByteViewType> InProgressArray for InProgressByteViewArray<B> {
281 fn set_source(&mut self, source: Option<ArrayRef>) {
282 self.source = source.map(|array| {
283 let s = array.as_byte_view::<B>();
284
285 let (need_gc, ideal_buffer_size) = if s.data_buffers().is_empty() {
286 (false, 0)
287 } else {
288 let ideal_buffer_size = s.total_buffer_bytes_used();
289 let actual_buffer_size =
292 s.data_buffers().iter().map(|b| b.capacity()).sum::<usize>();
293 let need_gc =
296 ideal_buffer_size != 0 && actual_buffer_size > (ideal_buffer_size * 2);
297 (need_gc, ideal_buffer_size)
298 };
299
300 Source {
301 array,
302 need_gc,
303 ideal_buffer_size,
304 }
305 })
306 }
307
308 fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> {
309 self.ensure_capacity();
310 let source = self.source.take().ok_or_else(|| {
311 ArrowError::InvalidArgumentError(
312 "Internal Error: InProgressByteViewArray: source not set".to_string(),
313 )
314 })?;
315
316 let s = source.array.as_byte_view::<B>();
318
319 if let Some(nulls) = s.nulls().as_ref() {
321 let nulls = nulls.slice(offset, len);
322 self.nulls.append_buffer(&nulls);
323 } else {
324 self.nulls.append_n_non_nulls(len);
325 };
326
327 let buffers = s.data_buffers();
328 let views = &s.views().as_ref()[offset..offset + len];
329
330 if source.ideal_buffer_size == 0 {
333 self.views.extend_from_slice(views);
334 self.source = Some(source);
335 return Ok(());
336 }
337
338 if source.need_gc {
341 self.append_views_and_copy_strings(views, source.ideal_buffer_size, buffers);
342 } else {
343 self.append_views_and_update_buffer_index(views, buffers);
344 }
345 self.source = Some(source);
346 Ok(())
347 }
348
349 fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
350 self.finish_current();
351 assert!(self.current.is_none());
352 let buffers = std::mem::take(&mut self.completed);
353 let views = std::mem::take(&mut self.views);
354 let nulls = self.nulls.finish();
355 self.nulls = NullBufferBuilder::new(self.batch_size);
356
357 let new_array =
360 unsafe { GenericByteViewArray::<B>::new_unchecked(views.into(), buffers, nulls) };
361 Ok(Arc::new(new_array))
362 }
363}
364
365const STARTING_BLOCK_SIZE: usize = 4 * 1024; const MAX_BLOCK_SIZE: usize = 1024 * 1024; #[derive(Debug)]
370struct BufferSource {
371 current_size: usize,
372}
373
374impl BufferSource {
375 fn new() -> Self {
376 Self {
377 current_size: STARTING_BLOCK_SIZE,
378 }
379 }
380
381 fn next_buffer(&mut self, min_size: usize) -> Vec<u8> {
383 let size = self.next_size(min_size);
384 Vec::with_capacity(size)
385 }
386
387 fn next_size(&mut self, min_size: usize) -> usize {
388 if self.current_size < MAX_BLOCK_SIZE {
389 self.current_size = self.current_size.saturating_mul(2);
392 }
393 if self.current_size >= min_size {
394 self.current_size
395 } else {
396 while self.current_size <= min_size && self.current_size < MAX_BLOCK_SIZE {
398 self.current_size = self.current_size.saturating_mul(2);
399 }
400 self.current_size.max(min_size)
401 }
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408
409 #[test]
410 fn test_buffer_source() {
411 let mut source = BufferSource::new();
412 assert_eq!(source.next_buffer(1000).capacity(), 8192);
413 assert_eq!(source.next_buffer(1000).capacity(), 16384);
414 assert_eq!(source.next_buffer(1000).capacity(), 32768);
415 assert_eq!(source.next_buffer(1000).capacity(), 65536);
416 assert_eq!(source.next_buffer(1000).capacity(), 131072);
417 assert_eq!(source.next_buffer(1000).capacity(), 262144);
418 assert_eq!(source.next_buffer(1000).capacity(), 524288);
419 assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
420 assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
422 assert_eq!(source.next_buffer(10_000_000).capacity(), 10_000_000);
424 }
425
426 #[test]
427 fn test_buffer_source_with_min_small() {
428 let mut source = BufferSource::new();
429 assert_eq!(source.next_buffer(5_600).capacity(), 8 * 1024);
431 assert_eq!(source.next_buffer(5_600).capacity(), 16 * 1024);
433 assert_eq!(source.next_buffer(5_600).capacity(), 32 * 1024);
435 }
436
437 #[test]
438 fn test_buffer_source_with_min_large() {
439 let mut source = BufferSource::new();
440 assert_eq!(source.next_buffer(500_000).capacity(), 512 * 1024);
441 assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
442 assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
444 assert_eq!(source.next_buffer(2_000_000).capacity(), 2_000_000);
446 }
447}