Skip to main content

arrow_buffer/
bytes.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains an implementation of a contiguous immutable memory region that knows
19//! how to de-allocate itself, [`Bytes`].
20//! Note that this is a low-level functionality of this crate.
21
22use core::slice;
23use std::ptr::NonNull;
24use std::{fmt::Debug, fmt::Formatter};
25
26use crate::alloc::Deallocation;
27use crate::buffer::dangling_ptr;
28
29#[cfg(feature = "pool")]
30use crate::pool::{MemoryPool, MemoryReservation};
31#[cfg(feature = "pool")]
32use std::sync::Mutex;
33
34/// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself.
35///
36/// Note that this structure is an internal implementation detail of the
37/// arrow-rs crate. While it has the same name and similar API as
38/// [`bytes::Bytes`] it is not limited to rust's global allocator nor u8
39/// alignment. It is possible to create a `Bytes` from `bytes::Bytes` using the
40/// `From` implementation.
41///
42/// In the most common case, this buffer is allocated using [`alloc`](std::alloc::alloc)
43/// with an alignment of [`ALIGNMENT`](crate::alloc::ALIGNMENT)
44///
45/// When the region is allocated by a different allocator, [Deallocation::Custom], this calls the
46/// custom deallocator to deallocate the region when it is no longer needed.
47///
48pub(crate) struct Bytes {
49    /// The raw pointer to be beginning of the region
50    ptr: NonNull<u8>,
51
52    /// The number of bytes visible to this region. This is always smaller than its capacity (when available).
53    len: usize,
54
55    /// how to deallocate this region
56    deallocation: Deallocation,
57
58    /// Memory reservation for tracking memory usage
59    #[cfg(feature = "pool")]
60    pub(super) reservation: Mutex<Option<Box<dyn MemoryReservation>>>,
61}
62
63impl Bytes {
64    /// Takes ownership of an allocated memory region,
65    ///
66    /// # Arguments
67    ///
68    /// * `ptr` - Pointer to raw parts
69    /// * `len` - Length of raw parts in **bytes**
70    /// * `deallocation` - Type of allocation
71    ///
72    /// # Safety
73    ///
74    /// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
75    /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
76    #[inline]
77    pub(crate) unsafe fn new(ptr: NonNull<u8>, len: usize, deallocation: Deallocation) -> Bytes {
78        Bytes {
79            ptr,
80            len,
81            deallocation,
82            #[cfg(feature = "pool")]
83            reservation: Mutex::new(None),
84        }
85    }
86
87    fn as_slice(&self) -> &[u8] {
88        self
89    }
90
91    #[inline]
92    pub(crate) fn len(&self) -> usize {
93        self.len
94    }
95
96    #[inline]
97    pub(crate) fn ptr(&self) -> NonNull<u8> {
98        self.ptr
99    }
100
101    pub(crate) fn capacity(&self) -> usize {
102        match self.deallocation {
103            Deallocation::Standard(layout) => layout.size(),
104            // we only know the size of the custom allocation
105            // its underlying capacity might be larger
106            Deallocation::Custom(_, size) => size,
107        }
108    }
109
110    /// Register this [`Bytes`] with the provided [`MemoryPool`], replacing any prior reservation.
111    #[cfg(feature = "pool")]
112    pub(crate) fn claim(&self, pool: &dyn MemoryPool) {
113        *self.reservation.lock().unwrap() = Some(pool.reserve(self.capacity()));
114    }
115
116    /// Resize the memory reservation of this buffer
117    ///
118    /// This is a no-op if this buffer doesn't have a reservation.
119    #[cfg(feature = "pool")]
120    fn resize_reservation(&self, new_size: usize) {
121        let mut guard = self.reservation.lock().unwrap();
122        if let Some(mut reservation) = guard.take() {
123            // Resize the reservation
124            reservation.resize(new_size);
125
126            // Put it back
127            *guard = Some(reservation);
128        }
129    }
130
131    /// Try to reallocate the underlying memory region to a new size (smaller or larger).
132    ///
133    /// Only works for bytes allocated with the standard allocator.
134    /// Returns `Err` if the memory was allocated with a custom allocator,
135    /// or the call to `realloc` failed, for whatever reason.
136    /// In case of `Err`, the [`Bytes`] will remain as it was (i.e. have the old size).
137    pub(crate) fn try_realloc(&mut self, new_len: usize) -> Result<(), ()> {
138        if let Deallocation::Standard(old_layout) = self.deallocation {
139            if old_layout.size() == new_len {
140                return Ok(()); // Nothing to do
141            }
142
143            if let Ok(new_layout) = std::alloc::Layout::from_size_align(new_len, old_layout.align())
144            {
145                let old_ptr = self.ptr.as_ptr();
146
147                let new_ptr = match new_layout.size() {
148                    0 => {
149                        // SAFETY: Verified that old_layout.size != new_len (0)
150                        unsafe { std::alloc::dealloc(self.ptr.as_ptr(), old_layout) };
151                        Some(dangling_ptr())
152                    }
153                    // SAFETY: the call to `realloc` is safe if all the following hold (from https://doc.rust-lang.org/stable/std/alloc/trait.GlobalAlloc.html#method.realloc):
154                    // * `old_ptr` must be currently allocated via this allocator (guaranteed by the invariant/contract of `Bytes`)
155                    // * `old_layout` must be the same layout that was used to allocate that block of memory (same)
156                    // * `new_len` must be greater than zero
157                    // * `new_len`, when rounded up to the nearest multiple of `layout.align()`, must not overflow `isize` (guaranteed by the success of `Layout::from_size_align`)
158                    _ => NonNull::new(unsafe { std::alloc::realloc(old_ptr, old_layout, new_len) }),
159                };
160
161                if let Some(ptr) = new_ptr {
162                    self.ptr = ptr;
163                    self.len = new_len;
164                    self.deallocation = Deallocation::Standard(new_layout);
165
166                    #[cfg(feature = "pool")]
167                    {
168                        // Resize reservation
169                        self.resize_reservation(new_len);
170                    }
171
172                    return Ok(());
173                }
174            }
175        }
176
177        Err(())
178    }
179
180    #[inline]
181    pub(crate) fn deallocation(&self) -> &Deallocation {
182        &self.deallocation
183    }
184}
185
186// Deallocation is Send + Sync, repeating the bound here makes that refactoring safe
187// The only field that is not automatically Send+Sync then is the NonNull ptr
188unsafe impl Send for Bytes where Deallocation: Send {}
189unsafe impl Sync for Bytes where Deallocation: Sync {}
190
191impl Drop for Bytes {
192    #[inline]
193    fn drop(&mut self) {
194        match &self.deallocation {
195            Deallocation::Standard(layout) => match layout.size() {
196                0 => {} // Nothing to do
197                _ => unsafe { std::alloc::dealloc(self.ptr.as_ptr(), *layout) },
198            },
199            // The automatic drop implementation will free the memory once the reference count reaches zero
200            Deallocation::Custom(_allocation, _size) => (),
201        }
202    }
203}
204
205impl std::ops::Deref for Bytes {
206    type Target = [u8];
207
208    fn deref(&self) -> &[u8] {
209        unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
210    }
211}
212
213impl PartialEq for Bytes {
214    fn eq(&self, other: &Bytes) -> bool {
215        self.as_slice() == other.as_slice()
216    }
217}
218
219impl Debug for Bytes {
220    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
221        write!(f, "Bytes {{ ptr: {:?}, len: {}, data: ", self.ptr, self.len,)?;
222
223        f.debug_list().entries(self.iter()).finish()?;
224
225        write!(f, " }}")
226    }
227}
228
229impl From<bytes::Bytes> for Bytes {
230    fn from(value: bytes::Bytes) -> Self {
231        let len = value.len();
232        Self {
233            len,
234            ptr: NonNull::new(value.as_ptr() as _).unwrap(),
235            deallocation: Deallocation::Custom(std::sync::Arc::new(value), len),
236            #[cfg(feature = "pool")]
237            reservation: Mutex::new(None),
238        }
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245
246    #[test]
247    fn test_from_bytes() {
248        let message = b"hello arrow";
249
250        // we can create a Bytes from bytes::Bytes (created from slices)
251        let c_bytes: bytes::Bytes = message.as_ref().into();
252        let a_bytes: Bytes = c_bytes.into();
253        assert_eq!(a_bytes.as_slice(), message);
254
255        // we can create a Bytes from bytes::Bytes (created from Vec)
256        let c_bytes: bytes::Bytes = bytes::Bytes::from(message.to_vec());
257        let a_bytes: Bytes = c_bytes.into();
258        assert_eq!(a_bytes.as_slice(), message);
259    }
260
261    #[cfg(feature = "pool")]
262    mod pool_tests {
263        use super::*;
264
265        use crate::pool::TrackingMemoryPool;
266
267        #[test]
268        fn test_bytes_with_pool() {
269            // Create a standard allocation
270            let buffer = unsafe {
271                let layout =
272                    std::alloc::Layout::from_size_align(1024, crate::alloc::ALIGNMENT).unwrap();
273                let ptr = std::alloc::alloc(layout);
274                assert!(!ptr.is_null());
275
276                Bytes::new(
277                    NonNull::new(ptr).unwrap(),
278                    1024,
279                    Deallocation::Standard(layout),
280                )
281            };
282
283            // Create a memory pool
284            let pool = TrackingMemoryPool::default();
285            assert_eq!(pool.used(), 0);
286
287            // Reserve memory and assign to buffer. Claim twice.
288            buffer.claim(&pool);
289            assert_eq!(pool.used(), 1024);
290            buffer.claim(&pool);
291            assert_eq!(pool.used(), 1024);
292
293            // Memory should be released when buffer is dropped
294            drop(buffer);
295            assert_eq!(pool.used(), 0);
296        }
297
298        #[test]
299        fn test_bytes_drop_releases_pool() {
300            let pool = TrackingMemoryPool::default();
301
302            {
303                // Create a buffer with pool
304                let _buffer = unsafe {
305                    let layout =
306                        std::alloc::Layout::from_size_align(1024, crate::alloc::ALIGNMENT).unwrap();
307                    let ptr = std::alloc::alloc(layout);
308                    assert!(!ptr.is_null());
309
310                    let bytes = Bytes::new(
311                        NonNull::new(ptr).unwrap(),
312                        1024,
313                        Deallocation::Standard(layout),
314                    );
315
316                    bytes.claim(&pool);
317                    bytes
318                };
319
320                assert_eq!(pool.used(), 1024);
321            }
322
323            // Buffer has been dropped, memory should be released
324            assert_eq!(pool.used(), 0);
325        }
326    }
327}