1use std::any::Any;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use arrow_buffer::{Buffer, NullBufferBuilder, ScalarBuffer};
23use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
24use arrow_schema::ArrowError;
25use hashbrown::HashTable;
26use hashbrown::hash_table::Entry;
27
28use crate::builder::{ArrayBuilder, StringLikeArrayBuilder};
29use crate::types::bytes::ByteArrayNativeType;
30use crate::types::{BinaryViewType, ByteViewType, StringViewType};
31use crate::{Array, ArrayRef, GenericByteViewArray};
32
33const STARTING_BLOCK_SIZE: u32 = 8 * 1024; const MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; enum BlockSizeGrowthStrategy {
37 Fixed { size: u32 },
38 Exponential { current_size: u32 },
39}
40
41impl BlockSizeGrowthStrategy {
42 fn next_size(&mut self) -> u32 {
43 match self {
44 Self::Fixed { size } => *size,
45 Self::Exponential { current_size } => {
46 if *current_size < MAX_BLOCK_SIZE {
47 *current_size = current_size.saturating_mul(2);
49 *current_size
50 } else {
51 MAX_BLOCK_SIZE
52 }
53 }
54 }
55 }
56}
57
58pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> {
82 views_buffer: Vec<u128>,
83 null_buffer_builder: NullBufferBuilder,
84 completed: Vec<Buffer>,
85 in_progress: Vec<u8>,
86 block_size: BlockSizeGrowthStrategy,
87 string_tracker: Option<(HashTable<usize>, ahash::RandomState)>,
90 phantom: PhantomData<T>,
91}
92
93impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
94 pub fn new() -> Self {
96 Self::with_capacity(1024)
97 }
98
99 pub fn with_capacity(capacity: usize) -> Self {
101 Self {
102 views_buffer: Vec::with_capacity(capacity),
103 null_buffer_builder: NullBufferBuilder::new(capacity),
104 completed: vec![],
105 in_progress: vec![],
106 block_size: BlockSizeGrowthStrategy::Exponential {
107 current_size: STARTING_BLOCK_SIZE,
108 },
109 string_tracker: None,
110 phantom: Default::default(),
111 }
112 }
113
114 pub fn with_fixed_block_size(self, block_size: u32) -> Self {
130 debug_assert!(block_size > 0, "Block size must be greater than 0");
131 Self {
132 block_size: BlockSizeGrowthStrategy::Fixed { size: block_size },
133 ..self
134 }
135 }
136
137 pub fn with_deduplicate_strings(self) -> Self {
142 Self {
143 string_tracker: Some((
144 HashTable::with_capacity(self.views_buffer.capacity()),
145 Default::default(),
146 )),
147 ..self
148 }
149 }
150
151 pub fn append_block(&mut self, buffer: Buffer) -> u32 {
176 assert!(buffer.len() < u32::MAX as usize);
177
178 self.flush_in_progress();
179 let offset = self.completed.len();
180 self.push_completed(buffer);
181 offset as u32
182 }
183
184 pub unsafe fn append_view_unchecked(&mut self, block: u32, offset: u32, len: u32) {
191 let b = unsafe { self.completed.get_unchecked(block as usize) };
192 let start = offset as usize;
193 let end = start.saturating_add(len as usize);
194 let b = unsafe { b.get_unchecked(start..end) };
195
196 let view = make_view(b, block, offset);
197 self.views_buffer.push(view);
198 self.null_buffer_builder.append_non_null();
199 }
200
201 pub fn append_array(&mut self, array: &GenericByteViewArray<T>) {
205 self.flush_in_progress();
206 let keep_views = self.completed.is_empty() || array.data_buffers().is_empty();
208 let starting_buffer = self.completed.len() as u32;
209
210 self.completed.extend(array.data_buffers().iter().cloned());
211
212 if keep_views {
213 self.views_buffer.extend_from_slice(array.views());
214 } else {
215 self.views_buffer.extend(array.views().iter().map(|v| {
216 let mut byte_view = ByteView::from(*v);
217 if byte_view.length > MAX_INLINE_VIEW_LEN {
218 byte_view.buffer_index += starting_buffer;
220 };
221
222 byte_view.as_u128()
223 }));
224 }
225
226 if let Some(null_buffer) = array.nulls() {
227 self.null_buffer_builder.append_buffer(null_buffer);
228 } else {
229 self.null_buffer_builder.append_n_non_nulls(array.len());
230 }
231 }
232
233 pub fn try_append_view(&mut self, block: u32, offset: u32, len: u32) -> Result<(), ArrowError> {
237 let b = self.completed.get(block as usize).ok_or_else(|| {
238 ArrowError::InvalidArgumentError(format!("No block found with index {block}"))
239 })?;
240 let start = offset as usize;
241 let end = start.saturating_add(len as usize);
242
243 let b = b.get(start..end).ok_or_else(|| {
244 ArrowError::InvalidArgumentError(format!(
245 "Range {start}..{end} out of bounds for block of length {}",
246 b.len()
247 ))
248 })?;
249
250 if T::Native::from_bytes_checked(b).is_none() {
251 return Err(ArrowError::InvalidArgumentError(
252 "Invalid view data".to_string(),
253 ));
254 }
255
256 unsafe {
257 self.append_view_unchecked(block, offset, len);
258 }
259 Ok(())
260 }
261
262 #[inline]
264 fn flush_in_progress(&mut self) {
265 if !self.in_progress.is_empty() {
266 let f = Buffer::from_vec(std::mem::take(&mut self.in_progress));
267 self.push_completed(f)
268 }
269 }
270
271 #[inline]
273 fn push_completed(&mut self, block: Buffer) {
274 assert!(block.len() < u32::MAX as usize, "Block too large");
275 assert!(self.completed.len() < u32::MAX as usize, "Too many blocks");
276 self.completed.push(block);
277 }
278
279 pub fn get_value(&self, index: usize) -> &[u8] {
283 let view = self.views_buffer.as_slice().get(index).unwrap();
284 let len = *view as u32;
285 if len <= MAX_INLINE_VIEW_LEN {
286 unsafe { GenericByteViewArray::<T>::inline_value(view, len as usize) }
289 } else {
290 let view = ByteView::from(*view);
291 if view.buffer_index < self.completed.len() as u32 {
292 let block = &self.completed[view.buffer_index as usize];
293 &block[view.offset as usize..view.offset as usize + view.length as usize]
294 } else {
295 &self.in_progress[view.offset as usize..view.offset as usize + view.length as usize]
296 }
297 }
298 }
299
300 #[inline]
308 pub fn append_value(&mut self, value: impl AsRef<T::Native>) {
309 self.try_append_value(value).unwrap()
310 }
311
312 #[inline]
320 pub fn try_append_value(&mut self, value: impl AsRef<T::Native>) -> Result<(), ArrowError> {
321 let v: &[u8] = value.as_ref().as_ref();
322 let length: u32 = v.len().try_into().map_err(|_| {
323 ArrowError::InvalidArgumentError(format!("String length {} exceeds u32::MAX", v.len()))
324 })?;
325
326 if length <= MAX_INLINE_VIEW_LEN {
327 let mut view_buffer = [0; 16];
328 view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
329 view_buffer[4..4 + v.len()].copy_from_slice(v);
330 self.views_buffer.push(u128::from_le_bytes(view_buffer));
331 self.null_buffer_builder.append_non_null();
332 return Ok(());
333 }
334
335 if let Some((mut ht, hasher)) = self.string_tracker.take() {
339 let hash_val = hasher.hash_one(v);
340 let hasher_fn = |v: &_| hasher.hash_one(v);
341
342 let entry = ht.entry(
343 hash_val,
344 |idx| {
345 let stored_value = self.get_value(*idx);
346 v == stored_value
347 },
348 hasher_fn,
349 );
350 match entry {
351 Entry::Occupied(occupied) => {
352 let idx = occupied.get();
354 self.views_buffer.push(self.views_buffer[*idx]);
355 self.null_buffer_builder.append_non_null();
356 self.string_tracker = Some((ht, hasher));
357 return Ok(());
358 }
359 Entry::Vacant(vacant) => {
360 vacant.insert(self.views_buffer.len());
363 }
364 }
365 self.string_tracker = Some((ht, hasher));
366 }
367
368 let required_cap = self.in_progress.len() + v.len();
369 if self.in_progress.capacity() < required_cap {
370 self.flush_in_progress();
371 let to_reserve = v.len().max(self.block_size.next_size() as usize);
372 self.in_progress.reserve(to_reserve);
373 };
374
375 let offset = self.in_progress.len() as u32;
376 self.in_progress.extend_from_slice(v);
377
378 let buffer_index: u32 = self.completed.len().try_into().map_err(|_| {
379 ArrowError::InvalidArgumentError(format!(
380 "Buffer count {} exceeds u32::MAX",
381 self.completed.len()
382 ))
383 })?;
384
385 let view = ByteView {
386 length,
387 prefix: u32::from_le_bytes(v[0..4].try_into().unwrap()),
389 buffer_index,
390 offset,
391 };
392 self.views_buffer.push(view.into());
393 self.null_buffer_builder.append_non_null();
394
395 Ok(())
396 }
397
398 #[inline]
400 pub fn append_option(&mut self, value: Option<impl AsRef<T::Native>>) {
401 match value {
402 None => self.append_null(),
403 Some(v) => self.append_value(v),
404 };
405 }
406
407 #[inline]
409 pub fn append_null(&mut self) {
410 self.null_buffer_builder.append_null();
411 self.views_buffer.push(0);
412 }
413
414 pub fn finish(&mut self) -> GenericByteViewArray<T> {
416 self.flush_in_progress();
417 let completed = std::mem::take(&mut self.completed);
418 let nulls = self.null_buffer_builder.finish();
419 if let Some((ht, _)) = self.string_tracker.as_mut() {
420 ht.clear();
421 }
422 let views = std::mem::take(&mut self.views_buffer);
423 unsafe { GenericByteViewArray::new_unchecked(views.into(), completed, nulls) }
425 }
426
427 pub fn finish_cloned(&self) -> GenericByteViewArray<T> {
429 let mut completed = self.completed.clone();
430 if !self.in_progress.is_empty() {
431 completed.push(Buffer::from_slice_ref(&self.in_progress));
432 }
433 let len = self.views_buffer.len();
434 let views = Buffer::from_slice_ref(self.views_buffer.as_slice());
435 let views = ScalarBuffer::new(views, 0, len);
436 let nulls = self.null_buffer_builder.finish_cloned();
437 unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) }
439 }
440
441 pub fn validity_slice(&self) -> Option<&[u8]> {
443 self.null_buffer_builder.as_slice()
444 }
445
446 pub fn allocated_size(&self) -> usize {
448 let views = self.views_buffer.capacity() * std::mem::size_of::<u128>();
449 let null = self.null_buffer_builder.allocated_size();
450 let buffer_size = self.completed.iter().map(|b| b.capacity()).sum::<usize>();
451 let in_progress = self.in_progress.capacity();
452 let tracker = match &self.string_tracker {
453 Some((ht, _)) => ht.capacity() * std::mem::size_of::<usize>(),
454 None => 0,
455 };
456 buffer_size + in_progress + tracker + views + null
457 }
458}
459
460impl<T: ByteViewType + ?Sized> Default for GenericByteViewBuilder<T> {
461 fn default() -> Self {
462 Self::new()
463 }
464}
465
466impl<T: ByteViewType + ?Sized> std::fmt::Debug for GenericByteViewBuilder<T> {
467 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
468 write!(f, "{}ViewBuilder", T::PREFIX)?;
469 f.debug_struct("")
470 .field("views_buffer", &self.views_buffer)
471 .field("in_progress", &self.in_progress)
472 .field("completed", &self.completed)
473 .field("null_buffer_builder", &self.null_buffer_builder)
474 .finish()
475 }
476}
477
478impl<T: ByteViewType + ?Sized> ArrayBuilder for GenericByteViewBuilder<T> {
479 fn len(&self) -> usize {
480 self.null_buffer_builder.len()
481 }
482
483 fn finish(&mut self) -> ArrayRef {
484 Arc::new(self.finish())
485 }
486
487 fn finish_cloned(&self) -> ArrayRef {
488 Arc::new(self.finish_cloned())
489 }
490
491 fn as_any(&self) -> &dyn Any {
492 self
493 }
494
495 fn as_any_mut(&mut self) -> &mut dyn Any {
496 self
497 }
498
499 fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
500 self
501 }
502}
503
504impl<T: ByteViewType + ?Sized, V: AsRef<T::Native>> Extend<Option<V>>
505 for GenericByteViewBuilder<T>
506{
507 #[inline]
508 fn extend<I: IntoIterator<Item = Option<V>>>(&mut self, iter: I) {
509 for v in iter {
510 self.append_option(v)
511 }
512 }
513}
514
515pub type StringViewBuilder = GenericByteViewBuilder<StringViewType>;
535
536impl StringLikeArrayBuilder for StringViewBuilder {
537 fn type_name() -> &'static str {
538 std::any::type_name::<StringViewBuilder>()
539 }
540 fn with_capacity(capacity: usize) -> Self {
541 Self::with_capacity(capacity)
542 }
543 fn append_value(&mut self, value: &str) {
544 Self::append_value(self, value);
545 }
546 fn append_null(&mut self) {
547 Self::append_null(self);
548 }
549}
550
551pub type BinaryViewBuilder = GenericByteViewBuilder<BinaryViewType>;
572
573fn make_inlined_view<const LEN: usize>(data: &[u8]) -> u128 {
576 let mut view_buffer = [0; 16];
577 view_buffer[0..4].copy_from_slice(&(LEN as u32).to_le_bytes());
578 view_buffer[4..4 + LEN].copy_from_slice(&data[..LEN]);
579 u128::from_le_bytes(view_buffer)
580}
581
582#[inline(never)]
588pub fn make_view(data: &[u8], block_id: u32, offset: u32) -> u128 {
589 let len = data.len();
590
591 match len {
594 0 => make_inlined_view::<0>(data),
595 1 => make_inlined_view::<1>(data),
596 2 => make_inlined_view::<2>(data),
597 3 => make_inlined_view::<3>(data),
598 4 => make_inlined_view::<4>(data),
599 5 => make_inlined_view::<5>(data),
600 6 => make_inlined_view::<6>(data),
601 7 => make_inlined_view::<7>(data),
602 8 => make_inlined_view::<8>(data),
603 9 => make_inlined_view::<9>(data),
604 10 => make_inlined_view::<10>(data),
605 11 => make_inlined_view::<11>(data),
606 12 => make_inlined_view::<12>(data),
607 _ => {
609 let view = ByteView {
610 length: len as u32,
611 prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()),
612 buffer_index: block_id,
613 offset,
614 };
615 view.as_u128()
616 }
617 }
618}
619
620#[cfg(test)]
621mod tests {
622 use core::str;
623
624 use super::*;
625
626 #[test]
627 fn test_string_view_deduplicate() {
628 let value_1 = "long string to test string view";
629 let value_2 = "not so similar string but long";
630
631 let mut builder = StringViewBuilder::new()
632 .with_deduplicate_strings()
633 .with_fixed_block_size(value_1.len() as u32 * 2); let values = vec![
636 Some(value_1),
637 Some(value_2),
638 Some("short"),
639 Some(value_1),
640 None,
641 Some(value_2),
642 Some(value_1),
643 ];
644 builder.extend(values.clone());
645
646 let array = builder.finish_cloned();
647 array.to_data().validate_full().unwrap();
648 assert_eq!(array.data_buffers().len(), 1); let actual: Vec<_> = array.iter().collect();
650 assert_eq!(actual, values);
651
652 let view0 = array.views().first().unwrap();
653 let view3 = array.views().get(3).unwrap();
654 let view6 = array.views().get(6).unwrap();
655
656 assert_eq!(view0, view3);
657 assert_eq!(view0, view6);
658
659 assert_eq!(array.views().get(1), array.views().get(5));
660 }
661
662 #[test]
663 fn test_string_view_deduplicate_after_finish() {
664 let mut builder = StringViewBuilder::new().with_deduplicate_strings();
665
666 let value_1 = "long string to test string view";
667 let value_2 = "not so similar string but long";
668 builder.append_value(value_1);
669 let _array = builder.finish();
670 builder.append_value(value_2);
671 let _array = builder.finish();
672 builder.append_value(value_1);
673 let _array = builder.finish();
674 }
675
676 #[test]
677 fn test_string_view() {
678 let b1 = Buffer::from(b"world\xFFbananas\xF0\x9F\x98\x81");
679 let b2 = Buffer::from(b"cupcakes");
680 let b3 = Buffer::from(b"Many strings are here contained of great length and verbosity");
681
682 let mut v = StringViewBuilder::new();
683 assert_eq!(v.append_block(b1), 0);
684
685 v.append_value("This is a very long string that exceeds the inline length");
686 v.append_value("This is another very long string that exceeds the inline length");
687
688 assert_eq!(v.append_block(b2), 2);
689 assert_eq!(v.append_block(b3), 3);
690
691 v.try_append_view(0, 0, 5).unwrap(); v.try_append_view(0, 6, 7).unwrap(); v.try_append_view(2, 3, 5).unwrap(); v.try_append_view(2, 0, 3).unwrap(); v.try_append_view(2, 0, 8).unwrap(); v.try_append_view(0, 13, 4).unwrap(); v.try_append_view(0, 13, 0).unwrap(); v.try_append_view(3, 0, 16).unwrap(); v.try_append_view(1, 0, 19).unwrap(); v.try_append_view(3, 13, 27).unwrap(); v.append_value("I do so like long strings");
706
707 let array = v.finish_cloned();
708 array.to_data().validate_full().unwrap();
709 assert_eq!(array.data_buffers().len(), 5);
710 let actual: Vec<_> = array.iter().flatten().collect();
711 assert_eq!(
712 actual,
713 &[
714 "This is a very long string that exceeds the inline length",
715 "This is another very long string that exceeds the inline length",
716 "world",
717 "bananas",
718 "cakes",
719 "cup",
720 "cupcakes",
721 "😁",
722 "",
723 "Many strings are",
724 "This is a very long",
725 "are here contained of great",
726 "I do so like long strings"
727 ]
728 );
729
730 let err = v.try_append_view(0, u32::MAX, 1).unwrap_err();
731 assert_eq!(
732 err.to_string(),
733 "Invalid argument error: Range 4294967295..4294967296 out of bounds for block of length 17"
734 );
735
736 let err = v.try_append_view(0, 1, u32::MAX).unwrap_err();
737 assert_eq!(
738 err.to_string(),
739 "Invalid argument error: Range 1..4294967296 out of bounds for block of length 17"
740 );
741
742 let err = v.try_append_view(0, 13, 2).unwrap_err();
743 assert_eq!(err.to_string(), "Invalid argument error: Invalid view data");
744
745 let err = v.try_append_view(0, 40, 0).unwrap_err();
746 assert_eq!(
747 err.to_string(),
748 "Invalid argument error: Range 40..40 out of bounds for block of length 17"
749 );
750
751 let err = v.try_append_view(5, 0, 0).unwrap_err();
752 assert_eq!(
753 err.to_string(),
754 "Invalid argument error: No block found with index 5"
755 );
756 }
757
758 #[test]
759 fn test_string_view_with_block_size_growth() {
760 let mut exp_builder = StringViewBuilder::new();
761 let mut fixed_builder = StringViewBuilder::new().with_fixed_block_size(STARTING_BLOCK_SIZE);
762
763 let long_string = str::from_utf8(&[b'a'; STARTING_BLOCK_SIZE as usize]).unwrap();
764
765 for i in 0..9 {
766 for _ in 0..(2_u32.pow(i)) {
768 exp_builder.append_value(long_string);
769 fixed_builder.append_value(long_string);
770 }
771 exp_builder.flush_in_progress();
772 fixed_builder.flush_in_progress();
773
774 assert_eq!(exp_builder.completed.len(), i as usize + 1);
776 assert_eq!(
777 exp_builder.completed[i as usize].len(),
778 STARTING_BLOCK_SIZE as usize * 2_usize.pow(i)
779 );
780
781 assert_eq!(fixed_builder.completed.len(), 2_usize.pow(i + 1) - 1);
783
784 assert!(
786 fixed_builder
787 .completed
788 .iter()
789 .all(|b| b.len() == STARTING_BLOCK_SIZE as usize)
790 );
791 }
792
793 exp_builder.append_value(long_string);
795 exp_builder.flush_in_progress();
796 assert_eq!(
797 exp_builder.completed.last().unwrap().capacity(),
798 MAX_BLOCK_SIZE as usize
799 );
800 }
801}