Skip to main content

parquet/column/
page_store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Pluggable storage for completed, serialized page blobs.
19//!
20//! While a row group is being written the [`ArrowWriter`] must buffer every
21//! column's encoded pages, because Parquet requires each column chunk to be
22//! contiguous in the file while record batches arrive with all columns interleaved.
23//! By default that buffer lives on the heap, so the writer's peak memory grows
24//! with the row group size. A [`PageStore`] lets the buffer live somewhere else
25//! — a local temp file, object storage, etc. — bounding peak write memory
26//! independently of the row group size.
27//!
28//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
29
30use std::fmt::Debug;
31
32use bytes::Bytes;
33
34use crate::errors::{ParquetError, Result};
35use crate::schema::types::ColumnDescriptor;
36
37/// An opaque, store-allocated handle to a blob held by a [`PageStore`].
38///
39/// Handles are allocated by the store — densely and sequentially — and are only
40/// meaningful to the store that produced them. The caller treats them as opaque
41/// tokens.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
43pub struct PageKey(u64);
44
45impl PageKey {
46    /// Create a handle wrapping `raw`.
47    ///
48    /// A [`PageStore`] implementation calls this to mint the handle it returns
49    /// from [`put`](PageStore::put). The value is opaque to the caller, so a
50    /// store is free to use a dense counter, a packed locator, or anything else
51    /// it can later resolve in [`take`](PageStore::take).
52    pub const fn new(raw: u64) -> Self {
53        Self(raw)
54    }
55
56    /// The raw value passed to [`new`](Self::new).
57    pub const fn get(self) -> u64 {
58        self.0
59    }
60}
61
62/// A pluggable store for completed, serialized page blobs.
63///
64/// The store is intentionally "dumb": it only maps an opaque [`PageKey`] to a
65/// blob of bytes. It knows nothing about pages, dictionaries, ordering, or
66/// offsets. The caller keeps the handles it gets back from [`put`](Self::put)
67/// and decides what they mean.
68///
69/// Each store instance is owned by a single column writer and mutated by one
70/// thread at a time (both methods take `&mut self`), so it needs no internal
71/// synchronization — hence only `Send`, not `Sync`.
72///
73/// The default ([`InMemoryPageStore`]) keeps blobs in memory on the heap.
74///
75/// For an example of configuring the Parquet writer to use an alternate
76/// `PageStore` see the [`ArrowWriterOptions::with_page_store_factory`] API.
77///
78/// [`ArrowWriterOptions::with_page_store_factory`]: crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory
79pub trait PageStore: Send {
80    /// Store `value`, returning a handle that can later be passed to
81    /// [`take`](Self::take).
82    fn put(&mut self, value: Bytes) -> Result<PageKey>;
83
84    /// Take back the blob previously stored under `key`.
85    ///
86    /// The caller takes ownership of the returned bytes and will **not** request
87    /// `key` again, so the store may release any resources backing it — eagerly
88    /// here, or when the store is dropped.
89    fn take(&mut self, key: PageKey) -> Result<Bytes>;
90
91    /// The number of bytes this store currently holds **in memory** (resident
92    /// on the heap), used to report the writer's memory footprint.
93    ///
94    /// The default is `0`, which is exactly right for a backend that moves
95    /// every blob off-heap (a temp file, object storage): the bytes it has been
96    /// handed no longer occupy heap. The in-memory backend overrides this to
97    /// report its resident blobs. A backend that keeps a partial in-memory
98    /// buffer should report that buffer's size.
99    fn memory_size(&self) -> usize {
100        0
101    }
102}
103
104/// Context for a single [`PageStoreFactory::create`] call.
105///
106/// Describes the leaf column chunk the store will buffer. It is held by
107/// reference for the duration of the call; a backend reads only what it needs.
108/// More fields may be added in future releases without breaking existing
109/// implementations — the type is constructed only by the writer, so an
110/// implementer only ever receives one and calls its accessors.
111pub struct PageStoreArgs<'a> {
112    column_index: usize,
113    column_descriptor: &'a ColumnDescriptor,
114}
115
116impl<'a> PageStoreArgs<'a> {
117    // Constructed only by the Arrow writer; without that feature there is no caller.
118    #[cfg(feature = "arrow")]
119    pub(crate) fn new(column_index: usize, column_descriptor: &'a ColumnDescriptor) -> Self {
120        Self {
121            column_index,
122            column_descriptor,
123        }
124    }
125
126    /// Index of the leaf column within the row group.
127    ///
128    /// A backend may use this to e.g. name spill files or shard across a bounded
129    /// pool; it carries no ordering or coordination requirement.
130    pub fn column_index(&self) -> usize {
131        self.column_index
132    }
133
134    /// Descriptor for the leaf column: physical/logical type, path, and max
135    /// definition/repetition levels.
136    ///
137    /// Lets a backend tailor buffering to the column — for example spilling only
138    /// large `BYTE_ARRAY` columns while keeping small fixed-width ones on the
139    /// heap.
140    pub fn column_descriptor(&self) -> &ColumnDescriptor {
141        self.column_descriptor
142    }
143}
144
145/// Creates a fresh [`PageStore`] for each column chunk.
146///
147/// See
148/// [`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
149pub trait PageStoreFactory: Send + Sync + Debug {
150    /// Create a new, empty [`PageStore`] for the leaf column described by `args`.
151    fn create(&self, args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>>;
152}
153
154/// The default [`PageStore`], holding blobs on the heap in a `Vec<Bytes>`.
155///
156/// Peak memory grows with the row group size; use a spilling backend to bound
157/// it.
158#[derive(Debug, Default)]
159pub struct InMemoryPageStore {
160    blobs: Vec<Bytes>,
161    /// Running total of resident blob bytes, kept in step with `put`/`take`.
162    resident: usize,
163}
164
165impl PageStore for InMemoryPageStore {
166    fn put(&mut self, value: Bytes) -> Result<PageKey> {
167        let key = PageKey(self.blobs.len() as u64);
168        self.resident += value.len();
169        self.blobs.push(value);
170        Ok(key)
171    }
172
173    fn take(&mut self, key: PageKey) -> Result<Bytes> {
174        // Replace the slot with an empty `Bytes` so the stored blob is released
175        // as soon as it is taken, keeping memory bounded while the chunk is
176        // streamed into the output file.
177        let blob = self
178            .blobs
179            .get_mut(key.0 as usize)
180            .map(std::mem::take)
181            .ok_or_else(|| ParquetError::General(format!("invalid page key {}", key.0)))?;
182        self.resident -= blob.len();
183        Ok(blob)
184    }
185
186    fn memory_size(&self) -> usize {
187        self.resident
188    }
189}
190
191/// Factory for [`InMemoryPageStore`] — the default used by
192/// [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter).
193#[derive(Debug, Default)]
194pub struct InMemoryPageStoreFactory;
195
196impl PageStoreFactory for InMemoryPageStoreFactory {
197    fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
198        Ok(Box::new(InMemoryPageStore::default()))
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205
206    #[test]
207    fn in_memory_round_trips_blobs_in_handle_order() {
208        let mut store = InMemoryPageStore::default();
209        let k0 = store.put(Bytes::from_static(b"hello")).unwrap();
210        let k1 = store.put(Bytes::from_static(b"world")).unwrap();
211        assert_ne!(k0, k1);
212        assert_eq!(&store.take(k0).unwrap()[..], b"hello");
213        assert_eq!(&store.take(k1).unwrap()[..], b"world");
214    }
215
216    #[test]
217    fn in_memory_take_releases_the_slot() {
218        let mut store = InMemoryPageStore::default();
219        let k = store.put(Bytes::from_static(b"abc")).unwrap();
220        assert_eq!(&store.take(k).unwrap()[..], b"abc");
221        // A second take yields the emptied placeholder rather than the blob,
222        // confirming the bytes were released on the first take.
223        assert!(store.take(k).unwrap().is_empty());
224    }
225
226    #[test]
227    fn in_memory_invalid_key_errors() {
228        let mut store = InMemoryPageStore::default();
229        assert!(store.take(PageKey(99)).is_err());
230    }
231
232    #[test]
233    fn in_memory_reports_resident_bytes() {
234        let mut store = InMemoryPageStore::default();
235        assert_eq!(store.memory_size(), 0);
236        let k0 = store.put(Bytes::from_static(b"hello")).unwrap();
237        let k1 = store.put(Bytes::from_static(b"!")).unwrap();
238        assert_eq!(store.memory_size(), 6);
239        store.take(k0).unwrap();
240        assert_eq!(store.memory_size(), 1);
241        store.take(k1).unwrap();
242        assert_eq!(store.memory_size(), 0);
243    }
244
245    #[test]
246    fn default_store_memory_size_is_zero() {
247        // A spilling backend that does not override `memory_size` reports 0,
248        // reflecting that its blobs no longer occupy the heap.
249        struct OffHeap;
250        impl PageStore for OffHeap {
251            fn put(&mut self, _value: Bytes) -> Result<PageKey> {
252                Ok(PageKey::new(0))
253            }
254            fn take(&mut self, _key: PageKey) -> Result<Bytes> {
255                Ok(Bytes::new())
256            }
257        }
258        assert_eq!(OffHeap.memory_size(), 0);
259    }
260}