parquet/column/page_store.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Pluggable storage for completed, serialized page blobs.
19//!
20//! While a row group is being written the [`ArrowWriter`] must buffer every
21//! column's encoded pages, because Parquet requires each column chunk to be
22//! contiguous in the file while record batches arrive with all columns interleaved.
23//! By default that buffer lives on the heap, so the writer's peak memory grows
24//! with the row group size. A [`PageStore`] lets the buffer live somewhere else
25//! — a local temp file, object storage, etc. — bounding peak write memory
26//! independently of the row group size.
27//!
28//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
29
30use std::fmt::Debug;
31
32use bytes::Bytes;
33
34use crate::errors::{ParquetError, Result};
35use crate::schema::types::ColumnDescriptor;
36
37/// An opaque, store-allocated handle to a blob held by a [`PageStore`].
38///
39/// Handles are allocated by the store — densely and sequentially — and are only
40/// meaningful to the store that produced them. The caller treats them as opaque
41/// tokens.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
43pub struct PageKey(u64);
44
45impl PageKey {
46 /// Create a handle wrapping `raw`.
47 ///
48 /// A [`PageStore`] implementation calls this to mint the handle it returns
49 /// from [`put`](PageStore::put). The value is opaque to the caller, so a
50 /// store is free to use a dense counter, a packed locator, or anything else
51 /// it can later resolve in [`take`](PageStore::take).
52 pub const fn new(raw: u64) -> Self {
53 Self(raw)
54 }
55
56 /// The raw value passed to [`new`](Self::new).
57 pub const fn get(self) -> u64 {
58 self.0
59 }
60}
61
62/// A pluggable store for completed, serialized page blobs.
63///
64/// The store is intentionally "dumb": it only maps an opaque [`PageKey`] to a
65/// blob of bytes. It knows nothing about pages, dictionaries, ordering, or
66/// offsets. The caller keeps the handles it gets back from [`put`](Self::put)
67/// and decides what they mean.
68///
69/// Each store instance is owned by a single column writer and mutated by one
70/// thread at a time (both methods take `&mut self`), so it needs no internal
71/// synchronization — hence only `Send`, not `Sync`.
72///
73/// The default ([`InMemoryPageStore`]) keeps blobs in memory on the heap.
74///
75/// For an example of configuring the Parquet writer to use an alternate
76/// `PageStore` see the [`ArrowWriterOptions::with_page_store_factory`] API.
77///
78/// [`ArrowWriterOptions::with_page_store_factory`]: crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory
79pub trait PageStore: Send {
80 /// Store `value`, returning a handle that can later be passed to
81 /// [`take`](Self::take).
82 fn put(&mut self, value: Bytes) -> Result<PageKey>;
83
84 /// Take back the blob previously stored under `key`.
85 ///
86 /// The caller takes ownership of the returned bytes and will **not** request
87 /// `key` again, so the store may release any resources backing it — eagerly
88 /// here, or when the store is dropped.
89 fn take(&mut self, key: PageKey) -> Result<Bytes>;
90
91 /// The number of bytes this store currently holds **in memory** (resident
92 /// on the heap), used to report the writer's memory footprint.
93 ///
94 /// The default is `0`, which is exactly right for a backend that moves
95 /// every blob off-heap (a temp file, object storage): the bytes it has been
96 /// handed no longer occupy heap. The in-memory backend overrides this to
97 /// report its resident blobs. A backend that keeps a partial in-memory
98 /// buffer should report that buffer's size.
99 fn memory_size(&self) -> usize {
100 0
101 }
102}
103
104/// Context for a single [`PageStoreFactory::create`] call.
105///
106/// Describes the leaf column chunk the store will buffer. It is held by
107/// reference for the duration of the call; a backend reads only what it needs.
108/// More fields may be added in future releases without breaking existing
109/// implementations — the type is constructed only by the writer, so an
110/// implementer only ever receives one and calls its accessors.
111pub struct PageStoreArgs<'a> {
112 column_index: usize,
113 column_descriptor: &'a ColumnDescriptor,
114}
115
116impl<'a> PageStoreArgs<'a> {
117 // Constructed only by the Arrow writer; without that feature there is no caller.
118 #[cfg(feature = "arrow")]
119 pub(crate) fn new(column_index: usize, column_descriptor: &'a ColumnDescriptor) -> Self {
120 Self {
121 column_index,
122 column_descriptor,
123 }
124 }
125
126 /// Index of the leaf column within the row group.
127 ///
128 /// A backend may use this to e.g. name spill files or shard across a bounded
129 /// pool; it carries no ordering or coordination requirement.
130 pub fn column_index(&self) -> usize {
131 self.column_index
132 }
133
134 /// Descriptor for the leaf column: physical/logical type, path, and max
135 /// definition/repetition levels.
136 ///
137 /// Lets a backend tailor buffering to the column — for example spilling only
138 /// large `BYTE_ARRAY` columns while keeping small fixed-width ones on the
139 /// heap.
140 pub fn column_descriptor(&self) -> &ColumnDescriptor {
141 self.column_descriptor
142 }
143}
144
145/// Creates a fresh [`PageStore`] for each column chunk.
146///
147/// See
148/// [`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
149pub trait PageStoreFactory: Send + Sync + Debug {
150 /// Create a new, empty [`PageStore`] for the leaf column described by `args`.
151 fn create(&self, args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>>;
152}
153
154/// The default [`PageStore`], holding blobs on the heap in a `Vec<Bytes>`.
155///
156/// Peak memory grows with the row group size; use a spilling backend to bound
157/// it.
158#[derive(Debug, Default)]
159pub struct InMemoryPageStore {
160 blobs: Vec<Bytes>,
161 /// Running total of resident blob bytes, kept in step with `put`/`take`.
162 resident: usize,
163}
164
165impl PageStore for InMemoryPageStore {
166 fn put(&mut self, value: Bytes) -> Result<PageKey> {
167 let key = PageKey(self.blobs.len() as u64);
168 self.resident += value.len();
169 self.blobs.push(value);
170 Ok(key)
171 }
172
173 fn take(&mut self, key: PageKey) -> Result<Bytes> {
174 // Replace the slot with an empty `Bytes` so the stored blob is released
175 // as soon as it is taken, keeping memory bounded while the chunk is
176 // streamed into the output file.
177 let blob = self
178 .blobs
179 .get_mut(key.0 as usize)
180 .map(std::mem::take)
181 .ok_or_else(|| ParquetError::General(format!("invalid page key {}", key.0)))?;
182 self.resident -= blob.len();
183 Ok(blob)
184 }
185
186 fn memory_size(&self) -> usize {
187 self.resident
188 }
189}
190
191/// Factory for [`InMemoryPageStore`] — the default used by
192/// [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter).
193#[derive(Debug, Default)]
194pub struct InMemoryPageStoreFactory;
195
196impl PageStoreFactory for InMemoryPageStoreFactory {
197 fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
198 Ok(Box::new(InMemoryPageStore::default()))
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205
206 #[test]
207 fn in_memory_round_trips_blobs_in_handle_order() {
208 let mut store = InMemoryPageStore::default();
209 let k0 = store.put(Bytes::from_static(b"hello")).unwrap();
210 let k1 = store.put(Bytes::from_static(b"world")).unwrap();
211 assert_ne!(k0, k1);
212 assert_eq!(&store.take(k0).unwrap()[..], b"hello");
213 assert_eq!(&store.take(k1).unwrap()[..], b"world");
214 }
215
216 #[test]
217 fn in_memory_take_releases_the_slot() {
218 let mut store = InMemoryPageStore::default();
219 let k = store.put(Bytes::from_static(b"abc")).unwrap();
220 assert_eq!(&store.take(k).unwrap()[..], b"abc");
221 // A second take yields the emptied placeholder rather than the blob,
222 // confirming the bytes were released on the first take.
223 assert!(store.take(k).unwrap().is_empty());
224 }
225
226 #[test]
227 fn in_memory_invalid_key_errors() {
228 let mut store = InMemoryPageStore::default();
229 assert!(store.take(PageKey(99)).is_err());
230 }
231
232 #[test]
233 fn in_memory_reports_resident_bytes() {
234 let mut store = InMemoryPageStore::default();
235 assert_eq!(store.memory_size(), 0);
236 let k0 = store.put(Bytes::from_static(b"hello")).unwrap();
237 let k1 = store.put(Bytes::from_static(b"!")).unwrap();
238 assert_eq!(store.memory_size(), 6);
239 store.take(k0).unwrap();
240 assert_eq!(store.memory_size(), 1);
241 store.take(k1).unwrap();
242 assert_eq!(store.memory_size(), 0);
243 }
244
245 #[test]
246 fn default_store_memory_size_is_zero() {
247 // A spilling backend that does not override `memory_size` reports 0,
248 // reflecting that its blobs no longer occupy the heap.
249 struct OffHeap;
250 impl PageStore for OffHeap {
251 fn put(&mut self, _value: Bytes) -> Result<PageKey> {
252 Ok(PageKey::new(0))
253 }
254 fn take(&mut self, _key: PageKey) -> Result<Bytes> {
255 Ok(Bytes::new())
256 }
257 }
258 assert_eq!(OffHeap.memory_size(), 0);
259 }
260}