1use std::alloc::{Layout, handle_alloc_error};
19use std::mem;
20use std::ptr::NonNull;
21
22use crate::alloc::{ALIGNMENT, Deallocation};
23use crate::{
24 bytes::Bytes,
25 native::{ArrowNativeType, ToByteSlice},
26 util::bit_util,
27};
28
29#[cfg(feature = "pool")]
30use crate::pool::{MemoryPool, MemoryReservation};
31#[cfg(feature = "pool")]
32use std::sync::Mutex;
33
34use super::Buffer;
35
36#[derive(Debug)]
99pub struct MutableBuffer {
100 data: NonNull<u8>,
102 len: usize,
104 layout: Layout,
105
106 #[cfg(feature = "pool")]
108 reservation: Mutex<Option<Box<dyn MemoryReservation>>>,
109}
110
111impl MutableBuffer {
112 #[inline]
116 pub fn new(capacity: usize) -> Self {
117 Self::with_capacity(capacity)
118 }
119
120 #[inline]
127 pub fn with_capacity(capacity: usize) -> Self {
128 let capacity = bit_util::round_upto_multiple_of_64(capacity);
129 let layout = Layout::from_size_align(capacity, ALIGNMENT)
130 .expect("failed to create layout for MutableBuffer");
131 let data = match layout.size() {
132 0 => dangling_ptr(),
133 _ => {
134 let raw_ptr = unsafe { std::alloc::alloc(layout) };
136 NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
137 }
138 };
139 Self {
140 data,
141 len: 0,
142 layout,
143 #[cfg(feature = "pool")]
144 reservation: std::sync::Mutex::new(None),
145 }
146 }
147
148 pub fn from_len_zeroed(len: usize) -> Self {
160 let layout = Layout::from_size_align(len, ALIGNMENT).unwrap();
161 let data = match layout.size() {
162 0 => dangling_ptr(),
163 _ => {
164 let raw_ptr = unsafe { std::alloc::alloc_zeroed(layout) };
166 NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
167 }
168 };
169 Self {
170 data,
171 len,
172 layout,
173 #[cfg(feature = "pool")]
174 reservation: std::sync::Mutex::new(None),
175 }
176 }
177
178 pub(crate) fn from_bytes(bytes: Bytes) -> Result<Self, Bytes> {
180 let layout = match bytes.deallocation() {
181 Deallocation::Standard(layout) => *layout,
182 _ => return Err(bytes),
183 };
184
185 let len = bytes.len();
186 let data = bytes.ptr();
187 #[cfg(feature = "pool")]
188 let reservation = bytes.reservation.lock().unwrap().take();
189 mem::forget(bytes);
190
191 Ok(Self {
192 data,
193 len,
194 layout,
195 #[cfg(feature = "pool")]
196 reservation: Mutex::new(reservation),
197 })
198 }
199
200 pub fn new_null(len: usize) -> Self {
203 let num_bytes = bit_util::ceil(len, 8);
204 MutableBuffer::from_len_zeroed(num_bytes)
205 }
206
207 pub fn with_bitset(mut self, end: usize, val: bool) -> Self {
214 assert!(end <= self.layout.size());
215 let v = if val { 255 } else { 0 };
216 unsafe {
217 std::ptr::write_bytes(self.data.as_ptr(), v, end);
218 self.len = end;
219 }
220 self
221 }
222
223 pub fn set_null_bits(&mut self, start: usize, count: usize) {
229 assert!(
230 start.saturating_add(count) <= self.layout.size(),
231 "range start index {start} and count {count} out of bounds for \
232 buffer of length {}",
233 self.layout.size(),
234 );
235
236 unsafe {
238 std::ptr::write_bytes(self.data.as_ptr().add(start), 0, count);
239 }
240 }
241
242 #[inline(always)]
256 pub fn reserve(&mut self, additional: usize) {
257 let required_cap = self.len + additional;
258 if required_cap > self.layout.size() {
259 let new_capacity = bit_util::round_upto_multiple_of_64(required_cap);
260 let new_capacity = std::cmp::max(new_capacity, self.layout.size() * 2);
261 self.reallocate(new_capacity)
262 }
263 }
264
265 pub fn repeat_slice_n_times<T: ArrowNativeType>(
278 &mut self,
279 slice_to_repeat: &[T],
280 repeat_count: usize,
281 ) {
282 if repeat_count == 0 || slice_to_repeat.is_empty() {
283 return;
284 }
285
286 let bytes_to_repeat = size_of_val(slice_to_repeat);
287
288 self.reserve(repeat_count * bytes_to_repeat);
290
291 let length_before = self.len;
293
294 self.extend_from_slice(slice_to_repeat);
296
297 let added_repeats_length = bytes_to_repeat;
299 assert_eq!(
300 self.len - length_before,
301 added_repeats_length,
302 "should copy exactly the same number of bytes"
303 );
304
305 let mut already_repeated_times = 1;
307
308 while already_repeated_times < repeat_count {
310 let number_of_slices_to_copy =
313 already_repeated_times.min(repeat_count - already_repeated_times);
314 let number_of_bytes_to_copy = number_of_slices_to_copy * bytes_to_repeat;
315
316 unsafe {
317 let src = self.data.as_ptr().add(length_before) as *const u8;
319
320 let dst = self.data.as_ptr().add(self.len);
322
323 std::ptr::copy_nonoverlapping(src, dst, number_of_bytes_to_copy)
325 }
326
327 self.len += number_of_bytes_to_copy;
329
330 already_repeated_times += number_of_slices_to_copy;
331 }
332 }
333
334 #[cold]
335 fn reallocate(&mut self, capacity: usize) {
336 let new_layout = Layout::from_size_align(capacity, self.layout.align()).unwrap();
337 if new_layout.size() == 0 {
338 if self.layout.size() != 0 {
339 unsafe { std::alloc::dealloc(self.as_mut_ptr(), self.layout) };
341 self.layout = new_layout
342 }
343 return;
344 }
345
346 let data = match self.layout.size() {
347 0 => unsafe { std::alloc::alloc(new_layout) },
349 _ => unsafe { std::alloc::realloc(self.as_mut_ptr(), self.layout, capacity) },
351 };
352 self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout));
353 self.layout = new_layout;
354 #[cfg(feature = "pool")]
355 {
356 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
357 reservation.resize(self.layout.size());
358 }
359 }
360 }
361
362 #[inline(always)]
366 pub fn truncate(&mut self, len: usize) {
367 if len > self.len {
368 return;
369 }
370 self.len = len;
371 #[cfg(feature = "pool")]
372 {
373 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
374 reservation.resize(self.len);
375 }
376 }
377 }
378
379 #[inline(always)]
391 pub fn resize(&mut self, new_len: usize, value: u8) {
392 if new_len > self.len {
393 let diff = new_len - self.len;
394 self.reserve(diff);
395 unsafe { self.data.as_ptr().add(self.len).write_bytes(value, diff) };
397 }
398 self.len = new_len;
400 #[cfg(feature = "pool")]
401 {
402 if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
403 reservation.resize(self.len);
404 }
405 }
406 }
407
408 pub fn shrink_to_fit(&mut self) {
424 let new_capacity = bit_util::round_upto_multiple_of_64(self.len);
425 if new_capacity < self.layout.size() {
426 self.reallocate(new_capacity)
427 }
428 }
429
430 #[inline]
432 pub const fn is_empty(&self) -> bool {
433 self.len == 0
434 }
435
436 #[inline]
439 pub const fn len(&self) -> usize {
440 self.len
441 }
442
443 #[inline]
447 pub const fn capacity(&self) -> usize {
448 self.layout.size()
449 }
450
451 pub fn clear(&mut self) {
453 self.len = 0
454 }
455
456 pub fn as_slice(&self) -> &[u8] {
458 self
459 }
460
461 pub fn as_slice_mut(&mut self) -> &mut [u8] {
463 self
464 }
465
466 #[inline]
469 pub const fn as_ptr(&self) -> *const u8 {
470 self.data.as_ptr()
471 }
472
473 #[inline]
476 pub fn as_mut_ptr(&mut self) -> *mut u8 {
477 self.data.as_ptr()
478 }
479
480 #[inline]
481 pub(super) fn into_buffer(self) -> Buffer {
482 let bytes = unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(self.layout)) };
483 #[cfg(feature = "pool")]
484 {
485 let reservation = self.reservation.lock().unwrap().take();
486 *bytes.reservation.lock().unwrap() = reservation;
487 }
488 std::mem::forget(self);
489 Buffer::from(bytes)
490 }
491
492 pub fn typed_data_mut<T: ArrowNativeType>(&mut self) -> &mut [T] {
499 let (prefix, offsets, suffix) = unsafe { self.as_slice_mut().align_to_mut::<T>() };
503 assert!(prefix.is_empty() && suffix.is_empty());
504 offsets
505 }
506
507 pub fn typed_data<T: ArrowNativeType>(&self) -> &[T] {
514 let (prefix, offsets, suffix) = unsafe { self.as_slice().align_to::<T>() };
518 assert!(prefix.is_empty() && suffix.is_empty());
519 offsets
520 }
521
522 #[inline]
531 pub fn extend_from_slice<T: ArrowNativeType>(&mut self, items: &[T]) {
532 let additional = mem::size_of_val(items);
533 self.reserve(additional);
534 unsafe {
535 let src = items.as_ptr() as *const u8;
539 let dst = self.data.as_ptr().add(self.len);
540 std::ptr::copy_nonoverlapping(src, dst, additional)
541 }
542 self.len += additional;
543 }
544
545 #[inline]
554 pub fn push<T: ToByteSlice>(&mut self, item: T) {
555 let additional = std::mem::size_of::<T>();
556 self.reserve(additional);
557 unsafe {
558 let src = item.to_byte_slice().as_ptr();
559 let dst = self.data.as_ptr().add(self.len);
560 std::ptr::copy_nonoverlapping(src, dst, additional);
561 }
562 self.len += additional;
563 }
564
565 #[inline]
569 pub unsafe fn push_unchecked<T: ToByteSlice>(&mut self, item: T) {
570 let additional = std::mem::size_of::<T>();
571 let src = item.to_byte_slice().as_ptr();
572 let dst = unsafe { self.data.as_ptr().add(self.len) };
573 unsafe { std::ptr::copy_nonoverlapping(src, dst, additional) };
574 self.len += additional;
575 }
576
577 #[inline]
579 pub fn extend_zeros(&mut self, additional: usize) {
580 self.resize(self.len + additional, 0);
581 }
582
583 #[inline]
586 pub unsafe fn set_len(&mut self, len: usize) {
587 assert!(len <= self.capacity());
588 self.len = len;
589 }
590
591 #[inline]
596 pub fn collect_bool<F: FnMut(usize) -> bool>(len: usize, mut f: F) -> Self {
597 let mut buffer: Vec<u64> = Vec::with_capacity(bit_util::ceil(len, 64));
598
599 let chunks = len / 64;
600 let remainder = len % 64;
601 buffer.extend((0..chunks).map(|chunk| {
602 let mut packed = 0;
603 for bit_idx in 0..64 {
604 let i = bit_idx + chunk * 64;
605 packed |= (f(i) as u64) << bit_idx;
606 }
607
608 packed
609 }));
610
611 if remainder != 0 {
612 let mut packed = 0;
613 for bit_idx in 0..remainder {
614 let i = bit_idx + chunks * 64;
615 packed |= (f(i) as u64) << bit_idx;
616 }
617
618 buffer.push(packed)
619 }
620
621 let mut buffer: MutableBuffer = buffer.into();
622 buffer.truncate(bit_util::ceil(len, 8));
623 buffer
624 }
625
626 #[inline]
635 pub unsafe fn extend_bool_trusted_len<I: Iterator<Item = bool>>(
636 &mut self,
637 mut iter: I,
638 offset: usize,
639 ) {
640 let (lower, upper) = iter.size_hint();
641 let len = upper.expect("Iterator must have exact size_hint");
642 assert_eq!(lower, len, "Iterator must have exact size_hint");
643 debug_assert!(
644 offset <= self.len * 8,
645 "offset must be <= buffer length in bits"
646 );
647
648 if len == 0 {
649 return;
650 }
651
652 let start_len = offset;
653 let end_bit = start_len + len;
654
655 let new_len_bytes = bit_util::ceil(end_bit, 8);
657 if new_len_bytes > self.len {
658 self.reserve(new_len_bytes - self.len);
659 unsafe { self.set_len(new_len_bytes) };
661 }
662
663 let slice = self.as_slice_mut();
664
665 let mut bit_idx = start_len;
666
667 let misalignment = bit_idx & 63;
669 let prefix_bits = if misalignment == 0 {
670 0
671 } else {
672 (64 - misalignment).min(end_bit - bit_idx)
673 };
674
675 if prefix_bits != 0 {
676 let byte_start = bit_idx / 8;
677 let byte_end = bit_util::ceil(bit_idx + prefix_bits, 8);
678 let bit_offset = bit_idx % 8;
679
680 if bit_offset != 0 {
682 let keep_mask = (1u8 << bit_offset).wrapping_sub(1);
683 slice[byte_start] &= keep_mask;
684 }
685
686 let zero_from = if bit_offset == 0 {
688 byte_start
689 } else {
690 byte_start + 1
691 };
692 if byte_end > zero_from {
693 slice[zero_from..byte_end].fill(0);
694 }
695
696 for _ in 0..prefix_bits {
697 let v = iter.next().unwrap();
698 if v {
699 let byte_idx = bit_idx / 8;
700 let bit = bit_idx % 8;
701 slice[byte_idx] |= 1 << bit;
702 }
703 bit_idx += 1;
704 }
705 }
706
707 if bit_idx < end_bit {
708 debug_assert_eq!(bit_idx & 63, 0);
710 let remaining_bits = end_bit - bit_idx;
711 let chunks = remaining_bits / 64;
712
713 let words_start = bit_idx / 8;
714 let words_end = words_start + chunks * 8;
715 for dst in slice[words_start..words_end].chunks_exact_mut(8) {
716 let mut packed: u64 = 0;
717 for i in 0..64 {
718 packed |= (iter.next().unwrap() as u64) << i;
719 }
720 dst.copy_from_slice(&packed.to_le_bytes());
721 bit_idx += 64;
722 }
723
724 let suffix_bits = end_bit - bit_idx;
726 if suffix_bits != 0 {
727 debug_assert_eq!(bit_idx % 8, 0);
728 let byte_start = bit_idx / 8;
729 let byte_end = bit_util::ceil(end_bit, 8);
730 slice[byte_start..byte_end].fill(0);
731
732 for _ in 0..suffix_bits {
733 let v = iter.next().unwrap();
734 if v {
735 let byte_idx = bit_idx / 8;
736 let bit = bit_idx % 8;
737 slice[byte_idx] |= 1 << bit;
738 }
739 bit_idx += 1;
740 }
741 }
742 }
743
744 let remainder = end_bit % 8;
746 if remainder != 0 {
747 let mask = (1u8 << remainder).wrapping_sub(1);
748 slice[bit_util::ceil(end_bit, 8) - 1] &= mask;
749 }
750
751 debug_assert_eq!(bit_idx, end_bit);
752 }
753
754 #[cfg(feature = "pool")]
761 pub fn claim(&self, pool: &dyn MemoryPool) {
762 *self.reservation.lock().unwrap() = Some(pool.reserve(self.capacity()));
763 }
764}
765
766#[inline]
770pub(crate) fn dangling_ptr() -> NonNull<u8> {
771 #[cfg(miri)]
775 {
776 unsafe { NonNull::new_unchecked(std::ptr::without_provenance_mut(ALIGNMENT)) }
778 }
779 #[cfg(not(miri))]
780 {
781 unsafe { NonNull::new_unchecked(ALIGNMENT as *mut u8) }
782 }
783}
784
785impl<A: ArrowNativeType> Extend<A> for MutableBuffer {
786 #[inline]
787 fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
788 let iterator = iter.into_iter();
789 self.extend_from_iter(iterator)
790 }
791}
792
793impl<T: ArrowNativeType> From<Vec<T>> for MutableBuffer {
794 fn from(value: Vec<T>) -> Self {
795 let data = unsafe { NonNull::new_unchecked(value.as_ptr() as _) };
798 let len = value.len() * mem::size_of::<T>();
799 let layout = unsafe { Layout::array::<T>(value.capacity()).unwrap_unchecked() };
803 mem::forget(value);
804 Self {
805 data,
806 len,
807 layout,
808 #[cfg(feature = "pool")]
809 reservation: std::sync::Mutex::new(None),
810 }
811 }
812}
813
814impl MutableBuffer {
815 #[inline]
816 pub(super) fn extend_from_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
817 &mut self,
818 mut iterator: I,
819 ) {
820 let item_size = std::mem::size_of::<T>();
821 let (lower, _) = iterator.size_hint();
822 let additional = lower * item_size;
823 self.reserve(additional);
824
825 let mut len = SetLenOnDrop::new(&mut self.len);
827 let mut dst = unsafe { self.data.as_ptr().add(len.local_len) };
828 let capacity = self.layout.size();
829
830 while len.local_len + item_size <= capacity {
831 if let Some(item) = iterator.next() {
832 unsafe {
833 let src = item.to_byte_slice().as_ptr();
834 std::ptr::copy_nonoverlapping(src, dst, item_size);
835 dst = dst.add(item_size);
836 }
837 len.local_len += item_size;
838 } else {
839 break;
840 }
841 }
842 drop(len);
843
844 iterator.for_each(|item| self.push(item));
845 }
846
847 #[inline]
865 pub unsafe fn from_trusted_len_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
866 iterator: I,
867 ) -> Self {
868 let item_size = std::mem::size_of::<T>();
869 let (_, upper) = iterator.size_hint();
870 let upper = upper.expect("from_trusted_len_iter requires an upper limit");
871 let len = upper * item_size;
872
873 let mut buffer = MutableBuffer::new(len);
874
875 let mut dst = buffer.data.as_ptr();
876 for item in iterator {
877 let src = item.to_byte_slice().as_ptr();
879 unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
880 dst = unsafe { dst.add(item_size) };
881 }
882 assert_eq!(
883 unsafe { dst.offset_from(buffer.data.as_ptr()) } as usize,
884 len,
885 "Trusted iterator length was not accurately reported"
886 );
887 buffer.len = len;
888 buffer
889 }
890
891 #[inline]
909 pub unsafe fn from_trusted_len_iter_bool<I: Iterator<Item = bool>>(mut iterator: I) -> Self {
910 let (_, upper) = iterator.size_hint();
911 let len = upper.expect("from_trusted_len_iter requires an upper limit");
912
913 Self::collect_bool(len, |_| iterator.next().unwrap())
914 }
915
916 #[inline]
923 pub unsafe fn try_from_trusted_len_iter<
924 E,
925 T: ArrowNativeType,
926 I: Iterator<Item = Result<T, E>>,
927 >(
928 iterator: I,
929 ) -> Result<Self, E> {
930 let item_size = std::mem::size_of::<T>();
931 let (_, upper) = iterator.size_hint();
932 let upper = upper.expect("try_from_trusted_len_iter requires an upper limit");
933 let len = upper * item_size;
934
935 let mut buffer = MutableBuffer::new(len);
936
937 let mut dst = buffer.data.as_ptr();
938 for item in iterator {
939 let item = item?;
940 let src = item.to_byte_slice().as_ptr();
942 unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
943 dst = unsafe { dst.add(item_size) };
944 }
945 unsafe fn finalize_buffer(dst: *mut u8, buffer: &mut MutableBuffer, len: usize) {
948 unsafe {
949 assert_eq!(
950 dst.offset_from(buffer.data.as_ptr()) as usize,
951 len,
952 "Trusted iterator length was not accurately reported"
953 );
954 buffer.len = len;
955 }
956 }
957 unsafe { finalize_buffer(dst, &mut buffer, len) };
958 Ok(buffer)
959 }
960}
961
962impl Default for MutableBuffer {
963 fn default() -> Self {
964 Self::with_capacity(0)
965 }
966}
967
968impl std::ops::Deref for MutableBuffer {
969 type Target = [u8];
970
971 fn deref(&self) -> &[u8] {
972 unsafe { std::slice::from_raw_parts(self.as_ptr(), self.len) }
973 }
974}
975
976impl std::ops::DerefMut for MutableBuffer {
977 fn deref_mut(&mut self) -> &mut [u8] {
978 unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) }
979 }
980}
981
982impl AsRef<[u8]> for &MutableBuffer {
983 fn as_ref(&self) -> &[u8] {
984 self.as_slice()
985 }
986}
987
988impl Drop for MutableBuffer {
989 fn drop(&mut self) {
990 if self.layout.size() != 0 {
991 unsafe { std::alloc::dealloc(self.data.as_ptr() as _, self.layout) };
993 }
994 }
995}
996
997impl PartialEq for MutableBuffer {
998 fn eq(&self, other: &MutableBuffer) -> bool {
999 if self.len != other.len {
1000 return false;
1001 }
1002 if self.layout != other.layout {
1003 return false;
1004 }
1005 self.as_slice() == other.as_slice()
1006 }
1007}
1008
1009unsafe impl Sync for MutableBuffer {}
1010unsafe impl Send for MutableBuffer {}
1011
1012struct SetLenOnDrop<'a> {
1013 len: &'a mut usize,
1014 local_len: usize,
1015}
1016
1017impl<'a> SetLenOnDrop<'a> {
1018 #[inline]
1019 fn new(len: &'a mut usize) -> Self {
1020 SetLenOnDrop {
1021 local_len: *len,
1022 len,
1023 }
1024 }
1025}
1026
1027impl Drop for SetLenOnDrop<'_> {
1028 #[inline]
1029 fn drop(&mut self) {
1030 *self.len = self.local_len;
1031 }
1032}
1033
1034impl std::iter::FromIterator<bool> for MutableBuffer {
1036 fn from_iter<I>(iter: I) -> Self
1037 where
1038 I: IntoIterator<Item = bool>,
1039 {
1040 let mut iterator = iter.into_iter();
1041 let mut result = {
1042 let byte_capacity: usize = iterator.size_hint().0.saturating_add(7) / 8;
1043 MutableBuffer::new(byte_capacity)
1044 };
1045
1046 loop {
1047 let mut exhausted = false;
1048 let mut byte_accum: u8 = 0;
1049 let mut mask: u8 = 1;
1050
1051 while mask != 0 {
1053 if let Some(value) = iterator.next() {
1054 byte_accum |= match value {
1055 true => mask,
1056 false => 0,
1057 };
1058 mask <<= 1;
1059 } else {
1060 exhausted = true;
1061 break;
1062 }
1063 }
1064
1065 if exhausted && mask == 1 {
1067 break;
1068 }
1069
1070 if result.len() == result.capacity() {
1072 let additional_byte_capacity = 1usize.saturating_add(
1074 iterator.size_hint().0.saturating_add(7) / 8, );
1076 result.reserve(additional_byte_capacity)
1077 }
1078
1079 unsafe { result.push_unchecked(byte_accum) };
1081 if exhausted {
1082 break;
1083 }
1084 }
1085 result
1086 }
1087}
1088
1089impl<T: ArrowNativeType> std::iter::FromIterator<T> for MutableBuffer {
1090 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
1091 let mut buffer = Self::default();
1092 buffer.extend_from_iter(iter.into_iter());
1093 buffer
1094 }
1095}
1096
1097#[cfg(test)]
1098mod tests {
1099 use super::*;
1100
1101 #[test]
1102 fn test_mutable_new() {
1103 let buf = MutableBuffer::new(63);
1104 assert_eq!(64, buf.capacity());
1105 assert_eq!(0, buf.len());
1106 assert!(buf.is_empty());
1107 }
1108
1109 #[test]
1110 fn test_mutable_default() {
1111 let buf = MutableBuffer::default();
1112 assert_eq!(0, buf.capacity());
1113 assert_eq!(0, buf.len());
1114 assert!(buf.is_empty());
1115
1116 let mut buf = MutableBuffer::default();
1117 buf.extend_from_slice(b"hello");
1118 assert_eq!(5, buf.len());
1119 assert_eq!(b"hello", buf.as_slice());
1120 }
1121
1122 #[test]
1123 fn test_mutable_extend_from_slice() {
1124 let mut buf = MutableBuffer::new(100);
1125 buf.extend_from_slice(b"hello");
1126 assert_eq!(5, buf.len());
1127 assert_eq!(b"hello", buf.as_slice());
1128
1129 buf.extend_from_slice(b" world");
1130 assert_eq!(11, buf.len());
1131 assert_eq!(b"hello world", buf.as_slice());
1132
1133 buf.clear();
1134 assert_eq!(0, buf.len());
1135 buf.extend_from_slice(b"hello arrow");
1136 assert_eq!(11, buf.len());
1137 assert_eq!(b"hello arrow", buf.as_slice());
1138 }
1139
1140 #[test]
1141 fn mutable_extend_from_iter() {
1142 let mut buf = MutableBuffer::new(0);
1143 buf.extend(vec![1u32, 2]);
1144 assert_eq!(8, buf.len());
1145 assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
1146
1147 buf.extend(vec![3u32, 4]);
1148 assert_eq!(16, buf.len());
1149 assert_eq!(
1150 &[1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0],
1151 buf.as_slice()
1152 );
1153 }
1154
1155 #[test]
1156 fn mutable_extend_from_iter_unaligned_u64() {
1157 let mut buf = MutableBuffer::new(16);
1158 buf.push(1_u8);
1159 buf.extend([1_u64]);
1160 assert_eq!(9, buf.len());
1161 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1162 }
1163
1164 #[test]
1165 fn mutable_extend_from_slice_unaligned_u64() {
1166 let mut buf = MutableBuffer::new(16);
1167 buf.extend_from_slice(&[1_u8]);
1168 buf.extend_from_slice(&[1_u64]);
1169 assert_eq!(9, buf.len());
1170 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1171 }
1172
1173 #[test]
1174 fn mutable_push_unaligned_u64() {
1175 let mut buf = MutableBuffer::new(16);
1176 buf.push(1_u8);
1177 buf.push(1_u64);
1178 assert_eq!(9, buf.len());
1179 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1180 }
1181
1182 #[test]
1183 fn mutable_push_unchecked_unaligned_u64() {
1184 let mut buf = MutableBuffer::new(16);
1185 unsafe {
1186 buf.push_unchecked(1_u8);
1187 buf.push_unchecked(1_u64);
1188 }
1189 assert_eq!(9, buf.len());
1190 assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1191 }
1192
1193 #[test]
1194 fn test_from_trusted_len_iter() {
1195 let iter = vec![1u32, 2].into_iter();
1196 let buf = unsafe { MutableBuffer::from_trusted_len_iter(iter) };
1197 assert_eq!(8, buf.len());
1198 assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
1199 }
1200
1201 #[test]
1202 fn test_mutable_reserve() {
1203 let mut buf = MutableBuffer::new(1);
1204 assert_eq!(64, buf.capacity());
1205
1206 buf.reserve(10);
1208 assert_eq!(64, buf.capacity());
1209
1210 buf.reserve(80);
1211 assert_eq!(128, buf.capacity());
1212
1213 buf.reserve(129);
1214 assert_eq!(256, buf.capacity());
1215 }
1216
1217 #[test]
1218 fn test_mutable_resize() {
1219 let mut buf = MutableBuffer::new(1);
1220 assert_eq!(64, buf.capacity());
1221 assert_eq!(0, buf.len());
1222
1223 buf.resize(20, 0);
1224 assert_eq!(64, buf.capacity());
1225 assert_eq!(20, buf.len());
1226
1227 buf.resize(10, 0);
1228 assert_eq!(64, buf.capacity());
1229 assert_eq!(10, buf.len());
1230
1231 buf.resize(100, 0);
1232 assert_eq!(128, buf.capacity());
1233 assert_eq!(100, buf.len());
1234
1235 buf.resize(30, 0);
1236 assert_eq!(128, buf.capacity());
1237 assert_eq!(30, buf.len());
1238
1239 buf.resize(0, 0);
1240 assert_eq!(128, buf.capacity());
1241 assert_eq!(0, buf.len());
1242 }
1243
1244 #[test]
1245 fn test_mutable_into() {
1246 let mut buf = MutableBuffer::new(1);
1247 buf.extend_from_slice(b"aaaa bbbb cccc dddd");
1248 assert_eq!(19, buf.len());
1249 assert_eq!(64, buf.capacity());
1250 assert_eq!(b"aaaa bbbb cccc dddd", buf.as_slice());
1251
1252 let immutable_buf: Buffer = buf.into();
1253 assert_eq!(19, immutable_buf.len());
1254 assert_eq!(64, immutable_buf.capacity());
1255 assert_eq!(b"aaaa bbbb cccc dddd", immutable_buf.as_slice());
1256 }
1257
1258 #[test]
1259 fn test_mutable_equal() {
1260 let mut buf = MutableBuffer::new(1);
1261 let mut buf2 = MutableBuffer::new(1);
1262
1263 buf.extend_from_slice(&[0xaa]);
1264 buf2.extend_from_slice(&[0xaa, 0xbb]);
1265 assert!(buf != buf2);
1266
1267 buf.extend_from_slice(&[0xbb]);
1268 assert_eq!(buf, buf2);
1269
1270 buf2.reserve(65);
1271 assert!(buf != buf2);
1272 }
1273
1274 #[test]
1275 fn test_mutable_shrink_to_fit() {
1276 let mut buffer = MutableBuffer::new(128);
1277 assert_eq!(buffer.capacity(), 128);
1278 buffer.push(1);
1279 buffer.push(2);
1280
1281 buffer.shrink_to_fit();
1282 assert!(buffer.capacity() >= 64 && buffer.capacity() < 128);
1283 }
1284
1285 #[test]
1286 fn test_mutable_set_null_bits() {
1287 let mut buffer = MutableBuffer::new(8).with_bitset(8, true);
1288
1289 for i in 0..=buffer.capacity() {
1290 buffer.set_null_bits(i, 0);
1291 assert_eq!(buffer[..8], [255; 8][..]);
1292 }
1293
1294 buffer.set_null_bits(1, 4);
1295 assert_eq!(buffer[..8], [255, 0, 0, 0, 0, 255, 255, 255][..]);
1296 }
1297
1298 #[test]
1299 #[should_panic = "out of bounds for buffer of length"]
1300 fn test_mutable_set_null_bits_oob() {
1301 let mut buffer = MutableBuffer::new(64);
1302 buffer.set_null_bits(1, buffer.capacity());
1303 }
1304
1305 #[test]
1306 #[should_panic = "out of bounds for buffer of length"]
1307 fn test_mutable_set_null_bits_oob_by_overflow() {
1308 let mut buffer = MutableBuffer::new(0);
1309 buffer.set_null_bits(1, usize::MAX);
1310 }
1311
1312 #[test]
1313 fn from_iter() {
1314 let buffer = [1u16, 2, 3, 4].into_iter().collect::<MutableBuffer>();
1315 assert_eq!(buffer.len(), 4 * mem::size_of::<u16>());
1316 assert_eq!(buffer.as_slice(), &[1, 0, 2, 0, 3, 0, 4, 0]);
1317 }
1318
1319 #[test]
1320 #[should_panic(expected = "failed to create layout for MutableBuffer: LayoutError")]
1321 fn test_with_capacity_panics_above_max_capacity() {
1322 let max_capacity = isize::MAX as usize - (isize::MAX as usize % ALIGNMENT);
1323 let _ = MutableBuffer::with_capacity(max_capacity + 1);
1324 }
1325
1326 #[cfg(feature = "pool")]
1327 mod pool_tests {
1328 use super::*;
1329 use crate::pool::{MemoryPool, TrackingMemoryPool};
1330
1331 #[test]
1332 fn test_reallocate_with_pool() {
1333 let pool = TrackingMemoryPool::default();
1334 let mut buffer = MutableBuffer::with_capacity(100);
1335 buffer.claim(&pool);
1336
1337 assert_eq!(buffer.capacity(), 128);
1339 assert_eq!(pool.used(), 128);
1340
1341 buffer.reallocate(200);
1343
1344 assert_eq!(buffer.capacity(), 200);
1346 assert_eq!(pool.used(), 200);
1347
1348 buffer.reallocate(50);
1350
1351 assert_eq!(buffer.capacity(), 50);
1353 assert_eq!(pool.used(), 50);
1354 }
1355
1356 #[test]
1357 fn test_truncate_with_pool() {
1358 let pool = TrackingMemoryPool::default();
1359 let mut buffer = MutableBuffer::with_capacity(100);
1360
1361 buffer.resize(80, 1);
1363 assert_eq!(buffer.len(), 80);
1364
1365 buffer.claim(&pool);
1366 assert_eq!(pool.used(), 128);
1367
1368 buffer.truncate(40);
1370 assert_eq!(buffer.len(), 40);
1371 assert_eq!(pool.used(), 40);
1372
1373 buffer.truncate(0);
1375 assert_eq!(buffer.len(), 0);
1376 assert_eq!(pool.used(), 0);
1377 }
1378
1379 #[test]
1380 fn test_resize_with_pool() {
1381 let pool = TrackingMemoryPool::default();
1382 let mut buffer = MutableBuffer::with_capacity(100);
1383 buffer.claim(&pool);
1384
1385 assert_eq!(buffer.len(), 0);
1387 assert_eq!(pool.used(), 128);
1388
1389 buffer.resize(50, 1);
1391 assert_eq!(buffer.len(), 50);
1392 assert_eq!(pool.used(), 50);
1393
1394 buffer.resize(150, 1);
1396 assert_eq!(buffer.len(), 150);
1397 assert_eq!(buffer.capacity(), 256);
1398 assert_eq!(pool.used(), 150);
1399
1400 buffer.resize(30, 1);
1402 assert_eq!(buffer.len(), 30);
1403 assert_eq!(pool.used(), 30);
1404 }
1405
1406 #[test]
1407 fn test_buffer_lifecycle_with_pool() {
1408 let pool = TrackingMemoryPool::default();
1409
1410 let mut mutable = MutableBuffer::with_capacity(100);
1412 mutable.resize(80, 1);
1413 mutable.claim(&pool);
1414
1415 assert_eq!(pool.used(), 128);
1417
1418 let buffer = mutable.into_buffer();
1420
1421 assert_eq!(pool.used(), 128);
1423
1424 drop(buffer);
1426 assert_eq!(pool.used(), 0);
1427 }
1428 }
1429
1430 fn create_expected_repeated_slice<T: ArrowNativeType>(
1431 slice_to_repeat: &[T],
1432 repeat_count: usize,
1433 ) -> Buffer {
1434 let mut expected = MutableBuffer::new(size_of_val(slice_to_repeat) * repeat_count);
1435 for _ in 0..repeat_count {
1436 expected.extend_from_slice(slice_to_repeat);
1438 }
1439 expected.into()
1440 }
1441
1442 fn test_repeat_count<T: ArrowNativeType + PartialEq + std::fmt::Debug>(
1444 repeat_count: usize,
1445 test_data: &[T],
1446 ) {
1447 let mut buffer = MutableBuffer::new(0);
1448 buffer.repeat_slice_n_times(test_data, repeat_count);
1449
1450 let expected = create_expected_repeated_slice(test_data, repeat_count);
1451 let result: Buffer = buffer.into();
1452
1453 assert_eq!(
1454 result,
1455 expected,
1456 "Failed for repeat_count={}, slice_len={}",
1457 repeat_count,
1458 test_data.len()
1459 );
1460 }
1461
1462 #[test]
1463 fn test_repeat_slice_count_edge_cases() {
1464 test_repeat_count(100, &[] as &[i32]);
1466
1467 test_repeat_count(0, &[1i32, 2, 3]);
1469 }
1470
1471 #[test]
1472 fn test_small_repeats_counts() {
1473 let data = &[1u8, 2, 3, 4, 5];
1475
1476 for _ in 1..=10 {
1477 test_repeat_count(2, data);
1478 }
1479 }
1480
1481 #[test]
1482 fn test_different_size_of_i32_repeat_slice() {
1483 let data: &[i32] = &[1, 2, 3];
1484 let data_with_single_item: &[i32] = &[42];
1485
1486 for data in &[data, data_with_single_item] {
1487 for item in 1..=9 {
1488 let base_repeat_count = 2_usize.pow(item);
1489 test_repeat_count(base_repeat_count - 1, data);
1490 test_repeat_count(base_repeat_count, data);
1491 test_repeat_count(base_repeat_count + 1, data);
1492 }
1493 }
1494 }
1495
1496 #[test]
1497 fn test_different_size_of_u8_repeat_slice() {
1498 let data: &[u8] = &[1, 2, 3];
1499 let data_with_single_item: &[u8] = &[10];
1500
1501 for data in &[data, data_with_single_item] {
1502 for item in 1..=9 {
1503 let base_repeat_count = 2_usize.pow(item);
1504 test_repeat_count(base_repeat_count - 1, data);
1505 test_repeat_count(base_repeat_count, data);
1506 test_repeat_count(base_repeat_count + 1, data);
1507 }
1508 }
1509 }
1510
1511 #[test]
1512 fn test_different_size_of_u16_repeat_slice() {
1513 let data: &[u16] = &[1, 2, 3];
1514 let data_with_single_item: &[u16] = &[10];
1515
1516 for data in &[data, data_with_single_item] {
1517 for item in 1..=9 {
1518 let base_repeat_count = 2_usize.pow(item);
1519 test_repeat_count(base_repeat_count - 1, data);
1520 test_repeat_count(base_repeat_count, data);
1521 test_repeat_count(base_repeat_count + 1, data);
1522 }
1523 }
1524 }
1525
1526 #[test]
1527 fn test_various_slice_lengths() {
1528 let repeat_count = 37; test_repeat_count(repeat_count, &[42i32]);
1533
1534 test_repeat_count(repeat_count, &[1i32, 2]);
1536 test_repeat_count(repeat_count, &[1i32, 2, 3]);
1537 test_repeat_count(repeat_count, &[1i32, 2, 3, 4]);
1538 test_repeat_count(repeat_count, &[1i32, 2, 3, 4, 5]);
1539
1540 let data_10: Vec<i32> = (0..10).collect();
1542 test_repeat_count(repeat_count, &data_10);
1543
1544 let data_100: Vec<i32> = (0..100).collect();
1545 test_repeat_count(repeat_count, &data_100);
1546
1547 let data_1000: Vec<i32> = (0..1000).collect();
1548 test_repeat_count(repeat_count, &data_1000);
1549 }
1550}