1use std::alloc::{Layout, handle_alloc_error};
19use std::mem;
20use std::ptr::NonNull;
21
22use crate::alloc::{ALIGNMENT, Deallocation};
23use crate::{
24 bytes::Bytes,
25 native::{ArrowNativeType, ToByteSlice},
26 util::bit_util,
27};
28
29#[cfg(feature = "pool")]
30use crate::pool::{MemoryPool, MemoryReservation};
31#[cfg(feature = "pool")]
32use std::sync::Mutex;
33
34use super::Buffer;
35
36#[derive(Debug)]
99pub struct MutableBuffer {
100 data: NonNull<u8>,
102 len: usize,
104 layout: Layout,
105
106 #[cfg(feature = "pool")]
108 reservation: Mutex<Option<Box<dyn MemoryReservation>>>,
109}
110
111impl MutableBuffer {
112 #[inline]
116 pub fn new(capacity: usize) -> Self {
117 Self::with_capacity(capacity)
118 }
119
120 #[inline]
127 pub fn with_capacity(capacity: usize) -> Self {
128 let capacity = bit_util::round_upto_multiple_of_64(capacity);
129 let layout = Layout::from_size_align(capacity, ALIGNMENT)
130 .expect("failed to create layout for MutableBuffer");
131 let data = match layout.size() {
132 0 => dangling_ptr(),
133 _ => {
134 let raw_ptr = unsafe { std::alloc::alloc(layout) };
136 NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
137 }
138 };
139 Self {
140 data,
141 len: 0,
142 layout,
143 #[cfg(feature = "pool")]
144 reservation: std::sync::Mutex::new(None),
145 }
146 }
147
148 pub fn from_len_zeroed(len: usize) -> Self {
160 let layout = Layout::from_size_align(len, ALIGNMENT).unwrap();
161 let data = match layout.size() {
162 0 => dangling_ptr(),
163 _ => {
164 let raw_ptr = unsafe { std::alloc::alloc_zeroed(layout) };
166 NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
167 }
168 };
169 Self {
170 data,
171 len,
172 layout,
173 #[cfg(feature = "pool")]
174 reservation: std::sync::Mutex::new(None),
175 }
176 }
177
178 pub(crate) fn from_bytes(bytes: Bytes) -> Result<Self, Bytes> {
180 let layout = match bytes.deallocation() {
181 Deallocation::Standard(layout) => *layout,
182 _ => return Err(bytes),
183 };
184
185 let len = bytes.len();
186 let data = bytes.ptr();
187 #[cfg(feature = "pool")]
188 let reservation = bytes.reservation.lock().unwrap().take();
189 mem::forget(bytes);
190
191 Ok(Self {
192 data,
193 len,
194 layout,
195 #[cfg(feature = "pool")]
196 reservation: Mutex::new(reservation),
197 })
198 }
199
200 pub fn new_null(len: usize) -> Self {
203 let num_bytes = bit_util::ceil(len, 8);
204 MutableBuffer::from_len_zeroed(num_bytes)
205 }
206
207 pub fn with_bitset(mut self, end: usize, val: bool) -> Self {
214 assert!(end <= self.layout.size());
215 let v = if val { 255 } else { 0 };
216 unsafe {
217 std::ptr::write_bytes(self.data.as_ptr(), v, end);
218 self.len = end;
219 }
220 self
221 }
222
223 pub fn set_null_bits(&mut self, start: usize, count: usize) {
229 assert!(
230 start.saturating_add(count) <= self.layout.size(),
231 "range start index {start} and count {count} out of bounds for \
232 buffer of length {}",
233 self.layout.size(),
234 );
235
236 unsafe {
238 std::ptr::write_bytes(self.data.as_ptr().add(start), 0, count);
239 }
240 }
241
242 #[inline(always)]
256 pub fn reserve(&mut self, additional: usize) {
257 let required_cap = self.len + additional;
258 if required_cap > self.layout.size() {
259 let new_capacity = bit_util::round_upto_multiple_of_64(required_cap);
260 let new_capacity = std::cmp::max(new_capacity, self.layout.size() * 2);
261 self.reallocate(new_capacity)
262 }
263 }
264
265 pub fn repeat_slice_n_times<T: ArrowNativeType>(
278 &mut self,
279 slice_to_repeat: &[T],
280 repeat_count: usize,
281 ) {
282 if repeat_count == 0 || slice_to_repeat.is_empty() {
283 return;
284 }
285
286 let bytes_to_repeat = size_of_val(slice_to_repeat);
287
288 self.reserve(repeat_count * bytes_to_repeat);
290
291 let length_before = self.len;
293
294 self.extend_from_slice(slice_to_repeat);
296
297 let added_repeats_length = bytes_to_repeat;
299 assert_eq!(
300 self.len - length_before,
301 added_repeats_length,
302 "should copy exactly the same number of bytes"
303 );
304
305 let mut already_repeated_times = 1;
307
308 while already_repeated_times < repeat_count {
310 let number_of_slices_to_copy =
313 already_repeated_times.min(repeat_count - already_repeated_times);
314 let number_of_bytes_to_copy = number_of_slices_to_copy * bytes_to_repeat;
315
316 unsafe {
317 let src = self.data.as_ptr().add(length_before) as *const u8;
319
320 let dst = self.data.as_ptr().add(self.len);
322
323 std::ptr::copy_nonoverlapping(src, dst, number_of_bytes_to_copy)
325 }
326
327 self.len += number_of_bytes_to_copy;
329
330 already_repeated_times += number_of_slices_to_copy;
331 }
332 }
333
334 #[cold]
335 fn reallocate(&mut self, capacity: usize) {
336 let new_layout = Layout::from_size_align(capacity, self.layout.align()).unwrap();
337 if new_layout.size() == 0 {
338 if self.layout.size() != 0 {
339 unsafe { std::alloc::dealloc(self.as_mut_ptr(), self.layout) };
341 self.layout = new_layout
342 }
343 return;
344 }
345
346 let data = match self.layout.size() {
347 0 => unsafe { std::alloc::alloc(new_layout) },
349 _ => unsafe { std::alloc::realloc(self.as_mut_ptr(), self.layout, capacity) },
351 };
352 self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout));
353 self.layout = new_layout;
354 #[cfg(feature = "pool")]
355 {
356 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
357 reservation.resize(self.layout.size());
358 }
359 }
360 }
361
362 #[inline(always)]
366 pub fn truncate(&mut self, len: usize) {
367 if len > self.len {
368 return;
369 }
370 self.len = len;
371 #[cfg(feature = "pool")]
372 {
373 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
374 reservation.resize(self.len);
375 }
376 }
377 }
378
379 #[inline(always)]
391 pub fn resize(&mut self, new_len: usize, value: u8) {
392 if new_len > self.len {
393 let diff = new_len - self.len;
394 self.reserve(diff);
395 unsafe { self.data.as_ptr().add(self.len).write_bytes(value, diff) };
397 }
398 self.len = new_len;
400 #[cfg(feature = "pool")]
401 {
402 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
403 reservation.resize(self.len);
404 }
405 }
406 }
407
408 pub fn shrink_to_fit(&mut self) {
424 let new_capacity = bit_util::round_upto_multiple_of_64(self.len);
425 if new_capacity < self.layout.size() {
426 self.reallocate(new_capacity)
427 }
428 }
429
430 #[inline]
432 pub const fn is_empty(&self) -> bool {
433 self.len == 0
434 }
435
436 #[inline]
439 pub const fn len(&self) -> usize {
440 self.len
441 }
442
443 #[inline]
447 pub const fn capacity(&self) -> usize {
448 self.layout.size()
449 }
450
451 pub fn clear(&mut self) {
453 self.len = 0;
454 #[cfg(feature = "pool")]
455 {
456 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
457 reservation.resize(self.len);
458 }
459 }
460 }
461
462 pub fn as_slice(&self) -> &[u8] {
464 self
465 }
466
467 pub fn as_slice_mut(&mut self) -> &mut [u8] {
469 self
470 }
471
472 #[inline]
475 pub const fn as_ptr(&self) -> *const u8 {
476 self.data.as_ptr()
477 }
478
479 #[inline]
482 pub fn as_mut_ptr(&mut self) -> *mut u8 {
483 self.data.as_ptr()
484 }
485
486 #[inline]
487 pub(super) fn into_buffer(self) -> Buffer {
488 let bytes = unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(self.layout)) };
489 #[cfg(feature = "pool")]
490 {
491 let reservation = self.reservation.lock().unwrap().take();
492 *bytes.reservation.lock().unwrap() = reservation;
493 }
494 std::mem::forget(self);
495 Buffer::from(bytes)
496 }
497
498 pub fn typed_data_mut<T: ArrowNativeType>(&mut self) -> &mut [T] {
505 let (prefix, offsets, suffix) = unsafe { self.as_slice_mut().align_to_mut::<T>() };
509 assert!(prefix.is_empty() && suffix.is_empty());
510 offsets
511 }
512
513 pub fn typed_data<T: ArrowNativeType>(&self) -> &[T] {
520 let (prefix, offsets, suffix) = unsafe { self.as_slice().align_to::<T>() };
524 assert!(prefix.is_empty() && suffix.is_empty());
525 offsets
526 }
527
528 #[inline]
537 pub fn extend_from_slice<T: ArrowNativeType>(&mut self, items: &[T]) {
538 let additional = mem::size_of_val(items);
539 self.reserve(additional);
540 unsafe {
541 let src = items.as_ptr() as *const u8;
545 let dst = self.data.as_ptr().add(self.len);
546 std::ptr::copy_nonoverlapping(src, dst, additional)
547 }
548 self.len += additional;
549 }
550
551 #[inline]
560 pub fn push<T: ToByteSlice>(&mut self, item: T) {
561 let additional = std::mem::size_of::<T>();
562 self.reserve(additional);
563 unsafe {
564 let src = item.to_byte_slice().as_ptr();
565 let dst = self.data.as_ptr().add(self.len);
566 std::ptr::copy_nonoverlapping(src, dst, additional);
567 }
568 self.len += additional;
569 }
570
571 #[inline]
575 pub unsafe fn push_unchecked<T: ToByteSlice>(&mut self, item: T) {
576 let additional = std::mem::size_of::<T>();
577 let src = item.to_byte_slice().as_ptr();
578 let dst = unsafe { self.data.as_ptr().add(self.len) };
579 unsafe { std::ptr::copy_nonoverlapping(src, dst, additional) };
580 self.len += additional;
581 }
582
583 #[inline]
585 pub fn extend_zeros(&mut self, additional: usize) {
586 self.resize(self.len + additional, 0);
587 }
588
589 #[inline]
592 pub unsafe fn set_len(&mut self, len: usize) {
593 assert!(len <= self.capacity());
594 self.len = len;
595 }
596
597 #[inline]
602 pub fn collect_bool<F: FnMut(usize) -> bool>(len: usize, mut f: F) -> Self {
603 let mut buffer: Vec<u64> = Vec::with_capacity(bit_util::ceil(len, 64));
604
605 let chunks = len / 64;
606 let remainder = len % 64;
607 buffer.extend((0..chunks).map(|chunk| {
608 let mut packed = 0;
609 for bit_idx in 0..64 {
610 let i = bit_idx + chunk * 64;
611 packed |= (f(i) as u64) << bit_idx;
612 }
613
614 packed
615 }));
616
617 if remainder != 0 {
618 let mut packed = 0;
619 for bit_idx in 0..remainder {
620 let i = bit_idx + chunks * 64;
621 packed |= (f(i) as u64) << bit_idx;
622 }
623
624 buffer.push(packed)
625 }
626
627 let mut buffer: MutableBuffer = buffer.into();
628 buffer.truncate(bit_util::ceil(len, 8));
629 buffer
630 }
631
632 #[inline]
641 pub unsafe fn extend_bool_trusted_len<I: Iterator<Item = bool>>(
642 &mut self,
643 mut iter: I,
644 offset: usize,
645 ) {
646 let (lower, upper) = iter.size_hint();
647 let len = upper.expect("Iterator must have exact size_hint");
648 assert_eq!(lower, len, "Iterator must have exact size_hint");
649 debug_assert!(
650 offset <= self.len * 8,
651 "offset must be <= buffer length in bits"
652 );
653
654 if len == 0 {
655 return;
656 }
657
658 let start_len = offset;
659 let end_bit = start_len + len;
660
661 let new_len_bytes = bit_util::ceil(end_bit, 8);
663 if new_len_bytes > self.len {
664 self.reserve(new_len_bytes - self.len);
665 unsafe { self.set_len(new_len_bytes) };
667 }
668
669 let slice = self.as_slice_mut();
670
671 let mut bit_idx = start_len;
672
673 let misalignment = bit_idx & 63;
675 let prefix_bits = if misalignment == 0 {
676 0
677 } else {
678 (64 - misalignment).min(end_bit - bit_idx)
679 };
680
681 if prefix_bits != 0 {
682 let byte_start = bit_idx / 8;
683 let byte_end = bit_util::ceil(bit_idx + prefix_bits, 8);
684 let bit_offset = bit_idx % 8;
685
686 if bit_offset != 0 {
688 let keep_mask = (1u8 << bit_offset).wrapping_sub(1);
689 slice[byte_start] &= keep_mask;
690 }
691
692 let zero_from = if bit_offset == 0 {
694 byte_start
695 } else {
696 byte_start + 1
697 };
698 if byte_end > zero_from {
699 slice[zero_from..byte_end].fill(0);
700 }
701
702 for _ in 0..prefix_bits {
703 let v = iter.next().unwrap();
704 if v {
705 let byte_idx = bit_idx / 8;
706 let bit = bit_idx % 8;
707 slice[byte_idx] |= 1 << bit;
708 }
709 bit_idx += 1;
710 }
711 }
712
713 if bit_idx < end_bit {
714 debug_assert_eq!(bit_idx & 63, 0);
716 let remaining_bits = end_bit - bit_idx;
717 let chunks = remaining_bits / 64;
718
719 let words_start = bit_idx / 8;
720 let words_end = words_start + chunks * 8;
721 for dst in slice[words_start..words_end].chunks_exact_mut(8) {
722 let mut packed: u64 = 0;
723 for i in 0..64 {
724 packed |= (iter.next().unwrap() as u64) << i;
725 }
726 dst.copy_from_slice(&packed.to_le_bytes());
727 bit_idx += 64;
728 }
729
730 let suffix_bits = end_bit - bit_idx;
732 if suffix_bits != 0 {
733 debug_assert_eq!(bit_idx % 8, 0);
734 let byte_start = bit_idx / 8;
735 let byte_end = bit_util::ceil(end_bit, 8);
736 slice[byte_start..byte_end].fill(0);
737
738 for _ in 0..suffix_bits {
739 let v = iter.next().unwrap();
740 if v {
741 let byte_idx = bit_idx / 8;
742 let bit = bit_idx % 8;
743 slice[byte_idx] |= 1 << bit;
744 }
745 bit_idx += 1;
746 }
747 }
748 }
749
750 let remainder = end_bit % 8;
752 if remainder != 0 {
753 let mask = (1u8 << remainder).wrapping_sub(1);
754 slice[bit_util::ceil(end_bit, 8) - 1] &= mask;
755 }
756
757 debug_assert_eq!(bit_idx, end_bit);
758 }
759
760 #[cfg(feature = "pool")]
767 pub fn claim(&self, pool: &dyn MemoryPool) {
768 *self.reservation.lock().unwrap() = Some(pool.reserve(self.capacity()));
769 }
770}
771
772#[inline]
776pub(crate) fn dangling_ptr() -> NonNull<u8> {
777 #[cfg(miri)]
781 {
782 unsafe { NonNull::new_unchecked(std::ptr::without_provenance_mut(ALIGNMENT)) }
784 }
785 #[cfg(not(miri))]
786 {
787 unsafe { NonNull::new_unchecked(ALIGNMENT as *mut u8) }
788 }
789}
790
791impl<A: ArrowNativeType> Extend<A> for MutableBuffer {
792 #[inline]
793 fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
794 let iterator = iter.into_iter();
795 self.extend_from_iter(iterator)
796 }
797}
798
799impl<T: ArrowNativeType> From<Vec<T>> for MutableBuffer {
800 fn from(value: Vec<T>) -> Self {
801 let data = unsafe { NonNull::new_unchecked(value.as_ptr() as _) };
804 let len = value.len() * mem::size_of::<T>();
805 let layout = unsafe { Layout::array::<T>(value.capacity()).unwrap_unchecked() };
809 mem::forget(value);
810 Self {
811 data,
812 len,
813 layout,
814 #[cfg(feature = "pool")]
815 reservation: std::sync::Mutex::new(None),
816 }
817 }
818}
819
820impl MutableBuffer {
821 #[inline]
822 pub(super) fn extend_from_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
823 &mut self,
824 mut iterator: I,
825 ) {
826 let item_size = std::mem::size_of::<T>();
827 let (lower, _) = iterator.size_hint();
828 let additional = lower * item_size;
829 self.reserve(additional);
830
831 let mut len = SetLenOnDrop::new(&mut self.len);
833 let mut dst = unsafe { self.data.as_ptr().add(len.local_len) };
834 let capacity = self.layout.size();
835
836 while len.local_len + item_size <= capacity {
837 if let Some(item) = iterator.next() {
838 unsafe {
839 let src = item.to_byte_slice().as_ptr();
840 std::ptr::copy_nonoverlapping(src, dst, item_size);
841 dst = dst.add(item_size);
842 }
843 len.local_len += item_size;
844 } else {
845 break;
846 }
847 }
848 drop(len);
849
850 iterator.for_each(|item| self.push(item));
851 }
852
853 #[inline]
871 pub unsafe fn from_trusted_len_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
872 iterator: I,
873 ) -> Self {
874 let item_size = std::mem::size_of::<T>();
875 let (_, upper) = iterator.size_hint();
876 let upper = upper.expect("from_trusted_len_iter requires an upper limit");
877 let len = upper * item_size;
878
879 let mut buffer = MutableBuffer::new(len);
880
881 let mut dst = buffer.data.as_ptr();
882 for item in iterator {
883 let src = item.to_byte_slice().as_ptr();
885 unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
886 dst = unsafe { dst.add(item_size) };
887 }
888 assert_eq!(
889 unsafe { dst.offset_from(buffer.data.as_ptr()) } as usize,
890 len,
891 "Trusted iterator length was not accurately reported"
892 );
893 buffer.len = len;
894 buffer
895 }
896
897 #[inline]
915 pub unsafe fn from_trusted_len_iter_bool<I: Iterator<Item = bool>>(mut iterator: I) -> Self {
916 let (_, upper) = iterator.size_hint();
917 let len = upper.expect("from_trusted_len_iter requires an upper limit");
918
919 Self::collect_bool(len, |_| iterator.next().unwrap())
920 }
921
922 #[inline]
929 pub unsafe fn try_from_trusted_len_iter<
930 E,
931 T: ArrowNativeType,
932 I: Iterator<Item = Result<T, E>>,
933 >(
934 iterator: I,
935 ) -> Result<Self, E> {
936 let item_size = std::mem::size_of::<T>();
937 let (_, upper) = iterator.size_hint();
938 let upper = upper.expect("try_from_trusted_len_iter requires an upper limit");
939 let len = upper * item_size;
940
941 let mut buffer = MutableBuffer::new(len);
942
943 let mut dst = buffer.data.as_ptr();
944 for item in iterator {
945 let item = item?;
946 let src = item.to_byte_slice().as_ptr();
948 unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
949 dst = unsafe { dst.add(item_size) };
950 }
951 unsafe fn finalize_buffer(dst: *mut u8, buffer: &mut MutableBuffer, len: usize) {
954 unsafe {
955 assert_eq!(
956 dst.offset_from(buffer.data.as_ptr()) as usize,
957 len,
958 "Trusted iterator length was not accurately reported"
959 );
960 buffer.len = len;
961 }
962 }
963 unsafe { finalize_buffer(dst, &mut buffer, len) };
964 Ok(buffer)
965 }
966}
967
968impl Default for MutableBuffer {
969 fn default() -> Self {
970 Self::with_capacity(0)
971 }
972}
973
974impl std::ops::Deref for MutableBuffer {
975 type Target = [u8];
976
977 fn deref(&self) -> &[u8] {
978 unsafe { std::slice::from_raw_parts(self.as_ptr(), self.len) }
979 }
980}
981
982impl std::ops::DerefMut for MutableBuffer {
983 fn deref_mut(&mut self) -> &mut [u8] {
984 unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) }
985 }
986}
987
988impl AsRef<[u8]> for &MutableBuffer {
989 fn as_ref(&self) -> &[u8] {
990 self.as_slice()
991 }
992}
993
994impl Drop for MutableBuffer {
995 fn drop(&mut self) {
996 if self.layout.size() != 0 {
997 unsafe { std::alloc::dealloc(self.data.as_ptr() as _, self.layout) };
999 }
1000 }
1001}
1002
1003impl PartialEq for MutableBuffer {
1004 fn eq(&self, other: &MutableBuffer) -> bool {
1005 if self.len != other.len {
1006 return false;
1007 }
1008 if self.layout != other.layout {
1009 return false;
1010 }
1011 self.as_slice() == other.as_slice()
1012 }
1013}
1014
1015unsafe impl Sync for MutableBuffer {}
1016unsafe impl Send for MutableBuffer {}
1017
1018struct SetLenOnDrop<'a> {
1019 len: &'a mut usize,
1020 local_len: usize,
1021}
1022
1023impl<'a> SetLenOnDrop<'a> {
1024 #[inline]
1025 fn new(len: &'a mut usize) -> Self {
1026 SetLenOnDrop {
1027 local_len: *len,
1028 len,
1029 }
1030 }
1031}
1032
1033impl Drop for SetLenOnDrop<'_> {
1034 #[inline]
1035 fn drop(&mut self) {
1036 *self.len = self.local_len;
1037 }
1038}
1039
1040impl std::iter::FromIterator<bool> for MutableBuffer {
1042 fn from_iter<I>(iter: I) -> Self
1043 where
1044 I: IntoIterator<Item = bool>,
1045 {
1046 let mut iterator = iter.into_iter();
1047 let mut result = {
1048 let byte_capacity: usize = iterator.size_hint().0.saturating_add(7) / 8;
1049 MutableBuffer::new(byte_capacity)
1050 };
1051
1052 loop {
1053 let mut exhausted = false;
1054 let mut byte_accum: u8 = 0;
1055 let mut mask: u8 = 1;
1056
1057 while mask != 0 {
1059 if let Some(value) = iterator.next() {
1060 byte_accum |= match value {
1061 true => mask,
1062 false => 0,
1063 };
1064 mask <<= 1;
1065 } else {
1066 exhausted = true;
1067 break;
1068 }
1069 }
1070
1071 if exhausted && mask == 1 {
1073 break;
1074 }
1075
1076 if result.len() == result.capacity() {
1078 let additional_byte_capacity = 1usize.saturating_add(
1080 iterator.size_hint().0.saturating_add(7) / 8, );
1082 result.reserve(additional_byte_capacity)
1083 }
1084
1085 unsafe { result.push_unchecked(byte_accum) };
1087 if exhausted {
1088 break;
1089 }
1090 }
1091 result
1092 }
1093}
1094
1095impl<T: ArrowNativeType> std::iter::FromIterator<T> for MutableBuffer {
1096 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
1097 let mut buffer = Self::default();
1098 buffer.extend_from_iter(iter.into_iter());
1099 buffer
1100 }
1101}
1102
1103#[cfg(test)]
1104mod tests {
1105 use super::*;
1106
1107 #[test]
1108 fn test_mutable_new() {
1109 let buf = MutableBuffer::new(63);
1110 assert_eq!(64, buf.capacity());
1111 assert_eq!(0, buf.len());
1112 assert!(buf.is_empty());
1113 }
1114
1115 #[test]
1116 fn test_mutable_default() {
1117 let buf = MutableBuffer::default();
1118 assert_eq!(0, buf.capacity());
1119 assert_eq!(0, buf.len());
1120 assert!(buf.is_empty());
1121
1122 let mut buf = MutableBuffer::default();
1123 buf.extend_from_slice(b"hello");
1124 assert_eq!(5, buf.len());
1125 assert_eq!(b"hello", buf.as_slice());
1126 }
1127
1128 #[test]
1129 fn test_mutable_extend_from_slice() {
1130 let mut buf = MutableBuffer::new(100);
1131 buf.extend_from_slice(b"hello");
1132 assert_eq!(5, buf.len());
1133 assert_eq!(b"hello", buf.as_slice());
1134
1135 buf.extend_from_slice(b" world");
1136 assert_eq!(11, buf.len());
1137 assert_eq!(b"hello world", buf.as_slice());
1138
1139 buf.clear();
1140 assert_eq!(0, buf.len());
1141 buf.extend_from_slice(b"hello arrow");
1142 assert_eq!(11, buf.len());
1143 assert_eq!(b"hello arrow", buf.as_slice());
1144 }
1145
1146 #[test]
1147 fn mutable_extend_from_iter() {
1148 let mut buf = MutableBuffer::new(0);
1149 buf.extend(vec![1u32, 2]);
1150 assert_eq!(8, buf.len());
1151 assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
1152
1153 buf.extend(vec![3u32, 4]);
1154 assert_eq!(16, buf.len());
1155 assert_eq!(
1156 &[1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0],
1157 buf.as_slice()
1158 );
1159 }
1160
1161 #[test]
1162 fn mutable_extend_from_iter_unaligned_u64() {
1163 let mut buf = MutableBuffer::new(16);
1164 buf.push(1_u8);
1165 buf.extend([1_u64]);
1166 assert_eq!(9, buf.len());
1167 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1168 }
1169
1170 #[test]
1171 fn mutable_extend_from_slice_unaligned_u64() {
1172 let mut buf = MutableBuffer::new(16);
1173 buf.extend_from_slice(&[1_u8]);
1174 buf.extend_from_slice(&[1_u64]);
1175 assert_eq!(9, buf.len());
1176 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1177 }
1178
1179 #[test]
1180 fn mutable_push_unaligned_u64() {
1181 let mut buf = MutableBuffer::new(16);
1182 buf.push(1_u8);
1183 buf.push(1_u64);
1184 assert_eq!(9, buf.len());
1185 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1186 }
1187
1188 #[test]
1189 fn mutable_push_unchecked_unaligned_u64() {
1190 let mut buf = MutableBuffer::new(16);
1191 unsafe {
1192 buf.push_unchecked(1_u8);
1193 buf.push_unchecked(1_u64);
1194 }
1195 assert_eq!(9, buf.len());
1196 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1197 }
1198
1199 #[test]
1200 fn test_from_trusted_len_iter() {
1201 let iter = vec![1u32, 2].into_iter();
1202 let buf = unsafe { MutableBuffer::from_trusted_len_iter(iter) };
1203 assert_eq!(8, buf.len());
1204 assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
1205 }
1206
1207 #[test]
1208 fn test_mutable_reserve() {
1209 let mut buf = MutableBuffer::new(1);
1210 assert_eq!(64, buf.capacity());
1211
1212 buf.reserve(10);
1214 assert_eq!(64, buf.capacity());
1215
1216 buf.reserve(80);
1217 assert_eq!(128, buf.capacity());
1218
1219 buf.reserve(129);
1220 assert_eq!(256, buf.capacity());
1221 }
1222
1223 #[test]
1224 fn test_mutable_resize() {
1225 let mut buf = MutableBuffer::new(1);
1226 assert_eq!(64, buf.capacity());
1227 assert_eq!(0, buf.len());
1228
1229 buf.resize(20, 0);
1230 assert_eq!(64, buf.capacity());
1231 assert_eq!(20, buf.len());
1232
1233 buf.resize(10, 0);
1234 assert_eq!(64, buf.capacity());
1235 assert_eq!(10, buf.len());
1236
1237 buf.resize(100, 0);
1238 assert_eq!(128, buf.capacity());
1239 assert_eq!(100, buf.len());
1240
1241 buf.resize(30, 0);
1242 assert_eq!(128, buf.capacity());
1243 assert_eq!(30, buf.len());
1244
1245 buf.resize(0, 0);
1246 assert_eq!(128, buf.capacity());
1247 assert_eq!(0, buf.len());
1248 }
1249
1250 #[test]
1251 fn test_mutable_into() {
1252 let mut buf = MutableBuffer::new(1);
1253 buf.extend_from_slice(b"aaaa bbbb cccc dddd");
1254 assert_eq!(19, buf.len());
1255 assert_eq!(64, buf.capacity());
1256 assert_eq!(b"aaaa bbbb cccc dddd", buf.as_slice());
1257
1258 let immutable_buf: Buffer = buf.into();
1259 assert_eq!(19, immutable_buf.len());
1260 assert_eq!(64, immutable_buf.capacity());
1261 assert_eq!(b"aaaa bbbb cccc dddd", immutable_buf.as_slice());
1262 }
1263
1264 #[test]
1265 fn test_mutable_equal() {
1266 let mut buf = MutableBuffer::new(1);
1267 let mut buf2 = MutableBuffer::new(1);
1268
1269 buf.extend_from_slice(&[0xaa]);
1270 buf2.extend_from_slice(&[0xaa, 0xbb]);
1271 assert!(buf != buf2);
1272
1273 buf.extend_from_slice(&[0xbb]);
1274 assert_eq!(buf, buf2);
1275
1276 buf2.reserve(65);
1277 assert!(buf != buf2);
1278 }
1279
1280 #[test]
1281 fn test_mutable_shrink_to_fit() {
1282 let mut buffer = MutableBuffer::new(128);
1283 assert_eq!(buffer.capacity(), 128);
1284 buffer.push(1);
1285 buffer.push(2);
1286
1287 buffer.shrink_to_fit();
1288 assert!(buffer.capacity() >= 64 && buffer.capacity() < 128);
1289 }
1290
1291 #[test]
1292 fn test_mutable_set_null_bits() {
1293 let mut buffer = MutableBuffer::new(8).with_bitset(8, true);
1294
1295 for i in 0..=buffer.capacity() {
1296 buffer.set_null_bits(i, 0);
1297 assert_eq!(buffer[..8], [255; 8][..]);
1298 }
1299
1300 buffer.set_null_bits(1, 4);
1301 assert_eq!(buffer[..8], [255, 0, 0, 0, 0, 255, 255, 255][..]);
1302 }
1303
1304 #[test]
1305 #[should_panic = "out of bounds for buffer of length"]
1306 fn test_mutable_set_null_bits_oob() {
1307 let mut buffer = MutableBuffer::new(64);
1308 buffer.set_null_bits(1, buffer.capacity());
1309 }
1310
1311 #[test]
1312 #[should_panic = "out of bounds for buffer of length"]
1313 fn test_mutable_set_null_bits_oob_by_overflow() {
1314 let mut buffer = MutableBuffer::new(0);
1315 buffer.set_null_bits(1, usize::MAX);
1316 }
1317
1318 #[test]
1319 fn from_iter() {
1320 let buffer = [1u16, 2, 3, 4].into_iter().collect::<MutableBuffer>();
1321 assert_eq!(buffer.len(), 4 * mem::size_of::<u16>());
1322 assert_eq!(buffer.as_slice(), &[1, 0, 2, 0, 3, 0, 4, 0]);
1323 }
1324
1325 #[test]
1326 #[should_panic(expected = "failed to create layout for MutableBuffer: LayoutError")]
1327 fn test_with_capacity_panics_above_max_capacity() {
1328 let max_capacity = isize::MAX as usize - (isize::MAX as usize % ALIGNMENT);
1329 let _ = MutableBuffer::with_capacity(max_capacity + 1);
1330 }
1331
1332 #[cfg(feature = "pool")]
1333 mod pool_tests {
1334 use super::*;
1335 use crate::pool::{MemoryPool, TrackingMemoryPool};
1336
1337 #[test]
1338 fn test_reallocate_with_pool() {
1339 let pool = TrackingMemoryPool::default();
1340 let mut buffer = MutableBuffer::with_capacity(100);
1341 buffer.claim(&pool);
1342
1343 assert_eq!(buffer.capacity(), 128);
1345 assert_eq!(pool.used(), 128);
1346
1347 buffer.reallocate(200);
1349
1350 assert_eq!(buffer.capacity(), 200);
1352 assert_eq!(pool.used(), 200);
1353
1354 buffer.reallocate(50);
1356
1357 assert_eq!(buffer.capacity(), 50);
1359 assert_eq!(pool.used(), 50);
1360 }
1361
1362 #[test]
1363 fn test_truncate_with_pool() {
1364 let pool = TrackingMemoryPool::default();
1365 let mut buffer = MutableBuffer::with_capacity(100);
1366
1367 buffer.resize(80, 1);
1369 assert_eq!(buffer.len(), 80);
1370
1371 buffer.claim(&pool);
1372 assert_eq!(pool.used(), 128);
1373
1374 buffer.truncate(40);
1376 assert_eq!(buffer.len(), 40);
1377 assert_eq!(pool.used(), 40);
1378
1379 buffer.clear();
1381 assert_eq!(buffer.len(), 0);
1382 assert_eq!(pool.used(), 0);
1383 }
1384
1385 #[test]
1386 fn test_resize_with_pool() {
1387 let pool = TrackingMemoryPool::default();
1388 let mut buffer = MutableBuffer::with_capacity(100);
1389 buffer.claim(&pool);
1390
1391 assert_eq!(buffer.len(), 0);
1393 assert_eq!(pool.used(), 128);
1394
1395 buffer.resize(50, 1);
1397 assert_eq!(buffer.len(), 50);
1398 assert_eq!(pool.used(), 50);
1399
1400 buffer.resize(150, 1);
1402 assert_eq!(buffer.len(), 150);
1403 assert_eq!(buffer.capacity(), 256);
1404 assert_eq!(pool.used(), 150);
1405
1406 buffer.resize(30, 1);
1408 assert_eq!(buffer.len(), 30);
1409 assert_eq!(pool.used(), 30);
1410 }
1411
1412 #[test]
1413 fn test_buffer_lifecycle_with_pool() {
1414 let pool = TrackingMemoryPool::default();
1415
1416 let mut mutable = MutableBuffer::with_capacity(100);
1418 mutable.resize(80, 1);
1419 mutable.claim(&pool);
1420
1421 assert_eq!(pool.used(), 128);
1423
1424 let buffer = mutable.into_buffer();
1426
1427 assert_eq!(pool.used(), 128);
1429
1430 drop(buffer);
1432 assert_eq!(pool.used(), 0);
1433 }
1434 }
1435
1436 fn create_expected_repeated_slice<T: ArrowNativeType>(
1437 slice_to_repeat: &[T],
1438 repeat_count: usize,
1439 ) -> Buffer {
1440 let mut expected = MutableBuffer::new(size_of_val(slice_to_repeat) * repeat_count);
1441 for _ in 0..repeat_count {
1442 expected.extend_from_slice(slice_to_repeat);
1444 }
1445 expected.into()
1446 }
1447
1448 fn test_repeat_count<T: ArrowNativeType + PartialEq + std::fmt::Debug>(
1450 repeat_count: usize,
1451 test_data: &[T],
1452 ) {
1453 let mut buffer = MutableBuffer::new(0);
1454 buffer.repeat_slice_n_times(test_data, repeat_count);
1455
1456 let expected = create_expected_repeated_slice(test_data, repeat_count);
1457 let result: Buffer = buffer.into();
1458
1459 assert_eq!(
1460 result,
1461 expected,
1462 "Failed for repeat_count={}, slice_len={}",
1463 repeat_count,
1464 test_data.len()
1465 );
1466 }
1467
1468 #[test]
1469 fn test_repeat_slice_count_edge_cases() {
1470 test_repeat_count(100, &[] as &[i32]);
1472
1473 test_repeat_count(0, &[1i32, 2, 3]);
1475 }
1476
1477 #[test]
1478 fn test_small_repeats_counts() {
1479 let data = &[1u8, 2, 3, 4, 5];
1481
1482 for _ in 1..=10 {
1483 test_repeat_count(2, data);
1484 }
1485 }
1486
1487 #[test]
1488 fn test_different_size_of_i32_repeat_slice() {
1489 let data: &[i32] = &[1, 2, 3];
1490 let data_with_single_item: &[i32] = &[42];
1491
1492 for data in &[data, data_with_single_item] {
1493 for item in 1..=9 {
1494 let base_repeat_count = 2_usize.pow(item);
1495 test_repeat_count(base_repeat_count - 1, data);
1496 test_repeat_count(base_repeat_count, data);
1497 test_repeat_count(base_repeat_count + 1, data);
1498 }
1499 }
1500 }
1501
1502 #[test]
1503 fn test_different_size_of_u8_repeat_slice() {
1504 let data: &[u8] = &[1, 2, 3];
1505 let data_with_single_item: &[u8] = &[10];
1506
1507 for data in &[data, data_with_single_item] {
1508 for item in 1..=9 {
1509 let base_repeat_count = 2_usize.pow(item);
1510 test_repeat_count(base_repeat_count - 1, data);
1511 test_repeat_count(base_repeat_count, data);
1512 test_repeat_count(base_repeat_count + 1, data);
1513 }
1514 }
1515 }
1516
1517 #[test]
1518 fn test_different_size_of_u16_repeat_slice() {
1519 let data: &[u16] = &[1, 2, 3];
1520 let data_with_single_item: &[u16] = &[10];
1521
1522 for data in &[data, data_with_single_item] {
1523 for item in 1..=9 {
1524 let base_repeat_count = 2_usize.pow(item);
1525 test_repeat_count(base_repeat_count - 1, data);
1526 test_repeat_count(base_repeat_count, data);
1527 test_repeat_count(base_repeat_count + 1, data);
1528 }
1529 }
1530 }
1531
1532 #[test]
1533 fn test_various_slice_lengths() {
1534 let repeat_count = 37; test_repeat_count(repeat_count, &[42i32]);
1539
1540 test_repeat_count(repeat_count, &[1i32, 2]);
1542 test_repeat_count(repeat_count, &[1i32, 2, 3]);
1543 test_repeat_count(repeat_count, &[1i32, 2, 3, 4]);
1544 test_repeat_count(repeat_count, &[1i32, 2, 3, 4, 5]);
1545
1546 let data_10: Vec<i32> = (0..10).collect();
1548 test_repeat_count(repeat_count, &data_10);
1549
1550 let data_100: Vec<i32> = (0..100).collect();
1551 test_repeat_count(repeat_count, &data_100);
1552
1553 let data_1000: Vec<i32> = (0..1000).collect();
1554 test_repeat_count(repeat_count, &data_1000);
1555 }
1556}