1use std::alloc::{Layout, handle_alloc_error};
19use std::mem;
20use std::ptr::NonNull;
21
22use crate::alloc::{ALIGNMENT, Deallocation};
23use crate::{
24 bytes::Bytes,
25 native::{ArrowNativeType, ToByteSlice},
26 util::bit_util,
27};
28
29#[cfg(feature = "pool")]
30use crate::pool::{MemoryPool, MemoryReservation};
31#[cfg(feature = "pool")]
32use std::sync::Mutex;
33
34use super::Buffer;
35
36#[derive(Debug)]
62pub struct MutableBuffer {
63 data: NonNull<u8>,
65 len: usize,
67 layout: Layout,
68
69 #[cfg(feature = "pool")]
71 reservation: Mutex<Option<Box<dyn MemoryReservation>>>,
72}
73
74impl MutableBuffer {
75 #[inline]
79 pub fn new(capacity: usize) -> Self {
80 Self::with_capacity(capacity)
81 }
82
83 #[inline]
90 pub fn with_capacity(capacity: usize) -> Self {
91 let capacity = bit_util::round_upto_multiple_of_64(capacity);
92 let layout = Layout::from_size_align(capacity, ALIGNMENT)
93 .expect("failed to create layout for MutableBuffer");
94 let data = match layout.size() {
95 0 => dangling_ptr(),
96 _ => {
97 let raw_ptr = unsafe { std::alloc::alloc(layout) };
99 NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
100 }
101 };
102 Self {
103 data,
104 len: 0,
105 layout,
106 #[cfg(feature = "pool")]
107 reservation: std::sync::Mutex::new(None),
108 }
109 }
110
111 pub fn from_len_zeroed(len: usize) -> Self {
123 let layout = Layout::from_size_align(len, ALIGNMENT).unwrap();
124 let data = match layout.size() {
125 0 => dangling_ptr(),
126 _ => {
127 let raw_ptr = unsafe { std::alloc::alloc_zeroed(layout) };
129 NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
130 }
131 };
132 Self {
133 data,
134 len,
135 layout,
136 #[cfg(feature = "pool")]
137 reservation: std::sync::Mutex::new(None),
138 }
139 }
140
141 pub(crate) fn from_bytes(bytes: Bytes) -> Result<Self, Bytes> {
143 let layout = match bytes.deallocation() {
144 Deallocation::Standard(layout) => *layout,
145 _ => return Err(bytes),
146 };
147
148 let len = bytes.len();
149 let data = bytes.ptr();
150 #[cfg(feature = "pool")]
151 let reservation = bytes.reservation.lock().unwrap().take();
152 mem::forget(bytes);
153
154 Ok(Self {
155 data,
156 len,
157 layout,
158 #[cfg(feature = "pool")]
159 reservation: Mutex::new(reservation),
160 })
161 }
162
163 pub fn new_null(len: usize) -> Self {
166 let num_bytes = bit_util::ceil(len, 8);
167 MutableBuffer::from_len_zeroed(num_bytes)
168 }
169
170 pub fn with_bitset(mut self, end: usize, val: bool) -> Self {
177 assert!(end <= self.layout.size());
178 let v = if val { 255 } else { 0 };
179 unsafe {
180 std::ptr::write_bytes(self.data.as_ptr(), v, end);
181 self.len = end;
182 }
183 self
184 }
185
186 pub fn set_null_bits(&mut self, start: usize, count: usize) {
192 assert!(
193 start.saturating_add(count) <= self.layout.size(),
194 "range start index {start} and count {count} out of bounds for \
195 buffer of length {}",
196 self.layout.size(),
197 );
198
199 unsafe {
201 std::ptr::write_bytes(self.data.as_ptr().add(start), 0, count);
202 }
203 }
204
205 #[inline(always)]
219 pub fn reserve(&mut self, additional: usize) {
220 let required_cap = self.len + additional;
221 if required_cap > self.layout.size() {
222 let new_capacity = bit_util::round_upto_multiple_of_64(required_cap);
223 let new_capacity = std::cmp::max(new_capacity, self.layout.size() * 2);
224 self.reallocate(new_capacity)
225 }
226 }
227
228 pub fn repeat_slice_n_times<T: ArrowNativeType>(
241 &mut self,
242 slice_to_repeat: &[T],
243 repeat_count: usize,
244 ) {
245 if repeat_count == 0 || slice_to_repeat.is_empty() {
246 return;
247 }
248
249 let bytes_to_repeat = size_of_val(slice_to_repeat);
250
251 self.reserve(repeat_count * bytes_to_repeat);
253
254 let length_before = self.len;
256
257 self.extend_from_slice(slice_to_repeat);
259
260 let added_repeats_length = bytes_to_repeat;
262 assert_eq!(
263 self.len - length_before,
264 added_repeats_length,
265 "should copy exactly the same number of bytes"
266 );
267
268 let mut already_repeated_times = 1;
270
271 while already_repeated_times < repeat_count {
273 let number_of_slices_to_copy =
276 already_repeated_times.min(repeat_count - already_repeated_times);
277 let number_of_bytes_to_copy = number_of_slices_to_copy * bytes_to_repeat;
278
279 unsafe {
280 let src = self.data.as_ptr().add(length_before) as *const u8;
282
283 let dst = self.data.as_ptr().add(self.len);
285
286 std::ptr::copy_nonoverlapping(src, dst, number_of_bytes_to_copy)
288 }
289
290 self.len += number_of_bytes_to_copy;
292
293 already_repeated_times += number_of_slices_to_copy;
294 }
295 }
296
297 #[cold]
298 fn reallocate(&mut self, capacity: usize) {
299 let new_layout = Layout::from_size_align(capacity, self.layout.align()).unwrap();
300 if new_layout.size() == 0 {
301 if self.layout.size() != 0 {
302 unsafe { std::alloc::dealloc(self.as_mut_ptr(), self.layout) };
304 self.layout = new_layout
305 }
306 return;
307 }
308
309 let data = match self.layout.size() {
310 0 => unsafe { std::alloc::alloc(new_layout) },
312 _ => unsafe { std::alloc::realloc(self.as_mut_ptr(), self.layout, capacity) },
314 };
315 self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout));
316 self.layout = new_layout;
317 #[cfg(feature = "pool")]
318 {
319 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
320 reservation.resize(self.layout.size());
321 }
322 }
323 }
324
325 #[inline(always)]
329 pub fn truncate(&mut self, len: usize) {
330 if len > self.len {
331 return;
332 }
333 self.len = len;
334 #[cfg(feature = "pool")]
335 {
336 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
337 reservation.resize(self.len);
338 }
339 }
340 }
341
342 #[inline(always)]
354 pub fn resize(&mut self, new_len: usize, value: u8) {
355 if new_len > self.len {
356 let diff = new_len - self.len;
357 self.reserve(diff);
358 unsafe { self.data.as_ptr().add(self.len).write_bytes(value, diff) };
360 }
361 self.len = new_len;
363 #[cfg(feature = "pool")]
364 {
365 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
366 reservation.resize(self.len);
367 }
368 }
369 }
370
371 pub fn shrink_to_fit(&mut self) {
387 let new_capacity = bit_util::round_upto_multiple_of_64(self.len);
388 if new_capacity < self.layout.size() {
389 self.reallocate(new_capacity)
390 }
391 }
392
393 #[inline]
395 pub const fn is_empty(&self) -> bool {
396 self.len == 0
397 }
398
399 #[inline]
402 pub const fn len(&self) -> usize {
403 self.len
404 }
405
406 #[inline]
410 pub const fn capacity(&self) -> usize {
411 self.layout.size()
412 }
413
414 pub fn clear(&mut self) {
416 self.len = 0
417 }
418
419 pub fn as_slice(&self) -> &[u8] {
421 self
422 }
423
424 pub fn as_slice_mut(&mut self) -> &mut [u8] {
426 self
427 }
428
429 #[inline]
432 pub const fn as_ptr(&self) -> *const u8 {
433 self.data.as_ptr()
434 }
435
436 #[inline]
439 pub fn as_mut_ptr(&mut self) -> *mut u8 {
440 self.data.as_ptr()
441 }
442
443 #[inline]
444 pub(super) fn into_buffer(self) -> Buffer {
445 let bytes = unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(self.layout)) };
446 #[cfg(feature = "pool")]
447 {
448 let reservation = self.reservation.lock().unwrap().take();
449 *bytes.reservation.lock().unwrap() = reservation;
450 }
451 std::mem::forget(self);
452 Buffer::from(bytes)
453 }
454
455 pub fn typed_data_mut<T: ArrowNativeType>(&mut self) -> &mut [T] {
462 let (prefix, offsets, suffix) = unsafe { self.as_slice_mut().align_to_mut::<T>() };
466 assert!(prefix.is_empty() && suffix.is_empty());
467 offsets
468 }
469
470 pub fn typed_data<T: ArrowNativeType>(&self) -> &[T] {
477 let (prefix, offsets, suffix) = unsafe { self.as_slice().align_to::<T>() };
481 assert!(prefix.is_empty() && suffix.is_empty());
482 offsets
483 }
484
485 #[inline]
494 pub fn extend_from_slice<T: ArrowNativeType>(&mut self, items: &[T]) {
495 let additional = mem::size_of_val(items);
496 self.reserve(additional);
497 unsafe {
498 let src = items.as_ptr() as *const u8;
502 let dst = self.data.as_ptr().add(self.len);
503 std::ptr::copy_nonoverlapping(src, dst, additional)
504 }
505 self.len += additional;
506 }
507
508 #[inline]
517 pub fn push<T: ToByteSlice>(&mut self, item: T) {
518 let additional = std::mem::size_of::<T>();
519 self.reserve(additional);
520 unsafe {
521 let src = item.to_byte_slice().as_ptr();
522 let dst = self.data.as_ptr().add(self.len);
523 std::ptr::copy_nonoverlapping(src, dst, additional);
524 }
525 self.len += additional;
526 }
527
528 #[inline]
532 pub unsafe fn push_unchecked<T: ToByteSlice>(&mut self, item: T) {
533 let additional = std::mem::size_of::<T>();
534 let src = item.to_byte_slice().as_ptr();
535 let dst = unsafe { self.data.as_ptr().add(self.len) };
536 unsafe { std::ptr::copy_nonoverlapping(src, dst, additional) };
537 self.len += additional;
538 }
539
540 #[inline]
542 pub fn extend_zeros(&mut self, additional: usize) {
543 self.resize(self.len + additional, 0);
544 }
545
546 #[inline]
549 pub unsafe fn set_len(&mut self, len: usize) {
550 assert!(len <= self.capacity());
551 self.len = len;
552 }
553
554 #[inline]
559 pub fn collect_bool<F: FnMut(usize) -> bool>(len: usize, mut f: F) -> Self {
560 let mut buffer: Vec<u64> = Vec::with_capacity(bit_util::ceil(len, 64));
561
562 let chunks = len / 64;
563 let remainder = len % 64;
564 buffer.extend((0..chunks).map(|chunk| {
565 let mut packed = 0;
566 for bit_idx in 0..64 {
567 let i = bit_idx + chunk * 64;
568 packed |= (f(i) as u64) << bit_idx;
569 }
570
571 packed
572 }));
573
574 if remainder != 0 {
575 let mut packed = 0;
576 for bit_idx in 0..remainder {
577 let i = bit_idx + chunks * 64;
578 packed |= (f(i) as u64) << bit_idx;
579 }
580
581 buffer.push(packed)
582 }
583
584 let mut buffer: MutableBuffer = buffer.into();
585 buffer.truncate(bit_util::ceil(len, 8));
586 buffer
587 }
588
589 #[cfg(feature = "pool")]
596 pub fn claim(&self, pool: &dyn MemoryPool) {
597 *self.reservation.lock().unwrap() = Some(pool.reserve(self.capacity()));
598 }
599}
600
601#[inline]
605pub(crate) fn dangling_ptr() -> NonNull<u8> {
606 #[cfg(miri)]
610 {
611 unsafe { NonNull::new_unchecked(std::ptr::without_provenance_mut(ALIGNMENT)) }
613 }
614 #[cfg(not(miri))]
615 {
616 unsafe { NonNull::new_unchecked(ALIGNMENT as *mut u8) }
617 }
618}
619
620impl<A: ArrowNativeType> Extend<A> for MutableBuffer {
621 #[inline]
622 fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
623 let iterator = iter.into_iter();
624 self.extend_from_iter(iterator)
625 }
626}
627
628impl<T: ArrowNativeType> From<Vec<T>> for MutableBuffer {
629 fn from(value: Vec<T>) -> Self {
630 let data = unsafe { NonNull::new_unchecked(value.as_ptr() as _) };
633 let len = value.len() * mem::size_of::<T>();
634 let layout = unsafe { Layout::array::<T>(value.capacity()).unwrap_unchecked() };
638 mem::forget(value);
639 Self {
640 data,
641 len,
642 layout,
643 #[cfg(feature = "pool")]
644 reservation: std::sync::Mutex::new(None),
645 }
646 }
647}
648
649impl MutableBuffer {
650 #[inline]
651 pub(super) fn extend_from_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
652 &mut self,
653 mut iterator: I,
654 ) {
655 let item_size = std::mem::size_of::<T>();
656 let (lower, _) = iterator.size_hint();
657 let additional = lower * item_size;
658 self.reserve(additional);
659
660 let mut len = SetLenOnDrop::new(&mut self.len);
662 let mut dst = unsafe { self.data.as_ptr().add(len.local_len) };
663 let capacity = self.layout.size();
664
665 while len.local_len + item_size <= capacity {
666 if let Some(item) = iterator.next() {
667 unsafe {
668 let src = item.to_byte_slice().as_ptr();
669 std::ptr::copy_nonoverlapping(src, dst, item_size);
670 dst = dst.add(item_size);
671 }
672 len.local_len += item_size;
673 } else {
674 break;
675 }
676 }
677 drop(len);
678
679 iterator.for_each(|item| self.push(item));
680 }
681
682 #[inline]
700 pub unsafe fn from_trusted_len_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
701 iterator: I,
702 ) -> Self {
703 let item_size = std::mem::size_of::<T>();
704 let (_, upper) = iterator.size_hint();
705 let upper = upper.expect("from_trusted_len_iter requires an upper limit");
706 let len = upper * item_size;
707
708 let mut buffer = MutableBuffer::new(len);
709
710 let mut dst = buffer.data.as_ptr();
711 for item in iterator {
712 let src = item.to_byte_slice().as_ptr();
714 unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
715 dst = unsafe { dst.add(item_size) };
716 }
717 assert_eq!(
718 unsafe { dst.offset_from(buffer.data.as_ptr()) } as usize,
719 len,
720 "Trusted iterator length was not accurately reported"
721 );
722 buffer.len = len;
723 buffer
724 }
725
726 #[inline]
744 pub unsafe fn from_trusted_len_iter_bool<I: Iterator<Item = bool>>(mut iterator: I) -> Self {
745 let (_, upper) = iterator.size_hint();
746 let len = upper.expect("from_trusted_len_iter requires an upper limit");
747
748 Self::collect_bool(len, |_| iterator.next().unwrap())
749 }
750
751 #[inline]
758 pub unsafe fn try_from_trusted_len_iter<
759 E,
760 T: ArrowNativeType,
761 I: Iterator<Item = Result<T, E>>,
762 >(
763 iterator: I,
764 ) -> Result<Self, E> {
765 let item_size = std::mem::size_of::<T>();
766 let (_, upper) = iterator.size_hint();
767 let upper = upper.expect("try_from_trusted_len_iter requires an upper limit");
768 let len = upper * item_size;
769
770 let mut buffer = MutableBuffer::new(len);
771
772 let mut dst = buffer.data.as_ptr();
773 for item in iterator {
774 let item = item?;
775 let src = item.to_byte_slice().as_ptr();
777 unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
778 dst = unsafe { dst.add(item_size) };
779 }
780 unsafe fn finalize_buffer(dst: *mut u8, buffer: &mut MutableBuffer, len: usize) {
783 unsafe {
784 assert_eq!(
785 dst.offset_from(buffer.data.as_ptr()) as usize,
786 len,
787 "Trusted iterator length was not accurately reported"
788 );
789 buffer.len = len;
790 }
791 }
792 unsafe { finalize_buffer(dst, &mut buffer, len) };
793 Ok(buffer)
794 }
795}
796
797impl Default for MutableBuffer {
798 fn default() -> Self {
799 Self::with_capacity(0)
800 }
801}
802
803impl std::ops::Deref for MutableBuffer {
804 type Target = [u8];
805
806 fn deref(&self) -> &[u8] {
807 unsafe { std::slice::from_raw_parts(self.as_ptr(), self.len) }
808 }
809}
810
811impl std::ops::DerefMut for MutableBuffer {
812 fn deref_mut(&mut self) -> &mut [u8] {
813 unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) }
814 }
815}
816
817impl AsRef<[u8]> for &MutableBuffer {
818 fn as_ref(&self) -> &[u8] {
819 self.as_slice()
820 }
821}
822
823impl Drop for MutableBuffer {
824 fn drop(&mut self) {
825 if self.layout.size() != 0 {
826 unsafe { std::alloc::dealloc(self.data.as_ptr() as _, self.layout) };
828 }
829 }
830}
831
832impl PartialEq for MutableBuffer {
833 fn eq(&self, other: &MutableBuffer) -> bool {
834 if self.len != other.len {
835 return false;
836 }
837 if self.layout != other.layout {
838 return false;
839 }
840 self.as_slice() == other.as_slice()
841 }
842}
843
844unsafe impl Sync for MutableBuffer {}
845unsafe impl Send for MutableBuffer {}
846
847struct SetLenOnDrop<'a> {
848 len: &'a mut usize,
849 local_len: usize,
850}
851
852impl<'a> SetLenOnDrop<'a> {
853 #[inline]
854 fn new(len: &'a mut usize) -> Self {
855 SetLenOnDrop {
856 local_len: *len,
857 len,
858 }
859 }
860}
861
862impl Drop for SetLenOnDrop<'_> {
863 #[inline]
864 fn drop(&mut self) {
865 *self.len = self.local_len;
866 }
867}
868
869impl std::iter::FromIterator<bool> for MutableBuffer {
871 fn from_iter<I>(iter: I) -> Self
872 where
873 I: IntoIterator<Item = bool>,
874 {
875 let mut iterator = iter.into_iter();
876 let mut result = {
877 let byte_capacity: usize = iterator.size_hint().0.saturating_add(7) / 8;
878 MutableBuffer::new(byte_capacity)
879 };
880
881 loop {
882 let mut exhausted = false;
883 let mut byte_accum: u8 = 0;
884 let mut mask: u8 = 1;
885
886 while mask != 0 {
888 if let Some(value) = iterator.next() {
889 byte_accum |= match value {
890 true => mask,
891 false => 0,
892 };
893 mask <<= 1;
894 } else {
895 exhausted = true;
896 break;
897 }
898 }
899
900 if exhausted && mask == 1 {
902 break;
903 }
904
905 if result.len() == result.capacity() {
907 let additional_byte_capacity = 1usize.saturating_add(
909 iterator.size_hint().0.saturating_add(7) / 8, );
911 result.reserve(additional_byte_capacity)
912 }
913
914 unsafe { result.push_unchecked(byte_accum) };
916 if exhausted {
917 break;
918 }
919 }
920 result
921 }
922}
923
924impl<T: ArrowNativeType> std::iter::FromIterator<T> for MutableBuffer {
925 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
926 let mut buffer = Self::default();
927 buffer.extend_from_iter(iter.into_iter());
928 buffer
929 }
930}
931
932#[cfg(test)]
933mod tests {
934 use super::*;
935
936 #[test]
937 fn test_mutable_new() {
938 let buf = MutableBuffer::new(63);
939 assert_eq!(64, buf.capacity());
940 assert_eq!(0, buf.len());
941 assert!(buf.is_empty());
942 }
943
944 #[test]
945 fn test_mutable_default() {
946 let buf = MutableBuffer::default();
947 assert_eq!(0, buf.capacity());
948 assert_eq!(0, buf.len());
949 assert!(buf.is_empty());
950
951 let mut buf = MutableBuffer::default();
952 buf.extend_from_slice(b"hello");
953 assert_eq!(5, buf.len());
954 assert_eq!(b"hello", buf.as_slice());
955 }
956
957 #[test]
958 fn test_mutable_extend_from_slice() {
959 let mut buf = MutableBuffer::new(100);
960 buf.extend_from_slice(b"hello");
961 assert_eq!(5, buf.len());
962 assert_eq!(b"hello", buf.as_slice());
963
964 buf.extend_from_slice(b" world");
965 assert_eq!(11, buf.len());
966 assert_eq!(b"hello world", buf.as_slice());
967
968 buf.clear();
969 assert_eq!(0, buf.len());
970 buf.extend_from_slice(b"hello arrow");
971 assert_eq!(11, buf.len());
972 assert_eq!(b"hello arrow", buf.as_slice());
973 }
974
975 #[test]
976 fn mutable_extend_from_iter() {
977 let mut buf = MutableBuffer::new(0);
978 buf.extend(vec![1u32, 2]);
979 assert_eq!(8, buf.len());
980 assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
981
982 buf.extend(vec![3u32, 4]);
983 assert_eq!(16, buf.len());
984 assert_eq!(
985 &[1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0],
986 buf.as_slice()
987 );
988 }
989
990 #[test]
991 fn mutable_extend_from_iter_unaligned_u64() {
992 let mut buf = MutableBuffer::new(16);
993 buf.push(1_u8);
994 buf.extend([1_u64]);
995 assert_eq!(9, buf.len());
996 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
997 }
998
999 #[test]
1000 fn mutable_extend_from_slice_unaligned_u64() {
1001 let mut buf = MutableBuffer::new(16);
1002 buf.extend_from_slice(&[1_u8]);
1003 buf.extend_from_slice(&[1_u64]);
1004 assert_eq!(9, buf.len());
1005 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1006 }
1007
1008 #[test]
1009 fn mutable_push_unaligned_u64() {
1010 let mut buf = MutableBuffer::new(16);
1011 buf.push(1_u8);
1012 buf.push(1_u64);
1013 assert_eq!(9, buf.len());
1014 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1015 }
1016
1017 #[test]
1018 fn mutable_push_unchecked_unaligned_u64() {
1019 let mut buf = MutableBuffer::new(16);
1020 unsafe {
1021 buf.push_unchecked(1_u8);
1022 buf.push_unchecked(1_u64);
1023 }
1024 assert_eq!(9, buf.len());
1025 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1026 }
1027
1028 #[test]
1029 fn test_from_trusted_len_iter() {
1030 let iter = vec![1u32, 2].into_iter();
1031 let buf = unsafe { MutableBuffer::from_trusted_len_iter(iter) };
1032 assert_eq!(8, buf.len());
1033 assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
1034 }
1035
1036 #[test]
1037 fn test_mutable_reserve() {
1038 let mut buf = MutableBuffer::new(1);
1039 assert_eq!(64, buf.capacity());
1040
1041 buf.reserve(10);
1043 assert_eq!(64, buf.capacity());
1044
1045 buf.reserve(80);
1046 assert_eq!(128, buf.capacity());
1047
1048 buf.reserve(129);
1049 assert_eq!(256, buf.capacity());
1050 }
1051
1052 #[test]
1053 fn test_mutable_resize() {
1054 let mut buf = MutableBuffer::new(1);
1055 assert_eq!(64, buf.capacity());
1056 assert_eq!(0, buf.len());
1057
1058 buf.resize(20, 0);
1059 assert_eq!(64, buf.capacity());
1060 assert_eq!(20, buf.len());
1061
1062 buf.resize(10, 0);
1063 assert_eq!(64, buf.capacity());
1064 assert_eq!(10, buf.len());
1065
1066 buf.resize(100, 0);
1067 assert_eq!(128, buf.capacity());
1068 assert_eq!(100, buf.len());
1069
1070 buf.resize(30, 0);
1071 assert_eq!(128, buf.capacity());
1072 assert_eq!(30, buf.len());
1073
1074 buf.resize(0, 0);
1075 assert_eq!(128, buf.capacity());
1076 assert_eq!(0, buf.len());
1077 }
1078
1079 #[test]
1080 fn test_mutable_into() {
1081 let mut buf = MutableBuffer::new(1);
1082 buf.extend_from_slice(b"aaaa bbbb cccc dddd");
1083 assert_eq!(19, buf.len());
1084 assert_eq!(64, buf.capacity());
1085 assert_eq!(b"aaaa bbbb cccc dddd", buf.as_slice());
1086
1087 let immutable_buf: Buffer = buf.into();
1088 assert_eq!(19, immutable_buf.len());
1089 assert_eq!(64, immutable_buf.capacity());
1090 assert_eq!(b"aaaa bbbb cccc dddd", immutable_buf.as_slice());
1091 }
1092
1093 #[test]
1094 fn test_mutable_equal() {
1095 let mut buf = MutableBuffer::new(1);
1096 let mut buf2 = MutableBuffer::new(1);
1097
1098 buf.extend_from_slice(&[0xaa]);
1099 buf2.extend_from_slice(&[0xaa, 0xbb]);
1100 assert!(buf != buf2);
1101
1102 buf.extend_from_slice(&[0xbb]);
1103 assert_eq!(buf, buf2);
1104
1105 buf2.reserve(65);
1106 assert!(buf != buf2);
1107 }
1108
1109 #[test]
1110 fn test_mutable_shrink_to_fit() {
1111 let mut buffer = MutableBuffer::new(128);
1112 assert_eq!(buffer.capacity(), 128);
1113 buffer.push(1);
1114 buffer.push(2);
1115
1116 buffer.shrink_to_fit();
1117 assert!(buffer.capacity() >= 64 && buffer.capacity() < 128);
1118 }
1119
1120 #[test]
1121 fn test_mutable_set_null_bits() {
1122 let mut buffer = MutableBuffer::new(8).with_bitset(8, true);
1123
1124 for i in 0..=buffer.capacity() {
1125 buffer.set_null_bits(i, 0);
1126 assert_eq!(buffer[..8], [255; 8][..]);
1127 }
1128
1129 buffer.set_null_bits(1, 4);
1130 assert_eq!(buffer[..8], [255, 0, 0, 0, 0, 255, 255, 255][..]);
1131 }
1132
1133 #[test]
1134 #[should_panic = "out of bounds for buffer of length"]
1135 fn test_mutable_set_null_bits_oob() {
1136 let mut buffer = MutableBuffer::new(64);
1137 buffer.set_null_bits(1, buffer.capacity());
1138 }
1139
1140 #[test]
1141 #[should_panic = "out of bounds for buffer of length"]
1142 fn test_mutable_set_null_bits_oob_by_overflow() {
1143 let mut buffer = MutableBuffer::new(0);
1144 buffer.set_null_bits(1, usize::MAX);
1145 }
1146
1147 #[test]
1148 fn from_iter() {
1149 let buffer = [1u16, 2, 3, 4].into_iter().collect::<MutableBuffer>();
1150 assert_eq!(buffer.len(), 4 * mem::size_of::<u16>());
1151 assert_eq!(buffer.as_slice(), &[1, 0, 2, 0, 3, 0, 4, 0]);
1152 }
1153
1154 #[test]
1155 #[should_panic(expected = "failed to create layout for MutableBuffer: LayoutError")]
1156 fn test_with_capacity_panics_above_max_capacity() {
1157 let max_capacity = isize::MAX as usize - (isize::MAX as usize % ALIGNMENT);
1158 let _ = MutableBuffer::with_capacity(max_capacity + 1);
1159 }
1160
1161 #[cfg(feature = "pool")]
1162 mod pool_tests {
1163 use super::*;
1164 use crate::pool::{MemoryPool, TrackingMemoryPool};
1165
1166 #[test]
1167 fn test_reallocate_with_pool() {
1168 let pool = TrackingMemoryPool::default();
1169 let mut buffer = MutableBuffer::with_capacity(100);
1170 buffer.claim(&pool);
1171
1172 assert_eq!(buffer.capacity(), 128);
1174 assert_eq!(pool.used(), 128);
1175
1176 buffer.reallocate(200);
1178
1179 assert_eq!(buffer.capacity(), 200);
1181 assert_eq!(pool.used(), 200);
1182
1183 buffer.reallocate(50);
1185
1186 assert_eq!(buffer.capacity(), 50);
1188 assert_eq!(pool.used(), 50);
1189 }
1190
1191 #[test]
1192 fn test_truncate_with_pool() {
1193 let pool = TrackingMemoryPool::default();
1194 let mut buffer = MutableBuffer::with_capacity(100);
1195
1196 buffer.resize(80, 1);
1198 assert_eq!(buffer.len(), 80);
1199
1200 buffer.claim(&pool);
1201 assert_eq!(pool.used(), 128);
1202
1203 buffer.truncate(40);
1205 assert_eq!(buffer.len(), 40);
1206 assert_eq!(pool.used(), 40);
1207
1208 buffer.truncate(0);
1210 assert_eq!(buffer.len(), 0);
1211 assert_eq!(pool.used(), 0);
1212 }
1213
1214 #[test]
1215 fn test_resize_with_pool() {
1216 let pool = TrackingMemoryPool::default();
1217 let mut buffer = MutableBuffer::with_capacity(100);
1218 buffer.claim(&pool);
1219
1220 assert_eq!(buffer.len(), 0);
1222 assert_eq!(pool.used(), 128);
1223
1224 buffer.resize(50, 1);
1226 assert_eq!(buffer.len(), 50);
1227 assert_eq!(pool.used(), 50);
1228
1229 buffer.resize(150, 1);
1231 assert_eq!(buffer.len(), 150);
1232 assert_eq!(buffer.capacity(), 256);
1233 assert_eq!(pool.used(), 150);
1234
1235 buffer.resize(30, 1);
1237 assert_eq!(buffer.len(), 30);
1238 assert_eq!(pool.used(), 30);
1239 }
1240
1241 #[test]
1242 fn test_buffer_lifecycle_with_pool() {
1243 let pool = TrackingMemoryPool::default();
1244
1245 let mut mutable = MutableBuffer::with_capacity(100);
1247 mutable.resize(80, 1);
1248 mutable.claim(&pool);
1249
1250 assert_eq!(pool.used(), 128);
1252
1253 let buffer = mutable.into_buffer();
1255
1256 assert_eq!(pool.used(), 128);
1258
1259 drop(buffer);
1261 assert_eq!(pool.used(), 0);
1262 }
1263 }
1264
1265 fn create_expected_repeated_slice<T: ArrowNativeType>(
1266 slice_to_repeat: &[T],
1267 repeat_count: usize,
1268 ) -> Buffer {
1269 let mut expected = MutableBuffer::new(size_of_val(slice_to_repeat) * repeat_count);
1270 for _ in 0..repeat_count {
1271 expected.extend_from_slice(slice_to_repeat);
1273 }
1274 expected.into()
1275 }
1276
1277 fn test_repeat_count<T: ArrowNativeType + PartialEq + std::fmt::Debug>(
1279 repeat_count: usize,
1280 test_data: &[T],
1281 ) {
1282 let mut buffer = MutableBuffer::new(0);
1283 buffer.repeat_slice_n_times(test_data, repeat_count);
1284
1285 let expected = create_expected_repeated_slice(test_data, repeat_count);
1286 let result: Buffer = buffer.into();
1287
1288 assert_eq!(
1289 result,
1290 expected,
1291 "Failed for repeat_count={}, slice_len={}",
1292 repeat_count,
1293 test_data.len()
1294 );
1295 }
1296
1297 #[test]
1298 fn test_repeat_slice_count_edge_cases() {
1299 test_repeat_count(100, &[] as &[i32]);
1301
1302 test_repeat_count(0, &[1i32, 2, 3]);
1304 }
1305
1306 #[test]
1307 fn test_small_repeats_counts() {
1308 let data = &[1u8, 2, 3, 4, 5];
1310
1311 for _ in 1..=10 {
1312 test_repeat_count(2, data);
1313 }
1314 }
1315
1316 #[test]
1317 fn test_different_size_of_i32_repeat_slice() {
1318 let data: &[i32] = &[1, 2, 3];
1319 let data_with_single_item: &[i32] = &[42];
1320
1321 for data in &[data, data_with_single_item] {
1322 for item in 1..=9 {
1323 let base_repeat_count = 2_usize.pow(item);
1324 test_repeat_count(base_repeat_count - 1, data);
1325 test_repeat_count(base_repeat_count, data);
1326 test_repeat_count(base_repeat_count + 1, data);
1327 }
1328 }
1329 }
1330
1331 #[test]
1332 fn test_different_size_of_u8_repeat_slice() {
1333 let data: &[u8] = &[1, 2, 3];
1334 let data_with_single_item: &[u8] = &[10];
1335
1336 for data in &[data, data_with_single_item] {
1337 for item in 1..=9 {
1338 let base_repeat_count = 2_usize.pow(item);
1339 test_repeat_count(base_repeat_count - 1, data);
1340 test_repeat_count(base_repeat_count, data);
1341 test_repeat_count(base_repeat_count + 1, data);
1342 }
1343 }
1344 }
1345
1346 #[test]
1347 fn test_different_size_of_u16_repeat_slice() {
1348 let data: &[u16] = &[1, 2, 3];
1349 let data_with_single_item: &[u16] = &[10];
1350
1351 for data in &[data, data_with_single_item] {
1352 for item in 1..=9 {
1353 let base_repeat_count = 2_usize.pow(item);
1354 test_repeat_count(base_repeat_count - 1, data);
1355 test_repeat_count(base_repeat_count, data);
1356 test_repeat_count(base_repeat_count + 1, data);
1357 }
1358 }
1359 }
1360
1361 #[test]
1362 fn test_various_slice_lengths() {
1363 let repeat_count = 37; test_repeat_count(repeat_count, &[42i32]);
1368
1369 test_repeat_count(repeat_count, &[1i32, 2]);
1371 test_repeat_count(repeat_count, &[1i32, 2, 3]);
1372 test_repeat_count(repeat_count, &[1i32, 2, 3, 4]);
1373 test_repeat_count(repeat_count, &[1i32, 2, 3, 4, 5]);
1374
1375 let data_10: Vec<i32> = (0..10).collect();
1377 test_repeat_count(repeat_count, &data_10);
1378
1379 let data_100: Vec<i32> = (0..100).collect();
1380 test_repeat_count(repeat_count, &data_100);
1381
1382 let data_1000: Vec<i32> = (0..1000).collect();
1383 test_repeat_count(repeat_count, &data_1000);
1384 }
1385}