parquet/column/
page.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains Parquet Page definitions and page reader interface.
19
20use bytes::Bytes;
21
22use crate::basic::{Encoding, PageType};
23use crate::errors::{ParquetError, Result};
24use crate::file::statistics::Statistics;
25use crate::format::PageHeader;
26
27/// Parquet Page definition.
28///
29/// List of supported pages.
30/// These are 1-to-1 mapped from the equivalent Thrift definitions, except `buf` which
31/// used to store uncompressed bytes of the page.
32#[derive(Clone)]
33pub enum Page {
34    /// Data page Parquet format v1.
35    DataPage {
36        /// The underlying data buffer
37        buf: Bytes,
38        /// Number of values in this page
39        num_values: u32,
40        /// Encoding for values in this page
41        encoding: Encoding,
42        /// Definition level encoding
43        def_level_encoding: Encoding,
44        /// Repetition level encoding
45        rep_level_encoding: Encoding,
46        /// Optional statistics for this page
47        statistics: Option<Statistics>,
48    },
49    /// Data page Parquet format v2.
50    DataPageV2 {
51        /// The underlying data buffer
52        buf: Bytes,
53        /// Number of values in this page
54        num_values: u32,
55        /// Encoding for values in this page
56        encoding: Encoding,
57        /// Number of null values in this page
58        num_nulls: u32,
59        /// Number of rows in this page
60        num_rows: u32,
61        /// Length of definition levels
62        def_levels_byte_len: u32,
63        /// Length of repetition levels
64        rep_levels_byte_len: u32,
65        /// Is this page compressed
66        is_compressed: bool,
67        /// Optional statistics for this page
68        statistics: Option<Statistics>,
69    },
70    /// Dictionary page.
71    DictionaryPage {
72        /// The underlying data buffer
73        buf: Bytes,
74        /// Number of values in this page
75        num_values: u32,
76        /// Encoding for values in this page
77        encoding: Encoding,
78        /// Is dictionary page sorted
79        is_sorted: bool,
80    },
81}
82
83impl Page {
84    /// Returns [`PageType`] for this page.
85    pub fn page_type(&self) -> PageType {
86        match self {
87            Page::DataPage { .. } => PageType::DATA_PAGE,
88            Page::DataPageV2 { .. } => PageType::DATA_PAGE_V2,
89            Page::DictionaryPage { .. } => PageType::DICTIONARY_PAGE,
90        }
91    }
92
93    /// Returns internal byte buffer reference for this page.
94    pub fn buffer(&self) -> &Bytes {
95        match self {
96            Page::DataPage { ref buf, .. } => buf,
97            Page::DataPageV2 { ref buf, .. } => buf,
98            Page::DictionaryPage { ref buf, .. } => buf,
99        }
100    }
101
102    /// Returns number of values in this page.
103    pub fn num_values(&self) -> u32 {
104        match self {
105            Page::DataPage { num_values, .. } => *num_values,
106            Page::DataPageV2 { num_values, .. } => *num_values,
107            Page::DictionaryPage { num_values, .. } => *num_values,
108        }
109    }
110
111    /// Returns this page [`Encoding`].
112    pub fn encoding(&self) -> Encoding {
113        match self {
114            Page::DataPage { encoding, .. } => *encoding,
115            Page::DataPageV2 { encoding, .. } => *encoding,
116            Page::DictionaryPage { encoding, .. } => *encoding,
117        }
118    }
119
120    /// Returns optional [`Statistics`].
121    pub fn statistics(&self) -> Option<&Statistics> {
122        match self {
123            Page::DataPage { ref statistics, .. } => statistics.as_ref(),
124            Page::DataPageV2 { ref statistics, .. } => statistics.as_ref(),
125            Page::DictionaryPage { .. } => None,
126        }
127    }
128}
129
130/// Helper struct to represent pages with potentially compressed buffer (data page v1) or
131/// compressed and concatenated buffer (def levels + rep levels + compressed values for
132/// data page v2).
133///
134/// The difference with `Page` is that `Page` buffer is always uncompressed.
135pub struct CompressedPage {
136    compressed_page: Page,
137    uncompressed_size: usize,
138}
139
140impl CompressedPage {
141    /// Creates `CompressedPage` from a page with potentially compressed buffer and
142    /// uncompressed size.
143    pub fn new(compressed_page: Page, uncompressed_size: usize) -> Self {
144        Self {
145            compressed_page,
146            uncompressed_size,
147        }
148    }
149
150    /// Returns page type.
151    pub fn page_type(&self) -> PageType {
152        self.compressed_page.page_type()
153    }
154
155    /// Returns underlying page with potentially compressed buffer.
156    pub fn compressed_page(&self) -> &Page {
157        &self.compressed_page
158    }
159
160    /// Returns uncompressed size in bytes.
161    pub fn uncompressed_size(&self) -> usize {
162        self.uncompressed_size
163    }
164
165    /// Returns compressed size in bytes.
166    ///
167    /// Note that it is assumed that buffer is compressed, but it may not be. In this
168    /// case compressed size will be equal to uncompressed size.
169    pub fn compressed_size(&self) -> usize {
170        self.compressed_page.buffer().len()
171    }
172
173    /// Number of values in page.
174    pub fn num_values(&self) -> u32 {
175        self.compressed_page.num_values()
176    }
177
178    /// Returns encoding for values in page.
179    pub fn encoding(&self) -> Encoding {
180        self.compressed_page.encoding()
181    }
182
183    /// Returns slice of compressed buffer in the page.
184    pub fn data(&self) -> &[u8] {
185        self.compressed_page.buffer()
186    }
187
188    /// Returns the thrift page header
189    pub(crate) fn to_thrift_header(&self) -> PageHeader {
190        let uncompressed_size = self.uncompressed_size();
191        let compressed_size = self.compressed_size();
192        let num_values = self.num_values();
193        let encoding = self.encoding();
194        let page_type = self.page_type();
195
196        let mut page_header = PageHeader {
197            type_: page_type.into(),
198            uncompressed_page_size: uncompressed_size as i32,
199            compressed_page_size: compressed_size as i32,
200            // TODO: Add support for crc checksum
201            crc: None,
202            data_page_header: None,
203            index_page_header: None,
204            dictionary_page_header: None,
205            data_page_header_v2: None,
206        };
207
208        match self.compressed_page {
209            Page::DataPage {
210                def_level_encoding,
211                rep_level_encoding,
212                ref statistics,
213                ..
214            } => {
215                let data_page_header = crate::format::DataPageHeader {
216                    num_values: num_values as i32,
217                    encoding: encoding.into(),
218                    definition_level_encoding: def_level_encoding.into(),
219                    repetition_level_encoding: rep_level_encoding.into(),
220                    statistics: crate::file::statistics::to_thrift(statistics.as_ref()),
221                };
222                page_header.data_page_header = Some(data_page_header);
223            }
224            Page::DataPageV2 {
225                num_nulls,
226                num_rows,
227                def_levels_byte_len,
228                rep_levels_byte_len,
229                is_compressed,
230                ref statistics,
231                ..
232            } => {
233                let data_page_header_v2 = crate::format::DataPageHeaderV2 {
234                    num_values: num_values as i32,
235                    num_nulls: num_nulls as i32,
236                    num_rows: num_rows as i32,
237                    encoding: encoding.into(),
238                    definition_levels_byte_length: def_levels_byte_len as i32,
239                    repetition_levels_byte_length: rep_levels_byte_len as i32,
240                    is_compressed: Some(is_compressed),
241                    statistics: crate::file::statistics::to_thrift(statistics.as_ref()),
242                };
243                page_header.data_page_header_v2 = Some(data_page_header_v2);
244            }
245            Page::DictionaryPage { is_sorted, .. } => {
246                let dictionary_page_header = crate::format::DictionaryPageHeader {
247                    num_values: num_values as i32,
248                    encoding: encoding.into(),
249                    is_sorted: Some(is_sorted),
250                };
251                page_header.dictionary_page_header = Some(dictionary_page_header);
252            }
253        }
254        page_header
255    }
256}
257
258/// Contains page write metrics.
259pub struct PageWriteSpec {
260    /// The type of page being written
261    pub page_type: PageType,
262    /// The total size of the page, before compression
263    pub uncompressed_size: usize,
264    /// The compressed size of the page
265    pub compressed_size: usize,
266    /// The number of values in the page
267    pub num_values: u32,
268    /// The offset of the page in the column chunk
269    pub offset: u64,
270    /// The number of bytes written to the underlying sink
271    pub bytes_written: u64,
272}
273
274impl Default for PageWriteSpec {
275    fn default() -> Self {
276        Self::new()
277    }
278}
279
280impl PageWriteSpec {
281    /// Creates new spec with default page write metrics.
282    pub fn new() -> Self {
283        Self {
284            page_type: PageType::DATA_PAGE,
285            uncompressed_size: 0,
286            compressed_size: 0,
287            num_values: 0,
288            offset: 0,
289            bytes_written: 0,
290        }
291    }
292}
293
294/// Contains metadata for a page
295#[derive(Clone)]
296pub struct PageMetadata {
297    /// The number of rows within the page if known
298    pub num_rows: Option<usize>,
299    /// The number of levels within the page if known
300    pub num_levels: Option<usize>,
301    /// Returns true if the page is a dictionary page
302    pub is_dict: bool,
303}
304
305impl TryFrom<&PageHeader> for PageMetadata {
306    type Error = ParquetError;
307
308    fn try_from(value: &PageHeader) -> std::result::Result<Self, Self::Error> {
309        match value.type_ {
310            crate::format::PageType::DATA_PAGE => {
311                let header = value.data_page_header.as_ref().unwrap();
312                Ok(PageMetadata {
313                    num_rows: None,
314                    num_levels: Some(header.num_values as _),
315                    is_dict: false,
316                })
317            }
318            crate::format::PageType::DICTIONARY_PAGE => Ok(PageMetadata {
319                num_rows: None,
320                num_levels: None,
321                is_dict: true,
322            }),
323            crate::format::PageType::DATA_PAGE_V2 => {
324                let header = value.data_page_header_v2.as_ref().unwrap();
325                Ok(PageMetadata {
326                    num_rows: Some(header.num_rows as _),
327                    num_levels: Some(header.num_values as _),
328                    is_dict: false,
329                })
330            }
331            other => Err(ParquetError::General(format!(
332                "page type {other:?} cannot be converted to PageMetadata"
333            ))),
334        }
335    }
336}
337
338/// API for reading pages from a column chunk.
339/// This offers a iterator like API to get the next page.
340pub trait PageReader: Iterator<Item = Result<Page>> + Send {
341    /// Gets the next page in the column chunk associated with this reader.
342    /// Returns `None` if there are no pages left.
343    fn get_next_page(&mut self) -> Result<Option<Page>>;
344
345    /// Gets metadata about the next page, returns an error if no
346    /// column index information
347    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>>;
348
349    /// Skips reading the next page, returns an error if no
350    /// column index information
351    fn skip_next_page(&mut self) -> Result<()>;
352
353    /// Returns `true` if the next page can be assumed to contain the start of a new record
354    ///
355    /// Prior to parquet V2 the specification was ambiguous as to whether a single record
356    /// could be split across multiple pages, and prior to [(#4327)] the Rust writer would do
357    /// this in certain situations. However, correctly interpreting the offset index relies on
358    /// this assumption holding [(#4943)], and so this mechanism is provided for a [`PageReader`]
359    /// to signal this to the calling context
360    ///
361    /// [(#4327)]: https://github.com/apache/arrow-rs/pull/4327
362    /// [(#4943)]: https://github.com/apache/arrow-rs/pull/4943
363    fn at_record_boundary(&mut self) -> Result<bool> {
364        Ok(self.peek_next_page()?.is_none())
365    }
366}
367
368/// API for writing pages in a column chunk.
369///
370/// It is reasonable to assume that all pages will be written in the correct order, e.g.
371/// dictionary page followed by data pages, or a set of data pages, etc.
372pub trait PageWriter: Send {
373    /// Writes a page into the output stream/sink.
374    /// Returns `PageWriteSpec` that contains information about written page metrics,
375    /// including number of bytes, size, number of values, offset, etc.
376    ///
377    /// This method is called for every compressed page we write into underlying buffer,
378    /// either data page or dictionary page.
379    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
380
381    /// Closes resources and flushes underlying sink.
382    /// Page writer should not be used after this method is called.
383    fn close(&mut self) -> Result<()>;
384}
385
386/// An iterator over pages of one specific column in a parquet file.
387pub trait PageIterator: Iterator<Item = Result<Box<dyn PageReader>>> + Send {}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392
393    #[test]
394    fn test_page() {
395        let data_page = Page::DataPage {
396            buf: Bytes::from(vec![0, 1, 2]),
397            num_values: 10,
398            encoding: Encoding::PLAIN,
399            def_level_encoding: Encoding::RLE,
400            rep_level_encoding: Encoding::RLE,
401            statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
402        };
403        assert_eq!(data_page.page_type(), PageType::DATA_PAGE);
404        assert_eq!(data_page.buffer(), vec![0, 1, 2].as_slice());
405        assert_eq!(data_page.num_values(), 10);
406        assert_eq!(data_page.encoding(), Encoding::PLAIN);
407        assert_eq!(
408            data_page.statistics(),
409            Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
410        );
411
412        let data_page_v2 = Page::DataPageV2 {
413            buf: Bytes::from(vec![0, 1, 2]),
414            num_values: 10,
415            encoding: Encoding::PLAIN,
416            num_nulls: 5,
417            num_rows: 20,
418            def_levels_byte_len: 30,
419            rep_levels_byte_len: 40,
420            is_compressed: false,
421            statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
422        };
423        assert_eq!(data_page_v2.page_type(), PageType::DATA_PAGE_V2);
424        assert_eq!(data_page_v2.buffer(), vec![0, 1, 2].as_slice());
425        assert_eq!(data_page_v2.num_values(), 10);
426        assert_eq!(data_page_v2.encoding(), Encoding::PLAIN);
427        assert_eq!(
428            data_page_v2.statistics(),
429            Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
430        );
431
432        let dict_page = Page::DictionaryPage {
433            buf: Bytes::from(vec![0, 1, 2]),
434            num_values: 10,
435            encoding: Encoding::PLAIN,
436            is_sorted: false,
437        };
438        assert_eq!(dict_page.page_type(), PageType::DICTIONARY_PAGE);
439        assert_eq!(dict_page.buffer(), vec![0, 1, 2].as_slice());
440        assert_eq!(dict_page.num_values(), 10);
441        assert_eq!(dict_page.encoding(), Encoding::PLAIN);
442        assert_eq!(dict_page.statistics(), None);
443    }
444
445    #[test]
446    fn test_compressed_page() {
447        let data_page = Page::DataPage {
448            buf: Bytes::from(vec![0, 1, 2]),
449            num_values: 10,
450            encoding: Encoding::PLAIN,
451            def_level_encoding: Encoding::RLE,
452            rep_level_encoding: Encoding::RLE,
453            statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
454        };
455
456        let cpage = CompressedPage::new(data_page, 5);
457
458        assert_eq!(cpage.page_type(), PageType::DATA_PAGE);
459        assert_eq!(cpage.uncompressed_size(), 5);
460        assert_eq!(cpage.compressed_size(), 3);
461        assert_eq!(cpage.num_values(), 10);
462        assert_eq!(cpage.encoding(), Encoding::PLAIN);
463        assert_eq!(cpage.data(), &[0, 1, 2]);
464    }
465}