arrow_buffer/bytes.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains an implementation of a contiguous immutable memory region that knows
19//! how to de-allocate itself, [`Bytes`].
20//! Note that this is a low-level functionality of this crate.
21
22use core::slice;
23use std::ptr::NonNull;
24use std::{fmt::Debug, fmt::Formatter};
25
26use crate::alloc::Deallocation;
27use crate::buffer::dangling_ptr;
28
29#[cfg(feature = "pool")]
30use crate::pool::{MemoryPool, MemoryReservation};
31#[cfg(feature = "pool")]
32use std::sync::Mutex;
33
34/// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself.
35///
36/// Note that this structure is an internal implementation detail of the
37/// arrow-rs crate. While it has the same name and similar API as
38/// [`bytes::Bytes`] it is not limited to rust's global allocator nor u8
39/// alignment. It is possible to create a `Bytes` from `bytes::Bytes` using the
40/// `From` implementation.
41///
42/// In the most common case, this buffer is allocated using [`alloc`](std::alloc::alloc)
43/// with an alignment of [`ALIGNMENT`](crate::alloc::ALIGNMENT)
44///
45/// When the region is allocated by a different allocator, [Deallocation::Custom], this calls the
46/// custom deallocator to deallocate the region when it is no longer needed.
47///
48pub struct Bytes {
49 /// The raw pointer to be beginning of the region
50 ptr: NonNull<u8>,
51
52 /// The number of bytes visible to this region. This is always smaller than its capacity (when available).
53 len: usize,
54
55 /// how to deallocate this region
56 deallocation: Deallocation,
57
58 /// Memory reservation for tracking memory usage
59 #[cfg(feature = "pool")]
60 pub(super) reservation: Mutex<Option<Box<dyn MemoryReservation>>>,
61}
62
63impl Bytes {
64 /// Takes ownership of an allocated memory region,
65 ///
66 /// # Arguments
67 ///
68 /// * `ptr` - Pointer to raw parts
69 /// * `len` - Length of raw parts in **bytes**
70 /// * `deallocation` - Type of allocation
71 ///
72 /// # Safety
73 ///
74 /// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
75 /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
76 #[inline]
77 pub(crate) unsafe fn new(ptr: NonNull<u8>, len: usize, deallocation: Deallocation) -> Bytes {
78 Bytes {
79 ptr,
80 len,
81 deallocation,
82 #[cfg(feature = "pool")]
83 reservation: Mutex::new(None),
84 }
85 }
86
87 fn as_slice(&self) -> &[u8] {
88 self
89 }
90
91 #[inline]
92 pub fn len(&self) -> usize {
93 self.len
94 }
95
96 #[inline]
97 pub fn is_empty(&self) -> bool {
98 self.len == 0
99 }
100
101 #[inline]
102 pub fn ptr(&self) -> NonNull<u8> {
103 self.ptr
104 }
105
106 pub fn capacity(&self) -> usize {
107 match self.deallocation {
108 Deallocation::Standard(layout) => layout.size(),
109 // we only know the size of the custom allocation
110 // its underlying capacity might be larger
111 Deallocation::Custom(_, size) => size,
112 }
113 }
114
115 /// Register this [`Bytes`] with the provided [`MemoryPool`], replacing any prior reservation.
116 #[cfg(feature = "pool")]
117 pub fn claim(&self, pool: &dyn MemoryPool) {
118 *self.reservation.lock().unwrap() = Some(pool.reserve(self.capacity()));
119 }
120
121 /// Resize the memory reservation of this buffer
122 ///
123 /// This is a no-op if this buffer doesn't have a reservation.
124 #[cfg(feature = "pool")]
125 fn resize_reservation(&self, new_size: usize) {
126 let mut guard = self.reservation.lock().unwrap();
127 if let Some(mut reservation) = guard.take() {
128 // Resize the reservation
129 reservation.resize(new_size);
130
131 // Put it back
132 *guard = Some(reservation);
133 }
134 }
135
136 /// Try to reallocate the underlying memory region to a new size (smaller or larger).
137 ///
138 /// Only works for bytes allocated with the standard allocator.
139 /// Returns `Err` if the memory was allocated with a custom allocator,
140 /// or the call to `realloc` failed, for whatever reason.
141 /// In case of `Err`, the [`Bytes`] will remain as it was (i.e. have the old size).
142 pub fn try_realloc(&mut self, new_len: usize) -> Result<(), ()> {
143 if let Deallocation::Standard(old_layout) = self.deallocation {
144 if old_layout.size() == new_len {
145 return Ok(()); // Nothing to do
146 }
147
148 if let Ok(new_layout) = std::alloc::Layout::from_size_align(new_len, old_layout.align())
149 {
150 let old_ptr = self.ptr.as_ptr();
151
152 let new_ptr = match new_layout.size() {
153 0 => {
154 // SAFETY: Verified that old_layout.size != new_len (0)
155 unsafe { std::alloc::dealloc(self.ptr.as_ptr(), old_layout) };
156 Some(dangling_ptr())
157 }
158 // SAFETY: the call to `realloc` is safe if all the following hold (from https://doc.rust-lang.org/stable/std/alloc/trait.GlobalAlloc.html#method.realloc):
159 // * `old_ptr` must be currently allocated via this allocator (guaranteed by the invariant/contract of `Bytes`)
160 // * `old_layout` must be the same layout that was used to allocate that block of memory (same)
161 // * `new_len` must be greater than zero
162 // * `new_len`, when rounded up to the nearest multiple of `layout.align()`, must not overflow `isize` (guaranteed by the success of `Layout::from_size_align`)
163 _ => NonNull::new(unsafe { std::alloc::realloc(old_ptr, old_layout, new_len) }),
164 };
165
166 if let Some(ptr) = new_ptr {
167 self.ptr = ptr;
168 self.len = new_len;
169 self.deallocation = Deallocation::Standard(new_layout);
170
171 #[cfg(feature = "pool")]
172 {
173 // Resize reservation
174 self.resize_reservation(new_len);
175 }
176
177 return Ok(());
178 }
179 }
180 }
181
182 Err(())
183 }
184
185 #[inline]
186 pub(crate) fn deallocation(&self) -> &Deallocation {
187 &self.deallocation
188 }
189}
190
191// Deallocation is Send + Sync, repeating the bound here makes that refactoring safe
192// The only field that is not automatically Send+Sync then is the NonNull ptr
193unsafe impl Send for Bytes where Deallocation: Send {}
194unsafe impl Sync for Bytes where Deallocation: Sync {}
195
196impl Drop for Bytes {
197 #[inline]
198 fn drop(&mut self) {
199 match &self.deallocation {
200 Deallocation::Standard(layout) => match layout.size() {
201 0 => {} // Nothing to do
202 _ => unsafe { std::alloc::dealloc(self.ptr.as_ptr(), *layout) },
203 },
204 // The automatic drop implementation will free the memory once the reference count reaches zero
205 Deallocation::Custom(_allocation, _size) => (),
206 }
207 }
208}
209
210impl std::ops::Deref for Bytes {
211 type Target = [u8];
212
213 fn deref(&self) -> &[u8] {
214 unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
215 }
216}
217
218impl PartialEq for Bytes {
219 fn eq(&self, other: &Bytes) -> bool {
220 self.as_slice() == other.as_slice()
221 }
222}
223
224impl Debug for Bytes {
225 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
226 write!(f, "Bytes {{ ptr: {:?}, len: {}, data: ", self.ptr, self.len,)?;
227
228 f.debug_list().entries(self.iter()).finish()?;
229
230 write!(f, " }}")
231 }
232}
233
234impl From<bytes::Bytes> for Bytes {
235 fn from(value: bytes::Bytes) -> Self {
236 let len = value.len();
237 Self {
238 len,
239 ptr: NonNull::new(value.as_ptr() as _).unwrap(),
240 deallocation: Deallocation::Custom(std::sync::Arc::new(value), len),
241 #[cfg(feature = "pool")]
242 reservation: Mutex::new(None),
243 }
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use super::*;
250
251 #[test]
252 fn test_from_bytes() {
253 let message = b"hello arrow";
254
255 // we can create a Bytes from bytes::Bytes (created from slices)
256 let c_bytes: bytes::Bytes = message.as_ref().into();
257 let a_bytes: Bytes = c_bytes.into();
258 assert_eq!(a_bytes.as_slice(), message);
259
260 // we can create a Bytes from bytes::Bytes (created from Vec)
261 let c_bytes: bytes::Bytes = bytes::Bytes::from(message.to_vec());
262 let a_bytes: Bytes = c_bytes.into();
263 assert_eq!(a_bytes.as_slice(), message);
264 }
265
266 #[cfg(feature = "pool")]
267 mod pool_tests {
268 use super::*;
269
270 use crate::pool::TrackingMemoryPool;
271
272 #[test]
273 fn test_bytes_with_pool() {
274 // Create a standard allocation
275 let buffer = unsafe {
276 let layout =
277 std::alloc::Layout::from_size_align(1024, crate::alloc::ALIGNMENT).unwrap();
278 let ptr = std::alloc::alloc(layout);
279 assert!(!ptr.is_null());
280
281 Bytes::new(
282 NonNull::new(ptr).unwrap(),
283 1024,
284 Deallocation::Standard(layout),
285 )
286 };
287
288 // Create a memory pool
289 let pool = TrackingMemoryPool::default();
290 assert_eq!(pool.used(), 0);
291
292 // Reserve memory and assign to buffer. Claim twice.
293 buffer.claim(&pool);
294 assert_eq!(pool.used(), 1024);
295 buffer.claim(&pool);
296 assert_eq!(pool.used(), 1024);
297
298 // Memory should be released when buffer is dropped
299 drop(buffer);
300 assert_eq!(pool.used(), 0);
301 }
302
303 #[test]
304 fn test_bytes_drop_releases_pool() {
305 let pool = TrackingMemoryPool::default();
306
307 {
308 // Create a buffer with pool
309 let _buffer = unsafe {
310 let layout =
311 std::alloc::Layout::from_size_align(1024, crate::alloc::ALIGNMENT).unwrap();
312 let ptr = std::alloc::alloc(layout);
313 assert!(!ptr.is_null());
314
315 let bytes = Bytes::new(
316 NonNull::new(ptr).unwrap(),
317 1024,
318 Deallocation::Standard(layout),
319 );
320
321 bytes.claim(&pool);
322 bytes
323 };
324
325 assert_eq!(pool.used(), 1024);
326 }
327
328 // Buffer has been dropped, memory should be released
329 assert_eq!(pool.used(), 0);
330 }
331 }
332}