1use crate::array::{get_offsets_from_buffer, print_long_array};
19use crate::builder::GenericByteBuilder;
20use crate::iterator::ArrayIter;
21use crate::types::ByteArrayType;
22use crate::types::bytes::ByteArrayNativeType;
23use crate::{Array, ArrayAccessor, ArrayRef, OffsetSizeTrait, Scalar};
24use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
25use arrow_buffer::{NullBuffer, OffsetBuffer};
26use arrow_data::{ArrayData, ArrayDataBuilder};
27use arrow_schema::{ArrowError, DataType};
28use std::any::Any;
29use std::sync::Arc;
30
31pub struct GenericByteArray<T: ByteArrayType> {
88 data_type: DataType,
89 value_offsets: OffsetBuffer<T::Offset>,
90 value_data: Buffer,
91 nulls: Option<NullBuffer>,
92}
93
94impl<T: ByteArrayType> Clone for GenericByteArray<T> {
95 fn clone(&self) -> Self {
96 Self {
97 data_type: T::DATA_TYPE,
98 value_offsets: self.value_offsets.clone(),
99 value_data: self.value_data.clone(),
100 nulls: self.nulls.clone(),
101 }
102 }
103}
104
105impl<T: ByteArrayType> GenericByteArray<T> {
106 pub const DATA_TYPE: DataType = T::DATA_TYPE;
108
109 pub fn new(
115 offsets: OffsetBuffer<T::Offset>,
116 values: Buffer,
117 nulls: Option<NullBuffer>,
118 ) -> Self {
119 Self::try_new(offsets, values, nulls).unwrap()
120 }
121
122 pub fn try_new(
129 offsets: OffsetBuffer<T::Offset>,
130 values: Buffer,
131 nulls: Option<NullBuffer>,
132 ) -> Result<Self, ArrowError> {
133 let len = offsets.len() - 1;
134
135 T::validate(&offsets, &values)?;
137
138 if let Some(n) = nulls.as_ref() {
139 if n.len() != len {
140 return Err(ArrowError::InvalidArgumentError(format!(
141 "Incorrect length of null buffer for {}{}Array, expected {len} got {}",
142 T::Offset::PREFIX,
143 T::PREFIX,
144 n.len(),
145 )));
146 }
147 }
148
149 Ok(Self {
150 data_type: T::DATA_TYPE,
151 value_offsets: offsets,
152 value_data: values,
153 nulls,
154 })
155 }
156
157 pub unsafe fn new_unchecked(
163 offsets: OffsetBuffer<T::Offset>,
164 values: Buffer,
165 nulls: Option<NullBuffer>,
166 ) -> Self {
167 if cfg!(feature = "force_validate") {
168 return Self::new(offsets, values, nulls);
169 }
170 Self {
171 data_type: T::DATA_TYPE,
172 value_offsets: offsets,
173 value_data: values,
174 nulls,
175 }
176 }
177
178 pub fn new_null(len: usize) -> Self {
180 Self {
181 data_type: T::DATA_TYPE,
182 value_offsets: OffsetBuffer::new_zeroed(len),
183 value_data: MutableBuffer::new(0).into(),
184 nulls: Some(NullBuffer::new_null(len)),
185 }
186 }
187
188 pub fn new_scalar(value: impl AsRef<T::Native>) -> Scalar<Self> {
190 Scalar::new(Self::from_iter_values(std::iter::once(value)))
191 }
192
193 pub fn new_repeated(value: impl AsRef<T::Native>, repeat_count: usize) -> Self {
199 let s: &[u8] = value.as_ref().as_ref();
200 let value_offsets = OffsetBuffer::from_repeated_length(s.len(), repeat_count);
201 let bytes: Buffer = {
202 let mut mutable_buffer = MutableBuffer::with_capacity(0);
203 mutable_buffer.repeat_slice_n_times(s, repeat_count);
204
205 mutable_buffer.into()
206 };
207
208 Self {
209 data_type: T::DATA_TYPE,
210 value_data: bytes,
211 value_offsets,
212 nulls: None,
213 }
214 }
215
216 pub fn from_iter_values<Ptr, I>(iter: I) -> Self
218 where
219 Ptr: AsRef<T::Native>,
220 I: IntoIterator<Item = Ptr>,
221 {
222 let iter = iter.into_iter();
223 let (_, data_len) = iter.size_hint();
224 let data_len = data_len.expect("Iterator must be sized"); let mut offsets = MutableBuffer::new((data_len + 1) * std::mem::size_of::<T::Offset>());
227 offsets.push(T::Offset::usize_as(0));
228
229 let mut values = MutableBuffer::new(0);
230 for s in iter {
231 let s: &[u8] = s.as_ref().as_ref();
232 values.extend_from_slice(s);
233 offsets.push(T::Offset::usize_as(values.len()));
234 }
235
236 T::Offset::from_usize(values.len()).expect("offset overflow");
237 let offsets = Buffer::from(offsets);
238
239 let value_offsets = unsafe { OffsetBuffer::new_unchecked(offsets.into()) };
241
242 Self {
243 data_type: T::DATA_TYPE,
244 value_data: values.into(),
245 value_offsets,
246 nulls: None,
247 }
248 }
249
250 pub fn into_parts(self) -> (OffsetBuffer<T::Offset>, Buffer, Option<NullBuffer>) {
252 (self.value_offsets, self.value_data, self.nulls)
253 }
254
255 #[inline]
259 pub fn value_length(&self, i: usize) -> T::Offset {
260 let offsets = self.value_offsets();
261 offsets[i + 1] - offsets[i]
262 }
263
264 #[inline]
269 pub fn offsets(&self) -> &OffsetBuffer<T::Offset> {
270 &self.value_offsets
271 }
272
273 #[inline]
278 pub fn values(&self) -> &Buffer {
279 &self.value_data
280 }
281
282 pub fn value_data(&self) -> &[u8] {
284 self.value_data.as_slice()
285 }
286
287 pub fn is_ascii(&self) -> bool {
289 let offsets = self.value_offsets();
290 let start = offsets.first().unwrap();
291 let end = offsets.last().unwrap();
292 self.value_data()[start.as_usize()..end.as_usize()].is_ascii()
293 }
294
295 #[inline]
297 pub fn value_offsets(&self) -> &[T::Offset] {
298 &self.value_offsets
299 }
300
301 pub unsafe fn value_unchecked(&self, i: usize) -> &T::Native {
309 let end = *unsafe { self.value_offsets().get_unchecked(i + 1) };
310 let start = *unsafe { self.value_offsets().get_unchecked(i) };
311
312 let b = unsafe {
322 std::slice::from_raw_parts(
323 self.value_data
324 .as_ptr()
325 .offset(start.to_isize().unwrap_unchecked()),
326 (end - start).to_usize().unwrap_unchecked(),
327 )
328 };
329
330 unsafe { T::Native::from_bytes_unchecked(b) }
333 }
334
335 pub fn value(&self, i: usize) -> &T::Native {
343 assert!(
344 i < self.len(),
345 "Trying to access an element at index {} from a {}{}Array of length {}",
346 i,
347 T::Offset::PREFIX,
348 T::PREFIX,
349 self.len()
350 );
351 unsafe { self.value_unchecked(i) }
354 }
355
356 pub fn iter(&self) -> ArrayIter<&Self> {
358 ArrayIter::new(self)
359 }
360
361 pub fn slice(&self, offset: usize, length: usize) -> Self {
363 Self {
364 data_type: T::DATA_TYPE,
365 value_offsets: self.value_offsets.slice(offset, length),
366 value_data: self.value_data.clone(),
367 nulls: self.nulls.as_ref().map(|n| n.slice(offset, length)),
368 }
369 }
370
371 pub fn into_builder(self) -> Result<GenericByteBuilder<T>, Self> {
374 let len = self.len();
375 let value_len = T::Offset::as_usize(self.value_offsets()[len] - self.value_offsets()[0]);
376
377 let data = self.into_data();
378 let null_bit_buffer = data.nulls().map(|b| b.inner().sliced());
379
380 let element_len = std::mem::size_of::<T::Offset>();
381 let offset_buffer = data.buffers()[0]
382 .slice_with_length(data.offset() * element_len, (len + 1) * element_len);
383
384 let element_len = std::mem::size_of::<u8>();
385 let value_buffer = data.buffers()[1]
386 .slice_with_length(data.offset() * element_len, value_len * element_len);
387
388 drop(data);
389
390 let try_mutable_null_buffer = match null_bit_buffer {
391 None => Ok(None),
392 Some(null_buffer) => {
393 null_buffer.into_mutable().map(Some)
395 }
396 };
397
398 let try_mutable_buffers = match try_mutable_null_buffer {
399 Ok(mutable_null_buffer) => {
400 let try_mutable_offset_buffer = offset_buffer.into_mutable();
402 let try_mutable_value_buffer = value_buffer.into_mutable();
403
404 match (try_mutable_offset_buffer, try_mutable_value_buffer) {
407 (Ok(mutable_offset_buffer), Ok(mutable_value_buffer)) => unsafe {
408 Ok(GenericByteBuilder::<T>::new_from_buffer(
409 mutable_offset_buffer,
410 mutable_value_buffer,
411 mutable_null_buffer,
412 ))
413 },
414 (Ok(mutable_offset_buffer), Err(value_buffer)) => Err((
415 mutable_offset_buffer.into(),
416 value_buffer,
417 mutable_null_buffer.map(|b| b.into()),
418 )),
419 (Err(offset_buffer), Ok(mutable_value_buffer)) => Err((
420 offset_buffer,
421 mutable_value_buffer.into(),
422 mutable_null_buffer.map(|b| b.into()),
423 )),
424 (Err(offset_buffer), Err(value_buffer)) => Err((
425 offset_buffer,
426 value_buffer,
427 mutable_null_buffer.map(|b| b.into()),
428 )),
429 }
430 }
431 Err(mutable_null_buffer) => {
432 Err((offset_buffer, value_buffer, Some(mutable_null_buffer)))
434 }
435 };
436
437 match try_mutable_buffers {
438 Ok(builder) => Ok(builder),
439 Err((offset_buffer, value_buffer, null_bit_buffer)) => {
440 let builder = ArrayData::builder(T::DATA_TYPE)
441 .len(len)
442 .add_buffer(offset_buffer)
443 .add_buffer(value_buffer)
444 .null_bit_buffer(null_bit_buffer);
445
446 let array_data = unsafe { builder.build_unchecked() };
447 let array = GenericByteArray::<T>::from(array_data);
448
449 Err(array)
450 }
451 }
452 }
453}
454
455impl<T: ByteArrayType> std::fmt::Debug for GenericByteArray<T> {
456 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
457 write!(f, "{}{}Array\n[\n", T::Offset::PREFIX, T::PREFIX)?;
458 print_long_array(self, f, |array, index, f| {
459 std::fmt::Debug::fmt(&array.value(index), f)
460 })?;
461 write!(f, "]")
462 }
463}
464
465unsafe impl<T: ByteArrayType> Array for GenericByteArray<T> {
467 fn as_any(&self) -> &dyn Any {
468 self
469 }
470
471 fn to_data(&self) -> ArrayData {
472 self.clone().into()
473 }
474
475 fn into_data(self) -> ArrayData {
476 self.into()
477 }
478
479 fn data_type(&self) -> &DataType {
480 &self.data_type
481 }
482
483 fn slice(&self, offset: usize, length: usize) -> ArrayRef {
484 Arc::new(self.slice(offset, length))
485 }
486
487 fn len(&self) -> usize {
488 self.value_offsets.len() - 1
489 }
490
491 fn is_empty(&self) -> bool {
492 self.value_offsets.len() <= 1
493 }
494
495 fn shrink_to_fit(&mut self) {
496 self.value_offsets.shrink_to_fit();
497 self.value_data.shrink_to_fit();
498 if let Some(nulls) = &mut self.nulls {
499 nulls.shrink_to_fit();
500 }
501 }
502
503 fn offset(&self) -> usize {
504 0
505 }
506
507 fn nulls(&self) -> Option<&NullBuffer> {
508 self.nulls.as_ref()
509 }
510
511 fn logical_null_count(&self) -> usize {
512 self.null_count()
514 }
515
516 fn get_buffer_memory_size(&self) -> usize {
517 let mut sum = self.value_offsets.inner().inner().capacity();
518 sum += self.value_data.capacity();
519 if let Some(x) = &self.nulls {
520 sum += x.buffer().capacity()
521 }
522 sum
523 }
524
525 fn get_array_memory_size(&self) -> usize {
526 std::mem::size_of::<Self>() + self.get_buffer_memory_size()
527 }
528}
529
530impl<'a, T: ByteArrayType> ArrayAccessor for &'a GenericByteArray<T> {
531 type Item = &'a T::Native;
532
533 fn value(&self, index: usize) -> Self::Item {
534 GenericByteArray::value(self, index)
535 }
536
537 unsafe fn value_unchecked(&self, index: usize) -> Self::Item {
538 unsafe { GenericByteArray::value_unchecked(self, index) }
539 }
540}
541
542impl<T: ByteArrayType> From<ArrayData> for GenericByteArray<T> {
543 fn from(data: ArrayData) -> Self {
544 let (data_type, len, nulls, offset, mut buffers, _child_data) = data.into_parts();
545 assert_eq!(
546 data_type,
547 Self::DATA_TYPE,
548 "{}{}Array expects DataType::{}",
549 T::Offset::PREFIX,
550 T::PREFIX,
551 Self::DATA_TYPE
552 );
553 assert_eq!(
554 buffers.len(),
555 2,
556 "{}{}Array data should contain 2 buffers only (offsets and values)",
557 T::Offset::PREFIX,
558 T::PREFIX,
559 );
560 let value_data = buffers.pop().expect("checked above");
562 let offset_buffer = buffers.pop().expect("checked above");
563
564 let value_offsets = unsafe { get_offsets_from_buffer(offset_buffer, offset, len) };
567 Self {
568 value_offsets,
569 value_data,
570 data_type,
571 nulls,
572 }
573 }
574}
575
576impl<T: ByteArrayType> From<GenericByteArray<T>> for ArrayData {
577 fn from(array: GenericByteArray<T>) -> Self {
578 let len = array.len();
579
580 let offsets = array.value_offsets.into_inner().into_inner();
581 let builder = ArrayDataBuilder::new(array.data_type)
582 .len(len)
583 .buffers(vec![offsets, array.value_data])
584 .nulls(array.nulls);
585
586 unsafe { builder.build_unchecked() }
587 }
588}
589
590impl<'a, T: ByteArrayType> IntoIterator for &'a GenericByteArray<T> {
591 type Item = Option<&'a T::Native>;
592 type IntoIter = ArrayIter<Self>;
593
594 fn into_iter(self) -> Self::IntoIter {
595 ArrayIter::new(self)
596 }
597}
598
599impl<'a, Ptr, T: ByteArrayType> FromIterator<&'a Option<Ptr>> for GenericByteArray<T>
600where
601 Ptr: AsRef<T::Native> + 'a,
602{
603 fn from_iter<I: IntoIterator<Item = &'a Option<Ptr>>>(iter: I) -> Self {
604 iter.into_iter()
605 .map(|o| o.as_ref().map(|p| p.as_ref()))
606 .collect()
607 }
608}
609
610impl<Ptr, T: ByteArrayType> FromIterator<Option<Ptr>> for GenericByteArray<T>
611where
612 Ptr: AsRef<T::Native>,
613{
614 fn from_iter<I: IntoIterator<Item = Option<Ptr>>>(iter: I) -> Self {
615 let iter = iter.into_iter();
616 let mut builder = GenericByteBuilder::with_capacity(iter.size_hint().0, 1024);
617 builder.extend(iter);
618 builder.finish()
619 }
620}
621
622#[cfg(test)]
623mod tests {
624 use crate::{Array, BinaryArray, StringArray};
625 use arrow_buffer::{Buffer, NullBuffer, OffsetBuffer};
626
627 #[test]
628 fn try_new() {
629 let data = Buffer::from_slice_ref("helloworld");
630 let offsets = OffsetBuffer::new(vec![0, 5, 10].into());
631 StringArray::new(offsets.clone(), data.clone(), None);
632
633 let nulls = NullBuffer::new_null(3);
634 let err =
635 StringArray::try_new(offsets.clone(), data.clone(), Some(nulls.clone())).unwrap_err();
636 assert_eq!(
637 err.to_string(),
638 "Invalid argument error: Incorrect length of null buffer for StringArray, expected 2 got 3"
639 );
640
641 let err = BinaryArray::try_new(offsets.clone(), data.clone(), Some(nulls)).unwrap_err();
642 assert_eq!(
643 err.to_string(),
644 "Invalid argument error: Incorrect length of null buffer for BinaryArray, expected 2 got 3"
645 );
646
647 let non_utf8_data = Buffer::from_slice_ref(b"he\xFFloworld");
648 let err = StringArray::try_new(offsets.clone(), non_utf8_data.clone(), None).unwrap_err();
649 assert_eq!(
650 err.to_string(),
651 "Invalid argument error: Encountered non UTF-8 data: invalid utf-8 sequence of 1 bytes from index 2"
652 );
653
654 BinaryArray::new(offsets, non_utf8_data, None);
655
656 let offsets = OffsetBuffer::new(vec![0, 5, 11].into());
657 let err = StringArray::try_new(offsets.clone(), data.clone(), None).unwrap_err();
658 assert_eq!(
659 err.to_string(),
660 "Invalid argument error: Offset of 11 exceeds length of values 10"
661 );
662
663 let err = BinaryArray::try_new(offsets.clone(), data, None).unwrap_err();
664 assert_eq!(
665 err.to_string(),
666 "Invalid argument error: Maximum offset of 11 is larger than values of length 10"
667 );
668
669 let non_ascii_data = Buffer::from_slice_ref("heìloworld");
670 StringArray::new(offsets.clone(), non_ascii_data.clone(), None);
671 BinaryArray::new(offsets, non_ascii_data.clone(), None);
672
673 let offsets = OffsetBuffer::new(vec![0, 3, 10].into());
674 let err = StringArray::try_new(offsets.clone(), non_ascii_data.clone(), None).unwrap_err();
675 assert_eq!(
676 err.to_string(),
677 "Invalid argument error: Split UTF-8 codepoint at offset 3"
678 );
679
680 BinaryArray::new(offsets, non_ascii_data, None);
681 }
682
683 #[test]
684 fn create_repeated() {
685 let arr = BinaryArray::new_repeated(b"hello", 3);
686 assert_eq!(arr.len(), 3);
687 assert_eq!(arr.value(0), b"hello");
688 assert_eq!(arr.value(1), b"hello");
689 assert_eq!(arr.value(2), b"hello");
690
691 let arr = StringArray::new_repeated("world", 2);
692 assert_eq!(arr.len(), 2);
693 assert_eq!(arr.value(0), "world");
694 assert_eq!(arr.value(1), "world");
695 }
696
697 #[test]
698 #[should_panic(expected = "usize overflow")]
699 fn create_repeated_usize_overflow_1() {
700 let _arr = BinaryArray::new_repeated(b"hello", (usize::MAX / "hello".len()) + 1);
701 }
702
703 #[test]
704 #[should_panic(expected = "usize overflow")]
705 fn create_repeated_usize_overflow_2() {
706 let _arr = BinaryArray::new_repeated(b"hello", usize::MAX);
707 }
708
709 #[test]
710 #[should_panic(expected = "offset overflow")]
711 fn create_repeated_i32_offset_overflow_1() {
712 let _arr = BinaryArray::new_repeated(b"hello", usize::MAX / "hello".len());
713 }
714
715 #[test]
716 #[should_panic(expected = "offset overflow")]
717 fn create_repeated_i32_offset_overflow_2() {
718 let _arr = BinaryArray::new_repeated(b"hello", ((i32::MAX as usize) / "hello".len()) + 1);
719 }
720}