1use std::any::Any;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use arrow_buffer::{Buffer, NullBufferBuilder, ScalarBuffer};
23use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
24use arrow_schema::ArrowError;
25use hashbrown::HashTable;
26use hashbrown::hash_table::Entry;
27
28use crate::builder::{ArrayBuilder, BinaryLikeArrayBuilder, StringLikeArrayBuilder};
29use crate::types::bytes::ByteArrayNativeType;
30use crate::types::{BinaryViewType, ByteViewType, StringViewType};
31use crate::{Array, ArrayRef, GenericByteViewArray};
32
33const STARTING_BLOCK_SIZE: u32 = 8 * 1024; const MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; enum BlockSizeGrowthStrategy {
37 Fixed { size: u32 },
38 Exponential { current_size: u32 },
39}
40
41impl BlockSizeGrowthStrategy {
42 fn next_size(&mut self) -> u32 {
43 match self {
44 Self::Fixed { size } => *size,
45 Self::Exponential { current_size } => {
46 if *current_size < MAX_BLOCK_SIZE {
47 *current_size = current_size.saturating_mul(2);
49 *current_size
50 } else {
51 MAX_BLOCK_SIZE
52 }
53 }
54 }
55 }
56}
57
58pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> {
82 views_buffer: Vec<u128>,
83 null_buffer_builder: NullBufferBuilder,
84 completed: Vec<Buffer>,
85 in_progress: Vec<u8>,
86 block_size: BlockSizeGrowthStrategy,
87 string_tracker: Option<(HashTable<usize>, ahash::RandomState)>,
90 max_deduplication_len: Option<u32>,
91 phantom: PhantomData<T>,
92}
93
94impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
95 pub fn new() -> Self {
97 Self::with_capacity(1024)
98 }
99
100 pub fn with_capacity(capacity: usize) -> Self {
102 Self {
103 views_buffer: Vec::with_capacity(capacity),
104 null_buffer_builder: NullBufferBuilder::new(capacity),
105 completed: vec![],
106 in_progress: vec![],
107 block_size: BlockSizeGrowthStrategy::Exponential {
108 current_size: STARTING_BLOCK_SIZE,
109 },
110 string_tracker: None,
111 max_deduplication_len: None,
112 phantom: Default::default(),
113 }
114 }
115
116 pub fn with_max_deduplication_len(self, max_deduplication_len: u32) -> Self {
123 debug_assert!(
124 max_deduplication_len > 0,
125 "max_deduplication_len must be greater than 0"
126 );
127 Self {
128 max_deduplication_len: Some(max_deduplication_len),
129 ..self
130 }
131 }
132
133 pub fn with_fixed_block_size(self, block_size: u32) -> Self {
149 debug_assert!(block_size > 0, "Block size must be greater than 0");
150 Self {
151 block_size: BlockSizeGrowthStrategy::Fixed { size: block_size },
152 ..self
153 }
154 }
155
156 pub fn with_deduplicate_strings(self) -> Self {
161 Self {
162 string_tracker: Some((
163 HashTable::with_capacity(self.views_buffer.capacity()),
164 Default::default(),
165 )),
166 ..self
167 }
168 }
169
170 pub fn append_block(&mut self, buffer: Buffer) -> u32 {
195 assert!(buffer.len() < u32::MAX as usize);
196
197 self.flush_in_progress();
198 let offset = self.completed.len();
199 self.push_completed(buffer);
200 offset as u32
201 }
202
203 pub unsafe fn append_view_unchecked(&mut self, block: u32, offset: u32, len: u32) {
210 let b = unsafe { self.completed.get_unchecked(block as usize) };
211 let start = offset as usize;
212 let end = start.saturating_add(len as usize);
213 let b = unsafe { b.get_unchecked(start..end) };
214
215 let view = make_view(b, block, offset);
216 self.views_buffer.push(view);
217 self.null_buffer_builder.append_non_null();
218 }
219
220 pub fn append_array(&mut self, array: &GenericByteViewArray<T>) {
224 self.flush_in_progress();
225 let keep_views = self.completed.is_empty() || array.data_buffers().is_empty();
227 let starting_buffer = self.completed.len() as u32;
228
229 self.completed.extend(array.data_buffers().iter().cloned());
230
231 if keep_views {
232 self.views_buffer.extend_from_slice(array.views());
233 } else {
234 self.views_buffer.extend(array.views().iter().map(|v| {
235 let mut byte_view = ByteView::from(*v);
236 if byte_view.length > MAX_INLINE_VIEW_LEN {
237 byte_view.buffer_index += starting_buffer;
239 };
240
241 byte_view.as_u128()
242 }));
243 }
244
245 if let Some(null_buffer) = array.nulls() {
246 self.null_buffer_builder.append_buffer(null_buffer);
247 } else {
248 self.null_buffer_builder.append_n_non_nulls(array.len());
249 }
250 }
251
252 pub fn try_append_view(&mut self, block: u32, offset: u32, len: u32) -> Result<(), ArrowError> {
256 let b = self.completed.get(block as usize).ok_or_else(|| {
257 ArrowError::InvalidArgumentError(format!("No block found with index {block}"))
258 })?;
259 let start = offset as usize;
260 let end = start.saturating_add(len as usize);
261
262 let b = b.get(start..end).ok_or_else(|| {
263 ArrowError::InvalidArgumentError(format!(
264 "Range {start}..{end} out of bounds for block of length {}",
265 b.len()
266 ))
267 })?;
268
269 if T::Native::from_bytes_checked(b).is_none() {
270 return Err(ArrowError::InvalidArgumentError(
271 "Invalid view data".to_string(),
272 ));
273 }
274
275 unsafe {
276 self.append_view_unchecked(block, offset, len);
277 }
278 Ok(())
279 }
280
281 #[inline]
283 fn flush_in_progress(&mut self) {
284 if !self.in_progress.is_empty() {
285 let f = Buffer::from_vec(std::mem::take(&mut self.in_progress));
286 self.push_completed(f)
287 }
288 }
289
290 #[inline]
292 fn push_completed(&mut self, block: Buffer) {
293 assert!(block.len() < u32::MAX as usize, "Block too large");
294 assert!(self.completed.len() < u32::MAX as usize, "Too many blocks");
295 self.completed.push(block);
296 }
297
298 pub fn get_value(&self, index: usize) -> &[u8] {
302 let view = self.views_buffer.as_slice().get(index).unwrap();
303 let len = *view as u32;
304 if len <= MAX_INLINE_VIEW_LEN {
305 unsafe { GenericByteViewArray::<T>::inline_value(view, len as usize) }
308 } else {
309 let view = ByteView::from(*view);
310 if view.buffer_index < self.completed.len() as u32 {
311 let block = &self.completed[view.buffer_index as usize];
312 &block[view.offset as usize..view.offset as usize + view.length as usize]
313 } else {
314 &self.in_progress[view.offset as usize..view.offset as usize + view.length as usize]
315 }
316 }
317 }
318
319 #[inline]
327 pub fn append_value(&mut self, value: impl AsRef<T::Native>) {
328 self.try_append_value(value).unwrap()
329 }
330
331 #[inline]
339 pub fn try_append_value(&mut self, value: impl AsRef<T::Native>) -> Result<(), ArrowError> {
340 let v: &[u8] = value.as_ref().as_ref();
341 let length: u32 = v.len().try_into().map_err(|_| {
342 ArrowError::InvalidArgumentError(format!("String length {} exceeds u32::MAX", v.len()))
343 })?;
344
345 if length <= MAX_INLINE_VIEW_LEN {
346 let mut view_buffer = [0; 16];
347 view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
348 view_buffer[4..4 + v.len()].copy_from_slice(v);
349 self.views_buffer.push(u128::from_le_bytes(view_buffer));
350 self.null_buffer_builder.append_non_null();
351 return Ok(());
352 }
353
354 let can_deduplicate = self.string_tracker.is_some()
358 && self
359 .max_deduplication_len
360 .map(|max_length| length <= max_length)
361 .unwrap_or(true);
362 if can_deduplicate {
363 if let Some((mut ht, hasher)) = self.string_tracker.take() {
364 let hash_val = hasher.hash_one(v);
365 let hasher_fn = |v: &_| hasher.hash_one(v);
366
367 let entry = ht.entry(
368 hash_val,
369 |idx| {
370 let stored_value = self.get_value(*idx);
371 v == stored_value
372 },
373 hasher_fn,
374 );
375 match entry {
376 Entry::Occupied(occupied) => {
377 let idx = occupied.get();
379 self.views_buffer.push(self.views_buffer[*idx]);
380 self.null_buffer_builder.append_non_null();
381 self.string_tracker = Some((ht, hasher));
382 return Ok(());
383 }
384 Entry::Vacant(vacant) => {
385 vacant.insert(self.views_buffer.len());
388 }
389 }
390 self.string_tracker = Some((ht, hasher));
391 }
392 }
393
394 let required_cap = self.in_progress.len() + v.len();
395 if self.in_progress.capacity() < required_cap {
396 self.flush_in_progress();
397 let to_reserve = v.len().max(self.block_size.next_size() as usize);
398 self.in_progress.reserve(to_reserve);
399 };
400
401 let offset = self.in_progress.len() as u32;
402 self.in_progress.extend_from_slice(v);
403
404 let buffer_index: u32 = self.completed.len().try_into().map_err(|_| {
405 ArrowError::InvalidArgumentError(format!(
406 "Buffer count {} exceeds u32::MAX",
407 self.completed.len()
408 ))
409 })?;
410
411 let view = ByteView {
412 length,
413 prefix: u32::from_le_bytes(v[0..4].try_into().unwrap()),
415 buffer_index,
416 offset,
417 };
418 self.views_buffer.push(view.into());
419 self.null_buffer_builder.append_non_null();
420
421 Ok(())
422 }
423
424 #[inline]
426 pub fn append_option(&mut self, value: Option<impl AsRef<T::Native>>) {
427 match value {
428 None => self.append_null(),
429 Some(v) => self.append_value(v),
430 };
431 }
432
433 #[inline]
435 pub fn append_null(&mut self) {
436 self.null_buffer_builder.append_null();
437 self.views_buffer.push(0);
438 }
439
440 pub fn finish(&mut self) -> GenericByteViewArray<T> {
442 self.flush_in_progress();
443 let completed = std::mem::take(&mut self.completed);
444 let nulls = self.null_buffer_builder.finish();
445 if let Some((ht, _)) = self.string_tracker.as_mut() {
446 ht.clear();
447 }
448 let views = std::mem::take(&mut self.views_buffer);
449 unsafe { GenericByteViewArray::new_unchecked(views.into(), completed, nulls) }
451 }
452
453 pub fn finish_cloned(&self) -> GenericByteViewArray<T> {
455 let mut completed = self.completed.clone();
456 if !self.in_progress.is_empty() {
457 completed.push(Buffer::from_slice_ref(&self.in_progress));
458 }
459 let len = self.views_buffer.len();
460 let views = Buffer::from_slice_ref(self.views_buffer.as_slice());
461 let views = ScalarBuffer::new(views, 0, len);
462 let nulls = self.null_buffer_builder.finish_cloned();
463 unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) }
465 }
466
467 pub fn validity_slice(&self) -> Option<&[u8]> {
469 self.null_buffer_builder.as_slice()
470 }
471
472 pub fn allocated_size(&self) -> usize {
474 let views = self.views_buffer.capacity() * std::mem::size_of::<u128>();
475 let null = self.null_buffer_builder.allocated_size();
476 let buffer_size = self.completed.iter().map(|b| b.capacity()).sum::<usize>();
477 let in_progress = self.in_progress.capacity();
478 let tracker = match &self.string_tracker {
479 Some((ht, _)) => ht.capacity() * std::mem::size_of::<usize>(),
480 None => 0,
481 };
482 buffer_size + in_progress + tracker + views + null
483 }
484}
485
486impl<T: ByteViewType + ?Sized> Default for GenericByteViewBuilder<T> {
487 fn default() -> Self {
488 Self::new()
489 }
490}
491
492impl<T: ByteViewType + ?Sized> std::fmt::Debug for GenericByteViewBuilder<T> {
493 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
494 write!(f, "{}ViewBuilder", T::PREFIX)?;
495 f.debug_struct("")
496 .field("views_buffer", &self.views_buffer)
497 .field("in_progress", &self.in_progress)
498 .field("completed", &self.completed)
499 .field("null_buffer_builder", &self.null_buffer_builder)
500 .finish()
501 }
502}
503
504impl<T: ByteViewType + ?Sized> ArrayBuilder for GenericByteViewBuilder<T> {
505 fn len(&self) -> usize {
506 self.null_buffer_builder.len()
507 }
508
509 fn finish(&mut self) -> ArrayRef {
510 Arc::new(self.finish())
511 }
512
513 fn finish_cloned(&self) -> ArrayRef {
514 Arc::new(self.finish_cloned())
515 }
516
517 fn as_any(&self) -> &dyn Any {
518 self
519 }
520
521 fn as_any_mut(&mut self) -> &mut dyn Any {
522 self
523 }
524
525 fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
526 self
527 }
528}
529
530impl<T: ByteViewType + ?Sized, V: AsRef<T::Native>> Extend<Option<V>>
531 for GenericByteViewBuilder<T>
532{
533 #[inline]
534 fn extend<I: IntoIterator<Item = Option<V>>>(&mut self, iter: I) {
535 for v in iter {
536 self.append_option(v)
537 }
538 }
539}
540
541pub type StringViewBuilder = GenericByteViewBuilder<StringViewType>;
561
562impl StringLikeArrayBuilder for StringViewBuilder {
563 fn type_name() -> &'static str {
564 std::any::type_name::<StringViewBuilder>()
565 }
566 fn with_capacity(capacity: usize) -> Self {
567 Self::with_capacity(capacity)
568 }
569 fn append_value(&mut self, value: &str) {
570 Self::append_value(self, value);
571 }
572 fn append_null(&mut self) {
573 Self::append_null(self);
574 }
575}
576
577pub type BinaryViewBuilder = GenericByteViewBuilder<BinaryViewType>;
598
599impl BinaryLikeArrayBuilder for BinaryViewBuilder {
600 fn type_name() -> &'static str {
601 std::any::type_name::<BinaryViewBuilder>()
602 }
603 fn with_capacity(capacity: usize) -> Self {
604 Self::with_capacity(capacity)
605 }
606 fn append_value(&mut self, value: &[u8]) {
607 Self::append_value(self, value);
608 }
609 fn append_null(&mut self) {
610 Self::append_null(self);
611 }
612}
613
614fn make_inlined_view<const LEN: usize>(data: &[u8]) -> u128 {
617 let mut view_buffer = [0; 16];
618 view_buffer[0..4].copy_from_slice(&(LEN as u32).to_le_bytes());
619 view_buffer[4..4 + LEN].copy_from_slice(&data[..LEN]);
620 u128::from_le_bytes(view_buffer)
621}
622
623#[inline(never)]
629pub fn make_view(data: &[u8], block_id: u32, offset: u32) -> u128 {
630 let len = data.len();
631
632 match len {
635 0 => make_inlined_view::<0>(data),
636 1 => make_inlined_view::<1>(data),
637 2 => make_inlined_view::<2>(data),
638 3 => make_inlined_view::<3>(data),
639 4 => make_inlined_view::<4>(data),
640 5 => make_inlined_view::<5>(data),
641 6 => make_inlined_view::<6>(data),
642 7 => make_inlined_view::<7>(data),
643 8 => make_inlined_view::<8>(data),
644 9 => make_inlined_view::<9>(data),
645 10 => make_inlined_view::<10>(data),
646 11 => make_inlined_view::<11>(data),
647 12 => make_inlined_view::<12>(data),
648 _ => {
650 let view = ByteView {
651 length: len as u32,
652 prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()),
653 buffer_index: block_id,
654 offset,
655 };
656 view.as_u128()
657 }
658 }
659}
660
661#[cfg(test)]
662mod tests {
663 use core::str;
664
665 use arrow_buffer::ArrowNativeType;
666
667 use super::*;
668
669 #[test]
670 fn test_string_max_deduplication_len() {
671 let value_1 = "short";
672 let value_2 = "not so similar string but long";
673 let value_3 = "1234567890123";
674
675 let max_deduplication_len = MAX_INLINE_VIEW_LEN * 2;
676
677 let mut builder = StringViewBuilder::new()
678 .with_deduplicate_strings()
679 .with_max_deduplication_len(max_deduplication_len);
680
681 assert!(value_1.len() < MAX_INLINE_VIEW_LEN.as_usize());
682 assert!(value_2.len() > max_deduplication_len.as_usize());
683 assert!(
684 value_3.len() > MAX_INLINE_VIEW_LEN.as_usize()
685 && value_3.len() < max_deduplication_len.as_usize()
686 );
687
688 builder.append_value(value_1); builder.append_value(value_1); builder.append_value(value_2); builder.append_value(value_2); builder.append_value(value_3); builder.append_value(value_3); let array = builder.finish();
699
700 let v2 = ByteView::from(array.views()[2]);
702 let v3 = ByteView::from(array.views()[3]);
703 assert_eq!(v2.buffer_index, v3.buffer_index); assert_ne!(v2.offset, v3.offset); let v4 = ByteView::from(array.views()[4]);
707 let v5 = ByteView::from(array.views()[5]);
708 assert_eq!(v4.buffer_index, v5.buffer_index); assert_eq!(v4.offset, v5.offset); }
711
712 #[test]
713 fn test_string_view_deduplicate() {
714 let value_1 = "long string to test string view";
715 let value_2 = "not so similar string but long";
716
717 let mut builder = StringViewBuilder::new()
718 .with_deduplicate_strings()
719 .with_fixed_block_size(value_1.len() as u32 * 2); let values = vec![
722 Some(value_1),
723 Some(value_2),
724 Some("short"),
725 Some(value_1),
726 None,
727 Some(value_2),
728 Some(value_1),
729 ];
730 builder.extend(values.clone());
731
732 let array = builder.finish_cloned();
733 array.to_data().validate_full().unwrap();
734 assert_eq!(array.data_buffers().len(), 1); let actual: Vec<_> = array.iter().collect();
736 assert_eq!(actual, values);
737
738 let view0 = array.views().first().unwrap();
739 let view3 = array.views().get(3).unwrap();
740 let view6 = array.views().get(6).unwrap();
741
742 assert_eq!(view0, view3);
743 assert_eq!(view0, view6);
744
745 assert_eq!(array.views().get(1), array.views().get(5));
746 }
747
748 #[test]
749 fn test_string_view_deduplicate_after_finish() {
750 let mut builder = StringViewBuilder::new().with_deduplicate_strings();
751
752 let value_1 = "long string to test string view";
753 let value_2 = "not so similar string but long";
754 builder.append_value(value_1);
755 let _array = builder.finish();
756 builder.append_value(value_2);
757 let _array = builder.finish();
758 builder.append_value(value_1);
759 let _array = builder.finish();
760 }
761
762 #[test]
763 fn test_string_view() {
764 let b1 = Buffer::from(b"world\xFFbananas\xF0\x9F\x98\x81");
765 let b2 = Buffer::from(b"cupcakes");
766 let b3 = Buffer::from(b"Many strings are here contained of great length and verbosity");
767
768 let mut v = StringViewBuilder::new();
769 assert_eq!(v.append_block(b1), 0);
770
771 v.append_value("This is a very long string that exceeds the inline length");
772 v.append_value("This is another very long string that exceeds the inline length");
773
774 assert_eq!(v.append_block(b2), 2);
775 assert_eq!(v.append_block(b3), 3);
776
777 v.try_append_view(0, 0, 5).unwrap(); v.try_append_view(0, 6, 7).unwrap(); v.try_append_view(2, 3, 5).unwrap(); v.try_append_view(2, 0, 3).unwrap(); v.try_append_view(2, 0, 8).unwrap(); v.try_append_view(0, 13, 4).unwrap(); v.try_append_view(0, 13, 0).unwrap(); v.try_append_view(3, 0, 16).unwrap(); v.try_append_view(1, 0, 19).unwrap(); v.try_append_view(3, 13, 27).unwrap(); v.append_value("I do so like long strings");
792
793 let array = v.finish_cloned();
794 array.to_data().validate_full().unwrap();
795 assert_eq!(array.data_buffers().len(), 5);
796 let actual: Vec<_> = array.iter().flatten().collect();
797 assert_eq!(
798 actual,
799 &[
800 "This is a very long string that exceeds the inline length",
801 "This is another very long string that exceeds the inline length",
802 "world",
803 "bananas",
804 "cakes",
805 "cup",
806 "cupcakes",
807 "😁",
808 "",
809 "Many strings are",
810 "This is a very long",
811 "are here contained of great",
812 "I do so like long strings"
813 ]
814 );
815
816 let err = v.try_append_view(0, u32::MAX, 1).unwrap_err();
817 assert_eq!(
818 err.to_string(),
819 "Invalid argument error: Range 4294967295..4294967296 out of bounds for block of length 17"
820 );
821
822 let err = v.try_append_view(0, 1, u32::MAX).unwrap_err();
823 assert_eq!(
824 err.to_string(),
825 "Invalid argument error: Range 1..4294967296 out of bounds for block of length 17"
826 );
827
828 let err = v.try_append_view(0, 13, 2).unwrap_err();
829 assert_eq!(err.to_string(), "Invalid argument error: Invalid view data");
830
831 let err = v.try_append_view(0, 40, 0).unwrap_err();
832 assert_eq!(
833 err.to_string(),
834 "Invalid argument error: Range 40..40 out of bounds for block of length 17"
835 );
836
837 let err = v.try_append_view(5, 0, 0).unwrap_err();
838 assert_eq!(
839 err.to_string(),
840 "Invalid argument error: No block found with index 5"
841 );
842 }
843
844 #[test]
845 fn test_string_view_with_block_size_growth() {
846 let mut exp_builder = StringViewBuilder::new();
847 let mut fixed_builder = StringViewBuilder::new().with_fixed_block_size(STARTING_BLOCK_SIZE);
848
849 let long_string = str::from_utf8(&[b'a'; STARTING_BLOCK_SIZE as usize]).unwrap();
850
851 for i in 0..9 {
852 for _ in 0..(2_u32.pow(i)) {
854 exp_builder.append_value(long_string);
855 fixed_builder.append_value(long_string);
856 }
857 exp_builder.flush_in_progress();
858 fixed_builder.flush_in_progress();
859
860 assert_eq!(exp_builder.completed.len(), i as usize + 1);
862 assert_eq!(
863 exp_builder.completed[i as usize].len(),
864 STARTING_BLOCK_SIZE as usize * 2_usize.pow(i)
865 );
866
867 assert_eq!(fixed_builder.completed.len(), 2_usize.pow(i + 1) - 1);
869
870 assert!(
872 fixed_builder
873 .completed
874 .iter()
875 .all(|b| b.len() == STARTING_BLOCK_SIZE as usize)
876 );
877 }
878
879 exp_builder.append_value(long_string);
881 exp_builder.flush_in_progress();
882 assert_eq!(
883 exp_builder.completed.last().unwrap().capacity(),
884 MAX_BLOCK_SIZE as usize
885 );
886 }
887}