arrow_buffer/
bytes.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains an implementation of a contiguous immutable memory region that knows
19//! how to de-allocate itself, [`Bytes`].
20//! Note that this is a low-level functionality of this crate.
21
22use core::slice;
23use std::ptr::NonNull;
24use std::{fmt::Debug, fmt::Formatter};
25
26use crate::alloc::Deallocation;
27use crate::buffer::dangling_ptr;
28
29#[cfg(feature = "pool")]
30use crate::pool::{MemoryPool, MemoryReservation};
31#[cfg(feature = "pool")]
32use std::sync::Mutex;
33
34/// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself.
35///
36/// Note that this structure is an internal implementation detail of the
37/// arrow-rs crate. While it has the same name and similar API as
38/// [`bytes::Bytes`] it is not limited to rust's global allocator nor u8
39/// alignment. It is possible to create a `Bytes` from `bytes::Bytes` using the
40/// `From` implementation.
41///
42/// In the most common case, this buffer is allocated using [`alloc`](std::alloc::alloc)
43/// with an alignment of [`ALIGNMENT`](crate::alloc::ALIGNMENT)
44///
45/// When the region is allocated by a different allocator, [Deallocation::Custom], this calls the
46/// custom deallocator to deallocate the region when it is no longer needed.
47///
48pub struct Bytes {
49    /// The raw pointer to be beginning of the region
50    ptr: NonNull<u8>,
51
52    /// The number of bytes visible to this region. This is always smaller than its capacity (when available).
53    len: usize,
54
55    /// how to deallocate this region
56    deallocation: Deallocation,
57
58    /// Memory reservation for tracking memory usage
59    #[cfg(feature = "pool")]
60    pub(super) reservation: Mutex<Option<Box<dyn MemoryReservation>>>,
61}
62
63impl Bytes {
64    /// Takes ownership of an allocated memory region,
65    ///
66    /// # Arguments
67    ///
68    /// * `ptr` - Pointer to raw parts
69    /// * `len` - Length of raw parts in **bytes**
70    /// * `deallocation` - Type of allocation
71    ///
72    /// # Safety
73    ///
74    /// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
75    /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
76    #[inline]
77    pub(crate) unsafe fn new(ptr: NonNull<u8>, len: usize, deallocation: Deallocation) -> Bytes {
78        Bytes {
79            ptr,
80            len,
81            deallocation,
82            #[cfg(feature = "pool")]
83            reservation: Mutex::new(None),
84        }
85    }
86
87    fn as_slice(&self) -> &[u8] {
88        self
89    }
90
91    #[inline]
92    pub fn len(&self) -> usize {
93        self.len
94    }
95
96    #[inline]
97    pub fn is_empty(&self) -> bool {
98        self.len == 0
99    }
100
101    #[inline]
102    pub fn ptr(&self) -> NonNull<u8> {
103        self.ptr
104    }
105
106    pub fn capacity(&self) -> usize {
107        match self.deallocation {
108            Deallocation::Standard(layout) => layout.size(),
109            // we only know the size of the custom allocation
110            // its underlying capacity might be larger
111            Deallocation::Custom(_, size) => size,
112        }
113    }
114
115    /// Register this [`Bytes`] with the provided [`MemoryPool`], replacing any prior reservation.
116    #[cfg(feature = "pool")]
117    pub fn claim(&self, pool: &dyn MemoryPool) {
118        *self.reservation.lock().unwrap() = Some(pool.reserve(self.capacity()));
119    }
120
121    /// Resize the memory reservation of this buffer
122    ///
123    /// This is a no-op if this buffer doesn't have a reservation.
124    #[cfg(feature = "pool")]
125    fn resize_reservation(&self, new_size: usize) {
126        let mut guard = self.reservation.lock().unwrap();
127        if let Some(mut reservation) = guard.take() {
128            // Resize the reservation
129            reservation.resize(new_size);
130
131            // Put it back
132            *guard = Some(reservation);
133        }
134    }
135
136    /// Try to reallocate the underlying memory region to a new size (smaller or larger).
137    ///
138    /// Only works for bytes allocated with the standard allocator.
139    /// Returns `Err` if the memory was allocated with a custom allocator,
140    /// or the call to `realloc` failed, for whatever reason.
141    /// In case of `Err`, the [`Bytes`] will remain as it was (i.e. have the old size).
142    pub fn try_realloc(&mut self, new_len: usize) -> Result<(), ()> {
143        if let Deallocation::Standard(old_layout) = self.deallocation {
144            if old_layout.size() == new_len {
145                return Ok(()); // Nothing to do
146            }
147
148            if let Ok(new_layout) = std::alloc::Layout::from_size_align(new_len, old_layout.align())
149            {
150                let old_ptr = self.ptr.as_ptr();
151
152                let new_ptr = match new_layout.size() {
153                    0 => {
154                        // SAFETY: Verified that old_layout.size != new_len (0)
155                        unsafe { std::alloc::dealloc(self.ptr.as_ptr(), old_layout) };
156                        Some(dangling_ptr())
157                    }
158                    // SAFETY: the call to `realloc` is safe if all the following hold (from https://doc.rust-lang.org/stable/std/alloc/trait.GlobalAlloc.html#method.realloc):
159                    // * `old_ptr` must be currently allocated via this allocator (guaranteed by the invariant/contract of `Bytes`)
160                    // * `old_layout` must be the same layout that was used to allocate that block of memory (same)
161                    // * `new_len` must be greater than zero
162                    // * `new_len`, when rounded up to the nearest multiple of `layout.align()`, must not overflow `isize` (guaranteed by the success of `Layout::from_size_align`)
163                    _ => NonNull::new(unsafe { std::alloc::realloc(old_ptr, old_layout, new_len) }),
164                };
165
166                if let Some(ptr) = new_ptr {
167                    self.ptr = ptr;
168                    self.len = new_len;
169                    self.deallocation = Deallocation::Standard(new_layout);
170
171                    #[cfg(feature = "pool")]
172                    {
173                        // Resize reservation
174                        self.resize_reservation(new_len);
175                    }
176
177                    return Ok(());
178                }
179            }
180        }
181
182        Err(())
183    }
184
185    #[inline]
186    pub(crate) fn deallocation(&self) -> &Deallocation {
187        &self.deallocation
188    }
189}
190
191// Deallocation is Send + Sync, repeating the bound here makes that refactoring safe
192// The only field that is not automatically Send+Sync then is the NonNull ptr
193unsafe impl Send for Bytes where Deallocation: Send {}
194unsafe impl Sync for Bytes where Deallocation: Sync {}
195
196impl Drop for Bytes {
197    #[inline]
198    fn drop(&mut self) {
199        match &self.deallocation {
200            Deallocation::Standard(layout) => match layout.size() {
201                0 => {} // Nothing to do
202                _ => unsafe { std::alloc::dealloc(self.ptr.as_ptr(), *layout) },
203            },
204            // The automatic drop implementation will free the memory once the reference count reaches zero
205            Deallocation::Custom(_allocation, _size) => (),
206        }
207    }
208}
209
210impl std::ops::Deref for Bytes {
211    type Target = [u8];
212
213    fn deref(&self) -> &[u8] {
214        unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
215    }
216}
217
218impl PartialEq for Bytes {
219    fn eq(&self, other: &Bytes) -> bool {
220        self.as_slice() == other.as_slice()
221    }
222}
223
224impl Debug for Bytes {
225    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
226        write!(f, "Bytes {{ ptr: {:?}, len: {}, data: ", self.ptr, self.len,)?;
227
228        f.debug_list().entries(self.iter()).finish()?;
229
230        write!(f, " }}")
231    }
232}
233
234impl From<bytes::Bytes> for Bytes {
235    fn from(value: bytes::Bytes) -> Self {
236        let len = value.len();
237        Self {
238            len,
239            ptr: NonNull::new(value.as_ptr() as _).unwrap(),
240            deallocation: Deallocation::Custom(std::sync::Arc::new(value), len),
241            #[cfg(feature = "pool")]
242            reservation: Mutex::new(None),
243        }
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250
251    #[test]
252    fn test_from_bytes() {
253        let message = b"hello arrow";
254
255        // we can create a Bytes from bytes::Bytes (created from slices)
256        let c_bytes: bytes::Bytes = message.as_ref().into();
257        let a_bytes: Bytes = c_bytes.into();
258        assert_eq!(a_bytes.as_slice(), message);
259
260        // we can create a Bytes from bytes::Bytes (created from Vec)
261        let c_bytes: bytes::Bytes = bytes::Bytes::from(message.to_vec());
262        let a_bytes: Bytes = c_bytes.into();
263        assert_eq!(a_bytes.as_slice(), message);
264    }
265
266    #[cfg(feature = "pool")]
267    mod pool_tests {
268        use super::*;
269
270        use crate::pool::TrackingMemoryPool;
271
272        #[test]
273        fn test_bytes_with_pool() {
274            // Create a standard allocation
275            let buffer = unsafe {
276                let layout =
277                    std::alloc::Layout::from_size_align(1024, crate::alloc::ALIGNMENT).unwrap();
278                let ptr = std::alloc::alloc(layout);
279                assert!(!ptr.is_null());
280
281                Bytes::new(
282                    NonNull::new(ptr).unwrap(),
283                    1024,
284                    Deallocation::Standard(layout),
285                )
286            };
287
288            // Create a memory pool
289            let pool = TrackingMemoryPool::default();
290            assert_eq!(pool.used(), 0);
291
292            // Reserve memory and assign to buffer. Claim twice.
293            buffer.claim(&pool);
294            assert_eq!(pool.used(), 1024);
295            buffer.claim(&pool);
296            assert_eq!(pool.used(), 1024);
297
298            // Memory should be released when buffer is dropped
299            drop(buffer);
300            assert_eq!(pool.used(), 0);
301        }
302
303        #[test]
304        fn test_bytes_drop_releases_pool() {
305            let pool = TrackingMemoryPool::default();
306
307            {
308                // Create a buffer with pool
309                let _buffer = unsafe {
310                    let layout =
311                        std::alloc::Layout::from_size_align(1024, crate::alloc::ALIGNMENT).unwrap();
312                    let ptr = std::alloc::alloc(layout);
313                    assert!(!ptr.is_null());
314
315                    let bytes = Bytes::new(
316                        NonNull::new(ptr).unwrap(),
317                        1024,
318                        Deallocation::Standard(layout),
319                    );
320
321                    bytes.claim(&pool);
322                    bytes
323                };
324
325                assert_eq!(pool.used(), 1024);
326            }
327
328            // Buffer has been dropped, memory should be released
329            assert_eq!(pool.used(), 0);
330        }
331    }
332}