1use crate::coalesce::InProgressArray;
19use crate::filter::{FilterPredicate, FilterSelection, filter_null_mask};
20use arrow_array::cast::AsArray;
21use arrow_array::types::ByteViewType;
22use arrow_array::{Array, ArrayRef, GenericByteViewArray};
23use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, NullBufferBuilder};
24use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
25use arrow_schema::ArrowError;
26use std::marker::PhantomData;
27use std::sync::Arc;
28
29pub(crate) struct InProgressByteViewArray<B: ByteViewType> {
40 source: Option<Source>,
42 batch_size: usize,
44 views: Vec<u128>,
46 nulls: NullBufferBuilder,
48 current: Option<Vec<u8>>,
50 completed: Vec<Buffer>,
52 buffer_source: BufferSource,
54 _phantom: PhantomData<B>,
57}
58
59struct Source {
60 array: ArrayRef,
62 need_gc: bool,
64 ideal_buffer_size: usize,
66}
67
68impl<B: ByteViewType> std::fmt::Debug for InProgressByteViewArray<B> {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct("InProgressByteViewArray")
72 .field("batch_size", &self.batch_size)
73 .field("views", &self.views.len())
74 .field("nulls", &self.nulls)
75 .field("current", &self.current.as_ref().map(|_| "Some(...)"))
76 .field("completed", &self.completed.len())
77 .finish()
78 }
79}
80
81impl<B: ByteViewType> InProgressByteViewArray<B> {
82 pub(crate) fn new(batch_size: usize) -> Self {
83 let buffer_source = BufferSource::new();
84
85 Self {
86 batch_size,
87 source: None,
88 views: Vec::new(), nulls: NullBufferBuilder::new(batch_size), current: None,
91 completed: vec![],
92 buffer_source,
93 _phantom: PhantomData,
94 }
95 }
96
97 fn ensure_capacity(&mut self) {
102 if self.views.capacity() == 0 {
103 self.views.reserve(self.batch_size);
104 }
105 }
106
107 fn finish_current(&mut self) {
109 let Some(next_buffer) = self.current.take() else {
110 return;
111 };
112 self.completed.push(next_buffer.into());
113 }
114
115 fn append_views_by_filter(&mut self, views: &[u128], filter: &FilterPredicate) {
116 let selected_count = filter.count();
117 let current_len = self.views.len();
118 self.views.reserve(selected_count);
119
120 let mut written = 0;
121
122 unsafe {
123 let mut out = self.views.spare_capacity_mut().as_mut_ptr().cast::<u128>();
124
125 match filter.selection() {
126 FilterSelection::None => {}
127 FilterSelection::All { .. } => {
128 std::ptr::copy_nonoverlapping(views.as_ptr(), out, selected_count);
129 written = selected_count;
130 }
131 FilterSelection::Slices(slices) => {
132 slices.for_each(|(start, end)| {
133 let len = end - start;
134 std::ptr::copy_nonoverlapping(views.as_ptr().add(start), out, len);
135 out = out.add(len);
136 written += len;
137 });
138 }
139 FilterSelection::Indices(indices) => {
140 indices.for_each(|idx| {
141 out.write(*views.get_unchecked(idx));
142 out = out.add(1);
143 written += 1;
144 });
145 }
146 }
147
148 self.views.set_len(current_len + written);
149 }
150
151 debug_assert_eq!(written, selected_count);
152 }
153
154 fn append_nulls_by_filter(
155 &mut self,
156 filter: &FilterPredicate,
157 source_nulls: Option<&NullBuffer>,
158 ) {
159 let Some((null_count, nulls)) = filter_null_mask(source_nulls, filter) else {
160 self.nulls.append_n_non_nulls(filter.count());
161 return;
162 };
163
164 let nulls = unsafe {
165 NullBuffer::new_unchecked(BooleanBuffer::new(nulls, 0, filter.count()), null_count)
166 };
167 self.nulls.append_buffer(&nulls);
168 }
169
170 #[inline(never)]
172 fn append_views_and_update_buffer_index(&mut self, views: &[u128], buffers: &[Buffer]) {
173 if let Some(buffer) = self.current.take() {
174 self.completed.push(buffer.into());
175 }
176 let starting_buffer: u32 = self.completed.len().try_into().expect("too many buffers");
177 self.completed.extend_from_slice(buffers);
178
179 if starting_buffer == 0 {
180 self.views.extend_from_slice(views);
182 } else {
183 let updated_views = views.iter().map(|v| {
185 let mut byte_view = ByteView::from(*v);
186 if byte_view.length > MAX_INLINE_VIEW_LEN {
187 byte_view.buffer_index += starting_buffer;
189 };
190 byte_view.as_u128()
191 });
192
193 self.views.extend(updated_views);
194 }
195 }
196
197 #[inline(never)]
206 fn append_views_and_copy_strings(
207 &mut self,
208 views: &[u128],
209 view_buffer_size: usize,
210 buffers: &[Buffer],
211 ) {
212 let Some(current) = self.current.take() else {
218 let new_buffer = self.buffer_source.next_buffer(view_buffer_size);
219 self.append_views_and_copy_strings_inner(views, new_buffer, buffers);
220 return;
221 };
222
223 let mut remaining_capacity = current.capacity() - current.len();
226 if view_buffer_size <= remaining_capacity {
227 self.append_views_and_copy_strings_inner(views, current, buffers);
228 return;
229 }
230
231 let mut num_view_to_current = 0;
237 for view in views {
238 let b = ByteView::from(*view);
239 let str_len = b.length;
240 if remaining_capacity < str_len as usize {
241 break;
242 }
243 if str_len > MAX_INLINE_VIEW_LEN {
244 remaining_capacity -= str_len as usize;
245 }
246 num_view_to_current += 1;
247 }
248
249 let first_views = &views[0..num_view_to_current];
250 let string_bytes_to_copy = current.capacity() - current.len() - remaining_capacity;
251 let remaining_view_buffer_size = view_buffer_size - string_bytes_to_copy;
252
253 self.append_views_and_copy_strings_inner(first_views, current, buffers);
254 let completed = self.current.take().expect("completed");
255 self.completed.push(completed.into());
256
257 let remaining_views = &views[num_view_to_current..];
259 let new_buffer = self.buffer_source.next_buffer(remaining_view_buffer_size);
260 self.append_views_and_copy_strings_inner(remaining_views, new_buffer, buffers);
261 }
262
263 #[inline(never)]
271 fn append_views_and_copy_strings_inner(
272 &mut self,
273 views: &[u128],
274 mut dst_buffer: Vec<u8>,
275 buffers: &[Buffer],
276 ) {
277 assert!(self.current.is_none(), "current buffer should be None");
278
279 if views.is_empty() {
280 self.current = Some(dst_buffer);
281 return;
282 }
283
284 let new_buffer_index: u32 = self.completed.len().try_into().expect("too many buffers");
285
286 #[cfg(debug_assertions)]
289 {
290 let total_length: usize = views
291 .iter()
292 .filter_map(|v| {
293 let b = ByteView::from(*v);
294 if b.length > MAX_INLINE_VIEW_LEN {
295 Some(b.length as usize)
296 } else {
297 None
298 }
299 })
300 .sum();
301 debug_assert!(
302 dst_buffer.capacity() >= total_length,
303 "dst_buffer capacity {} is less than total length {}",
304 dst_buffer.capacity(),
305 total_length
306 );
307 }
308
309 let new_views = views.iter().map(|v| {
311 let mut b: ByteView = ByteView::from(*v);
312 if b.length > MAX_INLINE_VIEW_LEN {
313 let buffer_index = b.buffer_index as usize;
314 let buffer_offset = b.offset as usize;
315 let str_len = b.length as usize;
316
317 b.offset = dst_buffer.len() as u32;
319 b.buffer_index = new_buffer_index;
320
321 let src = unsafe {
323 buffers
324 .get_unchecked(buffer_index)
325 .get_unchecked(buffer_offset..buffer_offset + str_len)
326 };
327 dst_buffer.extend_from_slice(src);
328 }
329 b.as_u128()
330 });
331 self.views.extend(new_views);
332 self.current = Some(dst_buffer);
333 }
334}
335
336impl<B: ByteViewType> InProgressArray for InProgressByteViewArray<B> {
337 fn set_source(&mut self, source: Option<ArrayRef>) {
338 self.source = source.map(|array| {
339 let s = array.as_byte_view::<B>();
340
341 let (need_gc, ideal_buffer_size) = if s.data_buffers().is_empty() {
342 (false, 0)
343 } else {
344 let ideal_buffer_size = s.total_buffer_bytes_used();
345 let actual_buffer_size =
348 s.data_buffers().iter().map(|b| b.capacity()).sum::<usize>();
349 let need_gc =
352 ideal_buffer_size != 0 && actual_buffer_size > (ideal_buffer_size * 2);
353 (need_gc, ideal_buffer_size)
354 };
355
356 Source {
357 array,
358 need_gc,
359 ideal_buffer_size,
360 }
361 })
362 }
363
364 fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> {
365 self.ensure_capacity();
366 let source = self.source.take().ok_or_else(|| {
367 ArrowError::InvalidArgumentError(
368 "Internal Error: InProgressByteViewArray: source not set".to_string(),
369 )
370 })?;
371
372 let s = source.array.as_byte_view::<B>();
374
375 if let Some(nulls) = s.nulls().as_ref() {
377 let nulls = nulls.slice(offset, len);
378 self.nulls.append_buffer(&nulls);
379 } else {
380 self.nulls.append_n_non_nulls(len);
381 };
382
383 let buffers = s.data_buffers();
384 let views = unsafe { s.views().as_ref().get_unchecked(offset..offset + len) };
386
387 if source.ideal_buffer_size == 0 {
390 self.views.extend_from_slice(views);
391 self.source = Some(source);
392 return Ok(());
393 }
394
395 if source.need_gc {
398 self.append_views_and_copy_strings(views, source.ideal_buffer_size, buffers);
399 } else {
400 self.append_views_and_update_buffer_index(views, buffers);
401 }
402 self.source = Some(source);
403 Ok(())
404 }
405
406 fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), ArrowError> {
407 self.ensure_capacity();
408 let source = self.source.take().ok_or_else(|| {
409 ArrowError::InvalidArgumentError(
410 "Internal Error: InProgressByteViewArray: source not set".to_string(),
411 )
412 })?;
413
414 let s = source.array.as_byte_view::<B>();
415
416 if !s.data_buffers().is_empty() {
417 self.source = Some(source);
419 return Err(ArrowError::InvalidArgumentError(
420 "Internal Error: InProgressByteViewArray::copy_rows_by_filter requires inline views"
421 .to_string(),
422 ));
423 }
424
425 self.append_nulls_by_filter(filter, s.nulls());
426 self.append_views_by_filter(s.views(), filter);
427
428 self.source = Some(source);
429 Ok(())
430 }
431
432 fn copy_rows_by_filter_from(
433 &mut self,
434 source: ArrayRef,
435 filter: &FilterPredicate,
436 ) -> Result<(), ArrowError> {
437 let s = source.as_byte_view::<B>();
438 if s.data_buffers().is_empty() {
439 self.ensure_capacity();
440 self.append_nulls_by_filter(filter, s.nulls());
441 self.append_views_by_filter(s.views(), filter);
442 return Ok(());
443 }
444
445 let filtered = filter.filter(source.as_ref())?;
447 let filtered = filtered.as_byte_view::<B>();
448
449 self.ensure_capacity();
450 if let Some(nulls) = filtered.nulls().as_ref() {
451 self.nulls.append_buffer(nulls);
452 } else {
453 self.nulls.append_n_non_nulls(filter.count());
454 }
455 self.append_views_and_update_buffer_index(filtered.views(), filtered.data_buffers());
456 Ok(())
457 }
458
459 fn finish(&mut self) -> Result<ArrayRef, ArrowError> {
460 self.finish_current();
461 assert!(self.current.is_none());
462 let buffers = std::mem::take(&mut self.completed);
463 let views = std::mem::take(&mut self.views);
464 let nulls = self.nulls.finish();
465 self.nulls = NullBufferBuilder::new(self.batch_size);
466
467 let new_array =
470 unsafe { GenericByteViewArray::<B>::new_unchecked(views.into(), buffers, nulls) };
471 Ok(Arc::new(new_array))
472 }
473}
474
475const STARTING_BLOCK_SIZE: usize = 4 * 1024; const MAX_BLOCK_SIZE: usize = 1024 * 1024; #[derive(Debug)]
480struct BufferSource {
481 current_size: usize,
482}
483
484impl BufferSource {
485 fn new() -> Self {
486 Self {
487 current_size: STARTING_BLOCK_SIZE,
488 }
489 }
490
491 fn next_buffer(&mut self, min_size: usize) -> Vec<u8> {
493 let size = self.next_size(min_size);
494 Vec::with_capacity(size)
495 }
496
497 fn next_size(&mut self, min_size: usize) -> usize {
498 if self.current_size < MAX_BLOCK_SIZE {
499 self.current_size = self.current_size.saturating_mul(2);
502 }
503 if self.current_size >= min_size {
504 self.current_size
505 } else {
506 while self.current_size <= min_size && self.current_size < MAX_BLOCK_SIZE {
508 self.current_size = self.current_size.saturating_mul(2);
509 }
510 self.current_size.max(min_size)
511 }
512 }
513}
514
515#[cfg(test)]
516mod tests {
517 use super::*;
518 use crate::filter::FilterBuilder;
519 use arrow_array::types::BinaryViewType;
520 use arrow_array::{BinaryViewArray, BooleanArray};
521
522 #[test]
523 fn test_buffer_source() {
524 let mut source = BufferSource::new();
525 assert_eq!(source.next_buffer(1000).capacity(), 8192);
526 assert_eq!(source.next_buffer(1000).capacity(), 16384);
527 assert_eq!(source.next_buffer(1000).capacity(), 32768);
528 assert_eq!(source.next_buffer(1000).capacity(), 65536);
529 assert_eq!(source.next_buffer(1000).capacity(), 131072);
530 assert_eq!(source.next_buffer(1000).capacity(), 262144);
531 assert_eq!(source.next_buffer(1000).capacity(), 524288);
532 assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
533 assert_eq!(source.next_buffer(1000).capacity(), 1024 * 1024);
535 assert_eq!(source.next_buffer(10_000_000).capacity(), 10_000_000);
537 }
538
539 #[test]
540 fn test_buffer_source_with_min_small() {
541 let mut source = BufferSource::new();
542 assert_eq!(source.next_buffer(5_600).capacity(), 8 * 1024);
544 assert_eq!(source.next_buffer(5_600).capacity(), 16 * 1024);
546 assert_eq!(source.next_buffer(5_600).capacity(), 32 * 1024);
548 }
549
550 #[test]
551 fn test_buffer_source_with_min_large() {
552 let mut source = BufferSource::new();
553 assert_eq!(source.next_buffer(500_000).capacity(), 512 * 1024);
554 assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
555 assert_eq!(source.next_buffer(500_000).capacity(), 1024 * 1024);
557 assert_eq!(source.next_buffer(2_000_000).capacity(), 2_000_000);
559 }
560
561 #[test]
562 fn test_copy_rows_by_filter_rejects_non_inline_views() {
563 let values: Vec<Option<&[u8]>> = vec![Some(b"This value is longer than 12 bytes")];
564 let array = BinaryViewArray::from_iter(values);
565 assert!(!array.data_buffers().is_empty());
566
567 let mut in_progress = InProgressByteViewArray::<BinaryViewType>::new(1);
568 in_progress.set_source(Some(Arc::new(array)));
569
570 let filter = BooleanArray::from(vec![true]);
571 let predicate = FilterBuilder::new(&filter).build();
572 let err = in_progress.copy_rows_by_filter(&predicate).unwrap_err();
573
574 assert!(
575 err.to_string().contains("requires inline views"),
576 "unexpected error: {err}"
577 );
578 }
579
580 #[test]
581 fn test_copy_rows_by_filter_from_reuses_non_inline_buffers() {
582 let values = (0..32)
583 .map(|i| format!("This value is longer than 12 bytes: {i}").into_bytes())
584 .collect::<Vec<_>>();
585 let array = BinaryViewArray::from_iter(values.iter().map(|v| Some(v.as_slice())));
586 assert!(!array.data_buffers().is_empty());
587 let source_buffer = array.data_buffers()[0].as_ptr();
588
589 let filter = BooleanArray::from((0..32).map(|i| i == 3 || i == 29).collect::<Vec<_>>());
590 let predicate = FilterBuilder::new(&filter).build();
591
592 let mut in_progress = InProgressByteViewArray::<BinaryViewType>::new(32);
593 in_progress
594 .copy_rows_by_filter_from(Arc::new(array), &predicate)
595 .unwrap();
596 let output = in_progress.finish().unwrap();
597 let output = output.as_binary_view();
598
599 assert_eq!(output.len(), 2);
600 assert_eq!(output.value(0), values[3].as_slice());
601 assert_eq!(output.value(1), values[29].as_slice());
602 assert!(
603 output
604 .data_buffers()
605 .iter()
606 .any(|buffer| std::ptr::addr_eq(buffer.as_ptr(), source_buffer)),
607 "expected filtered output to reuse the source data buffer"
608 );
609 }
610}