parquet/column/page_store.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Pluggable storage for completed, serialized page blobs.
19//!
20//! While a row group is being written the [`ArrowWriter`] must buffer every
21//! column's encoded pages, because Parquet requires each column chunk to be
22//! contiguous in the file while record batches arrive with all columns interleaved.
23//! By default that buffer lives on the heap, so the writer's peak memory grows
24//! with the row group size. A [`PageStore`] lets the buffer live somewhere else
25//! — a local temp file, object storage, etc. — bounding peak write memory
26//! independently of the row group size.
27//!
28//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
29
30use std::fmt::Debug;
31
32use bytes::Bytes;
33
34use crate::errors::{ParquetError, Result};
35use crate::schema::types::ColumnDescriptor;
36
37/// An opaque, store-allocated handle to a blob held by a [`PageStore`].
38///
39/// Handles are allocated by the store — densely and sequentially — and are only
40/// meaningful to the store that produced them. The caller treats them as opaque
41/// tokens.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
43pub struct PageKey(u64);
44
45impl PageKey {
46 /// Create a handle wrapping `raw`.
47 ///
48 /// A [`PageStore`] implementation calls this to mint the handle it returns
49 /// from [`put`](PageStore::put). The value is opaque to the caller, so a
50 /// store is free to use a dense counter, a packed locator, or anything else
51 /// it can later resolve in [`take`](PageStore::take).
52 pub const fn new(raw: u64) -> Self {
53 Self(raw)
54 }
55
56 /// The raw value passed to [`new`](Self::new).
57 pub const fn get(self) -> u64 {
58 self.0
59 }
60}
61
62/// A pluggable store for completed, serialized page blobs.
63///
64/// The store is intentionally "dumb": it only maps an opaque [`PageKey`] to a
65/// blob of bytes. It knows nothing about pages, dictionaries, ordering, or
66/// offsets. The caller keeps the handles it gets back from [`put`](Self::put)
67/// and decides what they mean.
68///
69/// Each store instance is owned by a single column writer and mutated by one
70/// thread at a time (both methods take `&mut self`), so it needs no internal
71/// synchronization — hence only `Send`, not `Sync`.
72///
73/// The default ([`InMemoryPageStore`]) keeps blobs on the heap. Configure a
74/// different backend via
75/// [`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
76pub trait PageStore: Send {
77 /// Store `value`, returning a handle that can later be passed to
78 /// [`take`](Self::take).
79 fn put(&mut self, value: Bytes) -> Result<PageKey>;
80
81 /// Take back the blob previously stored under `key`.
82 ///
83 /// The caller takes ownership of the returned bytes and will **not** request
84 /// `key` again, so the store may release any resources backing it — eagerly
85 /// here, or when the store is dropped.
86 fn take(&mut self, key: PageKey) -> Result<Bytes>;
87
88 /// The number of bytes this store currently holds **in memory** (resident
89 /// on the heap), used to report the writer's memory footprint.
90 ///
91 /// The default is `0`, which is exactly right for a backend that moves
92 /// every blob off-heap (a temp file, object storage): the bytes it has been
93 /// handed no longer occupy heap. The in-memory backend overrides this to
94 /// report its resident blobs. A backend that keeps a partial in-memory
95 /// buffer should report that buffer's size.
96 fn memory_size(&self) -> usize {
97 0
98 }
99}
100
101/// Context for a single [`PageStoreFactory::create`] call.
102///
103/// Describes the leaf column chunk the store will buffer. It is held by
104/// reference for the duration of the call; a backend reads only what it needs.
105/// More fields may be added in future releases without breaking existing
106/// implementations — the type is constructed only by the writer, so an
107/// implementer only ever receives one and calls its accessors.
108pub struct PageStoreArgs<'a> {
109 column_index: usize,
110 column_descriptor: &'a ColumnDescriptor,
111}
112
113impl<'a> PageStoreArgs<'a> {
114 // Constructed only by the Arrow writer; without that feature there is no caller.
115 #[cfg(feature = "arrow")]
116 pub(crate) fn new(column_index: usize, column_descriptor: &'a ColumnDescriptor) -> Self {
117 Self {
118 column_index,
119 column_descriptor,
120 }
121 }
122
123 /// Index of the leaf column within the row group.
124 ///
125 /// A backend may use this to e.g. name spill files or shard across a bounded
126 /// pool; it carries no ordering or coordination requirement.
127 pub fn column_index(&self) -> usize {
128 self.column_index
129 }
130
131 /// Descriptor for the leaf column: physical/logical type, path, and max
132 /// definition/repetition levels.
133 ///
134 /// Lets a backend tailor buffering to the column — for example spilling only
135 /// large `BYTE_ARRAY` columns while keeping small fixed-width ones on the
136 /// heap.
137 pub fn column_descriptor(&self) -> &ColumnDescriptor {
138 self.column_descriptor
139 }
140}
141
142/// Creates a fresh [`PageStore`] for each column chunk.
143///
144/// See
145/// [`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
146pub trait PageStoreFactory: Send + Sync + Debug {
147 /// Create a new, empty [`PageStore`] for the leaf column described by `args`.
148 fn create(&self, args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>>;
149}
150
151/// The default [`PageStore`], holding blobs on the heap in a `Vec<Bytes>`.
152///
153/// Peak memory grows with the row group size; use a spilling backend to bound
154/// it.
155#[derive(Debug, Default)]
156pub struct InMemoryPageStore {
157 blobs: Vec<Bytes>,
158 /// Running total of resident blob bytes, kept in step with `put`/`take`.
159 resident: usize,
160}
161
162impl PageStore for InMemoryPageStore {
163 fn put(&mut self, value: Bytes) -> Result<PageKey> {
164 let key = PageKey(self.blobs.len() as u64);
165 self.resident += value.len();
166 self.blobs.push(value);
167 Ok(key)
168 }
169
170 fn take(&mut self, key: PageKey) -> Result<Bytes> {
171 // Replace the slot with an empty `Bytes` so the stored blob is released
172 // as soon as it is taken, keeping memory bounded while the chunk is
173 // streamed into the output file.
174 let blob = self
175 .blobs
176 .get_mut(key.0 as usize)
177 .map(std::mem::take)
178 .ok_or_else(|| ParquetError::General(format!("invalid page key {}", key.0)))?;
179 self.resident -= blob.len();
180 Ok(blob)
181 }
182
183 fn memory_size(&self) -> usize {
184 self.resident
185 }
186}
187
188/// Factory for [`InMemoryPageStore`] — the default used by
189/// [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter).
190#[derive(Debug, Default)]
191pub struct InMemoryPageStoreFactory;
192
193impl PageStoreFactory for InMemoryPageStoreFactory {
194 fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
195 Ok(Box::new(InMemoryPageStore::default()))
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202
203 #[test]
204 fn in_memory_round_trips_blobs_in_handle_order() {
205 let mut store = InMemoryPageStore::default();
206 let k0 = store.put(Bytes::from_static(b"hello")).unwrap();
207 let k1 = store.put(Bytes::from_static(b"world")).unwrap();
208 assert_ne!(k0, k1);
209 assert_eq!(&store.take(k0).unwrap()[..], b"hello");
210 assert_eq!(&store.take(k1).unwrap()[..], b"world");
211 }
212
213 #[test]
214 fn in_memory_take_releases_the_slot() {
215 let mut store = InMemoryPageStore::default();
216 let k = store.put(Bytes::from_static(b"abc")).unwrap();
217 assert_eq!(&store.take(k).unwrap()[..], b"abc");
218 // A second take yields the emptied placeholder rather than the blob,
219 // confirming the bytes were released on the first take.
220 assert!(store.take(k).unwrap().is_empty());
221 }
222
223 #[test]
224 fn in_memory_invalid_key_errors() {
225 let mut store = InMemoryPageStore::default();
226 assert!(store.take(PageKey(99)).is_err());
227 }
228
229 #[test]
230 fn in_memory_reports_resident_bytes() {
231 let mut store = InMemoryPageStore::default();
232 assert_eq!(store.memory_size(), 0);
233 let k0 = store.put(Bytes::from_static(b"hello")).unwrap();
234 let k1 = store.put(Bytes::from_static(b"!")).unwrap();
235 assert_eq!(store.memory_size(), 6);
236 store.take(k0).unwrap();
237 assert_eq!(store.memory_size(), 1);
238 store.take(k1).unwrap();
239 assert_eq!(store.memory_size(), 0);
240 }
241
242 #[test]
243 fn default_store_memory_size_is_zero() {
244 // A spilling backend that does not override `memory_size` reports 0,
245 // reflecting that its blobs no longer occupy the heap.
246 struct OffHeap;
247 impl PageStore for OffHeap {
248 fn put(&mut self, _value: Bytes) -> Result<PageKey> {
249 Ok(PageKey::new(0))
250 }
251 fn take(&mut self, _key: PageKey) -> Result<Bytes> {
252 Ok(Bytes::new())
253 }
254 }
255 assert_eq!(OffHeap.memory_size(), 0);
256 }
257}