Skip to main content

parquet/column/
page_store.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Pluggable storage for completed, serialized page blobs.
19//!
20//! While a row group is being written the [`ArrowWriter`] must buffer every
21//! column's encoded pages, because Parquet requires each column chunk to be
22//! contiguous in the file while record batches arrive with all columns interleaved.
23//! By default that buffer lives on the heap, so the writer's peak memory grows
24//! with the row group size. A [`PageStore`] lets the buffer live somewhere else
25//! — a local temp file, object storage, etc. — bounding peak write memory
26//! independently of the row group size.
27//!
28//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
29
30use std::fmt::Debug;
31
32use bytes::Bytes;
33
34use crate::errors::{ParquetError, Result};
35use crate::schema::types::ColumnDescriptor;
36
37/// An opaque, store-allocated handle to a blob held by a [`PageStore`].
38///
39/// Handles are allocated by the store — densely and sequentially — and are only
40/// meaningful to the store that produced them. The caller treats them as opaque
41/// tokens.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
43pub struct PageKey(u64);
44
45impl PageKey {
46    /// Create a handle wrapping `raw`.
47    ///
48    /// A [`PageStore`] implementation calls this to mint the handle it returns
49    /// from [`put`](PageStore::put). The value is opaque to the caller, so a
50    /// store is free to use a dense counter, a packed locator, or anything else
51    /// it can later resolve in [`take`](PageStore::take).
52    pub const fn new(raw: u64) -> Self {
53        Self(raw)
54    }
55
56    /// The raw value passed to [`new`](Self::new).
57    pub const fn get(self) -> u64 {
58        self.0
59    }
60}
61
62/// A pluggable store for completed, serialized page blobs.
63///
64/// The store is intentionally "dumb": it only maps an opaque [`PageKey`] to a
65/// blob of bytes. It knows nothing about pages, dictionaries, ordering, or
66/// offsets. The caller keeps the handles it gets back from [`put`](Self::put)
67/// and decides what they mean.
68///
69/// Each store instance is owned by a single column writer and mutated by one
70/// thread at a time (both methods take `&mut self`), so it needs no internal
71/// synchronization — hence only `Send`, not `Sync`.
72///
73/// The default ([`InMemoryPageStore`]) keeps blobs on the heap. Configure a
74/// different backend via
75/// [`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
76pub trait PageStore: Send {
77    /// Store `value`, returning a handle that can later be passed to
78    /// [`take`](Self::take).
79    fn put(&mut self, value: Bytes) -> Result<PageKey>;
80
81    /// Take back the blob previously stored under `key`.
82    ///
83    /// The caller takes ownership of the returned bytes and will **not** request
84    /// `key` again, so the store may release any resources backing it — eagerly
85    /// here, or when the store is dropped.
86    fn take(&mut self, key: PageKey) -> Result<Bytes>;
87
88    /// The number of bytes this store currently holds **in memory** (resident
89    /// on the heap), used to report the writer's memory footprint.
90    ///
91    /// The default is `0`, which is exactly right for a backend that moves
92    /// every blob off-heap (a temp file, object storage): the bytes it has been
93    /// handed no longer occupy heap. The in-memory backend overrides this to
94    /// report its resident blobs. A backend that keeps a partial in-memory
95    /// buffer should report that buffer's size.
96    fn memory_size(&self) -> usize {
97        0
98    }
99}
100
101/// Context for a single [`PageStoreFactory::create`] call.
102///
103/// Describes the leaf column chunk the store will buffer. It is held by
104/// reference for the duration of the call; a backend reads only what it needs.
105/// More fields may be added in future releases without breaking existing
106/// implementations — the type is constructed only by the writer, so an
107/// implementer only ever receives one and calls its accessors.
108pub struct PageStoreArgs<'a> {
109    column_index: usize,
110    column_descriptor: &'a ColumnDescriptor,
111}
112
113impl<'a> PageStoreArgs<'a> {
114    // Constructed only by the Arrow writer; without that feature there is no caller.
115    #[cfg(feature = "arrow")]
116    pub(crate) fn new(column_index: usize, column_descriptor: &'a ColumnDescriptor) -> Self {
117        Self {
118            column_index,
119            column_descriptor,
120        }
121    }
122
123    /// Index of the leaf column within the row group.
124    ///
125    /// A backend may use this to e.g. name spill files or shard across a bounded
126    /// pool; it carries no ordering or coordination requirement.
127    pub fn column_index(&self) -> usize {
128        self.column_index
129    }
130
131    /// Descriptor for the leaf column: physical/logical type, path, and max
132    /// definition/repetition levels.
133    ///
134    /// Lets a backend tailor buffering to the column — for example spilling only
135    /// large `BYTE_ARRAY` columns while keeping small fixed-width ones on the
136    /// heap.
137    pub fn column_descriptor(&self) -> &ColumnDescriptor {
138        self.column_descriptor
139    }
140}
141
142/// Creates a fresh [`PageStore`] for each column chunk.
143///
144/// See
145/// [`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
146pub trait PageStoreFactory: Send + Sync + Debug {
147    /// Create a new, empty [`PageStore`] for the leaf column described by `args`.
148    fn create(&self, args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>>;
149}
150
151/// The default [`PageStore`], holding blobs on the heap in a `Vec<Bytes>`.
152///
153/// Peak memory grows with the row group size; use a spilling backend to bound
154/// it.
155#[derive(Debug, Default)]
156pub struct InMemoryPageStore {
157    blobs: Vec<Bytes>,
158    /// Running total of resident blob bytes, kept in step with `put`/`take`.
159    resident: usize,
160}
161
162impl PageStore for InMemoryPageStore {
163    fn put(&mut self, value: Bytes) -> Result<PageKey> {
164        let key = PageKey(self.blobs.len() as u64);
165        self.resident += value.len();
166        self.blobs.push(value);
167        Ok(key)
168    }
169
170    fn take(&mut self, key: PageKey) -> Result<Bytes> {
171        // Replace the slot with an empty `Bytes` so the stored blob is released
172        // as soon as it is taken, keeping memory bounded while the chunk is
173        // streamed into the output file.
174        let blob = self
175            .blobs
176            .get_mut(key.0 as usize)
177            .map(std::mem::take)
178            .ok_or_else(|| ParquetError::General(format!("invalid page key {}", key.0)))?;
179        self.resident -= blob.len();
180        Ok(blob)
181    }
182
183    fn memory_size(&self) -> usize {
184        self.resident
185    }
186}
187
188/// Factory for [`InMemoryPageStore`] — the default used by
189/// [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter).
190#[derive(Debug, Default)]
191pub struct InMemoryPageStoreFactory;
192
193impl PageStoreFactory for InMemoryPageStoreFactory {
194    fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
195        Ok(Box::new(InMemoryPageStore::default()))
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202
203    #[test]
204    fn in_memory_round_trips_blobs_in_handle_order() {
205        let mut store = InMemoryPageStore::default();
206        let k0 = store.put(Bytes::from_static(b"hello")).unwrap();
207        let k1 = store.put(Bytes::from_static(b"world")).unwrap();
208        assert_ne!(k0, k1);
209        assert_eq!(&store.take(k0).unwrap()[..], b"hello");
210        assert_eq!(&store.take(k1).unwrap()[..], b"world");
211    }
212
213    #[test]
214    fn in_memory_take_releases_the_slot() {
215        let mut store = InMemoryPageStore::default();
216        let k = store.put(Bytes::from_static(b"abc")).unwrap();
217        assert_eq!(&store.take(k).unwrap()[..], b"abc");
218        // A second take yields the emptied placeholder rather than the blob,
219        // confirming the bytes were released on the first take.
220        assert!(store.take(k).unwrap().is_empty());
221    }
222
223    #[test]
224    fn in_memory_invalid_key_errors() {
225        let mut store = InMemoryPageStore::default();
226        assert!(store.take(PageKey(99)).is_err());
227    }
228
229    #[test]
230    fn in_memory_reports_resident_bytes() {
231        let mut store = InMemoryPageStore::default();
232        assert_eq!(store.memory_size(), 0);
233        let k0 = store.put(Bytes::from_static(b"hello")).unwrap();
234        let k1 = store.put(Bytes::from_static(b"!")).unwrap();
235        assert_eq!(store.memory_size(), 6);
236        store.take(k0).unwrap();
237        assert_eq!(store.memory_size(), 1);
238        store.take(k1).unwrap();
239        assert_eq!(store.memory_size(), 0);
240    }
241
242    #[test]
243    fn default_store_memory_size_is_zero() {
244        // A spilling backend that does not override `memory_size` reports 0,
245        // reflecting that its blobs no longer occupy the heap.
246        struct OffHeap;
247        impl PageStore for OffHeap {
248            fn put(&mut self, _value: Bytes) -> Result<PageKey> {
249                Ok(PageKey::new(0))
250            }
251            fn take(&mut self, _key: PageKey) -> Result<Bytes> {
252                Ok(Bytes::new())
253            }
254        }
255        assert_eq!(OffHeap.memory_size(), 0);
256    }
257}