arrow_buffer/
pool.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains traits for memory pool traits and an implementation
19//! for tracking memory usage.
20//!
21//! The basic traits are [`MemoryPool`] and [`MemoryReservation`]. And default
22//! implementation of [`MemoryPool`] is [`TrackingMemoryPool`]. Their relationship
23//! is as follows:
24//!
25//! ```text
26//!     (pool tracker)                        (resizable)           
27//!  ┌──────────────────┐ fn reserve() ┌─────────────────────────┐
28//!  │ trait MemoryPool │─────────────►│ trait MemoryReservation │
29//!  └──────────────────┘              └─────────────────────────┘
30//! ```
31
32use std::fmt::Debug;
33use std::sync::atomic::{AtomicUsize, Ordering};
34use std::sync::Arc;
35
36/// A memory reservation within a [`MemoryPool`] that is freed on drop
37pub trait MemoryReservation: Debug + Send + Sync {
38    /// Returns the size of this reservation in bytes.
39    fn size(&self) -> usize;
40
41    /// Resize this reservation to a new size in bytes.
42    fn resize(&mut self, new_size: usize);
43}
44
45/// A pool of memory that can be reserved and released.
46///
47/// This is used to accurately track memory usage when buffers are shared
48/// between multiple arrays or other data structures.
49///
50/// For example, assume we have two arrays that share underlying buffer.
51/// It's hard to tell how much memory is used by them because we can't
52/// tell if the buffer is shared or not.
53///
54/// ```text
55///       Array A           Array B    
56///    ┌────────────┐    ┌────────────┐
57///    │ slices...  │    │ slices...  │
58///    │────────────│    │────────────│
59///    │ Arc<Bytes> │    │ Arc<Bytes> │ (shared buffer)
60///    └─────▲──────┘    └───────▲────┘
61///          │                   │     
62///          │       Bytes       │     
63///          │  ┌─────────────┐  │     
64///          │  │   data...   │  │     
65///          │  │─────────────│  │     
66///          └──│   Memory    │──┘   (tracked with a memory pool)  
67///             │ Reservation │        
68///             └─────────────┘        
69/// ```
70///
71/// With a memory pool, we can count the memory usage by the shared buffer
72/// directly.
73pub trait MemoryPool: Debug + Send + Sync {
74    /// Reserves memory from the pool. Infallible.
75    ///
76    /// Returns a reservation of the requested size.
77    fn reserve(&self, size: usize) -> Box<dyn MemoryReservation>;
78
79    /// Returns the current available memory in the pool.
80    ///
81    /// The pool may be overfilled, so this method might return a negative value.
82    fn available(&self) -> isize;
83
84    /// Returns the current used memory from the pool.
85    fn used(&self) -> usize;
86
87    /// Returns the maximum memory that can be reserved from the pool.
88    fn capacity(&self) -> usize;
89}
90
91/// A simple [`MemoryPool`] that reports the total memory usage
92#[derive(Debug, Default)]
93pub struct TrackingMemoryPool(Arc<AtomicUsize>);
94
95impl TrackingMemoryPool {
96    /// Returns the total allocated size
97    pub fn allocated(&self) -> usize {
98        self.0.load(Ordering::Relaxed)
99    }
100}
101
102impl MemoryPool for TrackingMemoryPool {
103    fn reserve(&self, size: usize) -> Box<dyn MemoryReservation> {
104        self.0.fetch_add(size, Ordering::Relaxed);
105        Box::new(Tracker {
106            size,
107            shared: Arc::clone(&self.0),
108        })
109    }
110
111    fn available(&self) -> isize {
112        isize::MAX - self.used() as isize
113    }
114
115    fn used(&self) -> usize {
116        self.0.load(Ordering::Relaxed)
117    }
118
119    fn capacity(&self) -> usize {
120        usize::MAX
121    }
122}
123
124#[derive(Debug)]
125struct Tracker {
126    size: usize,
127    shared: Arc<AtomicUsize>,
128}
129
130impl Drop for Tracker {
131    fn drop(&mut self) {
132        self.shared.fetch_sub(self.size, Ordering::Relaxed);
133    }
134}
135
136impl MemoryReservation for Tracker {
137    fn size(&self) -> usize {
138        self.size
139    }
140
141    fn resize(&mut self, new: usize) {
142        match self.size < new {
143            true => self.shared.fetch_add(new - self.size, Ordering::Relaxed),
144            false => self.shared.fetch_sub(self.size - new, Ordering::Relaxed),
145        };
146        self.size = new;
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153
154    #[test]
155    fn test_tracking_memory_pool() {
156        let pool = TrackingMemoryPool::default();
157
158        // Reserve 512 bytes
159        let reservation = pool.reserve(512);
160        assert_eq!(reservation.size(), 512);
161        assert_eq!(pool.used(), 512);
162        assert_eq!(pool.available(), isize::MAX - 512);
163
164        // Reserve another 256 bytes
165        let reservation2 = pool.reserve(256);
166        assert_eq!(reservation2.size(), 256);
167        assert_eq!(pool.used(), 768);
168        assert_eq!(pool.available(), isize::MAX - 768);
169
170        // Test resize to increase
171        let mut reservation_mut = reservation;
172        reservation_mut.resize(600);
173        assert_eq!(reservation_mut.size(), 600);
174        assert_eq!(pool.used(), 856); // 600 + 256
175
176        // Test resize to decrease
177        reservation_mut.resize(400);
178        assert_eq!(reservation_mut.size(), 400);
179        assert_eq!(pool.used(), 656); // 400 + 256
180
181        // Drop the first reservation
182        drop(reservation_mut);
183        assert_eq!(pool.used(), 256);
184
185        // Drop the second reservation
186        drop(reservation2);
187        assert_eq!(pool.used(), 0);
188    }
189}