parquet/column/
page.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains Parquet Page definitions and page reader interface.
19
20use bytes::Bytes;
21
22use crate::basic::{Encoding, PageType};
23use crate::errors::{ParquetError, Result};
24use crate::file::statistics::Statistics;
25use crate::format::PageHeader;
26
27/// Parquet Page definition.
28///
29/// List of supported pages.
30/// These are 1-to-1 mapped from the equivalent Thrift definitions, except `buf` which
31/// used to store uncompressed bytes of the page.
32#[derive(Clone)]
33pub enum Page {
34    /// Data page Parquet format v1.
35    DataPage {
36        /// The underlying data buffer
37        buf: Bytes,
38        /// Number of values in this page
39        num_values: u32,
40        /// Encoding for values in this page
41        encoding: Encoding,
42        /// Definition level encoding
43        def_level_encoding: Encoding,
44        /// Repetition level encoding
45        rep_level_encoding: Encoding,
46        /// Optional statistics for this page
47        statistics: Option<Statistics>,
48    },
49    /// Data page Parquet format v2.
50    DataPageV2 {
51        /// The underlying data buffer
52        buf: Bytes,
53        /// Number of values in this page
54        num_values: u32,
55        /// Encoding for values in this page
56        encoding: Encoding,
57        /// Number of null values in this page
58        num_nulls: u32,
59        /// Number of rows in this page
60        num_rows: u32,
61        /// Length of definition levels
62        def_levels_byte_len: u32,
63        /// Length of repetition levels
64        rep_levels_byte_len: u32,
65        /// Is this page compressed
66        is_compressed: bool,
67        /// Optional statistics for this page
68        statistics: Option<Statistics>,
69    },
70    /// Dictionary page.
71    DictionaryPage {
72        /// The underlying data buffer
73        buf: Bytes,
74        /// Number of values in this page
75        num_values: u32,
76        /// Encoding for values in this page
77        encoding: Encoding,
78        /// Is dictionary page sorted
79        is_sorted: bool,
80    },
81}
82
83impl Page {
84    /// Returns [`PageType`] for this page.
85    pub fn page_type(&self) -> PageType {
86        match self {
87            Page::DataPage { .. } => PageType::DATA_PAGE,
88            Page::DataPageV2 { .. } => PageType::DATA_PAGE_V2,
89            Page::DictionaryPage { .. } => PageType::DICTIONARY_PAGE,
90        }
91    }
92
93    /// Returns whether this page is any version of a data page
94    pub fn is_data_page(&self) -> bool {
95        matches!(self, Page::DataPage { .. } | Page::DataPageV2 { .. })
96    }
97
98    /// Returns whether this page is a dictionary page
99    pub fn is_dictionary_page(&self) -> bool {
100        matches!(self, Page::DictionaryPage { .. })
101    }
102
103    /// Returns internal byte buffer reference for this page.
104    pub fn buffer(&self) -> &Bytes {
105        match self {
106            Page::DataPage { ref buf, .. } => buf,
107            Page::DataPageV2 { ref buf, .. } => buf,
108            Page::DictionaryPage { ref buf, .. } => buf,
109        }
110    }
111
112    /// Returns number of values in this page.
113    pub fn num_values(&self) -> u32 {
114        match self {
115            Page::DataPage { num_values, .. } => *num_values,
116            Page::DataPageV2 { num_values, .. } => *num_values,
117            Page::DictionaryPage { num_values, .. } => *num_values,
118        }
119    }
120
121    /// Returns this page [`Encoding`].
122    pub fn encoding(&self) -> Encoding {
123        match self {
124            Page::DataPage { encoding, .. } => *encoding,
125            Page::DataPageV2 { encoding, .. } => *encoding,
126            Page::DictionaryPage { encoding, .. } => *encoding,
127        }
128    }
129
130    /// Returns optional [`Statistics`].
131    pub fn statistics(&self) -> Option<&Statistics> {
132        match self {
133            Page::DataPage { ref statistics, .. } => statistics.as_ref(),
134            Page::DataPageV2 { ref statistics, .. } => statistics.as_ref(),
135            Page::DictionaryPage { .. } => None,
136        }
137    }
138}
139
140/// Helper struct to represent pages with potentially compressed buffer (data page v1) or
141/// compressed and concatenated buffer (def levels + rep levels + compressed values for
142/// data page v2).
143///
144/// The difference with `Page` is that `Page` buffer is always uncompressed.
145pub struct CompressedPage {
146    compressed_page: Page,
147    uncompressed_size: usize,
148}
149
150impl CompressedPage {
151    /// Creates `CompressedPage` from a page with potentially compressed buffer and
152    /// uncompressed size.
153    pub fn new(compressed_page: Page, uncompressed_size: usize) -> Self {
154        Self {
155            compressed_page,
156            uncompressed_size,
157        }
158    }
159
160    /// Returns page type.
161    pub fn page_type(&self) -> PageType {
162        self.compressed_page.page_type()
163    }
164
165    /// Returns underlying page with potentially compressed buffer.
166    pub fn compressed_page(&self) -> &Page {
167        &self.compressed_page
168    }
169
170    /// Returns uncompressed size in bytes.
171    pub fn uncompressed_size(&self) -> usize {
172        self.uncompressed_size
173    }
174
175    /// Returns compressed size in bytes.
176    ///
177    /// Note that it is assumed that buffer is compressed, but it may not be. In this
178    /// case compressed size will be equal to uncompressed size.
179    pub fn compressed_size(&self) -> usize {
180        self.compressed_page.buffer().len()
181    }
182
183    /// Number of values in page.
184    pub fn num_values(&self) -> u32 {
185        self.compressed_page.num_values()
186    }
187
188    /// Returns encoding for values in page.
189    pub fn encoding(&self) -> Encoding {
190        self.compressed_page.encoding()
191    }
192
193    /// Returns slice of compressed buffer in the page.
194    pub fn data(&self) -> &[u8] {
195        self.compressed_page.buffer()
196    }
197
198    /// Returns the thrift page header
199    pub(crate) fn to_thrift_header(&self) -> Result<PageHeader> {
200        let uncompressed_size = self.uncompressed_size();
201        let compressed_size = self.compressed_size();
202        if uncompressed_size > i32::MAX as usize {
203            return Err(general_err!(
204                "Page uncompressed size overflow: {}",
205                uncompressed_size
206            ));
207        }
208        if compressed_size > i32::MAX as usize {
209            return Err(general_err!(
210                "Page compressed size overflow: {}",
211                compressed_size
212            ));
213        }
214        let num_values = self.num_values();
215        let encoding = self.encoding();
216        let page_type = self.page_type();
217
218        let mut page_header = PageHeader {
219            type_: page_type.into(),
220            uncompressed_page_size: uncompressed_size as i32,
221            compressed_page_size: compressed_size as i32,
222            // TODO: Add support for crc checksum
223            crc: None,
224            data_page_header: None,
225            index_page_header: None,
226            dictionary_page_header: None,
227            data_page_header_v2: None,
228        };
229
230        match self.compressed_page {
231            Page::DataPage {
232                def_level_encoding,
233                rep_level_encoding,
234                ref statistics,
235                ..
236            } => {
237                let data_page_header = crate::format::DataPageHeader {
238                    num_values: num_values as i32,
239                    encoding: encoding.into(),
240                    definition_level_encoding: def_level_encoding.into(),
241                    repetition_level_encoding: rep_level_encoding.into(),
242                    statistics: crate::file::statistics::to_thrift(statistics.as_ref()),
243                };
244                page_header.data_page_header = Some(data_page_header);
245            }
246            Page::DataPageV2 {
247                num_nulls,
248                num_rows,
249                def_levels_byte_len,
250                rep_levels_byte_len,
251                is_compressed,
252                ref statistics,
253                ..
254            } => {
255                let data_page_header_v2 = crate::format::DataPageHeaderV2 {
256                    num_values: num_values as i32,
257                    num_nulls: num_nulls as i32,
258                    num_rows: num_rows as i32,
259                    encoding: encoding.into(),
260                    definition_levels_byte_length: def_levels_byte_len as i32,
261                    repetition_levels_byte_length: rep_levels_byte_len as i32,
262                    is_compressed: Some(is_compressed),
263                    statistics: crate::file::statistics::to_thrift(statistics.as_ref()),
264                };
265                page_header.data_page_header_v2 = Some(data_page_header_v2);
266            }
267            Page::DictionaryPage { is_sorted, .. } => {
268                let dictionary_page_header = crate::format::DictionaryPageHeader {
269                    num_values: num_values as i32,
270                    encoding: encoding.into(),
271                    is_sorted: Some(is_sorted),
272                };
273                page_header.dictionary_page_header = Some(dictionary_page_header);
274            }
275        }
276        Ok(page_header)
277    }
278
279    /// Update the compressed buffer for a page.
280    /// This might be required when encrypting page data for example.
281    /// The size of uncompressed data must not change.
282    #[cfg(feature = "encryption")]
283    pub(crate) fn with_new_compressed_buffer(mut self, new_buffer: Bytes) -> Self {
284        match &mut self.compressed_page {
285            Page::DataPage { buf, .. } => {
286                *buf = new_buffer;
287            }
288            Page::DataPageV2 { buf, .. } => {
289                *buf = new_buffer;
290            }
291            Page::DictionaryPage { buf, .. } => {
292                *buf = new_buffer;
293            }
294        }
295        self
296    }
297}
298
299/// Contains page write metrics.
300pub struct PageWriteSpec {
301    /// The type of page being written
302    pub page_type: PageType,
303    /// The total size of the page, before compression
304    pub uncompressed_size: usize,
305    /// The compressed size of the page
306    pub compressed_size: usize,
307    /// The number of values in the page
308    pub num_values: u32,
309    /// The offset of the page in the column chunk
310    pub offset: u64,
311    /// The number of bytes written to the underlying sink
312    pub bytes_written: u64,
313}
314
315impl Default for PageWriteSpec {
316    fn default() -> Self {
317        Self::new()
318    }
319}
320
321impl PageWriteSpec {
322    /// Creates new spec with default page write metrics.
323    pub fn new() -> Self {
324        Self {
325            page_type: PageType::DATA_PAGE,
326            uncompressed_size: 0,
327            compressed_size: 0,
328            num_values: 0,
329            offset: 0,
330            bytes_written: 0,
331        }
332    }
333}
334
335/// Contains metadata for a page
336#[derive(Clone)]
337pub struct PageMetadata {
338    /// The number of rows within the page if known
339    pub num_rows: Option<usize>,
340    /// The number of levels within the page if known
341    pub num_levels: Option<usize>,
342    /// Returns true if the page is a dictionary page
343    pub is_dict: bool,
344}
345
346impl TryFrom<&PageHeader> for PageMetadata {
347    type Error = ParquetError;
348
349    fn try_from(value: &PageHeader) -> std::result::Result<Self, Self::Error> {
350        match value.type_ {
351            crate::format::PageType::DATA_PAGE => {
352                let header = value.data_page_header.as_ref().unwrap();
353                Ok(PageMetadata {
354                    num_rows: None,
355                    num_levels: Some(header.num_values as _),
356                    is_dict: false,
357                })
358            }
359            crate::format::PageType::DICTIONARY_PAGE => Ok(PageMetadata {
360                num_rows: None,
361                num_levels: None,
362                is_dict: true,
363            }),
364            crate::format::PageType::DATA_PAGE_V2 => {
365                let header = value.data_page_header_v2.as_ref().unwrap();
366                Ok(PageMetadata {
367                    num_rows: Some(header.num_rows as _),
368                    num_levels: Some(header.num_values as _),
369                    is_dict: false,
370                })
371            }
372            other => Err(ParquetError::General(format!(
373                "page type {other:?} cannot be converted to PageMetadata"
374            ))),
375        }
376    }
377}
378
379/// API for reading pages from a column chunk.
380/// This offers a iterator like API to get the next page.
381pub trait PageReader: Iterator<Item = Result<Page>> + Send {
382    /// Gets the next page in the column chunk associated with this reader.
383    /// Returns `None` if there are no pages left.
384    fn get_next_page(&mut self) -> Result<Option<Page>>;
385
386    /// Gets metadata about the next page, returns an error if no
387    /// column index information
388    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>>;
389
390    /// Skips reading the next page, returns an error if no
391    /// column index information
392    fn skip_next_page(&mut self) -> Result<()>;
393
394    /// Returns `true` if the next page can be assumed to contain the start of a new record
395    ///
396    /// Prior to parquet V2 the specification was ambiguous as to whether a single record
397    /// could be split across multiple pages, and prior to [(#4327)] the Rust writer would do
398    /// this in certain situations. However, correctly interpreting the offset index relies on
399    /// this assumption holding [(#4943)], and so this mechanism is provided for a [`PageReader`]
400    /// to signal this to the calling context
401    ///
402    /// [(#4327)]: https://github.com/apache/arrow-rs/pull/4327
403    /// [(#4943)]: https://github.com/apache/arrow-rs/pull/4943
404    fn at_record_boundary(&mut self) -> Result<bool> {
405        Ok(self.peek_next_page()?.is_none())
406    }
407}
408
409/// API for writing pages in a column chunk.
410///
411/// It is reasonable to assume that all pages will be written in the correct order, e.g.
412/// dictionary page followed by data pages, or a set of data pages, etc.
413pub trait PageWriter: Send {
414    /// Writes a page into the output stream/sink.
415    /// Returns `PageWriteSpec` that contains information about written page metrics,
416    /// including number of bytes, size, number of values, offset, etc.
417    ///
418    /// This method is called for every compressed page we write into underlying buffer,
419    /// either data page or dictionary page.
420    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
421
422    /// Closes resources and flushes underlying sink.
423    /// Page writer should not be used after this method is called.
424    fn close(&mut self) -> Result<()>;
425}
426
427/// An iterator over pages of one specific column in a parquet file.
428pub trait PageIterator: Iterator<Item = Result<Box<dyn PageReader>>> + Send {}
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433
434    #[test]
435    fn test_page() {
436        let data_page = Page::DataPage {
437            buf: Bytes::from(vec![0, 1, 2]),
438            num_values: 10,
439            encoding: Encoding::PLAIN,
440            def_level_encoding: Encoding::RLE,
441            rep_level_encoding: Encoding::RLE,
442            statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
443        };
444        assert_eq!(data_page.page_type(), PageType::DATA_PAGE);
445        assert_eq!(data_page.buffer(), vec![0, 1, 2].as_slice());
446        assert_eq!(data_page.num_values(), 10);
447        assert_eq!(data_page.encoding(), Encoding::PLAIN);
448        assert_eq!(
449            data_page.statistics(),
450            Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
451        );
452
453        let data_page_v2 = Page::DataPageV2 {
454            buf: Bytes::from(vec![0, 1, 2]),
455            num_values: 10,
456            encoding: Encoding::PLAIN,
457            num_nulls: 5,
458            num_rows: 20,
459            def_levels_byte_len: 30,
460            rep_levels_byte_len: 40,
461            is_compressed: false,
462            statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
463        };
464        assert_eq!(data_page_v2.page_type(), PageType::DATA_PAGE_V2);
465        assert_eq!(data_page_v2.buffer(), vec![0, 1, 2].as_slice());
466        assert_eq!(data_page_v2.num_values(), 10);
467        assert_eq!(data_page_v2.encoding(), Encoding::PLAIN);
468        assert_eq!(
469            data_page_v2.statistics(),
470            Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
471        );
472
473        let dict_page = Page::DictionaryPage {
474            buf: Bytes::from(vec![0, 1, 2]),
475            num_values: 10,
476            encoding: Encoding::PLAIN,
477            is_sorted: false,
478        };
479        assert_eq!(dict_page.page_type(), PageType::DICTIONARY_PAGE);
480        assert_eq!(dict_page.buffer(), vec![0, 1, 2].as_slice());
481        assert_eq!(dict_page.num_values(), 10);
482        assert_eq!(dict_page.encoding(), Encoding::PLAIN);
483        assert_eq!(dict_page.statistics(), None);
484    }
485
486    #[test]
487    fn test_compressed_page() {
488        let data_page = Page::DataPage {
489            buf: Bytes::from(vec![0, 1, 2]),
490            num_values: 10,
491            encoding: Encoding::PLAIN,
492            def_level_encoding: Encoding::RLE,
493            rep_level_encoding: Encoding::RLE,
494            statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
495        };
496
497        let cpage = CompressedPage::new(data_page, 5);
498
499        assert_eq!(cpage.page_type(), PageType::DATA_PAGE);
500        assert_eq!(cpage.uncompressed_size(), 5);
501        assert_eq!(cpage.compressed_size(), 3);
502        assert_eq!(cpage.num_values(), 10);
503        assert_eq!(cpage.encoding(), Encoding::PLAIN);
504        assert_eq!(cpage.data(), &[0, 1, 2]);
505    }
506
507    #[test]
508    fn test_compressed_page_uncompressed_size_overflow() {
509        // Test that to_thrift_header fails when uncompressed size exceeds i32::MAX
510        let data_page = Page::DataPage {
511            buf: Bytes::from(vec![0, 1, 2]),
512            num_values: 10,
513            encoding: Encoding::PLAIN,
514            def_level_encoding: Encoding::RLE,
515            rep_level_encoding: Encoding::RLE,
516            statistics: None,
517        };
518
519        // Create a CompressedPage with uncompressed size larger than i32::MAX
520        let uncompressed_size = (i32::MAX as usize) + 1;
521        let cpage = CompressedPage::new(data_page, uncompressed_size);
522
523        // Verify that to_thrift_header returns an error
524        let result = cpage.to_thrift_header();
525        assert!(result.is_err());
526
527        let error_msg = result.unwrap_err().to_string();
528        assert!(error_msg.contains("Page uncompressed size overflow"));
529    }
530}