Skip to main content

parquet/column/
page.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains Parquet Page definitions and page reader interface.
19
20use bytes::Bytes;
21
22use crate::basic::{Encoding, PageType};
23use crate::errors::{ParquetError, Result};
24use crate::file::metadata::thrift::{
25    DataPageHeader, DataPageHeaderV2, DictionaryPageHeader, PageHeader,
26};
27use crate::file::statistics::{Statistics, page_stats_to_thrift};
28
29/// Parquet Page definition.
30///
31/// List of supported pages.
32/// These are 1-to-1 mapped from the equivalent Thrift definitions, except `buf` which
33/// used to store uncompressed bytes of the page.
34#[derive(Clone, Debug)]
35pub enum Page {
36    /// Data page Parquet format v1.
37    DataPage {
38        /// The underlying data buffer
39        buf: Bytes,
40        /// Number of values in this page
41        num_values: u32,
42        /// Encoding for values in this page
43        encoding: Encoding,
44        /// Definition level encoding
45        def_level_encoding: Encoding,
46        /// Repetition level encoding
47        rep_level_encoding: Encoding,
48        /// Optional statistics for this page
49        statistics: Option<Statistics>,
50    },
51    /// Data page Parquet format v2.
52    DataPageV2 {
53        /// The underlying data buffer
54        buf: Bytes,
55        /// Number of values in this page
56        num_values: u32,
57        /// Encoding for values in this page
58        encoding: Encoding,
59        /// Number of null values in this page
60        num_nulls: u32,
61        /// Number of rows in this page
62        num_rows: u32,
63        /// Length of definition levels
64        def_levels_byte_len: u32,
65        /// Length of repetition levels
66        rep_levels_byte_len: u32,
67        /// Is this page compressed
68        is_compressed: bool,
69        /// Optional statistics for this page
70        statistics: Option<Statistics>,
71    },
72    /// Dictionary page.
73    DictionaryPage {
74        /// The underlying data buffer
75        buf: Bytes,
76        /// Number of values in this page
77        num_values: u32,
78        /// Encoding for values in this page
79        encoding: Encoding,
80        /// Is dictionary page sorted
81        is_sorted: bool,
82    },
83}
84
85impl Page {
86    /// Returns [`PageType`] for this page.
87    pub fn page_type(&self) -> PageType {
88        match self {
89            Page::DataPage { .. } => PageType::DATA_PAGE,
90            Page::DataPageV2 { .. } => PageType::DATA_PAGE_V2,
91            Page::DictionaryPage { .. } => PageType::DICTIONARY_PAGE,
92        }
93    }
94
95    /// Returns whether this page is any version of a data page
96    pub fn is_data_page(&self) -> bool {
97        matches!(self, Page::DataPage { .. } | Page::DataPageV2 { .. })
98    }
99
100    /// Returns whether this page is a dictionary page
101    pub fn is_dictionary_page(&self) -> bool {
102        matches!(self, Page::DictionaryPage { .. })
103    }
104
105    /// Returns internal byte buffer reference for this page.
106    pub fn buffer(&self) -> &Bytes {
107        match self {
108            Page::DataPage { buf, .. } => buf,
109            Page::DataPageV2 { buf, .. } => buf,
110            Page::DictionaryPage { buf, .. } => buf,
111        }
112    }
113
114    /// Returns number of values in this page.
115    pub fn num_values(&self) -> u32 {
116        match self {
117            Page::DataPage { num_values, .. } => *num_values,
118            Page::DataPageV2 { num_values, .. } => *num_values,
119            Page::DictionaryPage { num_values, .. } => *num_values,
120        }
121    }
122
123    /// Returns this page [`Encoding`].
124    pub fn encoding(&self) -> Encoding {
125        match self {
126            Page::DataPage { encoding, .. } => *encoding,
127            Page::DataPageV2 { encoding, .. } => *encoding,
128            Page::DictionaryPage { encoding, .. } => *encoding,
129        }
130    }
131
132    /// Returns optional [`Statistics`].
133    pub fn statistics(&self) -> Option<&Statistics> {
134        match self {
135            Page::DataPage { statistics, .. } => statistics.as_ref(),
136            Page::DataPageV2 { statistics, .. } => statistics.as_ref(),
137            Page::DictionaryPage { .. } => None,
138        }
139    }
140}
141
142/// Helper struct to represent pages with potentially compressed buffer (data page v1) or
143/// compressed and concatenated buffer (def levels + rep levels + compressed values for
144/// data page v2).
145///
146/// The difference with `Page` is that `Page` buffer is always uncompressed.
147pub struct CompressedPage {
148    compressed_page: Page,
149    uncompressed_size: usize,
150}
151
152impl CompressedPage {
153    /// Creates `CompressedPage` from a page with potentially compressed buffer and
154    /// uncompressed size.
155    pub fn new(compressed_page: Page, uncompressed_size: usize) -> Self {
156        Self {
157            compressed_page,
158            uncompressed_size,
159        }
160    }
161
162    /// Returns page type.
163    pub fn page_type(&self) -> PageType {
164        self.compressed_page.page_type()
165    }
166
167    /// Returns underlying page with potentially compressed buffer.
168    pub fn compressed_page(&self) -> &Page {
169        &self.compressed_page
170    }
171
172    /// Returns uncompressed size in bytes.
173    pub fn uncompressed_size(&self) -> usize {
174        self.uncompressed_size
175    }
176
177    /// Returns compressed size in bytes.
178    ///
179    /// Note that it is assumed that buffer is compressed, but it may not be. In this
180    /// case compressed size will be equal to uncompressed size.
181    pub fn compressed_size(&self) -> usize {
182        self.compressed_page.buffer().len()
183    }
184
185    /// Number of values in page.
186    pub fn num_values(&self) -> u32 {
187        self.compressed_page.num_values()
188    }
189
190    /// Returns encoding for values in page.
191    pub fn encoding(&self) -> Encoding {
192        self.compressed_page.encoding()
193    }
194
195    /// Returns slice of compressed buffer in the page.
196    pub fn data(&self) -> &[u8] {
197        self.compressed_page.buffer()
198    }
199
200    /// Returns the number of heap bytes this page currently holds.
201    ///
202    /// This is the page's compressed buffer (the embedded [`Bytes`]); use it to
203    /// account for a buffered page's memory footprint rather than reaching for
204    /// `data().len()` at each call site.
205    pub fn memory_usage(&self) -> usize {
206        self.compressed_page.buffer().len()
207    }
208
209    /// Returns the thrift page header
210    pub(crate) fn to_thrift_header(&self) -> Result<PageHeader> {
211        let uncompressed_size = self.uncompressed_size();
212        let compressed_size = self.compressed_size();
213        if uncompressed_size > i32::MAX as usize {
214            return Err(general_err!(
215                "Page uncompressed size overflow: {}",
216                uncompressed_size
217            ));
218        }
219        if compressed_size > i32::MAX as usize {
220            return Err(general_err!(
221                "Page compressed size overflow: {}",
222                compressed_size
223            ));
224        }
225        let num_values = self.num_values();
226        let encoding = self.encoding();
227        let page_type = self.page_type();
228
229        let mut page_header = PageHeader {
230            r#type: page_type,
231            uncompressed_page_size: uncompressed_size as i32,
232            compressed_page_size: compressed_size as i32,
233            // TODO: Add support for crc checksum
234            crc: None,
235            data_page_header: None,
236            index_page_header: None,
237            dictionary_page_header: None,
238            data_page_header_v2: None,
239        };
240
241        match self.compressed_page {
242            Page::DataPage {
243                def_level_encoding,
244                rep_level_encoding,
245                ref statistics,
246                ..
247            } => {
248                let data_page_header = DataPageHeader {
249                    num_values: num_values as i32,
250                    encoding,
251                    definition_level_encoding: def_level_encoding,
252                    repetition_level_encoding: rep_level_encoding,
253                    statistics: page_stats_to_thrift(statistics.as_ref()),
254                };
255                page_header.data_page_header = Some(data_page_header);
256            }
257            Page::DataPageV2 {
258                num_nulls,
259                num_rows,
260                def_levels_byte_len,
261                rep_levels_byte_len,
262                is_compressed,
263                ref statistics,
264                ..
265            } => {
266                let data_page_header_v2 = DataPageHeaderV2 {
267                    num_values: num_values as i32,
268                    num_nulls: num_nulls as i32,
269                    num_rows: num_rows as i32,
270                    encoding,
271                    definition_levels_byte_length: def_levels_byte_len as i32,
272                    repetition_levels_byte_length: rep_levels_byte_len as i32,
273                    is_compressed: Some(is_compressed),
274                    statistics: page_stats_to_thrift(statistics.as_ref()),
275                };
276                page_header.data_page_header_v2 = Some(data_page_header_v2);
277            }
278            Page::DictionaryPage { is_sorted, .. } => {
279                let dictionary_page_header = DictionaryPageHeader {
280                    num_values: num_values as i32,
281                    encoding,
282                    is_sorted: Some(is_sorted),
283                };
284                page_header.dictionary_page_header = Some(dictionary_page_header);
285            }
286        }
287        Ok(page_header)
288    }
289
290    /// Update the compressed buffer for a page.
291    /// This might be required when encrypting page data for example.
292    /// The size of uncompressed data must not change.
293    #[cfg(feature = "encryption")]
294    pub(crate) fn with_new_compressed_buffer(mut self, new_buffer: Bytes) -> Self {
295        match &mut self.compressed_page {
296            Page::DataPage { buf, .. } => {
297                *buf = new_buffer;
298            }
299            Page::DataPageV2 { buf, .. } => {
300                *buf = new_buffer;
301            }
302            Page::DictionaryPage { buf, .. } => {
303                *buf = new_buffer;
304            }
305        }
306        self
307    }
308}
309
310/// Contains page write metrics.
311pub struct PageWriteSpec {
312    /// The type of page being written
313    pub page_type: PageType,
314    /// The total size of the page, before compression
315    pub uncompressed_size: usize,
316    /// The compressed size of the page
317    pub compressed_size: usize,
318    /// The number of values in the page
319    pub num_values: u32,
320    /// The offset of the page in the column chunk
321    pub offset: u64,
322    /// The number of bytes written to the underlying sink
323    pub bytes_written: u64,
324}
325
326impl Default for PageWriteSpec {
327    fn default() -> Self {
328        Self::new()
329    }
330}
331
332impl PageWriteSpec {
333    /// Creates new spec with default page write metrics.
334    pub fn new() -> Self {
335        Self {
336            page_type: PageType::DATA_PAGE,
337            uncompressed_size: 0,
338            compressed_size: 0,
339            num_values: 0,
340            offset: 0,
341            bytes_written: 0,
342        }
343    }
344}
345
346/// Contains metadata for a page
347#[derive(Clone)]
348pub struct PageMetadata {
349    /// The number of rows within the page if known
350    pub num_rows: Option<usize>,
351    /// The number of levels within the page if known
352    pub num_levels: Option<usize>,
353    /// Returns true if the page is a dictionary page
354    pub is_dict: bool,
355}
356
357impl TryFrom<&crate::file::metadata::thrift::PageHeader> for PageMetadata {
358    type Error = ParquetError;
359
360    fn try_from(
361        value: &crate::file::metadata::thrift::PageHeader,
362    ) -> std::result::Result<Self, Self::Error> {
363        match value.r#type {
364            PageType::DATA_PAGE => {
365                let header = value.data_page_header.as_ref().unwrap();
366                Ok(PageMetadata {
367                    num_rows: None,
368                    num_levels: Some(header.num_values as _),
369                    is_dict: false,
370                })
371            }
372            PageType::DICTIONARY_PAGE => Ok(PageMetadata {
373                num_rows: None,
374                num_levels: None,
375                is_dict: true,
376            }),
377            PageType::DATA_PAGE_V2 => {
378                let header = value.data_page_header_v2.as_ref().unwrap();
379                Ok(PageMetadata {
380                    num_rows: Some(header.num_rows as _),
381                    num_levels: Some(header.num_values as _),
382                    is_dict: false,
383                })
384            }
385            other => Err(ParquetError::General(format!(
386                "page type {other:?} cannot be converted to PageMetadata"
387            ))),
388        }
389    }
390}
391
392/// API for reading pages from a column chunk.
393/// This offers a iterator like API to get the next page.
394pub trait PageReader: Iterator<Item = Result<Page>> + Send {
395    /// Gets the next page in the column chunk associated with this reader.
396    /// Returns `None` if there are no pages left.
397    fn get_next_page(&mut self) -> Result<Option<Page>>;
398
399    /// Gets metadata about the next page, returns an error if no
400    /// column index information
401    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>>;
402
403    /// Skips reading the next page, returns an error if no
404    /// column index information
405    fn skip_next_page(&mut self) -> Result<()>;
406
407    /// Returns `true` if the next page can be assumed to contain the start of a new record
408    ///
409    /// Prior to parquet V2 the specification was ambiguous as to whether a single record
410    /// could be split across multiple pages, and prior to [(#4327)] the Rust writer would do
411    /// this in certain situations. However, correctly interpreting the offset index relies on
412    /// this assumption holding [(#4943)], and so this mechanism is provided for a [`PageReader`]
413    /// to signal this to the calling context
414    ///
415    /// [(#4327)]: https://github.com/apache/arrow-rs/pull/4327
416    /// [(#4943)]: https://github.com/apache/arrow-rs/pull/4943
417    fn at_record_boundary(&mut self) -> Result<bool> {
418        match self.peek_next_page()? {
419            // Last page in the column chunk - always a record boundary
420            None => Ok(true),
421            // A V2 data page is required by the parquet spec to start at a
422            // record boundary, so the current page ends at one.  V2 pages
423            // are identified by having `num_rows` set in their header.
424            Some(metadata) => Ok(metadata.num_rows.is_some()),
425        }
426    }
427}
428
429/// API for writing pages in a column chunk.
430///
431/// It is reasonable to assume that all pages will be written in the correct order, e.g.
432/// dictionary page followed by data pages, or a set of data pages, etc.
433pub trait PageWriter: Send {
434    /// Writes a page into the output stream/sink.
435    /// Returns `PageWriteSpec` that contains information about written page metrics,
436    /// including number of bytes, size, number of values, offset, etc.
437    ///
438    /// This method is called for every compressed page we write into underlying buffer,
439    /// either data page or dictionary page.
440    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
441
442    /// **Unstable, not public API.** This is an internal protocol between
443    /// [`GenericColumnWriter`] and its in-crate page writers; it is hidden from
444    /// the rendered docs and may change or be removed without a major version
445    /// bump. External `PageWriter` implementations should not override it. See
446    /// the page-spilling cleanup tracked in
447    /// <https://github.com/apache/arrow-rs/pull/10020>.
448    ///
449    /// Whether this writer resolves the final page layout itself (at flush)
450    /// rather than committing bytes to their final position as pages arrive.
451    ///
452    /// The dictionary page of a column chunk must be written *first*, but it is
453    /// not finalized until every value has been seen. A writer that commits
454    /// bytes live (e.g. straight to a file) therefore relies on the column
455    /// writer buffering the dictionary-encoded data pages in memory until the
456    /// dictionary page is ready — see [`GenericColumnWriter`]'s `data_pages`.
457    ///
458    /// A writer that instead buffers the whole chunk and splices it later (the
459    /// [`ArrowWriter`] path) can accept data pages *before* the dictionary page
460    /// and order them itself at flush. Returning `true` tells the column writer
461    /// to skip that in-memory buffering and stream dictionary-column data pages
462    /// straight through, bounding the column writer's memory.
463    ///
464    /// [`GenericColumnWriter`]: crate::column::writer::GenericColumnWriter
465    /// [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
466    #[doc(hidden)]
467    fn defers_dictionary_ordering(&self) -> bool {
468        false
469    }
470
471    /// **Unstable, not public API.** Companion to
472    /// [`defers_dictionary_ordering`](Self::defers_dictionary_ordering): an
473    /// internal hook for the column writer's memory accounting, hidden from the
474    /// rendered docs and subject to change or removal without a major version
475    /// bump. External `PageWriter` implementations should not override it.
476    ///
477    /// The number of bytes this writer is currently holding **in memory** for
478    /// pages it has been handed (i.e. completed pages not yet committed to their
479    /// final destination).
480    ///
481    /// Used by the column writer to report its memory footprint. The default is
482    /// `0`: a writer that streams pages straight to their destination retains
483    /// nothing. A writer that buffers pages should report what it actually holds
484    /// on the heap — which, when it spills to a backing store, can be far less
485    /// than the bytes written.
486    #[doc(hidden)]
487    fn buffered_memory_size(&self) -> usize {
488        0
489    }
490
491    /// Closes resources and flushes underlying sink.
492    /// Page writer should not be used after this method is called.
493    fn close(&mut self) -> Result<()>;
494}
495
496/// An iterator over pages of one specific column in a parquet file.
497pub trait PageIterator: Iterator<Item = Result<Box<dyn PageReader>>> + Send {}
498
499#[cfg(test)]
500mod tests {
501    use super::*;
502
503    #[test]
504    fn test_page() {
505        let data_page = Page::DataPage {
506            buf: Bytes::from(vec![0, 1, 2]),
507            num_values: 10,
508            encoding: Encoding::PLAIN,
509            def_level_encoding: Encoding::RLE,
510            rep_level_encoding: Encoding::RLE,
511            statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
512        };
513        assert_eq!(data_page.page_type(), PageType::DATA_PAGE);
514        assert_eq!(data_page.buffer(), vec![0, 1, 2].as_slice());
515        assert_eq!(data_page.num_values(), 10);
516        assert_eq!(data_page.encoding(), Encoding::PLAIN);
517        assert_eq!(
518            data_page.statistics(),
519            Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
520        );
521
522        let data_page_v2 = Page::DataPageV2 {
523            buf: Bytes::from(vec![0, 1, 2]),
524            num_values: 10,
525            encoding: Encoding::PLAIN,
526            num_nulls: 5,
527            num_rows: 20,
528            def_levels_byte_len: 30,
529            rep_levels_byte_len: 40,
530            is_compressed: false,
531            statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
532        };
533        assert_eq!(data_page_v2.page_type(), PageType::DATA_PAGE_V2);
534        assert_eq!(data_page_v2.buffer(), vec![0, 1, 2].as_slice());
535        assert_eq!(data_page_v2.num_values(), 10);
536        assert_eq!(data_page_v2.encoding(), Encoding::PLAIN);
537        assert_eq!(
538            data_page_v2.statistics(),
539            Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
540        );
541
542        let dict_page = Page::DictionaryPage {
543            buf: Bytes::from(vec![0, 1, 2]),
544            num_values: 10,
545            encoding: Encoding::PLAIN,
546            is_sorted: false,
547        };
548        assert_eq!(dict_page.page_type(), PageType::DICTIONARY_PAGE);
549        assert_eq!(dict_page.buffer(), vec![0, 1, 2].as_slice());
550        assert_eq!(dict_page.num_values(), 10);
551        assert_eq!(dict_page.encoding(), Encoding::PLAIN);
552        assert_eq!(dict_page.statistics(), None);
553    }
554
555    #[test]
556    fn test_compressed_page() {
557        let data_page = Page::DataPage {
558            buf: Bytes::from(vec![0, 1, 2]),
559            num_values: 10,
560            encoding: Encoding::PLAIN,
561            def_level_encoding: Encoding::RLE,
562            rep_level_encoding: Encoding::RLE,
563            statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
564        };
565
566        let cpage = CompressedPage::new(data_page, 5);
567
568        assert_eq!(cpage.page_type(), PageType::DATA_PAGE);
569        assert_eq!(cpage.uncompressed_size(), 5);
570        assert_eq!(cpage.compressed_size(), 3);
571        assert_eq!(cpage.num_values(), 10);
572        assert_eq!(cpage.encoding(), Encoding::PLAIN);
573        assert_eq!(cpage.data(), &[0, 1, 2]);
574    }
575
576    #[test]
577    fn test_compressed_page_uncompressed_size_overflow() {
578        // Test that to_thrift_header fails when uncompressed size exceeds i32::MAX
579        let data_page = Page::DataPage {
580            buf: Bytes::from(vec![0, 1, 2]),
581            num_values: 10,
582            encoding: Encoding::PLAIN,
583            def_level_encoding: Encoding::RLE,
584            rep_level_encoding: Encoding::RLE,
585            statistics: None,
586        };
587
588        // Create a CompressedPage with uncompressed size larger than i32::MAX
589        let uncompressed_size = (i32::MAX as usize) + 1;
590        let cpage = CompressedPage::new(data_page, uncompressed_size);
591
592        // Verify that to_thrift_header returns an error
593        let result = cpage.to_thrift_header();
594        assert!(result.is_err());
595
596        let error_msg = result.unwrap_err().to_string();
597        assert!(error_msg.contains("Page uncompressed size overflow"));
598    }
599}