1use std::any::Any;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use arrow_buffer::{Buffer, NullBufferBuilder, ScalarBuffer};
23use arrow_data::{ByteView, MAX_INLINE_VIEW_LEN};
24use arrow_schema::ArrowError;
25use hashbrown::hash_table::Entry;
26use hashbrown::HashTable;
27
28use crate::builder::ArrayBuilder;
29use crate::types::bytes::ByteArrayNativeType;
30use crate::types::{BinaryViewType, ByteViewType, StringViewType};
31use crate::{Array, ArrayRef, GenericByteViewArray};
32
33const STARTING_BLOCK_SIZE: u32 = 8 * 1024; const MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; enum BlockSizeGrowthStrategy {
37 Fixed { size: u32 },
38 Exponential { current_size: u32 },
39}
40
41impl BlockSizeGrowthStrategy {
42 fn next_size(&mut self) -> u32 {
43 match self {
44 Self::Fixed { size } => *size,
45 Self::Exponential { current_size } => {
46 if *current_size < MAX_BLOCK_SIZE {
47 *current_size = current_size.saturating_mul(2);
49 *current_size
50 } else {
51 MAX_BLOCK_SIZE
52 }
53 }
54 }
55 }
56}
57
58pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> {
82 views_buffer: Vec<u128>,
83 null_buffer_builder: NullBufferBuilder,
84 completed: Vec<Buffer>,
85 in_progress: Vec<u8>,
86 block_size: BlockSizeGrowthStrategy,
87 string_tracker: Option<(HashTable<usize>, ahash::RandomState)>,
90 phantom: PhantomData<T>,
91}
92
93impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
94 pub fn new() -> Self {
96 Self::with_capacity(1024)
97 }
98
99 pub fn with_capacity(capacity: usize) -> Self {
101 Self {
102 views_buffer: Vec::with_capacity(capacity),
103 null_buffer_builder: NullBufferBuilder::new(capacity),
104 completed: vec![],
105 in_progress: vec![],
106 block_size: BlockSizeGrowthStrategy::Exponential {
107 current_size: STARTING_BLOCK_SIZE,
108 },
109 string_tracker: None,
110 phantom: Default::default(),
111 }
112 }
113
114 pub fn with_fixed_block_size(self, block_size: u32) -> Self {
130 debug_assert!(block_size > 0, "Block size must be greater than 0");
131 Self {
132 block_size: BlockSizeGrowthStrategy::Fixed { size: block_size },
133 ..self
134 }
135 }
136
137 pub fn with_deduplicate_strings(self) -> Self {
142 Self {
143 string_tracker: Some((
144 HashTable::with_capacity(self.views_buffer.capacity()),
145 Default::default(),
146 )),
147 ..self
148 }
149 }
150
151 pub fn append_block(&mut self, buffer: Buffer) -> u32 {
176 assert!(buffer.len() < u32::MAX as usize);
177
178 self.flush_in_progress();
179 let offset = self.completed.len();
180 self.push_completed(buffer);
181 offset as u32
182 }
183
184 pub unsafe fn append_view_unchecked(&mut self, block: u32, offset: u32, len: u32) {
191 let b = self.completed.get_unchecked(block as usize);
192 let start = offset as usize;
193 let end = start.saturating_add(len as usize);
194 let b = b.get_unchecked(start..end);
195
196 let view = make_view(b, block, offset);
197 self.views_buffer.push(view);
198 self.null_buffer_builder.append_non_null();
199 }
200
201 pub fn append_array(&mut self, array: &GenericByteViewArray<T>) {
205 self.flush_in_progress();
206 let keep_views = self.completed.is_empty() || array.data_buffers().is_empty();
208 let starting_buffer = self.completed.len() as u32;
209
210 self.completed.extend(array.data_buffers().iter().cloned());
211
212 if keep_views {
213 self.views_buffer.extend_from_slice(array.views());
214 } else {
215 self.views_buffer.extend(array.views().iter().map(|v| {
216 let mut byte_view = ByteView::from(*v);
217 if byte_view.length > MAX_INLINE_VIEW_LEN {
218 byte_view.buffer_index += starting_buffer;
220 };
221
222 byte_view.as_u128()
223 }));
224 }
225
226 if let Some(null_buffer) = array.nulls() {
227 self.null_buffer_builder.append_buffer(null_buffer);
228 } else {
229 self.null_buffer_builder.append_n_non_nulls(array.len());
230 }
231 }
232
233 pub fn try_append_view(&mut self, block: u32, offset: u32, len: u32) -> Result<(), ArrowError> {
237 let b = self.completed.get(block as usize).ok_or_else(|| {
238 ArrowError::InvalidArgumentError(format!("No block found with index {block}"))
239 })?;
240 let start = offset as usize;
241 let end = start.saturating_add(len as usize);
242
243 let b = b.get(start..end).ok_or_else(|| {
244 ArrowError::InvalidArgumentError(format!(
245 "Range {start}..{end} out of bounds for block of length {}",
246 b.len()
247 ))
248 })?;
249
250 if T::Native::from_bytes_checked(b).is_none() {
251 return Err(ArrowError::InvalidArgumentError(
252 "Invalid view data".to_string(),
253 ));
254 }
255
256 unsafe {
257 self.append_view_unchecked(block, offset, len);
258 }
259 Ok(())
260 }
261
262 #[inline]
264 fn flush_in_progress(&mut self) {
265 if !self.in_progress.is_empty() {
266 let f = Buffer::from_vec(std::mem::take(&mut self.in_progress));
267 self.push_completed(f)
268 }
269 }
270
271 #[inline]
273 fn push_completed(&mut self, block: Buffer) {
274 assert!(block.len() < u32::MAX as usize, "Block too large");
275 assert!(self.completed.len() < u32::MAX as usize, "Too many blocks");
276 self.completed.push(block);
277 }
278
279 pub fn get_value(&self, index: usize) -> &[u8] {
283 let view = self.views_buffer.as_slice().get(index).unwrap();
284 let len = *view as u32;
285 if len <= MAX_INLINE_VIEW_LEN {
286 unsafe { GenericByteViewArray::<T>::inline_value(view, len as usize) }
289 } else {
290 let view = ByteView::from(*view);
291 if view.buffer_index < self.completed.len() as u32 {
292 let block = &self.completed[view.buffer_index as usize];
293 &block[view.offset as usize..view.offset as usize + view.length as usize]
294 } else {
295 &self.in_progress[view.offset as usize..view.offset as usize + view.length as usize]
296 }
297 }
298 }
299
300 #[inline]
308 pub fn append_value(&mut self, value: impl AsRef<T::Native>) {
309 let v: &[u8] = value.as_ref().as_ref();
310 let length: u32 = v.len().try_into().unwrap();
311 if length <= MAX_INLINE_VIEW_LEN {
312 let mut view_buffer = [0; 16];
313 view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
314 view_buffer[4..4 + v.len()].copy_from_slice(v);
315 self.views_buffer.push(u128::from_le_bytes(view_buffer));
316 self.null_buffer_builder.append_non_null();
317 return;
318 }
319
320 if let Some((mut ht, hasher)) = self.string_tracker.take() {
324 let hash_val = hasher.hash_one(v);
325 let hasher_fn = |v: &_| hasher.hash_one(v);
326
327 let entry = ht.entry(
328 hash_val,
329 |idx| {
330 let stored_value = self.get_value(*idx);
331 v == stored_value
332 },
333 hasher_fn,
334 );
335 match entry {
336 Entry::Occupied(occupied) => {
337 let idx = occupied.get();
339 self.views_buffer.push(self.views_buffer[*idx]);
340 self.null_buffer_builder.append_non_null();
341 self.string_tracker = Some((ht, hasher));
342 return;
343 }
344 Entry::Vacant(vacant) => {
345 vacant.insert(self.views_buffer.len());
348 }
349 }
350 self.string_tracker = Some((ht, hasher));
351 }
352
353 let required_cap = self.in_progress.len() + v.len();
354 if self.in_progress.capacity() < required_cap {
355 self.flush_in_progress();
356 let to_reserve = v.len().max(self.block_size.next_size() as usize);
357 self.in_progress.reserve(to_reserve);
358 };
359 let offset = self.in_progress.len() as u32;
360 self.in_progress.extend_from_slice(v);
361
362 let view = ByteView {
363 length,
364 prefix: u32::from_le_bytes(v[0..4].try_into().unwrap()),
365 buffer_index: self.completed.len() as u32,
366 offset,
367 };
368 self.views_buffer.push(view.into());
369 self.null_buffer_builder.append_non_null();
370 }
371
372 #[inline]
374 pub fn append_option(&mut self, value: Option<impl AsRef<T::Native>>) {
375 match value {
376 None => self.append_null(),
377 Some(v) => self.append_value(v),
378 };
379 }
380
381 #[inline]
383 pub fn append_null(&mut self) {
384 self.null_buffer_builder.append_null();
385 self.views_buffer.push(0);
386 }
387
388 pub fn finish(&mut self) -> GenericByteViewArray<T> {
390 self.flush_in_progress();
391 let completed = std::mem::take(&mut self.completed);
392 let nulls = self.null_buffer_builder.finish();
393 if let Some((ref mut ht, _)) = self.string_tracker.as_mut() {
394 ht.clear();
395 }
396 let views = std::mem::take(&mut self.views_buffer);
397 unsafe { GenericByteViewArray::new_unchecked(views.into(), completed, nulls) }
399 }
400
401 pub fn finish_cloned(&self) -> GenericByteViewArray<T> {
403 let mut completed = self.completed.clone();
404 if !self.in_progress.is_empty() {
405 completed.push(Buffer::from_slice_ref(&self.in_progress));
406 }
407 let len = self.views_buffer.len();
408 let views = Buffer::from_slice_ref(self.views_buffer.as_slice());
409 let views = ScalarBuffer::new(views, 0, len);
410 let nulls = self.null_buffer_builder.finish_cloned();
411 unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) }
413 }
414
415 pub fn validity_slice(&self) -> Option<&[u8]> {
417 self.null_buffer_builder.as_slice()
418 }
419
420 pub fn allocated_size(&self) -> usize {
422 let views = self.views_buffer.capacity() * std::mem::size_of::<u128>();
423 let null = self.null_buffer_builder.allocated_size();
424 let buffer_size = self.completed.iter().map(|b| b.capacity()).sum::<usize>();
425 let in_progress = self.in_progress.capacity();
426 let tracker = match &self.string_tracker {
427 Some((ht, _)) => ht.capacity() * std::mem::size_of::<usize>(),
428 None => 0,
429 };
430 buffer_size + in_progress + tracker + views + null
431 }
432}
433
434impl<T: ByteViewType + ?Sized> Default for GenericByteViewBuilder<T> {
435 fn default() -> Self {
436 Self::new()
437 }
438}
439
440impl<T: ByteViewType + ?Sized> std::fmt::Debug for GenericByteViewBuilder<T> {
441 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
442 write!(f, "{}ViewBuilder", T::PREFIX)?;
443 f.debug_struct("")
444 .field("views_buffer", &self.views_buffer)
445 .field("in_progress", &self.in_progress)
446 .field("completed", &self.completed)
447 .field("null_buffer_builder", &self.null_buffer_builder)
448 .finish()
449 }
450}
451
452impl<T: ByteViewType + ?Sized> ArrayBuilder for GenericByteViewBuilder<T> {
453 fn len(&self) -> usize {
454 self.null_buffer_builder.len()
455 }
456
457 fn finish(&mut self) -> ArrayRef {
458 Arc::new(self.finish())
459 }
460
461 fn finish_cloned(&self) -> ArrayRef {
462 Arc::new(self.finish_cloned())
463 }
464
465 fn as_any(&self) -> &dyn Any {
466 self
467 }
468
469 fn as_any_mut(&mut self) -> &mut dyn Any {
470 self
471 }
472
473 fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
474 self
475 }
476}
477
478impl<T: ByteViewType + ?Sized, V: AsRef<T::Native>> Extend<Option<V>>
479 for GenericByteViewBuilder<T>
480{
481 #[inline]
482 fn extend<I: IntoIterator<Item = Option<V>>>(&mut self, iter: I) {
483 for v in iter {
484 self.append_option(v)
485 }
486 }
487}
488
489pub type StringViewBuilder = GenericByteViewBuilder<StringViewType>;
509
510pub type BinaryViewBuilder = GenericByteViewBuilder<BinaryViewType>;
531
532fn make_inlined_view<const LEN: usize>(data: &[u8]) -> u128 {
535 let mut view_buffer = [0; 16];
536 view_buffer[0..4].copy_from_slice(&(LEN as u32).to_le_bytes());
537 view_buffer[4..4 + LEN].copy_from_slice(&data[..LEN]);
538 u128::from_le_bytes(view_buffer)
539}
540
541#[inline(never)]
547pub fn make_view(data: &[u8], block_id: u32, offset: u32) -> u128 {
548 let len = data.len();
549
550 match len {
553 0 => make_inlined_view::<0>(data),
554 1 => make_inlined_view::<1>(data),
555 2 => make_inlined_view::<2>(data),
556 3 => make_inlined_view::<3>(data),
557 4 => make_inlined_view::<4>(data),
558 5 => make_inlined_view::<5>(data),
559 6 => make_inlined_view::<6>(data),
560 7 => make_inlined_view::<7>(data),
561 8 => make_inlined_view::<8>(data),
562 9 => make_inlined_view::<9>(data),
563 10 => make_inlined_view::<10>(data),
564 11 => make_inlined_view::<11>(data),
565 12 => make_inlined_view::<12>(data),
566 _ => {
568 let view = ByteView {
569 length: len as u32,
570 prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()),
571 buffer_index: block_id,
572 offset,
573 };
574 view.as_u128()
575 }
576 }
577}
578
579#[cfg(test)]
580mod tests {
581 use core::str;
582
583 use super::*;
584 use crate::Array;
585
586 #[test]
587 fn test_string_view_deduplicate() {
588 let value_1 = "long string to test string view";
589 let value_2 = "not so similar string but long";
590
591 let mut builder = StringViewBuilder::new()
592 .with_deduplicate_strings()
593 .with_fixed_block_size(value_1.len() as u32 * 2); let values = vec![
596 Some(value_1),
597 Some(value_2),
598 Some("short"),
599 Some(value_1),
600 None,
601 Some(value_2),
602 Some(value_1),
603 ];
604 builder.extend(values.clone());
605
606 let array = builder.finish_cloned();
607 array.to_data().validate_full().unwrap();
608 assert_eq!(array.data_buffers().len(), 1); let actual: Vec<_> = array.iter().collect();
610 assert_eq!(actual, values);
611
612 let view0 = array.views().first().unwrap();
613 let view3 = array.views().get(3).unwrap();
614 let view6 = array.views().get(6).unwrap();
615
616 assert_eq!(view0, view3);
617 assert_eq!(view0, view6);
618
619 assert_eq!(array.views().get(1), array.views().get(5));
620 }
621
622 #[test]
623 fn test_string_view_deduplicate_after_finish() {
624 let mut builder = StringViewBuilder::new().with_deduplicate_strings();
625
626 let value_1 = "long string to test string view";
627 let value_2 = "not so similar string but long";
628 builder.append_value(value_1);
629 let _array = builder.finish();
630 builder.append_value(value_2);
631 let _array = builder.finish();
632 builder.append_value(value_1);
633 let _array = builder.finish();
634 }
635
636 #[test]
637 fn test_string_view() {
638 let b1 = Buffer::from(b"world\xFFbananas\xF0\x9F\x98\x81");
639 let b2 = Buffer::from(b"cupcakes");
640 let b3 = Buffer::from(b"Many strings are here contained of great length and verbosity");
641
642 let mut v = StringViewBuilder::new();
643 assert_eq!(v.append_block(b1), 0);
644
645 v.append_value("This is a very long string that exceeds the inline length");
646 v.append_value("This is another very long string that exceeds the inline length");
647
648 assert_eq!(v.append_block(b2), 2);
649 assert_eq!(v.append_block(b3), 3);
650
651 v.try_append_view(0, 0, 5).unwrap(); v.try_append_view(0, 6, 7).unwrap(); v.try_append_view(2, 3, 5).unwrap(); v.try_append_view(2, 0, 3).unwrap(); v.try_append_view(2, 0, 8).unwrap(); v.try_append_view(0, 13, 4).unwrap(); v.try_append_view(0, 13, 0).unwrap(); v.try_append_view(3, 0, 16).unwrap(); v.try_append_view(1, 0, 19).unwrap(); v.try_append_view(3, 13, 27).unwrap(); v.append_value("I do so like long strings");
666
667 let array = v.finish_cloned();
668 array.to_data().validate_full().unwrap();
669 assert_eq!(array.data_buffers().len(), 5);
670 let actual: Vec<_> = array.iter().flatten().collect();
671 assert_eq!(
672 actual,
673 &[
674 "This is a very long string that exceeds the inline length",
675 "This is another very long string that exceeds the inline length",
676 "world",
677 "bananas",
678 "cakes",
679 "cup",
680 "cupcakes",
681 "😁",
682 "",
683 "Many strings are",
684 "This is a very long",
685 "are here contained of great",
686 "I do so like long strings"
687 ]
688 );
689
690 let err = v.try_append_view(0, u32::MAX, 1).unwrap_err();
691 assert_eq!(err.to_string(), "Invalid argument error: Range 4294967295..4294967296 out of bounds for block of length 17");
692
693 let err = v.try_append_view(0, 1, u32::MAX).unwrap_err();
694 assert_eq!(
695 err.to_string(),
696 "Invalid argument error: Range 1..4294967296 out of bounds for block of length 17"
697 );
698
699 let err = v.try_append_view(0, 13, 2).unwrap_err();
700 assert_eq!(err.to_string(), "Invalid argument error: Invalid view data");
701
702 let err = v.try_append_view(0, 40, 0).unwrap_err();
703 assert_eq!(
704 err.to_string(),
705 "Invalid argument error: Range 40..40 out of bounds for block of length 17"
706 );
707
708 let err = v.try_append_view(5, 0, 0).unwrap_err();
709 assert_eq!(
710 err.to_string(),
711 "Invalid argument error: No block found with index 5"
712 );
713 }
714
715 #[test]
716 fn test_string_view_with_block_size_growth() {
717 let mut exp_builder = StringViewBuilder::new();
718 let mut fixed_builder = StringViewBuilder::new().with_fixed_block_size(STARTING_BLOCK_SIZE);
719
720 let long_string = str::from_utf8(&[b'a'; STARTING_BLOCK_SIZE as usize]).unwrap();
721
722 for i in 0..9 {
723 for _ in 0..(2_u32.pow(i)) {
725 exp_builder.append_value(long_string);
726 fixed_builder.append_value(long_string);
727 }
728 exp_builder.flush_in_progress();
729 fixed_builder.flush_in_progress();
730
731 assert_eq!(exp_builder.completed.len(), i as usize + 1);
733 assert_eq!(
734 exp_builder.completed[i as usize].len(),
735 STARTING_BLOCK_SIZE as usize * 2_usize.pow(i)
736 );
737
738 assert_eq!(fixed_builder.completed.len(), 2_usize.pow(i + 1) - 1);
740
741 assert!(fixed_builder
743 .completed
744 .iter()
745 .all(|b| b.len() == STARTING_BLOCK_SIZE as usize));
746 }
747
748 exp_builder.append_value(long_string);
750 exp_builder.flush_in_progress();
751 assert_eq!(
752 exp_builder.completed.last().unwrap().capacity(),
753 MAX_BLOCK_SIZE as usize
754 );
755 }
756}