1use crate::coalesce::InProgressArray;
19use arrow_array::cast::AsArray;
20use arrow_array::types::ByteViewType;
21use arrow_array::{Array, ArrayRef, GenericByteViewArray};
22use arrow_buffer::{Buffer, NullBufferBuilder};
23use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
24use arrow_schema::ArrowError;
25use std::marker::PhantomData;
26use std::sync::Arc;
27
28pub(crate) struct InProgressByteViewArray<B: ByteViewType> {
39 source: Option<Source>,
41 batch_size: usize,
43 views: Vec<u128>,
45 nulls: NullBufferBuilder,
47 current: Option<Vec<u8>>,
49 completed: Vec<Buffer>,
51 buffer_source: BufferSource,
53 _phantom: PhantomData<B>,
56}
57
58struct Source {
59 array: ArrayRef,
61 need_gc: bool,
63 ideal_buffer_size: usize,
65}
66
67impl<B: ByteViewType> std::fmt::Debug for InProgressByteViewArray<B> {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 f.debug_struct("InProgressByteViewArray")
71 .field("batch_size", &self.batch_size)
72 .field("views", &self.views.len())
73 .field("nulls", &self.nulls)
74 .field("current", &self.current.as_ref().map(|_| "Some(...)"))
75 .field("completed", &self.completed.len())
76 .finish()
77 }
78}
79
80impl<B: ByteViewType> InProgressByteViewArray<B> {
81 pub(crate) fn new(batch_size: usize) -> Self {
82 let buffer_source = BufferSource::new();
83
84 Self {
85 batch_size,
86 source: None,
87 views: Vec::new(), nulls: NullBufferBuilder::new(batch_size), current: None,
90 completed: vec![],
91 buffer_source,
92 _phantom: PhantomData,
93 }
94 }
95
96 fn ensure_capacity(&mut self) {
101 if self.views.capacity() == 0 {
102 self.views.reserve(self.batch_size);
103 }
104 debug_assert_eq!(self.views.capacity(), self.batch_size);
105 }
106
107 fn finish_current(&mut self) {
109 let Some(next_buffer) = self.current.take() else {
110 return;
111 };
112 self.completed.push(next_buffer.into());
113 }
114
115 #[inline(never)]
117 fn append_views_and_update_buffer_index(&mut self, views: &[u128], buffers: &[Buffer]) {
118 if let Some(buffer) = self.current.take() {
119 self.completed.push(buffer.into());
120 }
121 let starting_buffer: u32 = self.completed.len().try_into().expect("too many buffers");
122 self.completed.extend_from_slice(buffers);
123
124 if starting_buffer == 0 {
125 self.views.extend_from_slice(views);
127 } else {
128 let updated_views = views.iter().map(|v| {
130 let mut byte_view = ByteView::from(*v);
131 if byte_view.length > MAX_INLINE_VIEW_LEN {
132 byte_view.buffer_index += starting_buffer;
134 };
135 byte_view.as_u128()
136 });
137
138 self.views.extend(updated_views);
139 }
140 }
141
142 #[inline(never)]
151 fn append_views_and_copy_strings(
152 &mut self,
153 views: &[u128],
154 view_buffer_size: usize,
155 buffers: &[Buffer],
156 ) {
157 let Some(current) = self.current.take() else {
163 let new_buffer = self.buffer_source.next_buffer(view_buffer_size);
164 self.append_views_and_copy_strings_inner(views, new_buffer, buffers);
165 return;
166 };
167
168 let mut remaining_capacity = current.capacity() - current.len();
171 if view_buffer_size <= remaining_capacity {
172 self.append_views_and_copy_strings_inner(views, current, buffers);
173 return;
174 }
175
176 let mut num_view_to_current = 0;
182 for view in views {
183 let b = ByteView::from(*view);
184 let str_len = b.length;
185 if remaining_capacity < str_len as usize {
186 break;
187 }
188 if str_len > MAX_INLINE_VIEW_LEN {
189 remaining_capacity -= str_len as usize;
190 }
191 num_view_to_current += 1;
192 }
193
194 let first_views = &views[0..num_view_to_current];
195 let string_bytes_to_copy = current.capacity() - current.len() - remaining_capacity;
196 let remaining_view_buffer_size = view_buffer_size - string_bytes_to_copy;
197
198 self.append_views_and_copy_strings_inner(first_views, current, buffers);
199 let completed = self.current.take().expect("completed");
200 self.completed.push(completed.into());
201
202 let remaining_views = &views[num_view_to_current..];
204 let new_buffer = self.buffer_source.next_buffer(remaining_view_buffer_size);
205 self.append_views_and_copy_strings_inner(remaining_views, new_buffer, buffers);
206 }
207
208 #[inline(never)]
216 fn append_views_and_copy_strings_inner(
217 &mut self,
218 views: &[u128],
219 mut dst_buffer: Vec<u8>,
220 buffers: &[Buffer],
221 ) {
222 assert!(self.current.is_none(), "current buffer should be None");
223
224 if views.is_empty() {
225 self.current = Some(dst_buffer);
226 return;
227 }
228
229 let new_buffer_index: u32 = self.completed.len().try_into().expect("too many buffers");
230
231 #[cfg(debug_assertions)]
234 {
235 let total_length: usize = views
236 .iter()
237 .filter_map(|v| {
238 let b = ByteView::from(*v);
239 if b.length > MAX_INLINE_VIEW_LEN {
240 Some(b.length as usize)
241 } else {
242 None
243 }
244 })
245 .sum();
246 debug_assert!(
247 dst_buffer.capacity() >= total_length,
248 "dst_buffer capacity {} is less than total length {}",
249 dst_buffer.capacity(),
250 total_length
251 );
252 }
253
254 let new_views = views.iter().map(|v| {
256 let mut b: ByteView = ByteView::from(*v);
257 if b.length > MAX_INLINE_VIEW_LEN {
258 let buffer_index = b.buffer_index as usize;
259 let buffer_offset = b.offset as usize;
260 let str_len = b.length as usize;
261
262 b.offset = dst_buffer.len() as u32;
264 b.buffer_index = new_buffer_index;
265
266 let src = unsafe {
268 buffers
269 .get_unchecked(buffer_index)
270 .get_unchecked(buffer_offset..buffer_offset + str_len)
271 };
272 dst_buffer.extend_from_slice(src);
273 }
274 b.as_u128()
275 });
276 self.views.extend(new_views);
277 self.current = Some(dst_buffer);
278 }
279}
280
281impl<B: ByteViewType> InProgressArray for InProgressByteViewArray<B> {
282 fn set_source(&mut self, source: Option<ArrayRef>) {
283 self.source = source.map(|array| {
284 let s = array.as_byte_view::<B>();
285
286 let (need_gc, ideal_buffer_size) = if s.data_buffers().is_empty() {
287 (false, 0)
288 } else {
289 let ideal_buffer_size = s.total_buffer_bytes_used();
290 let actual_buffer_size =
293 s.data_buffers().iter().map(|b| b.capacity()).sum::<usize>();
294 let need_gc =
297 ideal_buffer_size != 0 && actual_buffer_size > (ideal_buffer_size * 2);
298 (need_gc, ideal_buffer_size)
299 };
300
301 Source {
302 array,
303 need_gc,
304 ideal_buffer_size,
305 }
306 })
307 }
308
309 fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> {
310 self.ensure_capacity();
311 let source = self.source.take().ok_or_else(|| {
312 ArrowError::InvalidArgumentError(
313 "Internal Error: InProgressByteViewArray: source not set".to_string(),
314 )
315 })?;
316
317 let s = source.array.as_byte_view::<B>();
319
320 if let Some(nulls) = s.nulls().as_ref() {
322 let nulls = nulls.slice(offset, len);
323 self.nulls.append_buffer(&nulls);
324 } else {
325 self.nulls.append_n_non_nulls(len);
326 };
327
328 let buffers = s.data_buffers();
329 let views = &s.views().as_ref()[offset..offset + len];
330
331 if source.ideal_buffer_size == 0 {
334 self.views.extend_from_slice(views);
335 self.source = Some(source);
336 return Ok(());
337 }
338
339 if source.need_gc {
342 self.append_views_and_copy_strings(views, source.ideal_buffer_size, buffers);
343 } else {
344 self.append_views_and_update_buffer_index(views, buffers);
345 }
346 self.source = Some(source);
347 Ok(())
348 }
349
350 fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
351 self.finish_current();
352 assert!(self.current.is_none());
353 let buffers = std::mem::take(&mut self.completed);
354 let views = std::mem::take(&mut self.views);
355 let nulls = self.nulls.finish();
356 self.nulls = NullBufferBuilder::new(self.batch_size);
357
358 let new_array =
361 unsafe { GenericByteViewArray::<B>::new_unchecked(views.into(), buffers, nulls) };
362 Ok(Arc::new(new_array))
363 }
364}
365
366const STARTING_BLOCK_SIZE: usize = 4 * 1024; const MAX_BLOCK_SIZE: usize = 1024 * 1024; #[derive(Debug)]
371struct BufferSource {
372 current_size: usize,
373}
374
375impl BufferSource {
376 fn new() -> Self {
377 Self {
378 current_size: STARTING_BLOCK_SIZE,
379 }
380 }
381
382 fn next_buffer(&mut self, min_size: usize) -> Vec<u8> {
384 let size = self.next_size(min_size);
385 Vec::with_capacity(size)
386 }
387
388 fn next_size(&mut self, min_size: usize) -> usize {
389 if self.current_size < MAX_BLOCK_SIZE {
390 self.current_size = self.current_size.saturating_mul(2);
393 }
394 if self.current_size >= min_size {
395 self.current_size
396 } else {
397 while self.current_size <= min_size && self.current_size < MAX_BLOCK_SIZE {
399 self.current_size = self.current_size.saturating_mul(2);
400 }
401 self.current_size.max(min_size)
402 }
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use super::*;
409
410 #[test]
411 fn test_buffer_source() {
412 let mut source = BufferSource::new();
413 assert_eq!(source.next_buffer(1000).capacity(), 8192);
414 assert_eq!(source.next_buffer(1000).capacity(), 16384);
415 assert_eq!(source.next_buffer(1000).capacity(), 32768);
416 assert_eq!(source.next_buffer(1000).capacity(), 65536);
417 assert_eq!(source.next_buffer(1000).capacity(), 131072);
418 assert_eq!(source.next_buffer(1000).capacity(), 262144);
419 assert_eq!(source.next_buffer(1000).capacity(), 524288);
420 assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
421 assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
423 assert_eq!(source.next_buffer(10_000_000).capacity(), 10_000_000);
425 }
426
427 #[test]
428 fn test_buffer_source_with_min_small() {
429 let mut source = BufferSource::new();
430 assert_eq!(source.next_buffer(5_600).capacity(), 8 * 1024);
432 assert_eq!(source.next_buffer(5_600).capacity(), 16 * 1024);
434 assert_eq!(source.next_buffer(5_600).capacity(), 32 * 1024);
436 }
437
438 #[test]
439 fn test_buffer_source_with_min_large() {
440 let mut source = BufferSource::new();
441 assert_eq!(source.next_buffer(500_000).capacity(), 512 * 1024);
442 assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
443 assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
445 assert_eq!(source.next_buffer(2_000_000).capacity(), 2_000_000);
447 }
448}