1use std::alloc::{Layout, handle_alloc_error};
19use std::mem;
20use std::ptr::NonNull;
21
22use crate::alloc::{ALIGNMENT, Deallocation};
23use crate::{
24 bytes::Bytes,
25 native::{ArrowNativeType, ToByteSlice},
26 util::bit_util,
27};
28
29#[cfg(feature = "pool")]
30use crate::pool::{MemoryPool, MemoryReservation};
31#[cfg(feature = "pool")]
32use std::sync::Mutex;
33
34use super::Buffer;
35
36#[derive(Debug)]
59pub struct MutableBuffer {
60 data: NonNull<u8>,
62 len: usize,
64 layout: Layout,
65
66 #[cfg(feature = "pool")]
68 reservation: Mutex<Option<Box<dyn MemoryReservation>>>,
69}
70
71impl MutableBuffer {
72 #[inline]
76 pub fn new(capacity: usize) -> Self {
77 Self::with_capacity(capacity)
78 }
79
80 #[inline]
87 pub fn with_capacity(capacity: usize) -> Self {
88 let capacity = bit_util::round_upto_multiple_of_64(capacity);
89 let layout = Layout::from_size_align(capacity, ALIGNMENT)
90 .expect("failed to create layout for MutableBuffer");
91 let data = match layout.size() {
92 0 => dangling_ptr(),
93 _ => {
94 let raw_ptr = unsafe { std::alloc::alloc(layout) };
96 NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
97 }
98 };
99 Self {
100 data,
101 len: 0,
102 layout,
103 #[cfg(feature = "pool")]
104 reservation: std::sync::Mutex::new(None),
105 }
106 }
107
108 pub fn from_len_zeroed(len: usize) -> Self {
120 let layout = Layout::from_size_align(len, ALIGNMENT).unwrap();
121 let data = match layout.size() {
122 0 => dangling_ptr(),
123 _ => {
124 let raw_ptr = unsafe { std::alloc::alloc_zeroed(layout) };
126 NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
127 }
128 };
129 Self {
130 data,
131 len,
132 layout,
133 #[cfg(feature = "pool")]
134 reservation: std::sync::Mutex::new(None),
135 }
136 }
137
138 pub(crate) fn from_bytes(bytes: Bytes) -> Result<Self, Bytes> {
140 let layout = match bytes.deallocation() {
141 Deallocation::Standard(layout) => *layout,
142 _ => return Err(bytes),
143 };
144
145 let len = bytes.len();
146 let data = bytes.ptr();
147 #[cfg(feature = "pool")]
148 let reservation = bytes.reservation.lock().unwrap().take();
149 mem::forget(bytes);
150
151 Ok(Self {
152 data,
153 len,
154 layout,
155 #[cfg(feature = "pool")]
156 reservation: Mutex::new(reservation),
157 })
158 }
159
160 pub fn new_null(len: usize) -> Self {
163 let num_bytes = bit_util::ceil(len, 8);
164 MutableBuffer::from_len_zeroed(num_bytes)
165 }
166
167 pub fn with_bitset(mut self, end: usize, val: bool) -> Self {
174 assert!(end <= self.layout.size());
175 let v = if val { 255 } else { 0 };
176 unsafe {
177 std::ptr::write_bytes(self.data.as_ptr(), v, end);
178 self.len = end;
179 }
180 self
181 }
182
183 pub fn set_null_bits(&mut self, start: usize, count: usize) {
189 assert!(
190 start.saturating_add(count) <= self.layout.size(),
191 "range start index {start} and count {count} out of bounds for \
192 buffer of length {}",
193 self.layout.size(),
194 );
195
196 unsafe {
198 std::ptr::write_bytes(self.data.as_ptr().add(start), 0, count);
199 }
200 }
201
202 #[inline(always)]
216 pub fn reserve(&mut self, additional: usize) {
217 let required_cap = self.len + additional;
218 if required_cap > self.layout.size() {
219 let new_capacity = bit_util::round_upto_multiple_of_64(required_cap);
220 let new_capacity = std::cmp::max(new_capacity, self.layout.size() * 2);
221 self.reallocate(new_capacity)
222 }
223 }
224
225 pub fn repeat_slice_n_times<T: ArrowNativeType>(
238 &mut self,
239 slice_to_repeat: &[T],
240 repeat_count: usize,
241 ) {
242 if repeat_count == 0 || slice_to_repeat.is_empty() {
243 return;
244 }
245
246 let bytes_to_repeat = size_of_val(slice_to_repeat);
247
248 self.reserve(repeat_count * bytes_to_repeat);
250
251 let length_before = self.len;
253
254 self.extend_from_slice(slice_to_repeat);
256
257 let added_repeats_length = bytes_to_repeat;
259 assert_eq!(
260 self.len - length_before,
261 added_repeats_length,
262 "should copy exactly the same number of bytes"
263 );
264
265 let mut already_repeated_times = 1;
267
268 while already_repeated_times < repeat_count {
270 let number_of_slices_to_copy =
273 already_repeated_times.min(repeat_count - already_repeated_times);
274 let number_of_bytes_to_copy = number_of_slices_to_copy * bytes_to_repeat;
275
276 unsafe {
277 let src = self.data.as_ptr().add(length_before) as *const u8;
279
280 let dst = self.data.as_ptr().add(self.len);
282
283 std::ptr::copy_nonoverlapping(src, dst, number_of_bytes_to_copy)
285 }
286
287 self.len += number_of_bytes_to_copy;
289
290 already_repeated_times += number_of_slices_to_copy;
291 }
292 }
293
294 #[cold]
295 fn reallocate(&mut self, capacity: usize) {
296 let new_layout = Layout::from_size_align(capacity, self.layout.align()).unwrap();
297 if new_layout.size() == 0 {
298 if self.layout.size() != 0 {
299 unsafe { std::alloc::dealloc(self.as_mut_ptr(), self.layout) };
301 self.layout = new_layout
302 }
303 return;
304 }
305
306 let data = match self.layout.size() {
307 0 => unsafe { std::alloc::alloc(new_layout) },
309 _ => unsafe { std::alloc::realloc(self.as_mut_ptr(), self.layout, capacity) },
311 };
312 self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout));
313 self.layout = new_layout;
314 #[cfg(feature = "pool")]
315 {
316 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
317 reservation.resize(self.layout.size());
318 }
319 }
320 }
321
322 #[inline(always)]
326 pub fn truncate(&mut self, len: usize) {
327 if len > self.len {
328 return;
329 }
330 self.len = len;
331 #[cfg(feature = "pool")]
332 {
333 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
334 reservation.resize(self.len);
335 }
336 }
337 }
338
339 #[inline(always)]
351 pub fn resize(&mut self, new_len: usize, value: u8) {
352 if new_len > self.len {
353 let diff = new_len - self.len;
354 self.reserve(diff);
355 unsafe { self.data.as_ptr().add(self.len).write_bytes(value, diff) };
357 }
358 self.len = new_len;
360 #[cfg(feature = "pool")]
361 {
362 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
363 reservation.resize(self.len);
364 }
365 }
366 }
367
368 pub fn shrink_to_fit(&mut self) {
384 let new_capacity = bit_util::round_upto_multiple_of_64(self.len);
385 if new_capacity < self.layout.size() {
386 self.reallocate(new_capacity)
387 }
388 }
389
390 #[inline]
392 pub const fn is_empty(&self) -> bool {
393 self.len == 0
394 }
395
396 #[inline]
399 pub const fn len(&self) -> usize {
400 self.len
401 }
402
403 #[inline]
407 pub const fn capacity(&self) -> usize {
408 self.layout.size()
409 }
410
411 pub fn clear(&mut self) {
413 self.len = 0
414 }
415
416 pub fn as_slice(&self) -> &[u8] {
418 self
419 }
420
421 pub fn as_slice_mut(&mut self) -> &mut [u8] {
423 self
424 }
425
426 #[inline]
429 pub const fn as_ptr(&self) -> *const u8 {
430 self.data.as_ptr()
431 }
432
433 #[inline]
436 pub fn as_mut_ptr(&mut self) -> *mut u8 {
437 self.data.as_ptr()
438 }
439
440 #[inline]
441 pub(super) fn into_buffer(self) -> Buffer {
442 let bytes = unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(self.layout)) };
443 #[cfg(feature = "pool")]
444 {
445 let reservation = self.reservation.lock().unwrap().take();
446 *bytes.reservation.lock().unwrap() = reservation;
447 }
448 std::mem::forget(self);
449 Buffer::from(bytes)
450 }
451
452 pub fn typed_data_mut<T: ArrowNativeType>(&mut self) -> &mut [T] {
459 let (prefix, offsets, suffix) = unsafe { self.as_slice_mut().align_to_mut::<T>() };
463 assert!(prefix.is_empty() && suffix.is_empty());
464 offsets
465 }
466
467 pub fn typed_data<T: ArrowNativeType>(&self) -> &[T] {
474 let (prefix, offsets, suffix) = unsafe { self.as_slice().align_to::<T>() };
478 assert!(prefix.is_empty() && suffix.is_empty());
479 offsets
480 }
481
482 #[inline]
491 pub fn extend_from_slice<T: ArrowNativeType>(&mut self, items: &[T]) {
492 let additional = mem::size_of_val(items);
493 self.reserve(additional);
494 unsafe {
495 let src = items.as_ptr() as *const u8;
499 let dst = self.data.as_ptr().add(self.len);
500 std::ptr::copy_nonoverlapping(src, dst, additional)
501 }
502 self.len += additional;
503 }
504
505 #[inline]
514 pub fn push<T: ToByteSlice>(&mut self, item: T) {
515 let additional = std::mem::size_of::<T>();
516 self.reserve(additional);
517 unsafe {
518 let src = item.to_byte_slice().as_ptr();
519 let dst = self.data.as_ptr().add(self.len);
520 std::ptr::copy_nonoverlapping(src, dst, additional);
521 }
522 self.len += additional;
523 }
524
525 #[inline]
529 pub unsafe fn push_unchecked<T: ToByteSlice>(&mut self, item: T) {
530 let additional = std::mem::size_of::<T>();
531 let src = item.to_byte_slice().as_ptr();
532 let dst = unsafe { self.data.as_ptr().add(self.len) };
533 unsafe { std::ptr::copy_nonoverlapping(src, dst, additional) };
534 self.len += additional;
535 }
536
537 #[inline]
539 pub fn extend_zeros(&mut self, additional: usize) {
540 self.resize(self.len + additional, 0);
541 }
542
543 #[inline]
546 pub unsafe fn set_len(&mut self, len: usize) {
547 assert!(len <= self.capacity());
548 self.len = len;
549 }
550
551 #[inline]
556 pub fn collect_bool<F: FnMut(usize) -> bool>(len: usize, mut f: F) -> Self {
557 let mut buffer = Self::new(bit_util::ceil(len, 64) * 8);
558
559 let chunks = len / 64;
560 let remainder = len % 64;
561 for chunk in 0..chunks {
562 let mut packed = 0;
563 for bit_idx in 0..64 {
564 let i = bit_idx + chunk * 64;
565 packed |= (f(i) as u64) << bit_idx;
566 }
567
568 unsafe { buffer.push_unchecked(packed) }
570 }
571
572 if remainder != 0 {
573 let mut packed = 0;
574 for bit_idx in 0..remainder {
575 let i = bit_idx + chunks * 64;
576 packed |= (f(i) as u64) << bit_idx;
577 }
578
579 unsafe { buffer.push_unchecked(packed) }
581 }
582
583 buffer.truncate(bit_util::ceil(len, 8));
584 buffer
585 }
586
587 #[cfg(feature = "pool")]
594 pub fn claim(&self, pool: &dyn MemoryPool) {
595 *self.reservation.lock().unwrap() = Some(pool.reserve(self.capacity()));
596 }
597}
598
599#[inline]
603pub(crate) fn dangling_ptr() -> NonNull<u8> {
604 #[cfg(miri)]
608 {
609 unsafe { NonNull::new_unchecked(std::ptr::without_provenance_mut(ALIGNMENT)) }
611 }
612 #[cfg(not(miri))]
613 {
614 unsafe { NonNull::new_unchecked(ALIGNMENT as *mut u8) }
615 }
616}
617
618impl<A: ArrowNativeType> Extend<A> for MutableBuffer {
619 #[inline]
620 fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
621 let iterator = iter.into_iter();
622 self.extend_from_iter(iterator)
623 }
624}
625
626impl<T: ArrowNativeType> From<Vec<T>> for MutableBuffer {
627 fn from(value: Vec<T>) -> Self {
628 let data = unsafe { NonNull::new_unchecked(value.as_ptr() as _) };
631 let len = value.len() * mem::size_of::<T>();
632 let layout = unsafe { Layout::array::<T>(value.capacity()).unwrap_unchecked() };
636 mem::forget(value);
637 Self {
638 data,
639 len,
640 layout,
641 #[cfg(feature = "pool")]
642 reservation: std::sync::Mutex::new(None),
643 }
644 }
645}
646
647impl MutableBuffer {
648 #[inline]
649 pub(super) fn extend_from_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
650 &mut self,
651 mut iterator: I,
652 ) {
653 let item_size = std::mem::size_of::<T>();
654 let (lower, _) = iterator.size_hint();
655 let additional = lower * item_size;
656 self.reserve(additional);
657
658 let mut len = SetLenOnDrop::new(&mut self.len);
660 let mut dst = unsafe { self.data.as_ptr().add(len.local_len) };
661 let capacity = self.layout.size();
662
663 while len.local_len + item_size <= capacity {
664 if let Some(item) = iterator.next() {
665 unsafe {
666 let src = item.to_byte_slice().as_ptr();
667 std::ptr::copy_nonoverlapping(src, dst, item_size);
668 dst = dst.add(item_size);
669 }
670 len.local_len += item_size;
671 } else {
672 break;
673 }
674 }
675 drop(len);
676
677 iterator.for_each(|item| self.push(item));
678 }
679
680 #[inline]
698 pub unsafe fn from_trusted_len_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
699 iterator: I,
700 ) -> Self {
701 let item_size = std::mem::size_of::<T>();
702 let (_, upper) = iterator.size_hint();
703 let upper = upper.expect("from_trusted_len_iter requires an upper limit");
704 let len = upper * item_size;
705
706 let mut buffer = MutableBuffer::new(len);
707
708 let mut dst = buffer.data.as_ptr();
709 for item in iterator {
710 let src = item.to_byte_slice().as_ptr();
712 unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
713 dst = unsafe { dst.add(item_size) };
714 }
715 assert_eq!(
716 unsafe { dst.offset_from(buffer.data.as_ptr()) } as usize,
717 len,
718 "Trusted iterator length was not accurately reported"
719 );
720 buffer.len = len;
721 buffer
722 }
723
724 #[inline]
742 pub unsafe fn from_trusted_len_iter_bool<I: Iterator<Item = bool>>(mut iterator: I) -> Self {
743 let (_, upper) = iterator.size_hint();
744 let len = upper.expect("from_trusted_len_iter requires an upper limit");
745
746 Self::collect_bool(len, |_| iterator.next().unwrap())
747 }
748
749 #[inline]
756 pub unsafe fn try_from_trusted_len_iter<
757 E,
758 T: ArrowNativeType,
759 I: Iterator<Item = Result<T, E>>,
760 >(
761 iterator: I,
762 ) -> Result<Self, E> {
763 let item_size = std::mem::size_of::<T>();
764 let (_, upper) = iterator.size_hint();
765 let upper = upper.expect("try_from_trusted_len_iter requires an upper limit");
766 let len = upper * item_size;
767
768 let mut buffer = MutableBuffer::new(len);
769
770 let mut dst = buffer.data.as_ptr();
771 for item in iterator {
772 let item = item?;
773 let src = item.to_byte_slice().as_ptr();
775 unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
776 dst = unsafe { dst.add(item_size) };
777 }
778 unsafe fn finalize_buffer(dst: *mut u8, buffer: &mut MutableBuffer, len: usize) {
781 unsafe {
782 assert_eq!(
783 dst.offset_from(buffer.data.as_ptr()) as usize,
784 len,
785 "Trusted iterator length was not accurately reported"
786 );
787 buffer.len = len;
788 }
789 }
790 unsafe { finalize_buffer(dst, &mut buffer, len) };
791 Ok(buffer)
792 }
793}
794
795impl Default for MutableBuffer {
796 fn default() -> Self {
797 Self::with_capacity(0)
798 }
799}
800
801impl std::ops::Deref for MutableBuffer {
802 type Target = [u8];
803
804 fn deref(&self) -> &[u8] {
805 unsafe { std::slice::from_raw_parts(self.as_ptr(), self.len) }
806 }
807}
808
809impl std::ops::DerefMut for MutableBuffer {
810 fn deref_mut(&mut self) -> &mut [u8] {
811 unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) }
812 }
813}
814
815impl Drop for MutableBuffer {
816 fn drop(&mut self) {
817 if self.layout.size() != 0 {
818 unsafe { std::alloc::dealloc(self.data.as_ptr() as _, self.layout) };
820 }
821 }
822}
823
824impl PartialEq for MutableBuffer {
825 fn eq(&self, other: &MutableBuffer) -> bool {
826 if self.len != other.len {
827 return false;
828 }
829 if self.layout != other.layout {
830 return false;
831 }
832 self.as_slice() == other.as_slice()
833 }
834}
835
836unsafe impl Sync for MutableBuffer {}
837unsafe impl Send for MutableBuffer {}
838
839struct SetLenOnDrop<'a> {
840 len: &'a mut usize,
841 local_len: usize,
842}
843
844impl<'a> SetLenOnDrop<'a> {
845 #[inline]
846 fn new(len: &'a mut usize) -> Self {
847 SetLenOnDrop {
848 local_len: *len,
849 len,
850 }
851 }
852}
853
854impl Drop for SetLenOnDrop<'_> {
855 #[inline]
856 fn drop(&mut self) {
857 *self.len = self.local_len;
858 }
859}
860
861impl std::iter::FromIterator<bool> for MutableBuffer {
863 fn from_iter<I>(iter: I) -> Self
864 where
865 I: IntoIterator<Item = bool>,
866 {
867 let mut iterator = iter.into_iter();
868 let mut result = {
869 let byte_capacity: usize = iterator.size_hint().0.saturating_add(7) / 8;
870 MutableBuffer::new(byte_capacity)
871 };
872
873 loop {
874 let mut exhausted = false;
875 let mut byte_accum: u8 = 0;
876 let mut mask: u8 = 1;
877
878 while mask != 0 {
880 if let Some(value) = iterator.next() {
881 byte_accum |= match value {
882 true => mask,
883 false => 0,
884 };
885 mask <<= 1;
886 } else {
887 exhausted = true;
888 break;
889 }
890 }
891
892 if exhausted && mask == 1 {
894 break;
895 }
896
897 if result.len() == result.capacity() {
899 let additional_byte_capacity = 1usize.saturating_add(
901 iterator.size_hint().0.saturating_add(7) / 8, );
903 result.reserve(additional_byte_capacity)
904 }
905
906 unsafe { result.push_unchecked(byte_accum) };
908 if exhausted {
909 break;
910 }
911 }
912 result
913 }
914}
915
916impl<T: ArrowNativeType> std::iter::FromIterator<T> for MutableBuffer {
917 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
918 let mut buffer = Self::default();
919 buffer.extend_from_iter(iter.into_iter());
920 buffer
921 }
922}
923
924#[cfg(test)]
925mod tests {
926 use super::*;
927
928 #[test]
929 fn test_mutable_new() {
930 let buf = MutableBuffer::new(63);
931 assert_eq!(64, buf.capacity());
932 assert_eq!(0, buf.len());
933 assert!(buf.is_empty());
934 }
935
936 #[test]
937 fn test_mutable_default() {
938 let buf = MutableBuffer::default();
939 assert_eq!(0, buf.capacity());
940 assert_eq!(0, buf.len());
941 assert!(buf.is_empty());
942
943 let mut buf = MutableBuffer::default();
944 buf.extend_from_slice(b"hello");
945 assert_eq!(5, buf.len());
946 assert_eq!(b"hello", buf.as_slice());
947 }
948
949 #[test]
950 fn test_mutable_extend_from_slice() {
951 let mut buf = MutableBuffer::new(100);
952 buf.extend_from_slice(b"hello");
953 assert_eq!(5, buf.len());
954 assert_eq!(b"hello", buf.as_slice());
955
956 buf.extend_from_slice(b" world");
957 assert_eq!(11, buf.len());
958 assert_eq!(b"hello world", buf.as_slice());
959
960 buf.clear();
961 assert_eq!(0, buf.len());
962 buf.extend_from_slice(b"hello arrow");
963 assert_eq!(11, buf.len());
964 assert_eq!(b"hello arrow", buf.as_slice());
965 }
966
967 #[test]
968 fn mutable_extend_from_iter() {
969 let mut buf = MutableBuffer::new(0);
970 buf.extend(vec![1u32, 2]);
971 assert_eq!(8, buf.len());
972 assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
973
974 buf.extend(vec![3u32, 4]);
975 assert_eq!(16, buf.len());
976 assert_eq!(
977 &[1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0],
978 buf.as_slice()
979 );
980 }
981
982 #[test]
983 fn mutable_extend_from_iter_unaligned_u64() {
984 let mut buf = MutableBuffer::new(16);
985 buf.push(1_u8);
986 buf.extend([1_u64]);
987 assert_eq!(9, buf.len());
988 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
989 }
990
991 #[test]
992 fn mutable_extend_from_slice_unaligned_u64() {
993 let mut buf = MutableBuffer::new(16);
994 buf.extend_from_slice(&[1_u8]);
995 buf.extend_from_slice(&[1_u64]);
996 assert_eq!(9, buf.len());
997 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
998 }
999
1000 #[test]
1001 fn mutable_push_unaligned_u64() {
1002 let mut buf = MutableBuffer::new(16);
1003 buf.push(1_u8);
1004 buf.push(1_u64);
1005 assert_eq!(9, buf.len());
1006 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1007 }
1008
1009 #[test]
1010 fn mutable_push_unchecked_unaligned_u64() {
1011 let mut buf = MutableBuffer::new(16);
1012 unsafe {
1013 buf.push_unchecked(1_u8);
1014 buf.push_unchecked(1_u64);
1015 }
1016 assert_eq!(9, buf.len());
1017 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1018 }
1019
1020 #[test]
1021 fn test_from_trusted_len_iter() {
1022 let iter = vec![1u32, 2].into_iter();
1023 let buf = unsafe { MutableBuffer::from_trusted_len_iter(iter) };
1024 assert_eq!(8, buf.len());
1025 assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
1026 }
1027
1028 #[test]
1029 fn test_mutable_reserve() {
1030 let mut buf = MutableBuffer::new(1);
1031 assert_eq!(64, buf.capacity());
1032
1033 buf.reserve(10);
1035 assert_eq!(64, buf.capacity());
1036
1037 buf.reserve(80);
1038 assert_eq!(128, buf.capacity());
1039
1040 buf.reserve(129);
1041 assert_eq!(256, buf.capacity());
1042 }
1043
1044 #[test]
1045 fn test_mutable_resize() {
1046 let mut buf = MutableBuffer::new(1);
1047 assert_eq!(64, buf.capacity());
1048 assert_eq!(0, buf.len());
1049
1050 buf.resize(20, 0);
1051 assert_eq!(64, buf.capacity());
1052 assert_eq!(20, buf.len());
1053
1054 buf.resize(10, 0);
1055 assert_eq!(64, buf.capacity());
1056 assert_eq!(10, buf.len());
1057
1058 buf.resize(100, 0);
1059 assert_eq!(128, buf.capacity());
1060 assert_eq!(100, buf.len());
1061
1062 buf.resize(30, 0);
1063 assert_eq!(128, buf.capacity());
1064 assert_eq!(30, buf.len());
1065
1066 buf.resize(0, 0);
1067 assert_eq!(128, buf.capacity());
1068 assert_eq!(0, buf.len());
1069 }
1070
1071 #[test]
1072 fn test_mutable_into() {
1073 let mut buf = MutableBuffer::new(1);
1074 buf.extend_from_slice(b"aaaa bbbb cccc dddd");
1075 assert_eq!(19, buf.len());
1076 assert_eq!(64, buf.capacity());
1077 assert_eq!(b"aaaa bbbb cccc dddd", buf.as_slice());
1078
1079 let immutable_buf: Buffer = buf.into();
1080 assert_eq!(19, immutable_buf.len());
1081 assert_eq!(64, immutable_buf.capacity());
1082 assert_eq!(b"aaaa bbbb cccc dddd", immutable_buf.as_slice());
1083 }
1084
1085 #[test]
1086 fn test_mutable_equal() {
1087 let mut buf = MutableBuffer::new(1);
1088 let mut buf2 = MutableBuffer::new(1);
1089
1090 buf.extend_from_slice(&[0xaa]);
1091 buf2.extend_from_slice(&[0xaa, 0xbb]);
1092 assert!(buf != buf2);
1093
1094 buf.extend_from_slice(&[0xbb]);
1095 assert_eq!(buf, buf2);
1096
1097 buf2.reserve(65);
1098 assert!(buf != buf2);
1099 }
1100
1101 #[test]
1102 fn test_mutable_shrink_to_fit() {
1103 let mut buffer = MutableBuffer::new(128);
1104 assert_eq!(buffer.capacity(), 128);
1105 buffer.push(1);
1106 buffer.push(2);
1107
1108 buffer.shrink_to_fit();
1109 assert!(buffer.capacity() >= 64 && buffer.capacity() < 128);
1110 }
1111
1112 #[test]
1113 fn test_mutable_set_null_bits() {
1114 let mut buffer = MutableBuffer::new(8).with_bitset(8, true);
1115
1116 for i in 0..=buffer.capacity() {
1117 buffer.set_null_bits(i, 0);
1118 assert_eq!(buffer[..8], [255; 8][..]);
1119 }
1120
1121 buffer.set_null_bits(1, 4);
1122 assert_eq!(buffer[..8], [255, 0, 0, 0, 0, 255, 255, 255][..]);
1123 }
1124
1125 #[test]
1126 #[should_panic = "out of bounds for buffer of length"]
1127 fn test_mutable_set_null_bits_oob() {
1128 let mut buffer = MutableBuffer::new(64);
1129 buffer.set_null_bits(1, buffer.capacity());
1130 }
1131
1132 #[test]
1133 #[should_panic = "out of bounds for buffer of length"]
1134 fn test_mutable_set_null_bits_oob_by_overflow() {
1135 let mut buffer = MutableBuffer::new(0);
1136 buffer.set_null_bits(1, usize::MAX);
1137 }
1138
1139 #[test]
1140 fn from_iter() {
1141 let buffer = [1u16, 2, 3, 4].into_iter().collect::<MutableBuffer>();
1142 assert_eq!(buffer.len(), 4 * mem::size_of::<u16>());
1143 assert_eq!(buffer.as_slice(), &[1, 0, 2, 0, 3, 0, 4, 0]);
1144 }
1145
1146 #[test]
1147 #[should_panic(expected = "failed to create layout for MutableBuffer: LayoutError")]
1148 fn test_with_capacity_panics_above_max_capacity() {
1149 let max_capacity = isize::MAX as usize - (isize::MAX as usize % ALIGNMENT);
1150 let _ = MutableBuffer::with_capacity(max_capacity + 1);
1151 }
1152
1153 #[cfg(feature = "pool")]
1154 mod pool_tests {
1155 use super::*;
1156 use crate::pool::{MemoryPool, TrackingMemoryPool};
1157
1158 #[test]
1159 fn test_reallocate_with_pool() {
1160 let pool = TrackingMemoryPool::default();
1161 let mut buffer = MutableBuffer::with_capacity(100);
1162 buffer.claim(&pool);
1163
1164 assert_eq!(buffer.capacity(), 128);
1166 assert_eq!(pool.used(), 128);
1167
1168 buffer.reallocate(200);
1170
1171 assert_eq!(buffer.capacity(), 200);
1173 assert_eq!(pool.used(), 200);
1174
1175 buffer.reallocate(50);
1177
1178 assert_eq!(buffer.capacity(), 50);
1180 assert_eq!(pool.used(), 50);
1181 }
1182
1183 #[test]
1184 fn test_truncate_with_pool() {
1185 let pool = TrackingMemoryPool::default();
1186 let mut buffer = MutableBuffer::with_capacity(100);
1187
1188 buffer.resize(80, 1);
1190 assert_eq!(buffer.len(), 80);
1191
1192 buffer.claim(&pool);
1193 assert_eq!(pool.used(), 128);
1194
1195 buffer.truncate(40);
1197 assert_eq!(buffer.len(), 40);
1198 assert_eq!(pool.used(), 40);
1199
1200 buffer.truncate(0);
1202 assert_eq!(buffer.len(), 0);
1203 assert_eq!(pool.used(), 0);
1204 }
1205
1206 #[test]
1207 fn test_resize_with_pool() {
1208 let pool = TrackingMemoryPool::default();
1209 let mut buffer = MutableBuffer::with_capacity(100);
1210 buffer.claim(&pool);
1211
1212 assert_eq!(buffer.len(), 0);
1214 assert_eq!(pool.used(), 128);
1215
1216 buffer.resize(50, 1);
1218 assert_eq!(buffer.len(), 50);
1219 assert_eq!(pool.used(), 50);
1220
1221 buffer.resize(150, 1);
1223 assert_eq!(buffer.len(), 150);
1224 assert_eq!(buffer.capacity(), 256);
1225 assert_eq!(pool.used(), 150);
1226
1227 buffer.resize(30, 1);
1229 assert_eq!(buffer.len(), 30);
1230 assert_eq!(pool.used(), 30);
1231 }
1232
1233 #[test]
1234 fn test_buffer_lifecycle_with_pool() {
1235 let pool = TrackingMemoryPool::default();
1236
1237 let mut mutable = MutableBuffer::with_capacity(100);
1239 mutable.resize(80, 1);
1240 mutable.claim(&pool);
1241
1242 assert_eq!(pool.used(), 128);
1244
1245 let buffer = mutable.into_buffer();
1247
1248 assert_eq!(pool.used(), 128);
1250
1251 drop(buffer);
1253 assert_eq!(pool.used(), 0);
1254 }
1255 }
1256
1257 fn create_expected_repeated_slice<T: ArrowNativeType>(
1258 slice_to_repeat: &[T],
1259 repeat_count: usize,
1260 ) -> Buffer {
1261 let mut expected = MutableBuffer::new(size_of_val(slice_to_repeat) * repeat_count);
1262 for _ in 0..repeat_count {
1263 expected.extend_from_slice(slice_to_repeat);
1265 }
1266 expected.into()
1267 }
1268
1269 fn test_repeat_count<T: ArrowNativeType + PartialEq + std::fmt::Debug>(
1271 repeat_count: usize,
1272 test_data: &[T],
1273 ) {
1274 let mut buffer = MutableBuffer::new(0);
1275 buffer.repeat_slice_n_times(test_data, repeat_count);
1276
1277 let expected = create_expected_repeated_slice(test_data, repeat_count);
1278 let result: Buffer = buffer.into();
1279
1280 assert_eq!(
1281 result,
1282 expected,
1283 "Failed for repeat_count={}, slice_len={}",
1284 repeat_count,
1285 test_data.len()
1286 );
1287 }
1288
1289 #[test]
1290 fn test_repeat_slice_count_edge_cases() {
1291 test_repeat_count(100, &[] as &[i32]);
1293
1294 test_repeat_count(0, &[1i32, 2, 3]);
1296 }
1297
1298 #[test]
1299 fn test_small_repeats_counts() {
1300 let data = &[1u8, 2, 3, 4, 5];
1302
1303 for _ in 1..=10 {
1304 test_repeat_count(2, data);
1305 }
1306 }
1307
1308 #[test]
1309 fn test_different_size_of_i32_repeat_slice() {
1310 let data: &[i32] = &[1, 2, 3];
1311 let data_with_single_item: &[i32] = &[42];
1312
1313 for data in &[data, data_with_single_item] {
1314 for item in 1..=9 {
1315 let base_repeat_count = 2_usize.pow(item);
1316 test_repeat_count(base_repeat_count - 1, data);
1317 test_repeat_count(base_repeat_count, data);
1318 test_repeat_count(base_repeat_count + 1, data);
1319 }
1320 }
1321 }
1322
1323 #[test]
1324 fn test_different_size_of_u8_repeat_slice() {
1325 let data: &[u8] = &[1, 2, 3];
1326 let data_with_single_item: &[u8] = &[10];
1327
1328 for data in &[data, data_with_single_item] {
1329 for item in 1..=9 {
1330 let base_repeat_count = 2_usize.pow(item);
1331 test_repeat_count(base_repeat_count - 1, data);
1332 test_repeat_count(base_repeat_count, data);
1333 test_repeat_count(base_repeat_count + 1, data);
1334 }
1335 }
1336 }
1337
1338 #[test]
1339 fn test_different_size_of_u16_repeat_slice() {
1340 let data: &[u16] = &[1, 2, 3];
1341 let data_with_single_item: &[u16] = &[10];
1342
1343 for data in &[data, data_with_single_item] {
1344 for item in 1..=9 {
1345 let base_repeat_count = 2_usize.pow(item);
1346 test_repeat_count(base_repeat_count - 1, data);
1347 test_repeat_count(base_repeat_count, data);
1348 test_repeat_count(base_repeat_count + 1, data);
1349 }
1350 }
1351 }
1352
1353 #[test]
1354 fn test_various_slice_lengths() {
1355 let repeat_count = 37; test_repeat_count(repeat_count, &[42i32]);
1360
1361 test_repeat_count(repeat_count, &[1i32, 2]);
1363 test_repeat_count(repeat_count, &[1i32, 2, 3]);
1364 test_repeat_count(repeat_count, &[1i32, 2, 3, 4]);
1365 test_repeat_count(repeat_count, &[1i32, 2, 3, 4, 5]);
1366
1367 let data_10: Vec<i32> = (0..10).collect();
1369 test_repeat_count(repeat_count, &data_10);
1370
1371 let data_100: Vec<i32> = (0..100).collect();
1372 test_repeat_count(repeat_count, &data_100);
1373
1374 let data_1000: Vec<i32> = (0..1000).collect();
1375 test_repeat_count(repeat_count, &data_1000);
1376 }
1377}