1use std::any::Any;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use arrow_buffer::{Buffer, NullBufferBuilder, ScalarBuffer};
23use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
24use arrow_schema::ArrowError;
25use hashbrown::hash_table::Entry;
26use hashbrown::HashTable;
27
28use crate::builder::ArrayBuilder;
29use crate::types::bytes::ByteArrayNativeType;
30use crate::types::{BinaryViewType, ByteViewType, StringViewType};
31use crate::{Array, ArrayRef, GenericByteViewArray};
32
33const STARTING_BLOCK_SIZE: u32 = 8 * 1024; const MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; enum BlockSizeGrowthStrategy {
37 Fixed { size: u32 },
38 Exponential { current_size: u32 },
39}
40
41impl BlockSizeGrowthStrategy {
42 fn next_size(&mut self) -> u32 {
43 match self {
44 Self::Fixed { size } => *size,
45 Self::Exponential { current_size } => {
46 if *current_size < MAX_BLOCK_SIZE {
47 *current_size = current_size.saturating_mul(2);
49 *current_size
50 } else {
51 MAX_BLOCK_SIZE
52 }
53 }
54 }
55 }
56}
57
58pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> {
82 views_buffer: Vec<u128>,
83 null_buffer_builder: NullBufferBuilder,
84 completed: Vec<Buffer>,
85 in_progress: Vec<u8>,
86 block_size: BlockSizeGrowthStrategy,
87 string_tracker: Option<(HashTable<usize>, ahash::RandomState)>,
90 phantom: PhantomData<T>,
91}
92
93impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
94 pub fn new() -> Self {
96 Self::with_capacity(1024)
97 }
98
99 pub fn with_capacity(capacity: usize) -> Self {
101 Self {
102 views_buffer: Vec::with_capacity(capacity),
103 null_buffer_builder: NullBufferBuilder::new(capacity),
104 completed: vec![],
105 in_progress: vec![],
106 block_size: BlockSizeGrowthStrategy::Exponential {
107 current_size: STARTING_BLOCK_SIZE,
108 },
109 string_tracker: None,
110 phantom: Default::default(),
111 }
112 }
113
114 pub fn with_fixed_block_size(self, block_size: u32) -> Self {
130 debug_assert!(block_size > 0, "Block size must be greater than 0");
131 Self {
132 block_size: BlockSizeGrowthStrategy::Fixed { size: block_size },
133 ..self
134 }
135 }
136
137 #[deprecated(since = "53.0.0", note = "Use `with_fixed_block_size` instead")]
140 pub fn with_block_size(self, block_size: u32) -> Self {
141 self.with_fixed_block_size(block_size)
142 }
143
144 pub fn with_deduplicate_strings(self) -> Self {
149 Self {
150 string_tracker: Some((
151 HashTable::with_capacity(self.views_buffer.capacity()),
152 Default::default(),
153 )),
154 ..self
155 }
156 }
157
158 pub fn append_block(&mut self, buffer: Buffer) -> u32 {
183 assert!(buffer.len() < u32::MAX as usize);
184
185 self.flush_in_progress();
186 let offset = self.completed.len();
187 self.push_completed(buffer);
188 offset as u32
189 }
190
191 pub unsafe fn append_view_unchecked(&mut self, block: u32, offset: u32, len: u32) {
198 let b = self.completed.get_unchecked(block as usize);
199 let start = offset as usize;
200 let end = start.saturating_add(len as usize);
201 let b = b.get_unchecked(start..end);
202
203 let view = make_view(b, block, offset);
204 self.views_buffer.push(view);
205 self.null_buffer_builder.append_non_null();
206 }
207
208 pub fn append_array(&mut self, array: &GenericByteViewArray<T>) {
212 self.flush_in_progress();
213 let keep_views = self.completed.is_empty() || array.data_buffers().is_empty();
215 let starting_buffer = self.completed.len() as u32;
216
217 self.completed.extend(array.data_buffers().iter().cloned());
218
219 if keep_views {
220 self.views_buffer.extend_from_slice(array.views());
221 } else {
222 self.views_buffer.extend(array.views().iter().map(|v| {
223 let mut byte_view = ByteView::from(*v);
224 if byte_view.length > MAX_INLINE_VIEW_LEN {
225 byte_view.buffer_index += starting_buffer;
227 };
228
229 byte_view.as_u128()
230 }));
231 }
232
233 if let Some(null_buffer) = array.nulls() {
234 self.null_buffer_builder.append_buffer(null_buffer);
235 } else {
236 self.null_buffer_builder.append_n_non_nulls(array.len());
237 }
238 }
239
240 pub fn try_append_view(&mut self, block: u32, offset: u32, len: u32) -> Result<(), ArrowError> {
244 let b = self.completed.get(block as usize).ok_or_else(|| {
245 ArrowError::InvalidArgumentError(format!("No block found with index {block}"))
246 })?;
247 let start = offset as usize;
248 let end = start.saturating_add(len as usize);
249
250 let b = b.get(start..end).ok_or_else(|| {
251 ArrowError::InvalidArgumentError(format!(
252 "Range {start}..{end} out of bounds for block of length {}",
253 b.len()
254 ))
255 })?;
256
257 if T::Native::from_bytes_checked(b).is_none() {
258 return Err(ArrowError::InvalidArgumentError(
259 "Invalid view data".to_string(),
260 ));
261 }
262
263 unsafe {
264 self.append_view_unchecked(block, offset, len);
265 }
266 Ok(())
267 }
268
269 #[inline]
271 fn flush_in_progress(&mut self) {
272 if !self.in_progress.is_empty() {
273 let f = Buffer::from_vec(std::mem::take(&mut self.in_progress));
274 self.push_completed(f)
275 }
276 }
277
278 #[inline]
280 fn push_completed(&mut self, block: Buffer) {
281 assert!(block.len() < u32::MAX as usize, "Block too large");
282 assert!(self.completed.len() < u32::MAX as usize, "Too many blocks");
283 self.completed.push(block);
284 }
285
286 pub fn get_value(&self, index: usize) -> &[u8] {
290 let view = self.views_buffer.as_slice().get(index).unwrap();
291 let len = *view as u32;
292 if len <= MAX_INLINE_VIEW_LEN {
293 unsafe { GenericByteViewArray::<T>::inline_value(view, len as usize) }
296 } else {
297 let view = ByteView::from(*view);
298 if view.buffer_index < self.completed.len() as u32 {
299 let block = &self.completed[view.buffer_index as usize];
300 &block[view.offset as usize..view.offset as usize + view.length as usize]
301 } else {
302 &self.in_progress[view.offset as usize..view.offset as usize + view.length as usize]
303 }
304 }
305 }
306
307 #[inline]
315 pub fn append_value(&mut self, value: impl AsRef<T::Native>) {
316 let v: &[u8] = value.as_ref().as_ref();
317 let length: u32 = v.len().try_into().unwrap();
318 if length <= MAX_INLINE_VIEW_LEN {
319 let mut view_buffer = [0; 16];
320 view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
321 view_buffer[4..4 + v.len()].copy_from_slice(v);
322 self.views_buffer.push(u128::from_le_bytes(view_buffer));
323 self.null_buffer_builder.append_non_null();
324 return;
325 }
326
327 if let Some((mut ht, hasher)) = self.string_tracker.take() {
331 let hash_val = hasher.hash_one(v);
332 let hasher_fn = |v: &_| hasher.hash_one(v);
333
334 let entry = ht.entry(
335 hash_val,
336 |idx| {
337 let stored_value = self.get_value(*idx);
338 v == stored_value
339 },
340 hasher_fn,
341 );
342 match entry {
343 Entry::Occupied(occupied) => {
344 let idx = occupied.get();
346 self.views_buffer.push(self.views_buffer[*idx]);
347 self.null_buffer_builder.append_non_null();
348 self.string_tracker = Some((ht, hasher));
349 return;
350 }
351 Entry::Vacant(vacant) => {
352 vacant.insert(self.views_buffer.len());
355 }
356 }
357 self.string_tracker = Some((ht, hasher));
358 }
359
360 let required_cap = self.in_progress.len() + v.len();
361 if self.in_progress.capacity() < required_cap {
362 self.flush_in_progress();
363 let to_reserve = v.len().max(self.block_size.next_size() as usize);
364 self.in_progress.reserve(to_reserve);
365 };
366 let offset = self.in_progress.len() as u32;
367 self.in_progress.extend_from_slice(v);
368
369 let view = ByteView {
370 length,
371 prefix: u32::from_le_bytes(v[0..4].try_into().unwrap()),
372 buffer_index: self.completed.len() as u32,
373 offset,
374 };
375 self.views_buffer.push(view.into());
376 self.null_buffer_builder.append_non_null();
377 }
378
379 #[inline]
381 pub fn append_option(&mut self, value: Option<impl AsRef<T::Native>>) {
382 match value {
383 None => self.append_null(),
384 Some(v) => self.append_value(v),
385 };
386 }
387
388 #[inline]
390 pub fn append_null(&mut self) {
391 self.null_buffer_builder.append_null();
392 self.views_buffer.push(0);
393 }
394
395 pub fn finish(&mut self) -> GenericByteViewArray<T> {
397 self.flush_in_progress();
398 let completed = std::mem::take(&mut self.completed);
399 let nulls = self.null_buffer_builder.finish();
400 if let Some((ref mut ht, _)) = self.string_tracker.as_mut() {
401 ht.clear();
402 }
403 let views = std::mem::take(&mut self.views_buffer);
404 unsafe { GenericByteViewArray::new_unchecked(views.into(), completed, nulls) }
406 }
407
408 pub fn finish_cloned(&self) -> GenericByteViewArray<T> {
410 let mut completed = self.completed.clone();
411 if !self.in_progress.is_empty() {
412 completed.push(Buffer::from_slice_ref(&self.in_progress));
413 }
414 let len = self.views_buffer.len();
415 let views = Buffer::from_slice_ref(self.views_buffer.as_slice());
416 let views = ScalarBuffer::new(views, 0, len);
417 let nulls = self.null_buffer_builder.finish_cloned();
418 unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) }
420 }
421
422 pub fn validity_slice(&self) -> Option<&[u8]> {
424 self.null_buffer_builder.as_slice()
425 }
426
427 pub fn allocated_size(&self) -> usize {
429 let views = self.views_buffer.capacity() * std::mem::size_of::<u128>();
430 let null = self.null_buffer_builder.allocated_size();
431 let buffer_size = self.completed.iter().map(|b| b.capacity()).sum::<usize>();
432 let in_progress = self.in_progress.capacity();
433 let tracker = match &self.string_tracker {
434 Some((ht, _)) => ht.capacity() * std::mem::size_of::<usize>(),
435 None => 0,
436 };
437 buffer_size + in_progress + tracker + views + null
438 }
439}
440
441impl<T: ByteViewType + ?Sized> Default for GenericByteViewBuilder<T> {
442 fn default() -> Self {
443 Self::new()
444 }
445}
446
447impl<T: ByteViewType + ?Sized> std::fmt::Debug for GenericByteViewBuilder<T> {
448 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
449 write!(f, "{}ViewBuilder", T::PREFIX)?;
450 f.debug_struct("")
451 .field("views_buffer", &self.views_buffer)
452 .field("in_progress", &self.in_progress)
453 .field("completed", &self.completed)
454 .field("null_buffer_builder", &self.null_buffer_builder)
455 .finish()
456 }
457}
458
459impl<T: ByteViewType + ?Sized> ArrayBuilder for GenericByteViewBuilder<T> {
460 fn len(&self) -> usize {
461 self.null_buffer_builder.len()
462 }
463
464 fn finish(&mut self) -> ArrayRef {
465 Arc::new(self.finish())
466 }
467
468 fn finish_cloned(&self) -> ArrayRef {
469 Arc::new(self.finish_cloned())
470 }
471
472 fn as_any(&self) -> &dyn Any {
473 self
474 }
475
476 fn as_any_mut(&mut self) -> &mut dyn Any {
477 self
478 }
479
480 fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
481 self
482 }
483}
484
485impl<T: ByteViewType + ?Sized, V: AsRef<T::Native>> Extend<Option<V>>
486 for GenericByteViewBuilder<T>
487{
488 #[inline]
489 fn extend<I: IntoIterator<Item = Option<V>>>(&mut self, iter: I) {
490 for v in iter {
491 self.append_option(v)
492 }
493 }
494}
495
496pub type StringViewBuilder = GenericByteViewBuilder<StringViewType>;
516
517pub type BinaryViewBuilder = GenericByteViewBuilder<BinaryViewType>;
538
539fn make_inlined_view<const LEN: usize>(data: &[u8]) -> u128 {
542 let mut view_buffer = [0; 16];
543 view_buffer[0..4].copy_from_slice(&(LEN as u32).to_le_bytes());
544 view_buffer[4..4 + LEN].copy_from_slice(&data[..LEN]);
545 u128::from_le_bytes(view_buffer)
546}
547
548#[inline(never)]
554pub fn make_view(data: &[u8], block_id: u32, offset: u32) -> u128 {
555 let len = data.len();
556
557 match len {
560 0 => make_inlined_view::<0>(data),
561 1 => make_inlined_view::<1>(data),
562 2 => make_inlined_view::<2>(data),
563 3 => make_inlined_view::<3>(data),
564 4 => make_inlined_view::<4>(data),
565 5 => make_inlined_view::<5>(data),
566 6 => make_inlined_view::<6>(data),
567 7 => make_inlined_view::<7>(data),
568 8 => make_inlined_view::<8>(data),
569 9 => make_inlined_view::<9>(data),
570 10 => make_inlined_view::<10>(data),
571 11 => make_inlined_view::<11>(data),
572 12 => make_inlined_view::<12>(data),
573 _ => {
575 let view = ByteView {
576 length: len as u32,
577 prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()),
578 buffer_index: block_id,
579 offset,
580 };
581 view.as_u128()
582 }
583 }
584}
585
586#[cfg(test)]
587mod tests {
588 use core::str;
589
590 use super::*;
591 use crate::Array;
592
593 #[test]
594 fn test_string_view_deduplicate() {
595 let value_1 = "long string to test string view";
596 let value_2 = "not so similar string but long";
597
598 let mut builder = StringViewBuilder::new()
599 .with_deduplicate_strings()
600 .with_fixed_block_size(value_1.len() as u32 * 2); let values = vec![
603 Some(value_1),
604 Some(value_2),
605 Some("short"),
606 Some(value_1),
607 None,
608 Some(value_2),
609 Some(value_1),
610 ];
611 builder.extend(values.clone());
612
613 let array = builder.finish_cloned();
614 array.to_data().validate_full().unwrap();
615 assert_eq!(array.data_buffers().len(), 1); let actual: Vec<_> = array.iter().collect();
617 assert_eq!(actual, values);
618
619 let view0 = array.views().first().unwrap();
620 let view3 = array.views().get(3).unwrap();
621 let view6 = array.views().get(6).unwrap();
622
623 assert_eq!(view0, view3);
624 assert_eq!(view0, view6);
625
626 assert_eq!(array.views().get(1), array.views().get(5));
627 }
628
629 #[test]
630 fn test_string_view_deduplicate_after_finish() {
631 let mut builder = StringViewBuilder::new().with_deduplicate_strings();
632
633 let value_1 = "long string to test string view";
634 let value_2 = "not so similar string but long";
635 builder.append_value(value_1);
636 let _array = builder.finish();
637 builder.append_value(value_2);
638 let _array = builder.finish();
639 builder.append_value(value_1);
640 let _array = builder.finish();
641 }
642
643 #[test]
644 fn test_string_view() {
645 let b1 = Buffer::from(b"world\xFFbananas\xF0\x9F\x98\x81");
646 let b2 = Buffer::from(b"cupcakes");
647 let b3 = Buffer::from(b"Many strings are here contained of great length and verbosity");
648
649 let mut v = StringViewBuilder::new();
650 assert_eq!(v.append_block(b1), 0);
651
652 v.append_value("This is a very long string that exceeds the inline length");
653 v.append_value("This is another very long string that exceeds the inline length");
654
655 assert_eq!(v.append_block(b2), 2);
656 assert_eq!(v.append_block(b3), 3);
657
658 v.try_append_view(0, 0, 5).unwrap(); v.try_append_view(0, 6, 7).unwrap(); v.try_append_view(2, 3, 5).unwrap(); v.try_append_view(2, 0, 3).unwrap(); v.try_append_view(2, 0, 8).unwrap(); v.try_append_view(0, 13, 4).unwrap(); v.try_append_view(0, 13, 0).unwrap(); v.try_append_view(3, 0, 16).unwrap(); v.try_append_view(1, 0, 19).unwrap(); v.try_append_view(3, 13, 27).unwrap(); v.append_value("I do so like long strings");
673
674 let array = v.finish_cloned();
675 array.to_data().validate_full().unwrap();
676 assert_eq!(array.data_buffers().len(), 5);
677 let actual: Vec<_> = array.iter().flatten().collect();
678 assert_eq!(
679 actual,
680 &[
681 "This is a very long string that exceeds the inline length",
682 "This is another very long string that exceeds the inline length",
683 "world",
684 "bananas",
685 "cakes",
686 "cup",
687 "cupcakes",
688 "😁",
689 "",
690 "Many strings are",
691 "This is a very long",
692 "are here contained of great",
693 "I do so like long strings"
694 ]
695 );
696
697 let err = v.try_append_view(0, u32::MAX, 1).unwrap_err();
698 assert_eq!(err.to_string(), "Invalid argument error: Range 4294967295..4294967296 out of bounds for block of length 17");
699
700 let err = v.try_append_view(0, 1, u32::MAX).unwrap_err();
701 assert_eq!(
702 err.to_string(),
703 "Invalid argument error: Range 1..4294967296 out of bounds for block of length 17"
704 );
705
706 let err = v.try_append_view(0, 13, 2).unwrap_err();
707 assert_eq!(err.to_string(), "Invalid argument error: Invalid view data");
708
709 let err = v.try_append_view(0, 40, 0).unwrap_err();
710 assert_eq!(
711 err.to_string(),
712 "Invalid argument error: Range 40..40 out of bounds for block of length 17"
713 );
714
715 let err = v.try_append_view(5, 0, 0).unwrap_err();
716 assert_eq!(
717 err.to_string(),
718 "Invalid argument error: No block found with index 5"
719 );
720 }
721
722 #[test]
723 fn test_string_view_with_block_size_growth() {
724 let mut exp_builder = StringViewBuilder::new();
725 let mut fixed_builder = StringViewBuilder::new().with_fixed_block_size(STARTING_BLOCK_SIZE);
726
727 let long_string = str::from_utf8(&[b'a'; STARTING_BLOCK_SIZE as usize]).unwrap();
728
729 for i in 0..9 {
730 for _ in 0..(2_u32.pow(i)) {
732 exp_builder.append_value(long_string);
733 fixed_builder.append_value(long_string);
734 }
735 exp_builder.flush_in_progress();
736 fixed_builder.flush_in_progress();
737
738 assert_eq!(exp_builder.completed.len(), i as usize + 1);
740 assert_eq!(
741 exp_builder.completed[i as usize].len(),
742 STARTING_BLOCK_SIZE as usize * 2_usize.pow(i)
743 );
744
745 assert_eq!(fixed_builder.completed.len(), 2_usize.pow(i + 1) - 1);
747
748 assert!(fixed_builder
750 .completed
751 .iter()
752 .all(|b| b.len() == STARTING_BLOCK_SIZE as usize));
753 }
754
755 exp_builder.append_value(long_string);
757 exp_builder.flush_in_progress();
758 assert_eq!(
759 exp_builder.completed.last().unwrap().capacity(),
760 MAX_BLOCK_SIZE as usize
761 );
762 }
763}