arrow_buffer/buffer/
mutable.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::alloc::{handle_alloc_error, Layout};
19use std::mem;
20use std::ptr::NonNull;
21
22use crate::alloc::{Deallocation, ALIGNMENT};
23use crate::{
24    bytes::Bytes,
25    native::{ArrowNativeType, ToByteSlice},
26    util::bit_util,
27};
28
29#[cfg(feature = "pool")]
30use crate::pool::{MemoryPool, MemoryReservation};
31#[cfg(feature = "pool")]
32use std::sync::Mutex;
33
34use super::Buffer;
35
36/// A [`MutableBuffer`] is Arrow's interface to build a [`Buffer`] out of items or slices of items.
37///
38/// [`Buffer`]s created from [`MutableBuffer`] (via `into`) are guaranteed to have its pointer aligned
39/// along cache lines and in multiple of 64 bytes.
40///
41/// Use [MutableBuffer::push] to insert an item, [MutableBuffer::extend_from_slice]
42/// to insert many items, and `into` to convert it to [`Buffer`].
43///
44/// For a safe, strongly typed API consider using [`Vec`] and [`ScalarBuffer`](crate::ScalarBuffer)
45///
46/// Note: this may be deprecated in a future release ([#1176](https://github.com/apache/arrow-rs/issues/1176))
47///
48/// # Example
49///
50/// ```
51/// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
52/// let mut buffer = MutableBuffer::new(0);
53/// buffer.push(256u32);
54/// buffer.extend_from_slice(&[1u32]);
55/// let buffer: Buffer = buffer.into();
56/// assert_eq!(buffer.as_slice(), &[0u8, 1, 0, 0, 1, 0, 0, 0])
57/// ```
58#[derive(Debug)]
59pub struct MutableBuffer {
60    // dangling iff capacity = 0
61    data: NonNull<u8>,
62    // invariant: len <= capacity
63    len: usize,
64    layout: Layout,
65
66    /// Memory reservation for tracking memory usage
67    #[cfg(feature = "pool")]
68    reservation: Mutex<Option<Box<dyn MemoryReservation>>>,
69}
70
71impl MutableBuffer {
72    /// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`.
73    ///
74    /// See [`MutableBuffer::with_capacity`].
75    #[inline]
76    pub fn new(capacity: usize) -> Self {
77        Self::with_capacity(capacity)
78    }
79
80    /// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`.
81    ///
82    /// # Panics
83    ///
84    /// If `capacity`, when rounded up to the nearest multiple of [`ALIGNMENT`], is greater
85    /// then `isize::MAX`, then this function will panic.
86    #[inline]
87    pub fn with_capacity(capacity: usize) -> Self {
88        let capacity = bit_util::round_upto_multiple_of_64(capacity);
89        let layout = Layout::from_size_align(capacity, ALIGNMENT)
90            .expect("failed to create layout for MutableBuffer");
91        let data = match layout.size() {
92            0 => dangling_ptr(),
93            _ => {
94                // Safety: Verified size != 0
95                let raw_ptr = unsafe { std::alloc::alloc(layout) };
96                NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
97            }
98        };
99        Self {
100            data,
101            len: 0,
102            layout,
103            #[cfg(feature = "pool")]
104            reservation: std::sync::Mutex::new(None),
105        }
106    }
107
108    /// Allocates a new [MutableBuffer] with `len` and capacity to be at least `len` where
109    /// all bytes are guaranteed to be `0u8`.
110    /// # Example
111    /// ```
112    /// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
113    /// let mut buffer = MutableBuffer::from_len_zeroed(127);
114    /// assert_eq!(buffer.len(), 127);
115    /// assert!(buffer.capacity() >= 127);
116    /// let data = buffer.as_slice_mut();
117    /// assert_eq!(data[126], 0u8);
118    /// ```
119    pub fn from_len_zeroed(len: usize) -> Self {
120        let layout = Layout::from_size_align(len, ALIGNMENT).unwrap();
121        let data = match layout.size() {
122            0 => dangling_ptr(),
123            _ => {
124                // Safety: Verified size != 0
125                let raw_ptr = unsafe { std::alloc::alloc_zeroed(layout) };
126                NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
127            }
128        };
129        Self {
130            data,
131            len,
132            layout,
133            #[cfg(feature = "pool")]
134            reservation: std::sync::Mutex::new(None),
135        }
136    }
137
138    /// Allocates a new [MutableBuffer] from given `Bytes`.
139    pub(crate) fn from_bytes(bytes: Bytes) -> Result<Self, Bytes> {
140        let layout = match bytes.deallocation() {
141            Deallocation::Standard(layout) => *layout,
142            _ => return Err(bytes),
143        };
144
145        let len = bytes.len();
146        let data = bytes.ptr();
147        #[cfg(feature = "pool")]
148        let reservation = bytes.reservation.lock().unwrap().take();
149        mem::forget(bytes);
150
151        Ok(Self {
152            data,
153            len,
154            layout,
155            #[cfg(feature = "pool")]
156            reservation: Mutex::new(reservation),
157        })
158    }
159
160    /// creates a new [MutableBuffer] with capacity and length capable of holding `len` bits.
161    /// This is useful to create a buffer for packed bitmaps.
162    pub fn new_null(len: usize) -> Self {
163        let num_bytes = bit_util::ceil(len, 8);
164        MutableBuffer::from_len_zeroed(num_bytes)
165    }
166
167    /// Set the bits in the range of `[0, end)` to 0 (if `val` is false), or 1 (if `val`
168    /// is true). Also extend the length of this buffer to be `end`.
169    ///
170    /// This is useful when one wants to clear (or set) the bits and then manipulate
171    /// the buffer directly (e.g., modifying the buffer by holding a mutable reference
172    /// from `data_mut()`).
173    pub fn with_bitset(mut self, end: usize, val: bool) -> Self {
174        assert!(end <= self.layout.size());
175        let v = if val { 255 } else { 0 };
176        unsafe {
177            std::ptr::write_bytes(self.data.as_ptr(), v, end);
178            self.len = end;
179        }
180        self
181    }
182
183    /// Ensure that `count` bytes from `start` contain zero bits
184    ///
185    /// This is used to initialize the bits in a buffer, however, it has no impact on the
186    /// `len` of the buffer and so can be used to initialize the memory region from
187    /// `len` to `capacity`.
188    pub fn set_null_bits(&mut self, start: usize, count: usize) {
189        assert!(
190            start.saturating_add(count) <= self.layout.size(),
191            "range start index {start} and count {count} out of bounds for \
192            buffer of length {}",
193            self.layout.size(),
194        );
195
196        // Safety: `self.data[start..][..count]` is in-bounds and well-aligned for `u8`
197        unsafe {
198            std::ptr::write_bytes(self.data.as_ptr().add(start), 0, count);
199        }
200    }
201
202    /// Ensures that this buffer has at least `self.len + additional` bytes. This re-allocates iff
203    /// `self.len + additional > capacity`.
204    /// # Example
205    /// ```
206    /// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
207    /// let mut buffer = MutableBuffer::new(0);
208    /// buffer.reserve(253); // allocates for the first time
209    /// (0..253u8).for_each(|i| buffer.push(i)); // no reallocation
210    /// let buffer: Buffer = buffer.into();
211    /// assert_eq!(buffer.len(), 253);
212    /// ```
213    // For performance reasons, this must be inlined so that the `if` is executed inside the caller, and not as an extra call that just
214    // exits.
215    #[inline(always)]
216    pub fn reserve(&mut self, additional: usize) {
217        let required_cap = self.len + additional;
218        if required_cap > self.layout.size() {
219            let new_capacity = bit_util::round_upto_multiple_of_64(required_cap);
220            let new_capacity = std::cmp::max(new_capacity, self.layout.size() * 2);
221            self.reallocate(new_capacity)
222        }
223    }
224
225    #[cold]
226    fn reallocate(&mut self, capacity: usize) {
227        let new_layout = Layout::from_size_align(capacity, self.layout.align()).unwrap();
228        if new_layout.size() == 0 {
229            if self.layout.size() != 0 {
230                // Safety: data was allocated with layout
231                unsafe { std::alloc::dealloc(self.as_mut_ptr(), self.layout) };
232                self.layout = new_layout
233            }
234            return;
235        }
236
237        let data = match self.layout.size() {
238            // Safety: new_layout is not empty
239            0 => unsafe { std::alloc::alloc(new_layout) },
240            // Safety: verified new layout is valid and not empty
241            _ => unsafe { std::alloc::realloc(self.as_mut_ptr(), self.layout, capacity) },
242        };
243        self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout));
244        self.layout = new_layout;
245        #[cfg(feature = "pool")]
246        {
247            if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
248                reservation.resize(self.layout.size());
249            }
250        }
251    }
252
253    /// Truncates this buffer to `len` bytes
254    ///
255    /// If `len` is greater than the buffer's current length, this has no effect
256    #[inline(always)]
257    pub fn truncate(&mut self, len: usize) {
258        if len > self.len {
259            return;
260        }
261        self.len = len;
262        #[cfg(feature = "pool")]
263        {
264            if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
265                reservation.resize(self.len);
266            }
267        }
268    }
269
270    /// Resizes the buffer, either truncating its contents (with no change in capacity), or
271    /// growing it (potentially reallocating it) and writing `value` in the newly available bytes.
272    /// # Example
273    /// ```
274    /// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
275    /// let mut buffer = MutableBuffer::new(0);
276    /// buffer.resize(253, 2); // allocates for the first time
277    /// assert_eq!(buffer.as_slice()[252], 2u8);
278    /// ```
279    // For performance reasons, this must be inlined so that the `if` is executed inside the caller, and not as an extra call that just
280    // exits.
281    #[inline(always)]
282    pub fn resize(&mut self, new_len: usize, value: u8) {
283        if new_len > self.len {
284            let diff = new_len - self.len;
285            self.reserve(diff);
286            // write the value
287            unsafe { self.data.as_ptr().add(self.len).write_bytes(value, diff) };
288        }
289        // this truncates the buffer when new_len < self.len
290        self.len = new_len;
291        #[cfg(feature = "pool")]
292        {
293            if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
294                reservation.resize(self.len);
295            }
296        }
297    }
298
299    /// Shrinks the capacity of the buffer as much as possible.
300    /// The new capacity will aligned to the nearest 64 bit alignment.
301    ///
302    /// # Example
303    /// ```
304    /// # use arrow_buffer::buffer::{Buffer, MutableBuffer};
305    /// // 2 cache lines
306    /// let mut buffer = MutableBuffer::new(128);
307    /// assert_eq!(buffer.capacity(), 128);
308    /// buffer.push(1);
309    /// buffer.push(2);
310    ///
311    /// buffer.shrink_to_fit();
312    /// assert!(buffer.capacity() >= 64 && buffer.capacity() < 128);
313    /// ```
314    pub fn shrink_to_fit(&mut self) {
315        let new_capacity = bit_util::round_upto_multiple_of_64(self.len);
316        if new_capacity < self.layout.size() {
317            self.reallocate(new_capacity)
318        }
319    }
320
321    /// Returns whether this buffer is empty or not.
322    #[inline]
323    pub const fn is_empty(&self) -> bool {
324        self.len == 0
325    }
326
327    /// Returns the length (the number of bytes written) in this buffer.
328    /// The invariant `buffer.len() <= buffer.capacity()` is always upheld.
329    #[inline]
330    pub const fn len(&self) -> usize {
331        self.len
332    }
333
334    /// Returns the total capacity in this buffer, in bytes.
335    ///
336    /// The invariant `buffer.len() <= buffer.capacity()` is always upheld.
337    #[inline]
338    pub const fn capacity(&self) -> usize {
339        self.layout.size()
340    }
341
342    /// Clear all existing data from this buffer.
343    pub fn clear(&mut self) {
344        self.len = 0
345    }
346
347    /// Returns the data stored in this buffer as a slice.
348    pub fn as_slice(&self) -> &[u8] {
349        self
350    }
351
352    /// Returns the data stored in this buffer as a mutable slice.
353    pub fn as_slice_mut(&mut self) -> &mut [u8] {
354        self
355    }
356
357    /// Returns a raw pointer to this buffer's internal memory
358    /// This pointer is guaranteed to be aligned along cache-lines.
359    #[inline]
360    pub const fn as_ptr(&self) -> *const u8 {
361        self.data.as_ptr()
362    }
363
364    /// Returns a mutable raw pointer to this buffer's internal memory
365    /// This pointer is guaranteed to be aligned along cache-lines.
366    #[inline]
367    pub fn as_mut_ptr(&mut self) -> *mut u8 {
368        self.data.as_ptr()
369    }
370
371    #[inline]
372    pub(super) fn into_buffer(self) -> Buffer {
373        let bytes = unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(self.layout)) };
374        #[cfg(feature = "pool")]
375        {
376            let reservation = self.reservation.lock().unwrap().take();
377            *bytes.reservation.lock().unwrap() = reservation;
378        }
379        std::mem::forget(self);
380        Buffer::from(bytes)
381    }
382
383    /// View this buffer as a mutable slice of a specific type.
384    ///
385    /// # Panics
386    ///
387    /// This function panics if the underlying buffer is not aligned
388    /// correctly for type `T`.
389    pub fn typed_data_mut<T: ArrowNativeType>(&mut self) -> &mut [T] {
390        // SAFETY
391        // ArrowNativeType is trivially transmutable, is sealed to prevent potentially incorrect
392        // implementation outside this crate, and this method checks alignment
393        let (prefix, offsets, suffix) = unsafe { self.as_slice_mut().align_to_mut::<T>() };
394        assert!(prefix.is_empty() && suffix.is_empty());
395        offsets
396    }
397
398    /// View buffer as a immutable slice of a specific type.
399    ///
400    /// # Panics
401    ///
402    /// This function panics if the underlying buffer is not aligned
403    /// correctly for type `T`.
404    pub fn typed_data<T: ArrowNativeType>(&self) -> &[T] {
405        // SAFETY
406        // ArrowNativeType is trivially transmutable, is sealed to prevent potentially incorrect
407        // implementation outside this crate, and this method checks alignment
408        let (prefix, offsets, suffix) = unsafe { self.as_slice().align_to::<T>() };
409        assert!(prefix.is_empty() && suffix.is_empty());
410        offsets
411    }
412
413    /// Extends this buffer from a slice of items that can be represented in bytes, increasing its capacity if needed.
414    /// # Example
415    /// ```
416    /// # use arrow_buffer::buffer::MutableBuffer;
417    /// let mut buffer = MutableBuffer::new(0);
418    /// buffer.extend_from_slice(&[2u32, 0]);
419    /// assert_eq!(buffer.len(), 8) // u32 has 4 bytes
420    /// ```
421    #[inline]
422    pub fn extend_from_slice<T: ArrowNativeType>(&mut self, items: &[T]) {
423        let additional = mem::size_of_val(items);
424        self.reserve(additional);
425        unsafe {
426            // this assumes that `[ToByteSlice]` can be copied directly
427            // without calling `to_byte_slice` for each element,
428            // which is correct for all ArrowNativeType implementations.
429            let src = items.as_ptr() as *const u8;
430            let dst = self.data.as_ptr().add(self.len);
431            std::ptr::copy_nonoverlapping(src, dst, additional)
432        }
433        self.len += additional;
434    }
435
436    /// Extends the buffer with a new item, increasing its capacity if needed.
437    /// # Example
438    /// ```
439    /// # use arrow_buffer::buffer::MutableBuffer;
440    /// let mut buffer = MutableBuffer::new(0);
441    /// buffer.push(256u32);
442    /// assert_eq!(buffer.len(), 4) // u32 has 4 bytes
443    /// ```
444    #[inline]
445    pub fn push<T: ToByteSlice>(&mut self, item: T) {
446        let additional = std::mem::size_of::<T>();
447        self.reserve(additional);
448        unsafe {
449            let src = item.to_byte_slice().as_ptr();
450            let dst = self.data.as_ptr().add(self.len);
451            std::ptr::copy_nonoverlapping(src, dst, additional);
452        }
453        self.len += additional;
454    }
455
456    /// Extends the buffer with a new item, without checking for sufficient capacity
457    /// # Safety
458    /// Caller must ensure that the capacity()-len()>=`size_of<T>`()
459    #[inline]
460    pub unsafe fn push_unchecked<T: ToByteSlice>(&mut self, item: T) {
461        let additional = std::mem::size_of::<T>();
462        let src = item.to_byte_slice().as_ptr();
463        let dst = self.data.as_ptr().add(self.len);
464        std::ptr::copy_nonoverlapping(src, dst, additional);
465        self.len += additional;
466    }
467
468    /// Extends the buffer by `additional` bytes equal to `0u8`, incrementing its capacity if needed.
469    #[inline]
470    pub fn extend_zeros(&mut self, additional: usize) {
471        self.resize(self.len + additional, 0);
472    }
473
474    /// # Safety
475    /// The caller must ensure that the buffer was properly initialized up to `len`.
476    #[inline]
477    pub unsafe fn set_len(&mut self, len: usize) {
478        assert!(len <= self.capacity());
479        self.len = len;
480    }
481
482    /// Invokes `f` with values `0..len` collecting the boolean results into a new `MutableBuffer`
483    ///
484    /// This is similar to `from_trusted_len_iter_bool`, however, can be significantly faster
485    /// as it eliminates the conditional `Iterator::next`
486    #[inline]
487    pub fn collect_bool<F: FnMut(usize) -> bool>(len: usize, mut f: F) -> Self {
488        let mut buffer = Self::new(bit_util::ceil(len, 64) * 8);
489
490        let chunks = len / 64;
491        let remainder = len % 64;
492        for chunk in 0..chunks {
493            let mut packed = 0;
494            for bit_idx in 0..64 {
495                let i = bit_idx + chunk * 64;
496                packed |= (f(i) as u64) << bit_idx;
497            }
498
499            // SAFETY: Already allocated sufficient capacity
500            unsafe { buffer.push_unchecked(packed) }
501        }
502
503        if remainder != 0 {
504            let mut packed = 0;
505            for bit_idx in 0..remainder {
506                let i = bit_idx + chunks * 64;
507                packed |= (f(i) as u64) << bit_idx;
508            }
509
510            // SAFETY: Already allocated sufficient capacity
511            unsafe { buffer.push_unchecked(packed) }
512        }
513
514        buffer.truncate(bit_util::ceil(len, 8));
515        buffer
516    }
517
518    /// Register this [`MutableBuffer`] with the provided [`MemoryPool`]
519    ///
520    /// This claims the memory used by this buffer in the pool, allowing for
521    /// accurate accounting of memory usage. Any prior reservation will be
522    /// released so this works well when the buffer is being shared among
523    /// multiple arrays.
524    #[cfg(feature = "pool")]
525    pub fn claim(&self, pool: &dyn MemoryPool) {
526        *self.reservation.lock().unwrap() = Some(pool.reserve(self.capacity()));
527    }
528}
529
530/// Creates a non-null pointer with alignment of [`ALIGNMENT`]
531///
532/// This is similar to [`NonNull::dangling`]
533#[inline]
534pub(crate) fn dangling_ptr() -> NonNull<u8> {
535    // SAFETY: ALIGNMENT is a non-zero usize which is then cast
536    // to a *mut u8. Therefore, `ptr` is not null and the conditions for
537    // calling new_unchecked() are respected.
538    #[cfg(miri)]
539    {
540        // Since miri implies a nightly rust version we can use the unstable strict_provenance feature
541        unsafe { NonNull::new_unchecked(std::ptr::without_provenance_mut(ALIGNMENT)) }
542    }
543    #[cfg(not(miri))]
544    {
545        unsafe { NonNull::new_unchecked(ALIGNMENT as *mut u8) }
546    }
547}
548
549impl<A: ArrowNativeType> Extend<A> for MutableBuffer {
550    #[inline]
551    fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
552        let iterator = iter.into_iter();
553        self.extend_from_iter(iterator)
554    }
555}
556
557impl<T: ArrowNativeType> From<Vec<T>> for MutableBuffer {
558    fn from(value: Vec<T>) -> Self {
559        // Safety
560        // Vec::as_ptr guaranteed to not be null and ArrowNativeType are trivially transmutable
561        let data = unsafe { NonNull::new_unchecked(value.as_ptr() as _) };
562        let len = value.len() * mem::size_of::<T>();
563        // Safety
564        // Vec guaranteed to have a valid layout matching that of `Layout::array`
565        // This is based on `RawVec::current_memory`
566        let layout = unsafe { Layout::array::<T>(value.capacity()).unwrap_unchecked() };
567        mem::forget(value);
568        Self {
569            data,
570            len,
571            layout,
572            #[cfg(feature = "pool")]
573            reservation: std::sync::Mutex::new(None),
574        }
575    }
576}
577
578impl MutableBuffer {
579    #[inline]
580    pub(super) fn extend_from_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
581        &mut self,
582        mut iterator: I,
583    ) {
584        let item_size = std::mem::size_of::<T>();
585        let (lower, _) = iterator.size_hint();
586        let additional = lower * item_size;
587        self.reserve(additional);
588
589        // this is necessary because of https://github.com/rust-lang/rust/issues/32155
590        let mut len = SetLenOnDrop::new(&mut self.len);
591        let mut dst = unsafe { self.data.as_ptr().add(len.local_len) };
592        let capacity = self.layout.size();
593
594        while len.local_len + item_size <= capacity {
595            if let Some(item) = iterator.next() {
596                unsafe {
597                    let src = item.to_byte_slice().as_ptr();
598                    std::ptr::copy_nonoverlapping(src, dst, item_size);
599                    dst = dst.add(item_size);
600                }
601                len.local_len += item_size;
602            } else {
603                break;
604            }
605        }
606        drop(len);
607
608        iterator.for_each(|item| self.push(item));
609    }
610
611    /// Creates a [`MutableBuffer`] from an [`Iterator`] with a trusted (upper) length.
612    /// Prefer this to `collect` whenever possible, as it is faster ~60% faster.
613    /// # Example
614    /// ```
615    /// # use arrow_buffer::buffer::MutableBuffer;
616    /// let v = vec![1u32];
617    /// let iter = v.iter().map(|x| x * 2);
618    /// let buffer = unsafe { MutableBuffer::from_trusted_len_iter(iter) };
619    /// assert_eq!(buffer.len(), 4) // u32 has 4 bytes
620    /// ```
621    /// # Safety
622    /// This method assumes that the iterator's size is correct and is undefined behavior
623    /// to use it on an iterator that reports an incorrect length.
624    // This implementation is required for two reasons:
625    // 1. there is no trait `TrustedLen` in stable rust and therefore
626    //    we can't specialize `extend` for `TrustedLen` like `Vec` does.
627    // 2. `from_trusted_len_iter` is faster.
628    #[inline]
629    pub unsafe fn from_trusted_len_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
630        iterator: I,
631    ) -> Self {
632        let item_size = std::mem::size_of::<T>();
633        let (_, upper) = iterator.size_hint();
634        let upper = upper.expect("from_trusted_len_iter requires an upper limit");
635        let len = upper * item_size;
636
637        let mut buffer = MutableBuffer::new(len);
638
639        let mut dst = buffer.data.as_ptr();
640        for item in iterator {
641            // note how there is no reserve here (compared with `extend_from_iter`)
642            let src = item.to_byte_slice().as_ptr();
643            std::ptr::copy_nonoverlapping(src, dst, item_size);
644            dst = dst.add(item_size);
645        }
646        assert_eq!(
647            dst.offset_from(buffer.data.as_ptr()) as usize,
648            len,
649            "Trusted iterator length was not accurately reported"
650        );
651        buffer.len = len;
652        buffer
653    }
654
655    /// Creates a [`MutableBuffer`] from a boolean [`Iterator`] with a trusted (upper) length.
656    /// # use arrow_buffer::buffer::MutableBuffer;
657    /// # Example
658    /// ```
659    /// # use arrow_buffer::buffer::MutableBuffer;
660    /// let v = vec![false, true, false];
661    /// let iter = v.iter().map(|x| *x || true);
662    /// let buffer = unsafe { MutableBuffer::from_trusted_len_iter_bool(iter) };
663    /// assert_eq!(buffer.len(), 1) // 3 booleans have 1 byte
664    /// ```
665    /// # Safety
666    /// This method assumes that the iterator's size is correct and is undefined behavior
667    /// to use it on an iterator that reports an incorrect length.
668    // This implementation is required for two reasons:
669    // 1. there is no trait `TrustedLen` in stable rust and therefore
670    //    we can't specialize `extend` for `TrustedLen` like `Vec` does.
671    // 2. `from_trusted_len_iter_bool` is faster.
672    #[inline]
673    pub unsafe fn from_trusted_len_iter_bool<I: Iterator<Item = bool>>(mut iterator: I) -> Self {
674        let (_, upper) = iterator.size_hint();
675        let len = upper.expect("from_trusted_len_iter requires an upper limit");
676
677        Self::collect_bool(len, |_| iterator.next().unwrap())
678    }
679
680    /// Creates a [`MutableBuffer`] from an [`Iterator`] with a trusted (upper) length or errors
681    /// if any of the items of the iterator is an error.
682    /// Prefer this to `collect` whenever possible, as it is faster ~60% faster.
683    /// # Safety
684    /// This method assumes that the iterator's size is correct and is undefined behavior
685    /// to use it on an iterator that reports an incorrect length.
686    #[inline]
687    pub unsafe fn try_from_trusted_len_iter<
688        E,
689        T: ArrowNativeType,
690        I: Iterator<Item = Result<T, E>>,
691    >(
692        iterator: I,
693    ) -> Result<Self, E> {
694        let item_size = std::mem::size_of::<T>();
695        let (_, upper) = iterator.size_hint();
696        let upper = upper.expect("try_from_trusted_len_iter requires an upper limit");
697        let len = upper * item_size;
698
699        let mut buffer = MutableBuffer::new(len);
700
701        let mut dst = buffer.data.as_ptr();
702        for item in iterator {
703            let item = item?;
704            // note how there is no reserve here (compared with `extend_from_iter`)
705            let src = item.to_byte_slice().as_ptr();
706            std::ptr::copy_nonoverlapping(src, dst, item_size);
707            dst = dst.add(item_size);
708        }
709        // try_from_trusted_len_iter is instantiated a lot, so we extract part of it into a less
710        // generic method to reduce compile time
711        unsafe fn finalize_buffer(dst: *mut u8, buffer: &mut MutableBuffer, len: usize) {
712            assert_eq!(
713                dst.offset_from(buffer.data.as_ptr()) as usize,
714                len,
715                "Trusted iterator length was not accurately reported"
716            );
717            buffer.len = len;
718        }
719        finalize_buffer(dst, &mut buffer, len);
720        Ok(buffer)
721    }
722}
723
724impl Default for MutableBuffer {
725    fn default() -> Self {
726        Self::with_capacity(0)
727    }
728}
729
730impl std::ops::Deref for MutableBuffer {
731    type Target = [u8];
732
733    fn deref(&self) -> &[u8] {
734        unsafe { std::slice::from_raw_parts(self.as_ptr(), self.len) }
735    }
736}
737
738impl std::ops::DerefMut for MutableBuffer {
739    fn deref_mut(&mut self) -> &mut [u8] {
740        unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) }
741    }
742}
743
744impl Drop for MutableBuffer {
745    fn drop(&mut self) {
746        if self.layout.size() != 0 {
747            // Safety: data was allocated with standard allocator with given layout
748            unsafe { std::alloc::dealloc(self.data.as_ptr() as _, self.layout) };
749        }
750    }
751}
752
753impl PartialEq for MutableBuffer {
754    fn eq(&self, other: &MutableBuffer) -> bool {
755        if self.len != other.len {
756            return false;
757        }
758        if self.layout != other.layout {
759            return false;
760        }
761        self.as_slice() == other.as_slice()
762    }
763}
764
765unsafe impl Sync for MutableBuffer {}
766unsafe impl Send for MutableBuffer {}
767
768struct SetLenOnDrop<'a> {
769    len: &'a mut usize,
770    local_len: usize,
771}
772
773impl<'a> SetLenOnDrop<'a> {
774    #[inline]
775    fn new(len: &'a mut usize) -> Self {
776        SetLenOnDrop {
777            local_len: *len,
778            len,
779        }
780    }
781}
782
783impl Drop for SetLenOnDrop<'_> {
784    #[inline]
785    fn drop(&mut self) {
786        *self.len = self.local_len;
787    }
788}
789
790/// Creating a `MutableBuffer` instance by setting bits according to the boolean values
791impl std::iter::FromIterator<bool> for MutableBuffer {
792    fn from_iter<I>(iter: I) -> Self
793    where
794        I: IntoIterator<Item = bool>,
795    {
796        let mut iterator = iter.into_iter();
797        let mut result = {
798            let byte_capacity: usize = iterator.size_hint().0.saturating_add(7) / 8;
799            MutableBuffer::new(byte_capacity)
800        };
801
802        loop {
803            let mut exhausted = false;
804            let mut byte_accum: u8 = 0;
805            let mut mask: u8 = 1;
806
807            //collect (up to) 8 bits into a byte
808            while mask != 0 {
809                if let Some(value) = iterator.next() {
810                    byte_accum |= match value {
811                        true => mask,
812                        false => 0,
813                    };
814                    mask <<= 1;
815                } else {
816                    exhausted = true;
817                    break;
818                }
819            }
820
821            // break if the iterator was exhausted before it provided a bool for this byte
822            if exhausted && mask == 1 {
823                break;
824            }
825
826            //ensure we have capacity to write the byte
827            if result.len() == result.capacity() {
828                //no capacity for new byte, allocate 1 byte more (plus however many more the iterator advertises)
829                let additional_byte_capacity = 1usize.saturating_add(
830                    iterator.size_hint().0.saturating_add(7) / 8, //convert bit count to byte count, rounding up
831                );
832                result.reserve(additional_byte_capacity)
833            }
834
835            // Soundness: capacity was allocated above
836            unsafe { result.push_unchecked(byte_accum) };
837            if exhausted {
838                break;
839            }
840        }
841        result
842    }
843}
844
845impl<T: ArrowNativeType> std::iter::FromIterator<T> for MutableBuffer {
846    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
847        let mut buffer = Self::default();
848        buffer.extend_from_iter(iter.into_iter());
849        buffer
850    }
851}
852
853#[cfg(test)]
854mod tests {
855    use super::*;
856
857    #[test]
858    fn test_mutable_new() {
859        let buf = MutableBuffer::new(63);
860        assert_eq!(64, buf.capacity());
861        assert_eq!(0, buf.len());
862        assert!(buf.is_empty());
863    }
864
865    #[test]
866    fn test_mutable_default() {
867        let buf = MutableBuffer::default();
868        assert_eq!(0, buf.capacity());
869        assert_eq!(0, buf.len());
870        assert!(buf.is_empty());
871
872        let mut buf = MutableBuffer::default();
873        buf.extend_from_slice(b"hello");
874        assert_eq!(5, buf.len());
875        assert_eq!(b"hello", buf.as_slice());
876    }
877
878    #[test]
879    fn test_mutable_extend_from_slice() {
880        let mut buf = MutableBuffer::new(100);
881        buf.extend_from_slice(b"hello");
882        assert_eq!(5, buf.len());
883        assert_eq!(b"hello", buf.as_slice());
884
885        buf.extend_from_slice(b" world");
886        assert_eq!(11, buf.len());
887        assert_eq!(b"hello world", buf.as_slice());
888
889        buf.clear();
890        assert_eq!(0, buf.len());
891        buf.extend_from_slice(b"hello arrow");
892        assert_eq!(11, buf.len());
893        assert_eq!(b"hello arrow", buf.as_slice());
894    }
895
896    #[test]
897    fn mutable_extend_from_iter() {
898        let mut buf = MutableBuffer::new(0);
899        buf.extend(vec![1u32, 2]);
900        assert_eq!(8, buf.len());
901        assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
902
903        buf.extend(vec![3u32, 4]);
904        assert_eq!(16, buf.len());
905        assert_eq!(
906            &[1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0],
907            buf.as_slice()
908        );
909    }
910
911    #[test]
912    fn mutable_extend_from_iter_unaligned_u64() {
913        let mut buf = MutableBuffer::new(16);
914        buf.push(1_u8);
915        buf.extend([1_u64]);
916        assert_eq!(9, buf.len());
917        assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
918    }
919
920    #[test]
921    fn mutable_extend_from_slice_unaligned_u64() {
922        let mut buf = MutableBuffer::new(16);
923        buf.extend_from_slice(&[1_u8]);
924        buf.extend_from_slice(&[1_u64]);
925        assert_eq!(9, buf.len());
926        assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
927    }
928
929    #[test]
930    fn mutable_push_unaligned_u64() {
931        let mut buf = MutableBuffer::new(16);
932        buf.push(1_u8);
933        buf.push(1_u64);
934        assert_eq!(9, buf.len());
935        assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
936    }
937
938    #[test]
939    fn mutable_push_unchecked_unaligned_u64() {
940        let mut buf = MutableBuffer::new(16);
941        unsafe {
942            buf.push_unchecked(1_u8);
943            buf.push_unchecked(1_u64);
944        }
945        assert_eq!(9, buf.len());
946        assert_eq!(&[1u8, 1u8, 0, 0, 0, 0, 0, 0, 0], buf.as_slice());
947    }
948
949    #[test]
950    fn test_from_trusted_len_iter() {
951        let iter = vec![1u32, 2].into_iter();
952        let buf = unsafe { MutableBuffer::from_trusted_len_iter(iter) };
953        assert_eq!(8, buf.len());
954        assert_eq!(&[1u8, 0, 0, 0, 2, 0, 0, 0], buf.as_slice());
955    }
956
957    #[test]
958    fn test_mutable_reserve() {
959        let mut buf = MutableBuffer::new(1);
960        assert_eq!(64, buf.capacity());
961
962        // Reserving a smaller capacity should have no effect.
963        buf.reserve(10);
964        assert_eq!(64, buf.capacity());
965
966        buf.reserve(80);
967        assert_eq!(128, buf.capacity());
968
969        buf.reserve(129);
970        assert_eq!(256, buf.capacity());
971    }
972
973    #[test]
974    fn test_mutable_resize() {
975        let mut buf = MutableBuffer::new(1);
976        assert_eq!(64, buf.capacity());
977        assert_eq!(0, buf.len());
978
979        buf.resize(20, 0);
980        assert_eq!(64, buf.capacity());
981        assert_eq!(20, buf.len());
982
983        buf.resize(10, 0);
984        assert_eq!(64, buf.capacity());
985        assert_eq!(10, buf.len());
986
987        buf.resize(100, 0);
988        assert_eq!(128, buf.capacity());
989        assert_eq!(100, buf.len());
990
991        buf.resize(30, 0);
992        assert_eq!(128, buf.capacity());
993        assert_eq!(30, buf.len());
994
995        buf.resize(0, 0);
996        assert_eq!(128, buf.capacity());
997        assert_eq!(0, buf.len());
998    }
999
1000    #[test]
1001    fn test_mutable_into() {
1002        let mut buf = MutableBuffer::new(1);
1003        buf.extend_from_slice(b"aaaa bbbb cccc dddd");
1004        assert_eq!(19, buf.len());
1005        assert_eq!(64, buf.capacity());
1006        assert_eq!(b"aaaa bbbb cccc dddd", buf.as_slice());
1007
1008        let immutable_buf: Buffer = buf.into();
1009        assert_eq!(19, immutable_buf.len());
1010        assert_eq!(64, immutable_buf.capacity());
1011        assert_eq!(b"aaaa bbbb cccc dddd", immutable_buf.as_slice());
1012    }
1013
1014    #[test]
1015    fn test_mutable_equal() {
1016        let mut buf = MutableBuffer::new(1);
1017        let mut buf2 = MutableBuffer::new(1);
1018
1019        buf.extend_from_slice(&[0xaa]);
1020        buf2.extend_from_slice(&[0xaa, 0xbb]);
1021        assert!(buf != buf2);
1022
1023        buf.extend_from_slice(&[0xbb]);
1024        assert_eq!(buf, buf2);
1025
1026        buf2.reserve(65);
1027        assert!(buf != buf2);
1028    }
1029
1030    #[test]
1031    fn test_mutable_shrink_to_fit() {
1032        let mut buffer = MutableBuffer::new(128);
1033        assert_eq!(buffer.capacity(), 128);
1034        buffer.push(1);
1035        buffer.push(2);
1036
1037        buffer.shrink_to_fit();
1038        assert!(buffer.capacity() >= 64 && buffer.capacity() < 128);
1039    }
1040
1041    #[test]
1042    fn test_mutable_set_null_bits() {
1043        let mut buffer = MutableBuffer::new(8).with_bitset(8, true);
1044
1045        for i in 0..=buffer.capacity() {
1046            buffer.set_null_bits(i, 0);
1047            assert_eq!(buffer[..8], [255; 8][..]);
1048        }
1049
1050        buffer.set_null_bits(1, 4);
1051        assert_eq!(buffer[..8], [255, 0, 0, 0, 0, 255, 255, 255][..]);
1052    }
1053
1054    #[test]
1055    #[should_panic = "out of bounds for buffer of length"]
1056    fn test_mutable_set_null_bits_oob() {
1057        let mut buffer = MutableBuffer::new(64);
1058        buffer.set_null_bits(1, buffer.capacity());
1059    }
1060
1061    #[test]
1062    #[should_panic = "out of bounds for buffer of length"]
1063    fn test_mutable_set_null_bits_oob_by_overflow() {
1064        let mut buffer = MutableBuffer::new(0);
1065        buffer.set_null_bits(1, usize::MAX);
1066    }
1067
1068    #[test]
1069    fn from_iter() {
1070        let buffer = [1u16, 2, 3, 4].into_iter().collect::<MutableBuffer>();
1071        assert_eq!(buffer.len(), 4 * mem::size_of::<u16>());
1072        assert_eq!(buffer.as_slice(), &[1, 0, 2, 0, 3, 0, 4, 0]);
1073    }
1074
1075    #[test]
1076    #[should_panic(expected = "failed to create layout for MutableBuffer: LayoutError")]
1077    fn test_with_capacity_panics_above_max_capacity() {
1078        let max_capacity = isize::MAX as usize - (isize::MAX as usize % ALIGNMENT);
1079        let _ = MutableBuffer::with_capacity(max_capacity + 1);
1080    }
1081
1082    #[cfg(feature = "pool")]
1083    mod pool_tests {
1084        use super::*;
1085        use crate::pool::{MemoryPool, TrackingMemoryPool};
1086
1087        #[test]
1088        fn test_reallocate_with_pool() {
1089            let pool = TrackingMemoryPool::default();
1090            let mut buffer = MutableBuffer::with_capacity(100);
1091            buffer.claim(&pool);
1092
1093            // Initial capacity should be 128 (multiple of 64)
1094            assert_eq!(buffer.capacity(), 128);
1095            assert_eq!(pool.used(), 128);
1096
1097            // Reallocate to a larger size
1098            buffer.reallocate(200);
1099
1100            // The capacity is exactly the requested size, not rounded up
1101            assert_eq!(buffer.capacity(), 200);
1102            assert_eq!(pool.used(), 200);
1103
1104            // Reallocate to a smaller size
1105            buffer.reallocate(50);
1106
1107            // The capacity is exactly the requested size, not rounded up
1108            assert_eq!(buffer.capacity(), 50);
1109            assert_eq!(pool.used(), 50);
1110        }
1111
1112        #[test]
1113        fn test_truncate_with_pool() {
1114            let pool = TrackingMemoryPool::default();
1115            let mut buffer = MutableBuffer::with_capacity(100);
1116
1117            // Fill buffer with some data
1118            buffer.resize(80, 1);
1119            assert_eq!(buffer.len(), 80);
1120
1121            buffer.claim(&pool);
1122            assert_eq!(pool.used(), 128);
1123
1124            // Truncate buffer
1125            buffer.truncate(40);
1126            assert_eq!(buffer.len(), 40);
1127            assert_eq!(pool.used(), 40);
1128
1129            // Truncate to zero
1130            buffer.truncate(0);
1131            assert_eq!(buffer.len(), 0);
1132            assert_eq!(pool.used(), 0);
1133        }
1134
1135        #[test]
1136        fn test_resize_with_pool() {
1137            let pool = TrackingMemoryPool::default();
1138            let mut buffer = MutableBuffer::with_capacity(100);
1139            buffer.claim(&pool);
1140
1141            // Initial state
1142            assert_eq!(buffer.len(), 0);
1143            assert_eq!(pool.used(), 128);
1144
1145            // Resize to increase length
1146            buffer.resize(50, 1);
1147            assert_eq!(buffer.len(), 50);
1148            assert_eq!(pool.used(), 50);
1149
1150            // Resize to increase length beyond capacity
1151            buffer.resize(150, 1);
1152            assert_eq!(buffer.len(), 150);
1153            assert_eq!(buffer.capacity(), 256);
1154            assert_eq!(pool.used(), 150);
1155
1156            // Resize to decrease length
1157            buffer.resize(30, 1);
1158            assert_eq!(buffer.len(), 30);
1159            assert_eq!(pool.used(), 30);
1160        }
1161
1162        #[test]
1163        fn test_buffer_lifecycle_with_pool() {
1164            let pool = TrackingMemoryPool::default();
1165
1166            // Create a buffer with memory reservation
1167            let mut mutable = MutableBuffer::with_capacity(100);
1168            mutable.resize(80, 1);
1169            mutable.claim(&pool);
1170
1171            // Memory reservation is based on capacity when using claim()
1172            assert_eq!(pool.used(), 128);
1173
1174            // Convert to immutable Buffer
1175            let buffer = mutable.into_buffer();
1176
1177            // Memory reservation should be preserved
1178            assert_eq!(pool.used(), 128);
1179
1180            // Drop the buffer and the reservation should be released
1181            drop(buffer);
1182            assert_eq!(pool.used(), 0);
1183        }
1184    }
1185}