parquet/column/
page.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains Parquet Page definitions and page reader interface.
19
20use bytes::Bytes;
21
22use crate::basic::{Encoding, PageType};
23use crate::errors::{ParquetError, Result};
24use crate::file::metadata::thrift::{
25    DataPageHeader, DataPageHeaderV2, DictionaryPageHeader, PageHeader,
26};
27use crate::file::statistics::{Statistics, page_stats_to_thrift};
28
29/// Parquet Page definition.
30///
31/// List of supported pages.
32/// These are 1-to-1 mapped from the equivalent Thrift definitions, except `buf` which
33/// used to store uncompressed bytes of the page.
34#[derive(Clone)]
35pub enum Page {
36    /// Data page Parquet format v1.
37    DataPage {
38        /// The underlying data buffer
39        buf: Bytes,
40        /// Number of values in this page
41        num_values: u32,
42        /// Encoding for values in this page
43        encoding: Encoding,
44        /// Definition level encoding
45        def_level_encoding: Encoding,
46        /// Repetition level encoding
47        rep_level_encoding: Encoding,
48        /// Optional statistics for this page
49        statistics: Option<Statistics>,
50    },
51    /// Data page Parquet format v2.
52    DataPageV2 {
53        /// The underlying data buffer
54        buf: Bytes,
55        /// Number of values in this page
56        num_values: u32,
57        /// Encoding for values in this page
58        encoding: Encoding,
59        /// Number of null values in this page
60        num_nulls: u32,
61        /// Number of rows in this page
62        num_rows: u32,
63        /// Length of definition levels
64        def_levels_byte_len: u32,
65        /// Length of repetition levels
66        rep_levels_byte_len: u32,
67        /// Is this page compressed
68        is_compressed: bool,
69        /// Optional statistics for this page
70        statistics: Option<Statistics>,
71    },
72    /// Dictionary page.
73    DictionaryPage {
74        /// The underlying data buffer
75        buf: Bytes,
76        /// Number of values in this page
77        num_values: u32,
78        /// Encoding for values in this page
79        encoding: Encoding,
80        /// Is dictionary page sorted
81        is_sorted: bool,
82    },
83}
84
85impl Page {
86    /// Returns [`PageType`] for this page.
87    pub fn page_type(&self) -> PageType {
88        match self {
89            Page::DataPage { .. } => PageType::DATA_PAGE,
90            Page::DataPageV2 { .. } => PageType::DATA_PAGE_V2,
91            Page::DictionaryPage { .. } => PageType::DICTIONARY_PAGE,
92        }
93    }
94
95    /// Returns whether this page is any version of a data page
96    pub fn is_data_page(&self) -> bool {
97        matches!(self, Page::DataPage { .. } | Page::DataPageV2 { .. })
98    }
99
100    /// Returns whether this page is a dictionary page
101    pub fn is_dictionary_page(&self) -> bool {
102        matches!(self, Page::DictionaryPage { .. })
103    }
104
105    /// Returns internal byte buffer reference for this page.
106    pub fn buffer(&self) -> &Bytes {
107        match self {
108            Page::DataPage { buf, .. } => buf,
109            Page::DataPageV2 { buf, .. } => buf,
110            Page::DictionaryPage { buf, .. } => buf,
111        }
112    }
113
114    /// Returns number of values in this page.
115    pub fn num_values(&self) -> u32 {
116        match self {
117            Page::DataPage { num_values, .. } => *num_values,
118            Page::DataPageV2 { num_values, .. } => *num_values,
119            Page::DictionaryPage { num_values, .. } => *num_values,
120        }
121    }
122
123    /// Returns this page [`Encoding`].
124    pub fn encoding(&self) -> Encoding {
125        match self {
126            Page::DataPage { encoding, .. } => *encoding,
127            Page::DataPageV2 { encoding, .. } => *encoding,
128            Page::DictionaryPage { encoding, .. } => *encoding,
129        }
130    }
131
132    /// Returns optional [`Statistics`].
133    pub fn statistics(&self) -> Option<&Statistics> {
134        match self {
135            Page::DataPage { statistics, .. } => statistics.as_ref(),
136            Page::DataPageV2 { statistics, .. } => statistics.as_ref(),
137            Page::DictionaryPage { .. } => None,
138        }
139    }
140}
141
142/// Helper struct to represent pages with potentially compressed buffer (data page v1) or
143/// compressed and concatenated buffer (def levels + rep levels + compressed values for
144/// data page v2).
145///
146/// The difference with `Page` is that `Page` buffer is always uncompressed.
147pub struct CompressedPage {
148    compressed_page: Page,
149    uncompressed_size: usize,
150}
151
152impl CompressedPage {
153    /// Creates `CompressedPage` from a page with potentially compressed buffer and
154    /// uncompressed size.
155    pub fn new(compressed_page: Page, uncompressed_size: usize) -> Self {
156        Self {
157            compressed_page,
158            uncompressed_size,
159        }
160    }
161
162    /// Returns page type.
163    pub fn page_type(&self) -> PageType {
164        self.compressed_page.page_type()
165    }
166
167    /// Returns underlying page with potentially compressed buffer.
168    pub fn compressed_page(&self) -> &Page {
169        &self.compressed_page
170    }
171
172    /// Returns uncompressed size in bytes.
173    pub fn uncompressed_size(&self) -> usize {
174        self.uncompressed_size
175    }
176
177    /// Returns compressed size in bytes.
178    ///
179    /// Note that it is assumed that buffer is compressed, but it may not be. In this
180    /// case compressed size will be equal to uncompressed size.
181    pub fn compressed_size(&self) -> usize {
182        self.compressed_page.buffer().len()
183    }
184
185    /// Number of values in page.
186    pub fn num_values(&self) -> u32 {
187        self.compressed_page.num_values()
188    }
189
190    /// Returns encoding for values in page.
191    pub fn encoding(&self) -> Encoding {
192        self.compressed_page.encoding()
193    }
194
195    /// Returns slice of compressed buffer in the page.
196    pub fn data(&self) -> &[u8] {
197        self.compressed_page.buffer()
198    }
199
200    /// Returns the thrift page header
201    pub(crate) fn to_thrift_header(&self) -> Result<PageHeader> {
202        let uncompressed_size = self.uncompressed_size();
203        let compressed_size = self.compressed_size();
204        if uncompressed_size > i32::MAX as usize {
205            return Err(general_err!(
206                "Page uncompressed size overflow: {}",
207                uncompressed_size
208            ));
209        }
210        if compressed_size > i32::MAX as usize {
211            return Err(general_err!(
212                "Page compressed size overflow: {}",
213                compressed_size
214            ));
215        }
216        let num_values = self.num_values();
217        let encoding = self.encoding();
218        let page_type = self.page_type();
219
220        let mut page_header = PageHeader {
221            r#type: page_type,
222            uncompressed_page_size: uncompressed_size as i32,
223            compressed_page_size: compressed_size as i32,
224            // TODO: Add support for crc checksum
225            crc: None,
226            data_page_header: None,
227            index_page_header: None,
228            dictionary_page_header: None,
229            data_page_header_v2: None,
230        };
231
232        match self.compressed_page {
233            Page::DataPage {
234                def_level_encoding,
235                rep_level_encoding,
236                ref statistics,
237                ..
238            } => {
239                let data_page_header = DataPageHeader {
240                    num_values: num_values as i32,
241                    encoding,
242                    definition_level_encoding: def_level_encoding,
243                    repetition_level_encoding: rep_level_encoding,
244                    statistics: page_stats_to_thrift(statistics.as_ref()),
245                };
246                page_header.data_page_header = Some(data_page_header);
247            }
248            Page::DataPageV2 {
249                num_nulls,
250                num_rows,
251                def_levels_byte_len,
252                rep_levels_byte_len,
253                is_compressed,
254                ref statistics,
255                ..
256            } => {
257                let data_page_header_v2 = DataPageHeaderV2 {
258                    num_values: num_values as i32,
259                    num_nulls: num_nulls as i32,
260                    num_rows: num_rows as i32,
261                    encoding,
262                    definition_levels_byte_length: def_levels_byte_len as i32,
263                    repetition_levels_byte_length: rep_levels_byte_len as i32,
264                    is_compressed: Some(is_compressed),
265                    statistics: page_stats_to_thrift(statistics.as_ref()),
266                };
267                page_header.data_page_header_v2 = Some(data_page_header_v2);
268            }
269            Page::DictionaryPage { is_sorted, .. } => {
270                let dictionary_page_header = DictionaryPageHeader {
271                    num_values: num_values as i32,
272                    encoding,
273                    is_sorted: Some(is_sorted),
274                };
275                page_header.dictionary_page_header = Some(dictionary_page_header);
276            }
277        }
278        Ok(page_header)
279    }
280
281    /// Update the compressed buffer for a page.
282    /// This might be required when encrypting page data for example.
283    /// The size of uncompressed data must not change.
284    #[cfg(feature = "encryption")]
285    pub(crate) fn with_new_compressed_buffer(mut self, new_buffer: Bytes) -> Self {
286        match &mut self.compressed_page {
287            Page::DataPage { buf, .. } => {
288                *buf = new_buffer;
289            }
290            Page::DataPageV2 { buf, .. } => {
291                *buf = new_buffer;
292            }
293            Page::DictionaryPage { buf, .. } => {
294                *buf = new_buffer;
295            }
296        }
297        self
298    }
299}
300
301/// Contains page write metrics.
302pub struct PageWriteSpec {
303    /// The type of page being written
304    pub page_type: PageType,
305    /// The total size of the page, before compression
306    pub uncompressed_size: usize,
307    /// The compressed size of the page
308    pub compressed_size: usize,
309    /// The number of values in the page
310    pub num_values: u32,
311    /// The offset of the page in the column chunk
312    pub offset: u64,
313    /// The number of bytes written to the underlying sink
314    pub bytes_written: u64,
315}
316
317impl Default for PageWriteSpec {
318    fn default() -> Self {
319        Self::new()
320    }
321}
322
323impl PageWriteSpec {
324    /// Creates new spec with default page write metrics.
325    pub fn new() -> Self {
326        Self {
327            page_type: PageType::DATA_PAGE,
328            uncompressed_size: 0,
329            compressed_size: 0,
330            num_values: 0,
331            offset: 0,
332            bytes_written: 0,
333        }
334    }
335}
336
337/// Contains metadata for a page
338#[derive(Clone)]
339pub struct PageMetadata {
340    /// The number of rows within the page if known
341    pub num_rows: Option<usize>,
342    /// The number of levels within the page if known
343    pub num_levels: Option<usize>,
344    /// Returns true if the page is a dictionary page
345    pub is_dict: bool,
346}
347
348impl TryFrom<&crate::file::metadata::thrift::PageHeader> for PageMetadata {
349    type Error = ParquetError;
350
351    fn try_from(
352        value: &crate::file::metadata::thrift::PageHeader,
353    ) -> std::result::Result<Self, Self::Error> {
354        match value.r#type {
355            PageType::DATA_PAGE => {
356                let header = value.data_page_header.as_ref().unwrap();
357                Ok(PageMetadata {
358                    num_rows: None,
359                    num_levels: Some(header.num_values as _),
360                    is_dict: false,
361                })
362            }
363            PageType::DICTIONARY_PAGE => Ok(PageMetadata {
364                num_rows: None,
365                num_levels: None,
366                is_dict: true,
367            }),
368            PageType::DATA_PAGE_V2 => {
369                let header = value.data_page_header_v2.as_ref().unwrap();
370                Ok(PageMetadata {
371                    num_rows: Some(header.num_rows as _),
372                    num_levels: Some(header.num_values as _),
373                    is_dict: false,
374                })
375            }
376            other => Err(ParquetError::General(format!(
377                "page type {other:?} cannot be converted to PageMetadata"
378            ))),
379        }
380    }
381}
382
383/// API for reading pages from a column chunk.
384/// This offers a iterator like API to get the next page.
385pub trait PageReader: Iterator<Item = Result<Page>> + Send {
386    /// Gets the next page in the column chunk associated with this reader.
387    /// Returns `None` if there are no pages left.
388    fn get_next_page(&mut self) -> Result<Option<Page>>;
389
390    /// Gets metadata about the next page, returns an error if no
391    /// column index information
392    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>>;
393
394    /// Skips reading the next page, returns an error if no
395    /// column index information
396    fn skip_next_page(&mut self) -> Result<()>;
397
398    /// Returns `true` if the next page can be assumed to contain the start of a new record
399    ///
400    /// Prior to parquet V2 the specification was ambiguous as to whether a single record
401    /// could be split across multiple pages, and prior to [(#4327)] the Rust writer would do
402    /// this in certain situations. However, correctly interpreting the offset index relies on
403    /// this assumption holding [(#4943)], and so this mechanism is provided for a [`PageReader`]
404    /// to signal this to the calling context
405    ///
406    /// [(#4327)]: https://github.com/apache/arrow-rs/pull/4327
407    /// [(#4943)]: https://github.com/apache/arrow-rs/pull/4943
408    fn at_record_boundary(&mut self) -> Result<bool> {
409        Ok(self.peek_next_page()?.is_none())
410    }
411}
412
413/// API for writing pages in a column chunk.
414///
415/// It is reasonable to assume that all pages will be written in the correct order, e.g.
416/// dictionary page followed by data pages, or a set of data pages, etc.
417pub trait PageWriter: Send {
418    /// Writes a page into the output stream/sink.
419    /// Returns `PageWriteSpec` that contains information about written page metrics,
420    /// including number of bytes, size, number of values, offset, etc.
421    ///
422    /// This method is called for every compressed page we write into underlying buffer,
423    /// either data page or dictionary page.
424    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
425
426    /// Closes resources and flushes underlying sink.
427    /// Page writer should not be used after this method is called.
428    fn close(&mut self) -> Result<()>;
429}
430
431/// An iterator over pages of one specific column in a parquet file.
432pub trait PageIterator: Iterator<Item = Result<Box<dyn PageReader>>> + Send {}
433
434#[cfg(test)]
435mod tests {
436    use super::*;
437
438    #[test]
439    fn test_page() {
440        let data_page = Page::DataPage {
441            buf: Bytes::from(vec![0, 1, 2]),
442            num_values: 10,
443            encoding: Encoding::PLAIN,
444            def_level_encoding: Encoding::RLE,
445            rep_level_encoding: Encoding::RLE,
446            statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
447        };
448        assert_eq!(data_page.page_type(), PageType::DATA_PAGE);
449        assert_eq!(data_page.buffer(), vec![0, 1, 2].as_slice());
450        assert_eq!(data_page.num_values(), 10);
451        assert_eq!(data_page.encoding(), Encoding::PLAIN);
452        assert_eq!(
453            data_page.statistics(),
454            Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
455        );
456
457        let data_page_v2 = Page::DataPageV2 {
458            buf: Bytes::from(vec![0, 1, 2]),
459            num_values: 10,
460            encoding: Encoding::PLAIN,
461            num_nulls: 5,
462            num_rows: 20,
463            def_levels_byte_len: 30,
464            rep_levels_byte_len: 40,
465            is_compressed: false,
466            statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
467        };
468        assert_eq!(data_page_v2.page_type(), PageType::DATA_PAGE_V2);
469        assert_eq!(data_page_v2.buffer(), vec![0, 1, 2].as_slice());
470        assert_eq!(data_page_v2.num_values(), 10);
471        assert_eq!(data_page_v2.encoding(), Encoding::PLAIN);
472        assert_eq!(
473            data_page_v2.statistics(),
474            Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
475        );
476
477        let dict_page = Page::DictionaryPage {
478            buf: Bytes::from(vec![0, 1, 2]),
479            num_values: 10,
480            encoding: Encoding::PLAIN,
481            is_sorted: false,
482        };
483        assert_eq!(dict_page.page_type(), PageType::DICTIONARY_PAGE);
484        assert_eq!(dict_page.buffer(), vec![0, 1, 2].as_slice());
485        assert_eq!(dict_page.num_values(), 10);
486        assert_eq!(dict_page.encoding(), Encoding::PLAIN);
487        assert_eq!(dict_page.statistics(), None);
488    }
489
490    #[test]
491    fn test_compressed_page() {
492        let data_page = Page::DataPage {
493            buf: Bytes::from(vec![0, 1, 2]),
494            num_values: 10,
495            encoding: Encoding::PLAIN,
496            def_level_encoding: Encoding::RLE,
497            rep_level_encoding: Encoding::RLE,
498            statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
499        };
500
501        let cpage = CompressedPage::new(data_page, 5);
502
503        assert_eq!(cpage.page_type(), PageType::DATA_PAGE);
504        assert_eq!(cpage.uncompressed_size(), 5);
505        assert_eq!(cpage.compressed_size(), 3);
506        assert_eq!(cpage.num_values(), 10);
507        assert_eq!(cpage.encoding(), Encoding::PLAIN);
508        assert_eq!(cpage.data(), &[0, 1, 2]);
509    }
510
511    #[test]
512    fn test_compressed_page_uncompressed_size_overflow() {
513        // Test that to_thrift_header fails when uncompressed size exceeds i32::MAX
514        let data_page = Page::DataPage {
515            buf: Bytes::from(vec![0, 1, 2]),
516            num_values: 10,
517            encoding: Encoding::PLAIN,
518            def_level_encoding: Encoding::RLE,
519            rep_level_encoding: Encoding::RLE,
520            statistics: None,
521        };
522
523        // Create a CompressedPage with uncompressed size larger than i32::MAX
524        let uncompressed_size = (i32::MAX as usize) + 1;
525        let cpage = CompressedPage::new(data_page, uncompressed_size);
526
527        // Verify that to_thrift_header returns an error
528        let result = cpage.to_thrift_header();
529        assert!(result.is_err());
530
531        let error_msg = result.unwrap_err().to_string();
532        assert!(error_msg.contains("Page uncompressed size overflow"));
533    }
534}