arrow_buffer/pool.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains traits for memory pool traits and an implementation
19//! for tracking memory usage.
20//!
21//! The basic traits are [`MemoryPool`] and [`MemoryReservation`]. And default
22//! implementation of [`MemoryPool`] is [`TrackingMemoryPool`]. Their relationship
23//! is as follows:
24//!
25//! ```text
26//! (pool tracker) (resizable)
27//! ┌──────────────────┐ fn reserve() ┌─────────────────────────┐
28//! │ trait MemoryPool │─────────────►│ trait MemoryReservation │
29//! └──────────────────┘ └─────────────────────────┘
30//! ```
31
32use std::fmt::Debug;
33use std::sync::atomic::{AtomicUsize, Ordering};
34use std::sync::Arc;
35
36/// A memory reservation within a [`MemoryPool`] that is freed on drop
37pub trait MemoryReservation: Debug + Send + Sync {
38 /// Returns the size of this reservation in bytes.
39 fn size(&self) -> usize;
40
41 /// Resize this reservation to a new size in bytes.
42 fn resize(&mut self, new_size: usize);
43}
44
45/// A pool of memory that can be reserved and released.
46///
47/// This is used to accurately track memory usage when buffers are shared
48/// between multiple arrays or other data structures.
49///
50/// For example, assume we have two arrays that share underlying buffer.
51/// It's hard to tell how much memory is used by them because we can't
52/// tell if the buffer is shared or not.
53///
54/// ```text
55/// Array A Array B
56/// ┌────────────┐ ┌────────────┐
57/// │ slices... │ │ slices... │
58/// │────────────│ │────────────│
59/// │ Arc<Bytes> │ │ Arc<Bytes> │ (shared buffer)
60/// └─────▲──────┘ └───────▲────┘
61/// │ │
62/// │ Bytes │
63/// │ ┌─────────────┐ │
64/// │ │ data... │ │
65/// │ │─────────────│ │
66/// └──│ Memory │──┘ (tracked with a memory pool)
67/// │ Reservation │
68/// └─────────────┘
69/// ```
70///
71/// With a memory pool, we can count the memory usage by the shared buffer
72/// directly.
73pub trait MemoryPool: Debug + Send + Sync {
74 /// Reserves memory from the pool. Infallible.
75 ///
76 /// Returns a reservation of the requested size.
77 fn reserve(&self, size: usize) -> Box<dyn MemoryReservation>;
78
79 /// Returns the current available memory in the pool.
80 ///
81 /// The pool may be overfilled, so this method might return a negative value.
82 fn available(&self) -> isize;
83
84 /// Returns the current used memory from the pool.
85 fn used(&self) -> usize;
86
87 /// Returns the maximum memory that can be reserved from the pool.
88 fn capacity(&self) -> usize;
89}
90
91/// A simple [`MemoryPool`] that reports the total memory usage
92#[derive(Debug, Default)]
93pub struct TrackingMemoryPool(Arc<AtomicUsize>);
94
95impl TrackingMemoryPool {
96 /// Returns the total allocated size
97 pub fn allocated(&self) -> usize {
98 self.0.load(Ordering::Relaxed)
99 }
100}
101
102impl MemoryPool for TrackingMemoryPool {
103 fn reserve(&self, size: usize) -> Box<dyn MemoryReservation> {
104 self.0.fetch_add(size, Ordering::Relaxed);
105 Box::new(Tracker {
106 size,
107 shared: Arc::clone(&self.0),
108 })
109 }
110
111 fn available(&self) -> isize {
112 isize::MAX - self.used() as isize
113 }
114
115 fn used(&self) -> usize {
116 self.0.load(Ordering::Relaxed)
117 }
118
119 fn capacity(&self) -> usize {
120 usize::MAX
121 }
122}
123
124#[derive(Debug)]
125struct Tracker {
126 size: usize,
127 shared: Arc<AtomicUsize>,
128}
129
130impl Drop for Tracker {
131 fn drop(&mut self) {
132 self.shared.fetch_sub(self.size, Ordering::Relaxed);
133 }
134}
135
136impl MemoryReservation for Tracker {
137 fn size(&self) -> usize {
138 self.size
139 }
140
141 fn resize(&mut self, new: usize) {
142 match self.size < new {
143 true => self.shared.fetch_add(new - self.size, Ordering::Relaxed),
144 false => self.shared.fetch_sub(self.size - new, Ordering::Relaxed),
145 };
146 self.size = new;
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use super::*;
153
154 #[test]
155 fn test_tracking_memory_pool() {
156 let pool = TrackingMemoryPool::default();
157
158 // Reserve 512 bytes
159 let reservation = pool.reserve(512);
160 assert_eq!(reservation.size(), 512);
161 assert_eq!(pool.used(), 512);
162 assert_eq!(pool.available(), isize::MAX - 512);
163
164 // Reserve another 256 bytes
165 let reservation2 = pool.reserve(256);
166 assert_eq!(reservation2.size(), 256);
167 assert_eq!(pool.used(), 768);
168 assert_eq!(pool.available(), isize::MAX - 768);
169
170 // Test resize to increase
171 let mut reservation_mut = reservation;
172 reservation_mut.resize(600);
173 assert_eq!(reservation_mut.size(), 600);
174 assert_eq!(pool.used(), 856); // 600 + 256
175
176 // Test resize to decrease
177 reservation_mut.resize(400);
178 assert_eq!(reservation_mut.size(), 400);
179 assert_eq!(pool.used(), 656); // 400 + 256
180
181 // Drop the first reservation
182 drop(reservation_mut);
183 assert_eq!(pool.used(), 256);
184
185 // Drop the second reservation
186 drop(reservation2);
187 assert_eq!(pool.used(), 0);
188 }
189}