1use crate::coalesce::InProgressArray;
19use arrow_array::cast::AsArray;
20use arrow_array::types::ByteViewType;
21use arrow_array::{Array, ArrayRef, GenericByteViewArray};
22use arrow_buffer::{Buffer, NullBufferBuilder};
23use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
24use arrow_schema::ArrowError;
25use std::marker::PhantomData;
26use std::sync::Arc;
27
28pub(crate) struct InProgressByteViewArray<B: ByteViewType> {
39 source: Option<Source>,
41 batch_size: usize,
43 views: Vec<u128>,
45 nulls: NullBufferBuilder,
47 current: Option<Vec<u8>>,
49 completed: Vec<Buffer>,
51 buffer_source: BufferSource,
53 _phantom: PhantomData<B>,
56}
57
58struct Source {
59 array: ArrayRef,
61 need_gc: bool,
63 ideal_buffer_size: usize,
65}
66
67impl<B: ByteViewType> std::fmt::Debug for InProgressByteViewArray<B> {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 f.debug_struct("InProgressByteViewArray")
71 .field("batch_size", &self.batch_size)
72 .field("views", &self.views.len())
73 .field("nulls", &self.nulls)
74 .field("current", &self.current.as_ref().map(|_| "Some(...)"))
75 .field("completed", &self.completed.len())
76 .finish()
77 }
78}
79
80impl<B: ByteViewType> InProgressByteViewArray<B> {
81 pub(crate) fn new(batch_size: usize) -> Self {
82 let buffer_source = BufferSource::new();
83
84 Self {
85 batch_size,
86 source: None,
87 views: Vec::new(), nulls: NullBufferBuilder::new(batch_size), current: None,
90 completed: vec![],
91 buffer_source,
92 _phantom: PhantomData,
93 }
94 }
95
96 fn ensure_capacity(&mut self) {
101 self.views.reserve(self.batch_size);
102 }
103
104 fn finish_current(&mut self) {
106 let Some(next_buffer) = self.current.take() else {
107 return;
108 };
109 self.completed.push(next_buffer.into());
110 }
111
112 #[inline(never)]
114 fn append_views_and_update_buffer_index(&mut self, views: &[u128], buffers: &[Buffer]) {
115 if let Some(buffer) = self.current.take() {
116 self.completed.push(buffer.into());
117 }
118 let starting_buffer: u32 = self.completed.len().try_into().expect("too many buffers");
119 self.completed.extend_from_slice(buffers);
120
121 if starting_buffer == 0 {
122 self.views.extend_from_slice(views);
124 } else {
125 let updated_views = views.iter().map(|v| {
127 let mut byte_view = ByteView::from(*v);
128 if byte_view.length > MAX_INLINE_VIEW_LEN {
129 byte_view.buffer_index += starting_buffer;
131 };
132 byte_view.as_u128()
133 });
134
135 self.views.extend(updated_views);
136 }
137 }
138
139 #[inline(never)]
148 fn append_views_and_copy_strings(
149 &mut self,
150 views: &[u128],
151 view_buffer_size: usize,
152 buffers: &[Buffer],
153 ) {
154 let Some(current) = self.current.take() else {
160 let new_buffer = self.buffer_source.next_buffer(view_buffer_size);
161 self.append_views_and_copy_strings_inner(views, new_buffer, buffers);
162 return;
163 };
164
165 let mut remaining_capacity = current.capacity() - current.len();
168 if view_buffer_size <= remaining_capacity {
169 self.append_views_and_copy_strings_inner(views, current, buffers);
170 return;
171 }
172
173 let mut num_view_to_current = 0;
179 for view in views {
180 let b = ByteView::from(*view);
181 let str_len = b.length;
182 if remaining_capacity < str_len as usize {
183 break;
184 }
185 if str_len > MAX_INLINE_VIEW_LEN {
186 remaining_capacity -= str_len as usize;
187 }
188 num_view_to_current += 1;
189 }
190
191 let first_views = &views[0..num_view_to_current];
192 let string_bytes_to_copy = current.capacity() - current.len() - remaining_capacity;
193 let remaining_view_buffer_size = view_buffer_size - string_bytes_to_copy;
194
195 self.append_views_and_copy_strings_inner(first_views, current, buffers);
196 let completed = self.current.take().expect("completed");
197 self.completed.push(completed.into());
198
199 let remaining_views = &views[num_view_to_current..];
201 let new_buffer = self.buffer_source.next_buffer(remaining_view_buffer_size);
202 self.append_views_and_copy_strings_inner(remaining_views, new_buffer, buffers);
203 }
204
205 #[inline(never)]
213 fn append_views_and_copy_strings_inner(
214 &mut self,
215 views: &[u128],
216 mut dst_buffer: Vec<u8>,
217 buffers: &[Buffer],
218 ) {
219 assert!(self.current.is_none(), "current buffer should be None");
220
221 if views.is_empty() {
222 self.current = Some(dst_buffer);
223 return;
224 }
225
226 let new_buffer_index: u32 = self.completed.len().try_into().expect("too many buffers");
227
228 #[cfg(debug_assertions)]
231 {
232 let total_length: usize = views
233 .iter()
234 .filter_map(|v| {
235 let b = ByteView::from(*v);
236 if b.length > MAX_INLINE_VIEW_LEN {
237 Some(b.length as usize)
238 } else {
239 None
240 }
241 })
242 .sum();
243 debug_assert!(
244 dst_buffer.capacity() >= total_length,
245 "dst_buffer capacity {} is less than total length {}",
246 dst_buffer.capacity(),
247 total_length
248 );
249 }
250
251 let new_views = views.iter().map(|v| {
253 let mut b: ByteView = ByteView::from(*v);
254 if b.length > MAX_INLINE_VIEW_LEN {
255 let buffer_index = b.buffer_index as usize;
256 let buffer_offset = b.offset as usize;
257 let str_len = b.length as usize;
258
259 b.offset = dst_buffer.len() as u32;
261 b.buffer_index = new_buffer_index;
262
263 let src = unsafe {
265 buffers
266 .get_unchecked(buffer_index)
267 .get_unchecked(buffer_offset..buffer_offset + str_len)
268 };
269 dst_buffer.extend_from_slice(src);
270 }
271 b.as_u128()
272 });
273 self.views.extend(new_views);
274 self.current = Some(dst_buffer);
275 }
276}
277
278impl<B: ByteViewType> InProgressArray for InProgressByteViewArray<B> {
279 fn set_source(&mut self, source: Option<ArrayRef>) {
280 self.source = source.map(|array| {
281 let s = array.as_byte_view::<B>();
282
283 let (need_gc, ideal_buffer_size) = if s.data_buffers().is_empty() {
284 (false, 0)
285 } else {
286 let ideal_buffer_size = s.total_buffer_bytes_used();
287 let actual_buffer_size = s.get_buffer_memory_size();
288 let need_gc =
291 ideal_buffer_size != 0 && actual_buffer_size > (ideal_buffer_size * 2);
292 (need_gc, ideal_buffer_size)
293 };
294
295 Source {
296 array,
297 need_gc,
298 ideal_buffer_size,
299 }
300 })
301 }
302
303 fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> {
304 self.ensure_capacity();
305 let source = self.source.take().ok_or_else(|| {
306 ArrowError::InvalidArgumentError(
307 "Internal Error: InProgressByteViewArray: source not set".to_string(),
308 )
309 })?;
310
311 let s = source.array.as_byte_view::<B>();
313
314 if let Some(nulls) = s.nulls().as_ref() {
316 let nulls = nulls.slice(offset, len);
317 self.nulls.append_buffer(&nulls);
318 } else {
319 self.nulls.append_n_non_nulls(len);
320 };
321
322 let buffers = s.data_buffers();
323 let views = &s.views().as_ref()[offset..offset + len];
324
325 if source.ideal_buffer_size == 0 {
328 self.views.extend_from_slice(views);
329 self.source = Some(source);
330 return Ok(());
331 }
332
333 if source.need_gc {
336 self.append_views_and_copy_strings(views, source.ideal_buffer_size, buffers);
337 } else {
338 self.append_views_and_update_buffer_index(views, buffers);
339 }
340 self.source = Some(source);
341 Ok(())
342 }
343
344 fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
345 self.finish_current();
346 assert!(self.current.is_none());
347 let buffers = std::mem::take(&mut self.completed);
348 let views = std::mem::take(&mut self.views);
349 let nulls = self.nulls.finish();
350 self.nulls = NullBufferBuilder::new(self.batch_size);
351
352 let new_array =
355 unsafe { GenericByteViewArray::<B>::new_unchecked(views.into(), buffers, nulls) };
356 Ok(Arc::new(new_array))
357 }
358}
359
360const STARTING_BLOCK_SIZE: usize = 4 * 1024; const MAX_BLOCK_SIZE: usize = 1024 * 1024; #[derive(Debug)]
365struct BufferSource {
366 current_size: usize,
367}
368
369impl BufferSource {
370 fn new() -> Self {
371 Self {
372 current_size: STARTING_BLOCK_SIZE,
373 }
374 }
375
376 fn next_buffer(&mut self, min_size: usize) -> Vec<u8> {
378 let size = self.next_size(min_size);
379 Vec::with_capacity(size)
380 }
381
382 fn next_size(&mut self, min_size: usize) -> usize {
383 if self.current_size < MAX_BLOCK_SIZE {
384 self.current_size = self.current_size.saturating_mul(2);
387 }
388 if self.current_size >= min_size {
389 self.current_size
390 } else {
391 while self.current_size <= min_size && self.current_size < MAX_BLOCK_SIZE {
393 self.current_size = self.current_size.saturating_mul(2);
394 }
395 self.current_size.max(min_size)
396 }
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403
404 #[test]
405 fn test_buffer_source() {
406 let mut source = BufferSource::new();
407 assert_eq!(source.next_buffer(1000).capacity(), 8192);
408 assert_eq!(source.next_buffer(1000).capacity(), 16384);
409 assert_eq!(source.next_buffer(1000).capacity(), 32768);
410 assert_eq!(source.next_buffer(1000).capacity(), 65536);
411 assert_eq!(source.next_buffer(1000).capacity(), 131072);
412 assert_eq!(source.next_buffer(1000).capacity(), 262144);
413 assert_eq!(source.next_buffer(1000).capacity(), 524288);
414 assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
415 assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
417 assert_eq!(source.next_buffer(10_000_000).capacity(), 10_000_000);
419 }
420
421 #[test]
422 fn test_buffer_source_with_min_small() {
423 let mut source = BufferSource::new();
424 assert_eq!(source.next_buffer(5_600).capacity(), 8 * 1024);
426 assert_eq!(source.next_buffer(5_600).capacity(), 16 * 1024);
428 assert_eq!(source.next_buffer(5_600).capacity(), 32 * 1024);
430 }
431
432 #[test]
433 fn test_buffer_source_with_min_large() {
434 let mut source = BufferSource::new();
435 assert_eq!(source.next_buffer(500_000).capacity(), 512 * 1024);
436 assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
437 assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
439 assert_eq!(source.next_buffer(2_000_000).capacity(), 2_000_000);
441 }
442}