Skip to main content

arrow_buffer/buffer/
mutable.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::alloc::{Layout, handle_alloc_error};
19use std::mem;
20use std::ptr::NonNull;
21
22use crate::alloc::{ALIGNMENT, Deallocation};
23use crate::{
24    bytes::Bytes,
25    native::{ArrowNativeType, ToByteSlice},
26    util::bit_util,
27};
28
29#[cfg(feature = "pool")]
30use crate::pool::{MemoryPool, MemoryReservation};
31#[cfg(feature = "pool")]
32use std::sync::Mutex;
33
34use super::Buffer;
35
36/// A [`MutableBuffer`] is a wrapper over memory regions, used to build
37/// [`Buffer`]s out of items or slices of items.
38///
39/// [`Buffer`]s created from [`MutableBuffer`] (via `into`) are guaranteed to be
40/// aligned along cache lines and in multiples of 64 bytes.
41///
42/// Use [MutableBuffer::push] to insert an item, [MutableBuffer::extend_from_slice]
43/// to insert many items, and `into` to convert it to [`Buffer`]. For typed data,
44/// it is often more efficient to use [`Vec`] and convert it to [`Buffer`] rather
45/// than using [`MutableBuffer`] (see examples below).
46///
47/// # See Also
48/// * For a safe, strongly typed API consider using [`Vec`] and [`ScalarBuffer`](crate::ScalarBuffer)
49/// * To apply bitwise operations, see [`apply_bitwise_binary_op`] and [`apply_bitwise_unary_op`]
50///
51/// [`apply_bitwise_binary_op`]: crate::bit_util::apply_bitwise_binary_op
52/// [`apply_bitwise_unary_op`]: crate::bit_util::apply_bitwise_unary_op
53///
54/// # Example: Creating a [`Buffer`] from a [`MutableBuffer`]
55/// ```
56/// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
57/// let mut buffer = MutableBuffer::new(0);
58/// buffer.push(256u32);
59/// buffer.extend_from_slice(&[1u32]);
60/// let buffer = Buffer::from(buffer);
61/// assert_eq!(buffer.as_slice(), &[0u8, 1, 0, 0, 1, 0, 0, 0])
62/// ```
63///
64/// The same can be achieved more efficiently by using a `Vec<u32>`
65/// ```
66/// # use arrow_buffer::buffer::Buffer;
67/// let mut vec = Vec::new();
68/// vec.push(256u32);
69/// vec.extend_from_slice(&[1u32]);
70/// let buffer = Buffer::from(vec);
71/// assert_eq!(buffer.as_slice(), &[0u8, 1, 0, 0, 1, 0, 0, 0]);
72/// ```
73///
74/// # Example: Creating a [`MutableBuffer`] from a `Vec<T>`
75/// ```
76/// # use arrow_buffer::buffer::MutableBuffer;
77/// let vec = vec![1u32, 2, 3];
78/// let mutable_buffer = MutableBuffer::from(vec); // reuses the allocation from vec
79/// assert_eq!(mutable_buffer.len(), 12); // 3 * 4 bytes
80/// ```
81///
82/// # Example: Creating a [`MutableBuffer`] from a [`Buffer`]
83/// ```
84/// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
85/// let buffer: Buffer = Buffer::from(&[1u8, 2, 3, 4][..]);
86/// // Only possible to convert a Buffer into a MutableBuffer if uniquely owned
87/// // (i.e., there are no other references to it).
88/// let mut mutable_buffer = match buffer.into_mutable() {
89///    Ok(mutable) => mutable,
90///    Err(orig_buffer) => {
91///      panic!("buffer was not uniquely owned");
92///    }
93/// };
94/// mutable_buffer.push(5u8);
95/// let buffer = Buffer::from(mutable_buffer);
96/// assert_eq!(buffer.as_slice(), &[1u8, 2, 3, 4, 5])
97/// ```
98#[derive(Debug)]
99pub struct MutableBuffer {
100    // dangling iff capacity = 0
101    data: NonNull<u8>,
102    // invariant: len <= capacity
103    len: usize,
104    layout: Layout,
105
106    /// Memory reservation for tracking memory usage
107    #[cfg(feature = "pool")]
108    reservation: Mutex<Option<Box<dyn MemoryReservation>>>,
109}
110
111impl MutableBuffer {
112    /// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`.
113    ///
114    /// See [`MutableBuffer::with_capacity`].
115    #[inline]
116    pub fn new(capacity: usize) -> Self {
117        Self::with_capacity(capacity)
118    }
119
120    /// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`.
121    ///
122    /// # Panics
123    ///
124    /// If `capacity`, when rounded up to the nearest multiple of [`ALIGNMENT`], is greater
125    /// then `isize::MAX`, then this function will panic.
126    #[inline]
127    pub fn with_capacity(capacity: usize) -> Self {
128        let capacity = bit_util::round_upto_multiple_of_64(capacity);
129        let layout = Layout::from_size_align(capacity, ALIGNMENT)
130            .expect("failed to create layout for MutableBuffer");
131        let data = match layout.size() {
132            0 => dangling_ptr(),
133            _ => {
134                // Safety: Verified size != 0
135                let raw_ptr = unsafe { std::alloc::alloc(layout) };
136                NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
137            }
138        };
139        Self {
140            data,
141            len: 0,
142            layout,
143            #[cfg(feature = "pool")]
144            reservation: std::sync::Mutex::new(None),
145        }
146    }
147
148    /// Allocates a new [MutableBuffer] with `len` and capacity to be at least `len` where
149    /// all bytes are guaranteed to be `0u8`.
150    /// # Example
151    /// ```
152    /// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
153    /// let mut buffer = MutableBuffer::from_len_zeroed(127);
154    /// assert_eq!(buffer.len(), 127);
155    /// assert!(buffer.capacity() >= 127);
156    /// let data = buffer.as_slice_mut();
157    /// assert_eq!(data[126], 0u8);
158    /// ```
159    pub fn from_len_zeroed(len: usize) -> Self {
160        let layout = Layout::from_size_align(len, ALIGNMENT).unwrap();
161        let data = match layout.size() {
162            0 => dangling_ptr(),
163            _ => {
164                // Safety: Verified size != 0
165                let raw_ptr = unsafe { std::alloc::alloc_zeroed(layout) };
166                NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
167            }
168        };
169        Self {
170            data,
171            len,
172            layout,
173            #[cfg(feature = "pool")]
174            reservation: std::sync::Mutex::new(None),
175        }
176    }
177
178    /// Allocates a new [MutableBuffer] from given `Bytes`.
179    pub(crate) fn from_bytes(bytes: Bytes) -> Result<Self, Bytes> {
180        let layout = match bytes.deallocation() {
181            Deallocation::Standard(layout) => *layout,
182            _ => return Err(bytes),
183        };
184
185        let len = bytes.len();
186        let data = bytes.ptr();
187        #[cfg(feature = "pool")]
188        let reservation = bytes.reservation.lock().unwrap().take();
189        mem::forget(bytes);
190
191        Ok(Self {
192            data,
193            len,
194            layout,
195            #[cfg(feature = "pool")]
196            reservation: Mutex::new(reservation),
197        })
198    }
199
200    /// creates a new [MutableBuffer] with capacity and length capable of holding `len` bits.
201    /// This is useful to create a buffer for packed bitmaps.
202    pub fn new_null(len: usize) -> Self {
203        let num_bytes = bit_util::ceil(len, 8);
204        MutableBuffer::from_len_zeroed(num_bytes)
205    }
206
207    /// Set the bits in the range of `[0, end)` to 0 (if `val` is false), or 1 (if `val`
208    /// is true). Also extend the length of this buffer to be `end`.
209    ///
210    /// This is useful when one wants to clear (or set) the bits and then manipulate
211    /// the buffer directly (e.g., modifying the buffer by holding a mutable reference
212    /// from `data_mut()`).
213    pub fn with_bitset(mut self, end: usize, val: bool) -> Self {
214        assert!(end <= self.layout.size());
215        let v = if val { 255 } else { 0 };
216        unsafe {
217            std::ptr::write_bytes(self.data.as_ptr(), v, end);
218            self.len = end;
219        }
220        self
221    }
222
223    /// Ensure that `count` bytes from `start` contain zero bits
224    ///
225    /// This is used to initialize the bits in a buffer, however, it has no impact on the
226    /// `len` of the buffer and so can be used to initialize the memory region from
227    /// `len` to `capacity`.
228    pub fn set_null_bits(&mut self, start: usize, count: usize) {
229        assert!(
230            start.saturating_add(count) <= self.layout.size(),
231            "range start index {start} and count {count} out of bounds for \
232            buffer of length {}",
233            self.layout.size(),
234        );
235
236        // Safety: `self.data[start..][..count]` is in-bounds and well-aligned for `u8`
237        unsafe {
238            std::ptr::write_bytes(self.data.as_ptr().add(start), 0, count);
239        }
240    }
241
242    /// Ensures that this buffer has at least `self.len + additional` bytes. This re-allocates iff
243    /// `self.len + additional > capacity`.
244    /// # Example
245    /// ```
246    /// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
247    /// let mut buffer = MutableBuffer::new(0);
248    /// buffer.reserve(253); // allocates for the first time
249    /// (0..253u8).for_each(|i| buffer.push(i)); // no reallocation
250    /// let buffer: Buffer = buffer.into();
251    /// assert_eq!(buffer.len(), 253);
252    /// ```
253    // For performance reasons, this must be inlined so that the `if` is executed inside the caller, and not as an extra call that just
254    // exits.
255    #[inline(always)]
256    pub fn reserve(&mut self, additional: usize) {
257        let required_cap = self.len + additional;
258        if required_cap > self.layout.size() {
259            let new_capacity = bit_util::round_upto_multiple_of_64(required_cap);
260            let new_capacity = std::cmp::max(new_capacity, self.layout.size() * 2);
261            self.reallocate(new_capacity)
262        }
263    }
264
265    /// Adding to this mutable buffer `slice_to_repeat` repeated `repeat_count` times.
266    ///
267    /// # Example
268    ///
269    /// ## Repeat the same string bytes multiple times
270    /// ```
271    /// # use arrow_buffer::buffer::MutableBuffer;
272    /// let mut buffer = MutableBuffer::new(0);
273    /// let bytes_to_repeat = b"ab";
274    /// buffer.repeat_slice_n_times(bytes_to_repeat, 3);
275    /// assert_eq!(buffer.as_slice(), b"ababab");
276    /// ```
277    pub fn repeat_slice_n_times<T: ArrowNativeType>(
278        &mut self,
279        slice_to_repeat: &[T],
280        repeat_count: usize,
281    ) {
282        if repeat_count == 0 || slice_to_repeat.is_empty() {
283            return;
284        }
285
286        let bytes_to_repeat = size_of_val(slice_to_repeat);
287
288        // Ensure capacity
289        self.reserve(repeat_count * bytes_to_repeat);
290
291        // Save the length before we do all the copies to know where to start from
292        let length_before = self.len;
293
294        // Copy the initial slice once so we can use doubling strategy on it
295        self.extend_from_slice(slice_to_repeat);
296
297        // This tracks how much bytes we have added by repeating so far
298        let added_repeats_length = bytes_to_repeat;
299        assert_eq!(
300            self.len - length_before,
301            added_repeats_length,
302            "should copy exactly the same number of bytes"
303        );
304
305        // Number of times the slice was repeated
306        let mut already_repeated_times = 1;
307
308        // We will use doubling strategy to fill the buffer in log(repeat_count) steps
309        while already_repeated_times < repeat_count {
310            // How many slices can we copy in this iteration
311            // (either double what we have, or just the remaining ones)
312            let number_of_slices_to_copy =
313                already_repeated_times.min(repeat_count - already_repeated_times);
314            let number_of_bytes_to_copy = number_of_slices_to_copy * bytes_to_repeat;
315
316            unsafe {
317                // Get to the start of the data before we started copying anything
318                let src = self.data.as_ptr().add(length_before) as *const u8;
319
320                // Go to the current location to copy to (end of current data)
321                let dst = self.data.as_ptr().add(self.len);
322
323                // SAFETY: the pointers are not overlapping as there is `number_of_bytes_to_copy` or less between them
324                std::ptr::copy_nonoverlapping(src, dst, number_of_bytes_to_copy)
325            }
326
327            // Advance the length by the amount of data we just copied (doubled)
328            self.len += number_of_bytes_to_copy;
329
330            already_repeated_times += number_of_slices_to_copy;
331        }
332    }
333
334    #[cold]
335    fn reallocate(&mut self, capacity: usize) {
336        let new_layout = Layout::from_size_align(capacity, self.layout.align()).unwrap();
337        if new_layout.size() == 0 {
338            if self.layout.size() != 0 {
339                // Safety: data was allocated with layout
340                unsafe { std::alloc::dealloc(self.as_mut_ptr(), self.layout) };
341                self.layout = new_layout
342            }
343            return;
344        }
345
346        let data = match self.layout.size() {
347            // Safety: new_layout is not empty
348            0 => unsafe { std::alloc::alloc(new_layout) },
349            // Safety: verified new layout is valid and not empty
350            _ => unsafe { std::alloc::realloc(self.as_mut_ptr(), self.layout, capacity) },
351        };
352        self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout));
353        self.layout = new_layout;
354        #[cfg(feature = "pool")]
355        {
356            if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
357                reservation.resize(self.layout.size());
358            }
359        }
360    }
361
362    /// Truncates this buffer to `len` bytes
363    ///
364    /// If `len` is greater than the buffer's current length, this has no effect
365    #[inline(always)]
366    pub fn truncate(&mut self, len: usize) {
367        if len > self.len {
368            return;
369        }
370        self.len = len;
371        #[cfg(feature = "pool")]
372        {
373            if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
374                reservation.resize(self.len);
375            }
376        }
377    }
378
379    /// Resizes the buffer, either truncating its contents (with no change in capacity), or
380    /// growing it (potentially reallocating it) and writing `value` in the newly available bytes.
381    /// # Example
382    /// ```
383    /// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
384    /// let mut buffer = MutableBuffer::new(0);
385    /// buffer.resize(253, 2); // allocates for the first time
386    /// assert_eq!(buffer.as_slice()[252], 2u8);
387    /// ```
388    // For performance reasons, this must be inlined so that the `if` is executed inside the caller, and not as an extra call that just
389    // exits.
390    #[inline(always)]
391    pub fn resize(&mut self, new_len: usize, value: u8) {
392        if new_len > self.len {
393            let diff = new_len - self.len;
394            self.reserve(diff);
395            // write the value
396            unsafe { self.data.as_ptr().add(self.len).write_bytes(value, diff) };
397        }
398        // this truncates the buffer when new_len < self.len
399        self.len = new_len;
400        #[cfg(feature = "pool")]
401        {
402            if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
403                reservation.resize(self.len);
404            }
405        }
406    }
407
408    /// Shrinks the capacity of the buffer as much as possible.
409    /// The new capacity will aligned to the nearest 64 bit alignment.
410    ///
411    /// # Example
412    /// ```
413    /// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
414    /// // 2 cache lines
415    /// let mut buffer = MutableBuffer::new(128);
416    /// assert_eq!(buffer.capacity(), 128);
417    /// buffer.push(1);
418    /// buffer.push(2);
419    ///
420    /// buffer.shrink_to_fit();
421    /// assert!(buffer.capacity() >= 64 && buffer.capacity() < 128);
422    /// ```
423    pub fn shrink_to_fit(&mut self) {
424        let new_capacity = bit_util::round_upto_multiple_of_64(self.len);
425        if new_capacity < self.layout.size() {
426            self.reallocate(new_capacity)
427        }
428    }
429
430    /// Returns whether this buffer is empty or not.
431    #[inline]
432    pub const fn is_empty(&self) -> bool {
433        self.len == 0
434    }
435
436    /// Returns the length (the number of bytes written) in this buffer.
437    /// The invariant `buffer.len() <= buffer.capacity()` is always upheld.
438    #[inline]
439    pub const fn len(&self) -> usize {
440        self.len
441    }
442
443    /// Returns the total capacity in this buffer, in bytes.
444    ///
445    /// The invariant `buffer.len() <= buffer.capacity()` is always upheld.
446    #[inline]
447    pub const fn capacity(&self) -> usize {
448        self.layout.size()
449    }
450
451    /// Clear all existing data from this buffer.
452    pub fn clear(&mut self) {
453        self.len = 0
454    }
455
456    /// Returns the data stored in this buffer as a slice.
457    pub fn as_slice(&self) -> &[u8] {
458        self
459    }
460
461    /// Returns the data stored in this buffer as a mutable slice.
462    pub fn as_slice_mut(&mut self) -> &mut [u8] {
463        self
464    }
465
466    /// Returns a raw pointer to this buffer's internal memory
467    /// This pointer is guaranteed to be aligned along cache-lines.
468    #[inline]
469    pub const fn as_ptr(&self) -> *const u8 {
470        self.data.as_ptr()
471    }
472
473    /// Returns a mutable raw pointer to this buffer's internal memory
474    /// This pointer is guaranteed to be aligned along cache-lines.
475    #[inline]
476    pub fn as_mut_ptr(&mut self) -> *mut u8 {
477        self.data.as_ptr()
478    }
479
480    #[inline]
481    pub(super) fn into_buffer(self) -> Buffer {
482        let bytes = unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(self.layout)) };
483        #[cfg(feature = "pool")]
484        {
485            let reservation = self.reservation.lock().unwrap().take();
486            *bytes.reservation.lock().unwrap() = reservation;
487        }
488        std::mem::forget(self);
489        Buffer::from(bytes)
490    }
491
492    /// View this buffer as a mutable slice of a specific type.
493    ///
494    /// # Panics
495    ///
496    /// This function panics if the underlying buffer is not aligned
497    /// correctly for type `T`.
498    pub fn typed_data_mut<T: ArrowNativeType>(&mut self) -> &mut [T] {
499        // SAFETY
500        // ArrowNativeType is trivially transmutable, is sealed to prevent potentially incorrect
501        // implementation outside this crate, and this method checks alignment
502        let (prefix, offsets, suffix) = unsafe { self.as_slice_mut().align_to_mut::<T>() };
503        assert!(prefix.is_empty() && suffix.is_empty());
504        offsets
505    }
506
507    /// View buffer as a immutable slice of a specific type.
508    ///
509    /// # Panics
510    ///
511    /// This function panics if the underlying buffer is not aligned
512    /// correctly for type `T`.
513    pub fn typed_data<T: ArrowNativeType>(&self) -> &[T] {
514        // SAFETY
515        // ArrowNativeType is trivially transmutable, is sealed to prevent potentially incorrect
516        // implementation outside this crate, and this method checks alignment
517        let (prefix, offsets, suffix) = unsafe { self.as_slice().align_to::<T>() };
518        assert!(prefix.is_empty() && suffix.is_empty());
519        offsets
520    }
521
522    /// Extends this buffer from a slice of items that can be represented in bytes, increasing its capacity if needed.
523    /// # Example
524    /// ```
525    /// # use arrow_buffer::buffer::MutableBuffer;
526    /// let mut buffer = MutableBuffer::new(0);
527    /// buffer.extend_from_slice(&[2u32, 0]);
528    /// assert_eq!(buffer.len(), 8) // u32 has 4 bytes
529    /// ```
530    #[inline]
531    pub fn extend_from_slice<T: ArrowNativeType>(&mut self, items: &[T]) {
532        let additional = mem::size_of_val(items);
533        self.reserve(additional);
534        unsafe {
535            // this assumes that `[ToByteSlice]` can be copied directly
536            // without calling `to_byte_slice` for each element,
537            // which is correct for all ArrowNativeType implementations.
538            let src = items.as_ptr() as *const u8;
539            let dst = self.data.as_ptr().add(self.len);
540            std::ptr::copy_nonoverlapping(src, dst, additional)
541        }
542        self.len += additional;
543    }
544
545    /// Extends the buffer with a new item, increasing its capacity if needed.
546    /// # Example
547    /// ```
548    /// # use arrow_buffer::buffer::MutableBuffer;
549    /// let mut buffer = MutableBuffer::new(0);
550    /// buffer.push(256u32);
551    /// assert_eq!(buffer.len(), 4) // u32 has 4 bytes
552    /// ```
553    #[inline]
554    pub fn push<T: ToByteSlice>(&mut self, item: T) {
555        let additional = std::mem::size_of::<T>();
556        self.reserve(additional);
557        unsafe {
558            let src = item.to_byte_slice().as_ptr();
559            let dst = self.data.as_ptr().add(self.len);
560            std::ptr::copy_nonoverlapping(src, dst, additional);
561        }
562        self.len += additional;
563    }
564
565    /// Extends the buffer with a new item, without checking for sufficient capacity
566    /// # Safety
567    /// Caller must ensure that the capacity()-len()>=`size_of<T>`()
568    #[inline]
569    pub unsafe fn push_unchecked<T: ToByteSlice>(&mut self, item: T) {
570        let additional = std::mem::size_of::<T>();
571        let src = item.to_byte_slice().as_ptr();
572        let dst = unsafe { self.data.as_ptr().add(self.len) };
573        unsafe { std::ptr::copy_nonoverlapping(src, dst, additional) };
574        self.len += additional;
575    }
576
577    /// Extends the buffer by `additional` bytes equal to `0u8`, incrementing its capacity if needed.
578    #[inline]
579    pub fn extend_zeros(&mut self, additional: usize) {
580        self.resize(self.len + additional, 0);
581    }
582
583    /// # Safety
584    /// The caller must ensure that the buffer was properly initialized up to `len`.
585    #[inline]
586    pub unsafe fn set_len(&mut self, len: usize) {
587        assert!(len <= self.capacity());
588        self.len = len;
589    }
590
591    /// Invokes `f` with values `0..len` collecting the boolean results into a new `MutableBuffer`
592    ///
593    /// This is similar to `from_trusted_len_iter_bool`, however, can be significantly faster
594    /// as it eliminates the conditional `Iterator::next`
595    #[inline]
596    pub fn collect_bool<F: FnMut(usize) -> bool>(len: usize, mut f: F) -> Self {
597        let mut buffer: Vec<u64> = Vec::with_capacity(bit_util::ceil(len, 64));
598
599        let chunks = len / 64;
600        let remainder = len % 64;
601        buffer.extend((0..chunks).map(|chunk| {
602            let mut packed = 0;
603            for bit_idx in 0..64 {
604                let i = bit_idx + chunk * 64;
605                packed |= (f(i) as u64) << bit_idx;
606            }
607
608            packed
609        }));
610
611        if remainder != 0 {
612            let mut packed = 0;
613            for bit_idx in 0..remainder {
614                let i = bit_idx + chunks * 64;
615                packed |= (f(i) as u64) << bit_idx;
616            }
617
618            buffer.push(packed)
619        }
620
621        let mut buffer: MutableBuffer = buffer.into();
622        buffer.truncate(bit_util::ceil(len, 8));
623        buffer
624    }
625
626    /// Extends this buffer with boolean values.
627    ///
628    /// This requires `iter` to report an exact size via `size_hint`.
629    /// `offset` indicates the starting offset in bits in this buffer to begin writing to
630    /// and must be less than or equal to the current length of this buffer.
631    /// All bits not written to (but readable due to byte alignment) will be zeroed out.
632    /// # Safety
633    /// Callers must ensure that `iter` reports an exact size via `size_hint`.
634    #[inline]
635    pub unsafe fn extend_bool_trusted_len<I: Iterator<Item = bool>>(
636        &mut self,
637        mut iter: I,
638        offset: usize,
639    ) {
640        let (lower, upper) = iter.size_hint();
641        let len = upper.expect("Iterator must have exact size_hint");
642        assert_eq!(lower, len, "Iterator must have exact size_hint");
643        debug_assert!(
644            offset <= self.len * 8,
645            "offset must be <= buffer length in bits"
646        );
647
648        if len == 0 {
649            return;
650        }
651
652        let start_len = offset;
653        let end_bit = start_len + len;
654
655        // SAFETY: we will initialize all newly exposed bytes before they are read
656        let new_len_bytes = bit_util::ceil(end_bit, 8);
657        if new_len_bytes > self.len {
658            self.reserve(new_len_bytes - self.len);
659            // SAFETY: caller will initialize all newly exposed bytes before they are read
660            unsafe { self.set_len(new_len_bytes) };
661        }
662
663        let slice = self.as_slice_mut();
664
665        let mut bit_idx = start_len;
666
667        // ---- Unaligned prefix: advance to the next 64-bit boundary ----
668        let misalignment = bit_idx & 63;
669        let prefix_bits = if misalignment == 0 {
670            0
671        } else {
672            (64 - misalignment).min(end_bit - bit_idx)
673        };
674
675        if prefix_bits != 0 {
676            let byte_start = bit_idx / 8;
677            let byte_end = bit_util::ceil(bit_idx + prefix_bits, 8);
678            let bit_offset = bit_idx % 8;
679
680            // Clear any newly-visible bits in the existing partial byte
681            if bit_offset != 0 {
682                let keep_mask = (1u8 << bit_offset).wrapping_sub(1);
683                slice[byte_start] &= keep_mask;
684            }
685
686            // Zero any new bytes we will partially fill in this prefix
687            let zero_from = if bit_offset == 0 {
688                byte_start
689            } else {
690                byte_start + 1
691            };
692            if byte_end > zero_from {
693                slice[zero_from..byte_end].fill(0);
694            }
695
696            for _ in 0..prefix_bits {
697                let v = iter.next().unwrap();
698                if v {
699                    let byte_idx = bit_idx / 8;
700                    let bit = bit_idx % 8;
701                    slice[byte_idx] |= 1 << bit;
702                }
703                bit_idx += 1;
704            }
705        }
706
707        if bit_idx < end_bit {
708            // ---- Aligned middle: write u64 chunks ----
709            debug_assert_eq!(bit_idx & 63, 0);
710            let remaining_bits = end_bit - bit_idx;
711            let chunks = remaining_bits / 64;
712
713            let words_start = bit_idx / 8;
714            let words_end = words_start + chunks * 8;
715            for dst in slice[words_start..words_end].chunks_exact_mut(8) {
716                let mut packed: u64 = 0;
717                for i in 0..64 {
718                    packed |= (iter.next().unwrap() as u64) << i;
719                }
720                dst.copy_from_slice(&packed.to_le_bytes());
721                bit_idx += 64;
722            }
723
724            // ---- Unaligned suffix: remaining < 64 bits ----
725            let suffix_bits = end_bit - bit_idx;
726            if suffix_bits != 0 {
727                debug_assert_eq!(bit_idx % 8, 0);
728                let byte_start = bit_idx / 8;
729                let byte_end = bit_util::ceil(end_bit, 8);
730                slice[byte_start..byte_end].fill(0);
731
732                for _ in 0..suffix_bits {
733                    let v = iter.next().unwrap();
734                    if v {
735                        let byte_idx = bit_idx / 8;
736                        let bit = bit_idx % 8;
737                        slice[byte_idx] |= 1 << bit;
738                    }
739                    bit_idx += 1;
740                }
741            }
742        }
743
744        // Clear any unused bits in the last byte
745        let remainder = end_bit % 8;
746        if remainder != 0 {
747            let mask = (1u8 << remainder).wrapping_sub(1);
748            slice[bit_util::ceil(end_bit, 8) - 1] &= mask;
749        }
750
751        debug_assert_eq!(bit_idx, end_bit);
752    }
753
754    /// Register this [`MutableBuffer`] with the provided [`MemoryPool`]
755    ///
756    /// This claims the memory used by this buffer in the pool, allowing for
757    /// accurate accounting of memory usage. Any prior reservation will be
758    /// released so this works well when the buffer is being shared among
759    /// multiple arrays.
760    #[cfg(feature = "pool")]
761    pub fn claim(&self, pool: &dyn MemoryPool) {
762        *self.reservation.lock().unwrap() = Some(pool.reserve(self.capacity()));
763    }
764}
765
766/// Creates a non-null pointer with alignment of [`ALIGNMENT`]
767///
768/// This is similar to [`NonNull::dangling`]
769#[inline]
770pub(crate) fn dangling_ptr() -> NonNull<u8> {
771    // SAFETY: ALIGNMENT is a non-zero usize which is then cast
772    // to a *mut u8. Therefore, `ptr` is not null and the conditions for
773    // calling new_unchecked() are respected.
774    #[cfg(miri)]
775    {
776        // Since miri implies a nightly rust version we can use the unstable strict_provenance feature
777        unsafe { NonNull::new_unchecked(std::ptr::without_provenance_mut(ALIGNMENT)) }
778    }
779    #[cfg(not(miri))]
780    {
781        unsafe { NonNull::new_unchecked(ALIGNMENT as *mut u8) }
782    }
783}
784
785impl<A: ArrowNativeType> Extend<A> for MutableBuffer {
786    #[inline]
787    fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
788        let iterator = iter.into_iter();
789        self.extend_from_iter(iterator)
790    }
791}
792
793impl<T: ArrowNativeType> From<Vec<T>> for MutableBuffer {
794    fn from(value: Vec<T>) -> Self {
795        // Safety
796        // Vec::as_ptr guaranteed to not be null and ArrowNativeType are trivially transmutable
797        let data = unsafe { NonNull::new_unchecked(value.as_ptr() as _) };
798        let len = value.len() * mem::size_of::<T>();
799        // Safety
800        // Vec guaranteed to have a valid layout matching that of `Layout::array`
801        // This is based on `RawVec::current_memory`
802        let layout = unsafe { Layout::array::<T>(value.capacity()).unwrap_unchecked() };
803        mem::forget(value);
804        Self {
805            data,
806            len,
807            layout,
808            #[cfg(feature = "pool")]
809            reservation: std::sync::Mutex::new(None),
810        }
811    }
812}
813
814impl MutableBuffer {
815    #[inline]
816    pub(super) fn extend_from_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
817        &mut self,
818        mut iterator: I,
819    ) {
820        let item_size = std::mem::size_of::<T>();
821        let (lower, _) = iterator.size_hint();
822        let additional = lower * item_size;
823        self.reserve(additional);
824
825        // this is necessary because of https://github.com/rust-lang/rust/issues/32155
826        let mut len = SetLenOnDrop::new(&mut self.len);
827        let mut dst = unsafe { self.data.as_ptr().add(len.local_len) };
828        let capacity = self.layout.size();
829
830        while len.local_len + item_size <= capacity {
831            if let Some(item) = iterator.next() {
832                unsafe {
833                    let src = item.to_byte_slice().as_ptr();
834                    std::ptr::copy_nonoverlapping(src, dst, item_size);
835                    dst = dst.add(item_size);
836                }
837                len.local_len += item_size;
838            } else {
839                break;
840            }
841        }
842        drop(len);
843
844        iterator.for_each(|item| self.push(item));
845    }
846
847    /// Creates a [`MutableBuffer`] from an [`Iterator`] with a trusted (upper) length.
848    /// Prefer this to `collect` whenever possible, as it is faster ~60% faster.
849    /// # Example
850    /// ```
851    /// # use arrow_buffer::buffer::MutableBuffer;
852    /// let v = vec![1u32];
853    /// let iter = v.iter().map(|x| x * 2);
854    /// let buffer = unsafe { MutableBuffer::from_trusted_len_iter(iter) };
855    /// assert_eq!(buffer.len(), 4) // u32 has 4 bytes
856    /// ```
857    /// # Safety
858    /// This method assumes that the iterator's size is correct and is undefined behavior
859    /// to use it on an iterator that reports an incorrect length.
860    // This implementation is required for two reasons:
861    // 1. there is no trait `TrustedLen` in stable rust and therefore
862    //    we can't specialize `extend` for `TrustedLen` like `Vec` does.
863    // 2. `from_trusted_len_iter` is faster.
864    #[inline]
865    pub unsafe fn from_trusted_len_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
866        iterator: I,
867    ) -> Self {
868        let item_size = std::mem::size_of::<T>();
869        let (_, upper) = iterator.size_hint();
870        let upper = upper.expect("from_trusted_len_iter requires an upper limit");
871        let len = upper * item_size;
872
873        let mut buffer = MutableBuffer::new(len);
874
875        let mut dst = buffer.data.as_ptr();
876        for item in iterator {
877            // note how there is no reserve here (compared with `extend_from_iter`)
878            let src = item.to_byte_slice().as_ptr();
879            unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
880            dst = unsafe { dst.add(item_size) };
881        }
882        assert_eq!(
883            unsafe { dst.offset_from(buffer.data.as_ptr()) } as usize,
884            len,
885            "Trusted iterator length was not accurately reported"
886        );
887        buffer.len = len;
888        buffer
889    }
890
891    /// Creates a [`MutableBuffer`] from a boolean [`Iterator`] with a trusted (upper) length.
892    /// # use arrow_buffer::buffer::MutableBuffer;
893    /// # Example
894    /// ```
895    /// # use arrow_buffer::buffer::MutableBuffer;
896    /// let v = vec![false, true, false];
897    /// let iter = v.iter().map(|x| *x || true);
898    /// let buffer = unsafe { MutableBuffer::from_trusted_len_iter_bool(iter) };
899    /// assert_eq!(buffer.len(), 1) // 3 booleans have 1 byte
900    /// ```
901    /// # Safety
902    /// This method assumes that the iterator's size is correct and is undefined behavior
903    /// to use it on an iterator that reports an incorrect length.
904    // This implementation is required for two reasons:
905    // 1. there is no trait `TrustedLen` in stable rust and therefore
906    //    we can't specialize `extend` for `TrustedLen` like `Vec` does.
907    // 2. `from_trusted_len_iter_bool` is faster.
908    #[inline]
909    pub unsafe fn from_trusted_len_iter_bool<I: Iterator<Item = bool>>(mut iterator: I) -> Self {
910        let (_, upper) = iterator.size_hint();
911        let len = upper.expect("from_trusted_len_iter requires an upper limit");
912
913        Self::collect_bool(len, |_| iterator.next().unwrap())
914    }
915
916    /// Creates a [`MutableBuffer`] from an [`Iterator`] with a trusted (upper) length or errors
917    /// if any of the items of the iterator is an error.
918    /// Prefer this to `collect` whenever possible, as it is faster ~60% faster.
919    /// # Safety
920    /// This method assumes that the iterator's size is correct and is undefined behavior
921    /// to use it on an iterator that reports an incorrect length.
922    #[inline]
923    pub unsafe fn try_from_trusted_len_iter<
924        E,
925        T: ArrowNativeType,
926        I: Iterator<Item = Result<T, E>>,
927    >(
928        iterator: I,
929    ) -> Result<Self, E> {
930        let item_size = std::mem::size_of::<T>();
931        let (_, upper) = iterator.size_hint();
932        let upper = upper.expect("try_from_trusted_len_iter requires an upper limit");
933        let len = upper * item_size;
934
935        let mut buffer = MutableBuffer::new(len);
936
937        let mut dst = buffer.data.as_ptr();
938        for item in iterator {
939            let item = item?;
940            // note how there is no reserve here (compared with `extend_from_iter`)
941            let src = item.to_byte_slice().as_ptr();
942            unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) };
943            dst = unsafe { dst.add(item_size) };
944        }
945        // try_from_trusted_len_iter is instantiated a lot, so we extract part of it into a less
946        // generic method to reduce compile time
947        unsafe fn finalize_buffer(dst: *mut u8, buffer: &mut MutableBuffer, len: usize) {
948            unsafe {
949                assert_eq!(
950                    dst.offset_from(buffer.data.as_ptr()) as usize,
951                    len,
952                    "Trusted iterator length was not accurately reported"
953                );
954                buffer.len = len;
955            }
956        }
957        unsafe { finalize_buffer(dst, &mut buffer, len) };
958        Ok(buffer)
959    }
960}
961
962impl Default for MutableBuffer {
963    fn default() -> Self {
964        Self::with_capacity(0)
965    }
966}
967
968impl std::ops::Deref for MutableBuffer {
969    type Target = [u8];
970
971    fn deref(&self) -> &[u8] {
972        unsafe { std::slice::from_raw_parts(self.as_ptr(), self.len) }
973    }
974}
975
976impl std::ops::DerefMut for MutableBuffer {
977    fn deref_mut(&mut self) -> &mut [u8] {
978        unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) }
979    }
980}
981
982impl AsRef<[u8]> for &MutableBuffer {
983    fn as_ref(&self) -> &[u8] {
984        self.as_slice()
985    }
986}
987
988impl Drop for MutableBuffer {
989    fn drop(&mut self) {
990        if self.layout.size() != 0 {
991            // Safety: data was allocated with standard allocator with given layout
992            unsafe { std::alloc::dealloc(self.data.as_ptr() as _, self.layout) };
993        }
994    }
995}
996
997impl PartialEq for MutableBuffer {
998    fn eq(&self, other: &MutableBuffer) -> bool {
999        if self.len != other.len {
1000            return false;
1001        }
1002        if self.layout != other.layout {
1003            return false;
1004        }
1005        self.as_slice() == other.as_slice()
1006    }
1007}
1008
1009unsafe impl Sync for MutableBuffer {}
1010unsafe impl Send for MutableBuffer {}
1011
1012struct SetLenOnDrop<'a> {
1013    len: &'a mut usize,
1014    local_len: usize,
1015}
1016
1017impl<'a> SetLenOnDrop<'a> {
1018    #[inline]
1019    fn new(len: &'a mut usize) -> Self {
1020        SetLenOnDrop {
1021            local_len: *len,
1022            len,
1023        }
1024    }
1025}
1026
1027impl Drop for SetLenOnDrop<'_> {
1028    #[inline]
1029    fn drop(&mut self) {
1030        *self.len = self.local_len;
1031    }
1032}
1033
1034/// Creating a `MutableBuffer` instance by setting bits according to the boolean values
1035impl std::iter::FromIterator<bool> for MutableBuffer {
1036    fn from_iter<I>(iter: I) -> Self
1037    where
1038        I: IntoIterator<Item = bool>,
1039    {
1040        let mut iterator = iter.into_iter();
1041        let mut result = {
1042            let byte_capacity: usize = iterator.size_hint().0.saturating_add(7) / 8;
1043            MutableBuffer::new(byte_capacity)
1044        };
1045
1046        loop {
1047            let mut exhausted = false;
1048            let mut byte_accum: u8 = 0;
1049            let mut mask: u8 = 1;
1050
1051            //collect (up to) 8 bits into a byte
1052            while mask != 0 {
1053                if let Some(value) = iterator.next() {
1054                    byte_accum |= match value {
1055                        true => mask,
1056                        false => 0,
1057                    };
1058                    mask <<= 1;
1059                } else {
1060                    exhausted = true;
1061                    break;
1062                }
1063            }
1064
1065            // break if the iterator was exhausted before it provided a bool for this byte
1066            if exhausted && mask == 1 {
1067                break;
1068            }
1069
1070            //ensure we have capacity to write the byte
1071            if result.len() == result.capacity() {
1072                //no capacity for new byte, allocate 1 byte more (plus however many more the iterator advertises)
1073                let additional_byte_capacity = 1usize.saturating_add(
1074                    iterator.size_hint().0.saturating_add(7) / 8, //convert bit count to byte count, rounding up
1075                );
1076                result.reserve(additional_byte_capacity)
1077            }
1078
1079            // Soundness: capacity was allocated above
1080            unsafe { result.push_unchecked(byte_accum) };
1081            if exhausted {
1082                break;
1083            }
1084        }
1085        result
1086    }
1087}
1088
1089impl<T: ArrowNativeType> std::iter::FromIterator<T> for MutableBuffer {
1090    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
1091        let mut buffer = Self::default();
1092        buffer.extend_from_iter(iter.into_iter());
1093        buffer
1094    }
1095}
1096
1097#[cfg(test)]
1098mod tests {
1099    use super::*;
1100
1101    #[test]
1102    fn test_mutable_new() {
1103        let buf = MutableBuffer::new(63);
1104        assert_eq!(64, buf.capacity());
1105        assert_eq!(0, buf.len());
1106        assert!(buf.is_empty());
1107    }
1108
1109    #[test]
1110    fn test_mutable_default() {
1111        let buf = MutableBuffer::default();
1112        assert_eq!(0, buf.capacity());
1113        assert_eq!(0, buf.len());
1114        assert!(buf.is_empty());
1115
1116        let mut buf = MutableBuffer::default();
1117        buf.extend_from_slice(b"hello");
1118        assert_eq!(5, buf.len());
1119        assert_eq!(b"hello", buf.as_slice());
1120    }
1121
1122    #[test]
1123    fn test_mutable_extend_from_slice() {
1124        let mut buf = MutableBuffer::new(100);
1125        buf.extend_from_slice(b"hello");
1126        assert_eq!(5, buf.len());
1127        assert_eq!(b"hello", buf.as_slice());
1128
1129        buf.extend_from_slice(b" world");
1130        assert_eq!(11, buf.len());
1131        assert_eq!(b"hello world", buf.as_slice());
1132
1133        buf.clear();
1134        assert_eq!(0, buf.len());
1135        buf.extend_from_slice(b"hello arrow");
1136        assert_eq!(11, buf.len());
1137        assert_eq!(b"hello arrow", buf.as_slice());
1138    }
1139
1140    #[test]
1141    fn mutable_extend_from_iter() {
1142        let mut buf = MutableBuffer::new(0);
1143        buf.extend(vec![1u32, 2]);
1144        assert_eq!(8, buf.len());
1145        assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
1146
1147        buf.extend(vec![3u32, 4]);
1148        assert_eq!(16, buf.len());
1149        assert_eq!(
1150            &[1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0],
1151            buf.as_slice()
1152        );
1153    }
1154
1155    #[test]
1156    fn mutable_extend_from_iter_unaligned_u64() {
1157        let mut buf = MutableBuffer::new(16);
1158        buf.push(1_u8);
1159        buf.extend([1_u64]);
1160        assert_eq!(9, buf.len());
1161        assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1162    }
1163
1164    #[test]
1165    fn mutable_extend_from_slice_unaligned_u64() {
1166        let mut buf = MutableBuffer::new(16);
1167        buf.extend_from_slice(&[1_u8]);
1168        buf.extend_from_slice(&[1_u64]);
1169        assert_eq!(9, buf.len());
1170        assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1171    }
1172
1173    #[test]
1174    fn mutable_push_unaligned_u64() {
1175        let mut buf = MutableBuffer::new(16);
1176        buf.push(1_u8);
1177        buf.push(1_u64);
1178        assert_eq!(9, buf.len());
1179        assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1180    }
1181
1182    #[test]
1183    fn mutable_push_unchecked_unaligned_u64() {
1184        let mut buf = MutableBuffer::new(16);
1185        unsafe {
1186            buf.push_unchecked(1_u8);
1187            buf.push_unchecked(1_u64);
1188        }
1189        assert_eq!(9, buf.len());
1190        assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
1191    }
1192
1193    #[test]
1194    fn test_from_trusted_len_iter() {
1195        let iter = vec![1u32, 2].into_iter();
1196        let buf = unsafe { MutableBuffer::from_trusted_len_iter(iter) };
1197        assert_eq!(8, buf.len());
1198        assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
1199    }
1200
1201    #[test]
1202    fn test_mutable_reserve() {
1203        let mut buf = MutableBuffer::new(1);
1204        assert_eq!(64, buf.capacity());
1205
1206        // Reserving a smaller capacity should have no effect.
1207        buf.reserve(10);
1208        assert_eq!(64, buf.capacity());
1209
1210        buf.reserve(80);
1211        assert_eq!(128, buf.capacity());
1212
1213        buf.reserve(129);
1214        assert_eq!(256, buf.capacity());
1215    }
1216
1217    #[test]
1218    fn test_mutable_resize() {
1219        let mut buf = MutableBuffer::new(1);
1220        assert_eq!(64, buf.capacity());
1221        assert_eq!(0, buf.len());
1222
1223        buf.resize(20, 0);
1224        assert_eq!(64, buf.capacity());
1225        assert_eq!(20, buf.len());
1226
1227        buf.resize(10, 0);
1228        assert_eq!(64, buf.capacity());
1229        assert_eq!(10, buf.len());
1230
1231        buf.resize(100, 0);
1232        assert_eq!(128, buf.capacity());
1233        assert_eq!(100, buf.len());
1234
1235        buf.resize(30, 0);
1236        assert_eq!(128, buf.capacity());
1237        assert_eq!(30, buf.len());
1238
1239        buf.resize(0, 0);
1240        assert_eq!(128, buf.capacity());
1241        assert_eq!(0, buf.len());
1242    }
1243
1244    #[test]
1245    fn test_mutable_into() {
1246        let mut buf = MutableBuffer::new(1);
1247        buf.extend_from_slice(b"aaaa bbbb cccc dddd");
1248        assert_eq!(19, buf.len());
1249        assert_eq!(64, buf.capacity());
1250        assert_eq!(b"aaaa bbbb cccc dddd", buf.as_slice());
1251
1252        let immutable_buf: Buffer = buf.into();
1253        assert_eq!(19, immutable_buf.len());
1254        assert_eq!(64, immutable_buf.capacity());
1255        assert_eq!(b"aaaa bbbb cccc dddd", immutable_buf.as_slice());
1256    }
1257
1258    #[test]
1259    fn test_mutable_equal() {
1260        let mut buf = MutableBuffer::new(1);
1261        let mut buf2 = MutableBuffer::new(1);
1262
1263        buf.extend_from_slice(&[0xaa]);
1264        buf2.extend_from_slice(&[0xaa, 0xbb]);
1265        assert!(buf != buf2);
1266
1267        buf.extend_from_slice(&[0xbb]);
1268        assert_eq!(buf, buf2);
1269
1270        buf2.reserve(65);
1271        assert!(buf != buf2);
1272    }
1273
1274    #[test]
1275    fn test_mutable_shrink_to_fit() {
1276        let mut buffer = MutableBuffer::new(128);
1277        assert_eq!(buffer.capacity(), 128);
1278        buffer.push(1);
1279        buffer.push(2);
1280
1281        buffer.shrink_to_fit();
1282        assert!(buffer.capacity() >= 64 && buffer.capacity() < 128);
1283    }
1284
1285    #[test]
1286    fn test_mutable_set_null_bits() {
1287        let mut buffer = MutableBuffer::new(8).with_bitset(8, true);
1288
1289        for i in 0..=buffer.capacity() {
1290            buffer.set_null_bits(i, 0);
1291            assert_eq!(buffer[..8], [255; 8][..]);
1292        }
1293
1294        buffer.set_null_bits(1, 4);
1295        assert_eq!(buffer[..8], [255, 0, 0, 0, 0, 255, 255, 255][..]);
1296    }
1297
1298    #[test]
1299    #[should_panic = "out of bounds for buffer of length"]
1300    fn test_mutable_set_null_bits_oob() {
1301        let mut buffer = MutableBuffer::new(64);
1302        buffer.set_null_bits(1, buffer.capacity());
1303    }
1304
1305    #[test]
1306    #[should_panic = "out of bounds for buffer of length"]
1307    fn test_mutable_set_null_bits_oob_by_overflow() {
1308        let mut buffer = MutableBuffer::new(0);
1309        buffer.set_null_bits(1, usize::MAX);
1310    }
1311
1312    #[test]
1313    fn from_iter() {
1314        let buffer = [1u16, 2, 3, 4].into_iter().collect::<MutableBuffer>();
1315        assert_eq!(buffer.len(), 4 * mem::size_of::<u16>());
1316        assert_eq!(buffer.as_slice(), &[1, 0, 2, 0, 3, 0, 4, 0]);
1317    }
1318
1319    #[test]
1320    #[should_panic(expected = "failed to create layout for MutableBuffer: LayoutError")]
1321    fn test_with_capacity_panics_above_max_capacity() {
1322        let max_capacity = isize::MAX as usize - (isize::MAX as usize % ALIGNMENT);
1323        let _ = MutableBuffer::with_capacity(max_capacity + 1);
1324    }
1325
1326    #[cfg(feature = "pool")]
1327    mod pool_tests {
1328        use super::*;
1329        use crate::pool::{MemoryPool, TrackingMemoryPool};
1330
1331        #[test]
1332        fn test_reallocate_with_pool() {
1333            let pool = TrackingMemoryPool::default();
1334            let mut buffer = MutableBuffer::with_capacity(100);
1335            buffer.claim(&pool);
1336
1337            // Initial capacity should be 128 (multiple of 64)
1338            assert_eq!(buffer.capacity(), 128);
1339            assert_eq!(pool.used(), 128);
1340
1341            // Reallocate to a larger size
1342            buffer.reallocate(200);
1343
1344            // The capacity is exactly the requested size, not rounded up
1345            assert_eq!(buffer.capacity(), 200);
1346            assert_eq!(pool.used(), 200);
1347
1348            // Reallocate to a smaller size
1349            buffer.reallocate(50);
1350
1351            // The capacity is exactly the requested size, not rounded up
1352            assert_eq!(buffer.capacity(), 50);
1353            assert_eq!(pool.used(), 50);
1354        }
1355
1356        #[test]
1357        fn test_truncate_with_pool() {
1358            let pool = TrackingMemoryPool::default();
1359            let mut buffer = MutableBuffer::with_capacity(100);
1360
1361            // Fill buffer with some data
1362            buffer.resize(80, 1);
1363            assert_eq!(buffer.len(), 80);
1364
1365            buffer.claim(&pool);
1366            assert_eq!(pool.used(), 128);
1367
1368            // Truncate buffer
1369            buffer.truncate(40);
1370            assert_eq!(buffer.len(), 40);
1371            assert_eq!(pool.used(), 40);
1372
1373            // Truncate to zero
1374            buffer.truncate(0);
1375            assert_eq!(buffer.len(), 0);
1376            assert_eq!(pool.used(), 0);
1377        }
1378
1379        #[test]
1380        fn test_resize_with_pool() {
1381            let pool = TrackingMemoryPool::default();
1382            let mut buffer = MutableBuffer::with_capacity(100);
1383            buffer.claim(&pool);
1384
1385            // Initial state
1386            assert_eq!(buffer.len(), 0);
1387            assert_eq!(pool.used(), 128);
1388
1389            // Resize to increase length
1390            buffer.resize(50, 1);
1391            assert_eq!(buffer.len(), 50);
1392            assert_eq!(pool.used(), 50);
1393
1394            // Resize to increase length beyond capacity
1395            buffer.resize(150, 1);
1396            assert_eq!(buffer.len(), 150);
1397            assert_eq!(buffer.capacity(), 256);
1398            assert_eq!(pool.used(), 150);
1399
1400            // Resize to decrease length
1401            buffer.resize(30, 1);
1402            assert_eq!(buffer.len(), 30);
1403            assert_eq!(pool.used(), 30);
1404        }
1405
1406        #[test]
1407        fn test_buffer_lifecycle_with_pool() {
1408            let pool = TrackingMemoryPool::default();
1409
1410            // Create a buffer with memory reservation
1411            let mut mutable = MutableBuffer::with_capacity(100);
1412            mutable.resize(80, 1);
1413            mutable.claim(&pool);
1414
1415            // Memory reservation is based on capacity when using claim()
1416            assert_eq!(pool.used(), 128);
1417
1418            // Convert to immutable Buffer
1419            let buffer = mutable.into_buffer();
1420
1421            // Memory reservation should be preserved
1422            assert_eq!(pool.used(), 128);
1423
1424            // Drop the buffer and the reservation should be released
1425            drop(buffer);
1426            assert_eq!(pool.used(), 0);
1427        }
1428    }
1429
1430    fn create_expected_repeated_slice<T: ArrowNativeType>(
1431        slice_to_repeat: &[T],
1432        repeat_count: usize,
1433    ) -> Buffer {
1434        let mut expected = MutableBuffer::new(size_of_val(slice_to_repeat) * repeat_count);
1435        for _ in 0..repeat_count {
1436            // Not using push_slice_repeated as this is the function under test
1437            expected.extend_from_slice(slice_to_repeat);
1438        }
1439        expected.into()
1440    }
1441
1442    // Helper to test a specific repeat count with various slice sizes
1443    fn test_repeat_count<T: ArrowNativeType + PartialEq + std::fmt::Debug>(
1444        repeat_count: usize,
1445        test_data: &[T],
1446    ) {
1447        let mut buffer = MutableBuffer::new(0);
1448        buffer.repeat_slice_n_times(test_data, repeat_count);
1449
1450        let expected = create_expected_repeated_slice(test_data, repeat_count);
1451        let result: Buffer = buffer.into();
1452
1453        assert_eq!(
1454            result,
1455            expected,
1456            "Failed for repeat_count={}, slice_len={}",
1457            repeat_count,
1458            test_data.len()
1459        );
1460    }
1461
1462    #[test]
1463    fn test_repeat_slice_count_edge_cases() {
1464        // Empty slice
1465        test_repeat_count(100, &[] as &[i32]);
1466
1467        // Zero repeats
1468        test_repeat_count(0, &[1i32, 2, 3]);
1469    }
1470
1471    #[test]
1472    fn test_small_repeats_counts() {
1473        // test any special implementation for small repeat counts
1474        let data = &[1u8, 2, 3, 4, 5];
1475
1476        for _ in 1..=10 {
1477            test_repeat_count(2, data);
1478        }
1479    }
1480
1481    #[test]
1482    fn test_different_size_of_i32_repeat_slice() {
1483        let data: &[i32] = &[1, 2, 3];
1484        let data_with_single_item: &[i32] = &[42];
1485
1486        for data in &[data, data_with_single_item] {
1487            for item in 1..=9 {
1488                let base_repeat_count = 2_usize.pow(item);
1489                test_repeat_count(base_repeat_count - 1, data);
1490                test_repeat_count(base_repeat_count, data);
1491                test_repeat_count(base_repeat_count + 1, data);
1492            }
1493        }
1494    }
1495
1496    #[test]
1497    fn test_different_size_of_u8_repeat_slice() {
1498        let data: &[u8] = &[1, 2, 3];
1499        let data_with_single_item: &[u8] = &[10];
1500
1501        for data in &[data, data_with_single_item] {
1502            for item in 1..=9 {
1503                let base_repeat_count = 2_usize.pow(item);
1504                test_repeat_count(base_repeat_count - 1, data);
1505                test_repeat_count(base_repeat_count, data);
1506                test_repeat_count(base_repeat_count + 1, data);
1507            }
1508        }
1509    }
1510
1511    #[test]
1512    fn test_different_size_of_u16_repeat_slice() {
1513        let data: &[u16] = &[1, 2, 3];
1514        let data_with_single_item: &[u16] = &[10];
1515
1516        for data in &[data, data_with_single_item] {
1517            for item in 1..=9 {
1518                let base_repeat_count = 2_usize.pow(item);
1519                test_repeat_count(base_repeat_count - 1, data);
1520                test_repeat_count(base_repeat_count, data);
1521                test_repeat_count(base_repeat_count + 1, data);
1522            }
1523        }
1524    }
1525
1526    #[test]
1527    fn test_various_slice_lengths() {
1528        // Test different slice lengths with same repeat pattern
1529        let repeat_count = 37; // Arbitrary non-power-of-2
1530
1531        // Single element
1532        test_repeat_count(repeat_count, &[42i32]);
1533
1534        // Small slices
1535        test_repeat_count(repeat_count, &[1i32, 2]);
1536        test_repeat_count(repeat_count, &[1i32, 2, 3]);
1537        test_repeat_count(repeat_count, &[1i32, 2, 3, 4]);
1538        test_repeat_count(repeat_count, &[1i32, 2, 3, 4, 5]);
1539
1540        // Larger slices
1541        let data_10: Vec<i32> = (0..10).collect();
1542        test_repeat_count(repeat_count, &data_10);
1543
1544        let data_100: Vec<i32> = (0..100).collect();
1545        test_repeat_count(repeat_count, &data_100);
1546
1547        let data_1000: Vec<i32> = (0..1000).collect();
1548        test_repeat_count(repeat_count, &data_1000);
1549    }
1550}