arrow_buffer/bytes.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains an implementation of a contiguous immutable memory region that knows
19//! how to de-allocate itself, [`Bytes`].
20//! Note that this is a low-level functionality of this crate.
21
22use core::slice;
23use std::ptr::NonNull;
24use std::{fmt::Debug, fmt::Formatter};
25
26use crate::alloc::Deallocation;
27use crate::buffer::dangling_ptr;
28
29#[cfg(feature = "pool")]
30use crate::pool::{MemoryPool, MemoryReservation};
31#[cfg(feature = "pool")]
32use std::sync::Mutex;
33
34/// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself.
35///
36/// Note that this structure is an internal implementation detail of the
37/// arrow-rs crate. While it has the same name and similar API as
38/// [`bytes::Bytes`] it is not limited to rust's global allocator nor u8
39/// alignment. It is possible to create a `Bytes` from `bytes::Bytes` using the
40/// `From` implementation.
41///
42/// In the most common case, this buffer is allocated using [`alloc`](std::alloc::alloc)
43/// with an alignment of [`ALIGNMENT`](crate::alloc::ALIGNMENT)
44///
45/// When the region is allocated by a different allocator, [Deallocation::Custom], this calls the
46/// custom deallocator to deallocate the region when it is no longer needed.
47///
48pub(crate) struct Bytes {
49 /// The raw pointer to be beginning of the region
50 ptr: NonNull<u8>,
51
52 /// The number of bytes visible to this region. This is always smaller than its capacity (when available).
53 len: usize,
54
55 /// how to deallocate this region
56 deallocation: Deallocation,
57
58 /// Memory reservation for tracking memory usage
59 #[cfg(feature = "pool")]
60 pub(super) reservation: Mutex<Option<Box<dyn MemoryReservation>>>,
61}
62
63impl Bytes {
64 /// Takes ownership of an allocated memory region,
65 ///
66 /// # Arguments
67 ///
68 /// * `ptr` - Pointer to raw parts
69 /// * `len` - Length of raw parts in **bytes**
70 /// * `deallocation` - Type of allocation
71 ///
72 /// # Safety
73 ///
74 /// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
75 /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
76 #[inline]
77 pub(crate) unsafe fn new(ptr: NonNull<u8>, len: usize, deallocation: Deallocation) -> Bytes {
78 Bytes {
79 ptr,
80 len,
81 deallocation,
82 #[cfg(feature = "pool")]
83 reservation: Mutex::new(None),
84 }
85 }
86
87 fn as_slice(&self) -> &[u8] {
88 self
89 }
90
91 #[inline]
92 pub(crate) fn len(&self) -> usize {
93 self.len
94 }
95
96 #[inline]
97 pub(crate) fn ptr(&self) -> NonNull<u8> {
98 self.ptr
99 }
100
101 pub(crate) fn capacity(&self) -> usize {
102 match self.deallocation {
103 Deallocation::Standard(layout) => layout.size(),
104 // we only know the size of the custom allocation
105 // its underlying capacity might be larger
106 Deallocation::Custom(_, size) => size,
107 }
108 }
109
110 /// Register this [`Bytes`] with the provided [`MemoryPool`], replacing any prior reservation.
111 #[cfg(feature = "pool")]
112 pub(crate) fn claim(&self, pool: &dyn MemoryPool) {
113 *self.reservation.lock().unwrap() = Some(pool.reserve(self.capacity()));
114 }
115
116 /// Resize the memory reservation of this buffer
117 ///
118 /// This is a no-op if this buffer doesn't have a reservation.
119 #[cfg(feature = "pool")]
120 fn resize_reservation(&self, new_size: usize) {
121 let mut guard = self.reservation.lock().unwrap();
122 if let Some(mut reservation) = guard.take() {
123 // Resize the reservation
124 reservation.resize(new_size);
125
126 // Put it back
127 *guard = Some(reservation);
128 }
129 }
130
131 /// Try to reallocate the underlying memory region to a new size (smaller or larger).
132 ///
133 /// Only works for bytes allocated with the standard allocator.
134 /// Returns `Err` if the memory was allocated with a custom allocator,
135 /// or the call to `realloc` failed, for whatever reason.
136 /// In case of `Err`, the [`Bytes`] will remain as it was (i.e. have the old size).
137 pub(crate) fn try_realloc(&mut self, new_len: usize) -> Result<(), ()> {
138 if let Deallocation::Standard(old_layout) = self.deallocation {
139 if old_layout.size() == new_len {
140 return Ok(()); // Nothing to do
141 }
142
143 if let Ok(new_layout) = std::alloc::Layout::from_size_align(new_len, old_layout.align())
144 {
145 let old_ptr = self.ptr.as_ptr();
146
147 let new_ptr = match new_layout.size() {
148 0 => {
149 // SAFETY: Verified that old_layout.size != new_len (0)
150 unsafe { std::alloc::dealloc(self.ptr.as_ptr(), old_layout) };
151 Some(dangling_ptr())
152 }
153 // SAFETY: the call to `realloc` is safe if all the following hold (from https://doc.rust-lang.org/stable/std/alloc/trait.GlobalAlloc.html#method.realloc):
154 // * `old_ptr` must be currently allocated via this allocator (guaranteed by the invariant/contract of `Bytes`)
155 // * `old_layout` must be the same layout that was used to allocate that block of memory (same)
156 // * `new_len` must be greater than zero
157 // * `new_len`, when rounded up to the nearest multiple of `layout.align()`, must not overflow `isize` (guaranteed by the success of `Layout::from_size_align`)
158 _ => NonNull::new(unsafe { std::alloc::realloc(old_ptr, old_layout, new_len) }),
159 };
160
161 if let Some(ptr) = new_ptr {
162 self.ptr = ptr;
163 self.len = new_len;
164 self.deallocation = Deallocation::Standard(new_layout);
165
166 #[cfg(feature = "pool")]
167 {
168 // Resize reservation
169 self.resize_reservation(new_len);
170 }
171
172 return Ok(());
173 }
174 }
175 }
176
177 Err(())
178 }
179
180 #[inline]
181 pub(crate) fn deallocation(&self) -> &Deallocation {
182 &self.deallocation
183 }
184}
185
186// Deallocation is Send + Sync, repeating the bound here makes that refactoring safe
187// The only field that is not automatically Send+Sync then is the NonNull ptr
188unsafe impl Send for Bytes where Deallocation: Send {}
189unsafe impl Sync for Bytes where Deallocation: Sync {}
190
191impl Drop for Bytes {
192 #[inline]
193 fn drop(&mut self) {
194 match &self.deallocation {
195 Deallocation::Standard(layout) => match layout.size() {
196 0 => {} // Nothing to do
197 _ => unsafe { std::alloc::dealloc(self.ptr.as_ptr(), *layout) },
198 },
199 // The automatic drop implementation will free the memory once the reference count reaches zero
200 Deallocation::Custom(_allocation, _size) => (),
201 }
202 }
203}
204
205impl std::ops::Deref for Bytes {
206 type Target = [u8];
207
208 fn deref(&self) -> &[u8] {
209 unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
210 }
211}
212
213impl PartialEq for Bytes {
214 fn eq(&self, other: &Bytes) -> bool {
215 self.as_slice() == other.as_slice()
216 }
217}
218
219impl Debug for Bytes {
220 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
221 write!(f, "Bytes {{ ptr: {:?}, len: {}, data: ", self.ptr, self.len,)?;
222
223 f.debug_list().entries(self.iter()).finish()?;
224
225 write!(f, " }}")
226 }
227}
228
229impl From<bytes::Bytes> for Bytes {
230 fn from(value: bytes::Bytes) -> Self {
231 let len = value.len();
232 Self {
233 len,
234 ptr: NonNull::new(value.as_ptr() as _).unwrap(),
235 deallocation: Deallocation::Custom(std::sync::Arc::new(value), len),
236 #[cfg(feature = "pool")]
237 reservation: Mutex::new(None),
238 }
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245
246 #[test]
247 fn test_from_bytes() {
248 let message = b"hello arrow";
249
250 // we can create a Bytes from bytes::Bytes (created from slices)
251 let c_bytes: bytes::Bytes = message.as_ref().into();
252 let a_bytes: Bytes = c_bytes.into();
253 assert_eq!(a_bytes.as_slice(), message);
254
255 // we can create a Bytes from bytes::Bytes (created from Vec)
256 let c_bytes: bytes::Bytes = bytes::Bytes::from(message.to_vec());
257 let a_bytes: Bytes = c_bytes.into();
258 assert_eq!(a_bytes.as_slice(), message);
259 }
260
261 #[cfg(feature = "pool")]
262 mod pool_tests {
263 use super::*;
264
265 use crate::pool::TrackingMemoryPool;
266
267 #[test]
268 fn test_bytes_with_pool() {
269 // Create a standard allocation
270 let buffer = unsafe {
271 let layout =
272 std::alloc::Layout::from_size_align(1024, crate::alloc::ALIGNMENT).unwrap();
273 let ptr = std::alloc::alloc(layout);
274 assert!(!ptr.is_null());
275
276 Bytes::new(
277 NonNull::new(ptr).unwrap(),
278 1024,
279 Deallocation::Standard(layout),
280 )
281 };
282
283 // Create a memory pool
284 let pool = TrackingMemoryPool::default();
285 assert_eq!(pool.used(), 0);
286
287 // Reserve memory and assign to buffer. Claim twice.
288 buffer.claim(&pool);
289 assert_eq!(pool.used(), 1024);
290 buffer.claim(&pool);
291 assert_eq!(pool.used(), 1024);
292
293 // Memory should be released when buffer is dropped
294 drop(buffer);
295 assert_eq!(pool.used(), 0);
296 }
297
298 #[test]
299 fn test_bytes_drop_releases_pool() {
300 let pool = TrackingMemoryPool::default();
301
302 {
303 // Create a buffer with pool
304 let _buffer = unsafe {
305 let layout =
306 std::alloc::Layout::from_size_align(1024, crate::alloc::ALIGNMENT).unwrap();
307 let ptr = std::alloc::alloc(layout);
308 assert!(!ptr.is_null());
309
310 let bytes = Bytes::new(
311 NonNull::new(ptr).unwrap(),
312 1024,
313 Deallocation::Standard(layout),
314 );
315
316 bytes.claim(&pool);
317 bytes
318 };
319
320 assert_eq!(pool.used(), 1024);
321 }
322
323 // Buffer has been dropped, memory should be released
324 assert_eq!(pool.used(), 0);
325 }
326 }
327}