parquet/column/
page.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains Parquet Page definitions and page reader interface.
19
20use bytes::Bytes;
21
22use crate::basic::{Encoding, PageType};
23use crate::errors::{ParquetError, Result};
24use crate::file::statistics::Statistics;
25use crate::format::PageHeader;
26
27/// Parquet Page definition.
28///
29/// List of supported pages.
30/// These are 1-to-1 mapped from the equivalent Thrift definitions, except `buf` which
31/// used to store uncompressed bytes of the page.
32#[derive(Clone)]
33pub enum Page {
34    /// Data page Parquet format v1.
35    DataPage {
36        /// The underlying data buffer
37        buf: Bytes,
38        /// Number of values in this page
39        num_values: u32,
40        /// Encoding for values in this page
41        encoding: Encoding,
42        /// Definition level encoding
43        def_level_encoding: Encoding,
44        /// Repetition level encoding
45        rep_level_encoding: Encoding,
46        /// Optional statistics for this page
47        statistics: Option<Statistics>,
48    },
49    /// Data page Parquet format v2.
50    DataPageV2 {
51        /// The underlying data buffer
52        buf: Bytes,
53        /// Number of values in this page
54        num_values: u32,
55        /// Encoding for values in this page
56        encoding: Encoding,
57        /// Number of null values in this page
58        num_nulls: u32,
59        /// Number of rows in this page
60        num_rows: u32,
61        /// Length of definition levels
62        def_levels_byte_len: u32,
63        /// Length of repetition levels
64        rep_levels_byte_len: u32,
65        /// Is this page compressed
66        is_compressed: bool,
67        /// Optional statistics for this page
68        statistics: Option<Statistics>,
69    },
70    /// Dictionary page.
71    DictionaryPage {
72        /// The underlying data buffer
73        buf: Bytes,
74        /// Number of values in this page
75        num_values: u32,
76        /// Encoding for values in this page
77        encoding: Encoding,
78        /// Is dictionary page sorted
79        is_sorted: bool,
80    },
81}
82
83impl Page {
84    /// Returns [`PageType`] for this page.
85    pub fn page_type(&self) -> PageType {
86        match self {
87            Page::DataPage { .. } => PageType::DATA_PAGE,
88            Page::DataPageV2 { .. } => PageType::DATA_PAGE_V2,
89            Page::DictionaryPage { .. } => PageType::DICTIONARY_PAGE,
90        }
91    }
92
93    /// Returns whether this page is any version of a data page
94    pub fn is_data_page(&self) -> bool {
95        matches!(self, Page::DataPage { .. } | Page::DataPageV2 { .. })
96    }
97
98    /// Returns whether this page is a dictionary page
99    pub fn is_dictionary_page(&self) -> bool {
100        matches!(self, Page::DictionaryPage { .. })
101    }
102
103    /// Returns internal byte buffer reference for this page.
104    pub fn buffer(&self) -> &Bytes {
105        match self {
106            Page::DataPage { ref buf, .. } => buf,
107            Page::DataPageV2 { ref buf, .. } => buf,
108            Page::DictionaryPage { ref buf, .. } => buf,
109        }
110    }
111
112    /// Returns number of values in this page.
113    pub fn num_values(&self) -> u32 {
114        match self {
115            Page::DataPage { num_values, .. } => *num_values,
116            Page::DataPageV2 { num_values, .. } => *num_values,
117            Page::DictionaryPage { num_values, .. } => *num_values,
118        }
119    }
120
121    /// Returns this page [`Encoding`].
122    pub fn encoding(&self) -> Encoding {
123        match self {
124            Page::DataPage { encoding, .. } => *encoding,
125            Page::DataPageV2 { encoding, .. } => *encoding,
126            Page::DictionaryPage { encoding, .. } => *encoding,
127        }
128    }
129
130    /// Returns optional [`Statistics`].
131    pub fn statistics(&self) -> Option<&Statistics> {
132        match self {
133            Page::DataPage { ref statistics, .. } => statistics.as_ref(),
134            Page::DataPageV2 { ref statistics, .. } => statistics.as_ref(),
135            Page::DictionaryPage { .. } => None,
136        }
137    }
138}
139
140/// Helper struct to represent pages with potentially compressed buffer (data page v1) or
141/// compressed and concatenated buffer (def levels + rep levels + compressed values for
142/// data page v2).
143///
144/// The difference with `Page` is that `Page` buffer is always uncompressed.
145pub struct CompressedPage {
146    compressed_page: Page,
147    uncompressed_size: usize,
148}
149
150impl CompressedPage {
151    /// Creates `CompressedPage` from a page with potentially compressed buffer and
152    /// uncompressed size.
153    pub fn new(compressed_page: Page, uncompressed_size: usize) -> Self {
154        Self {
155            compressed_page,
156            uncompressed_size,
157        }
158    }
159
160    /// Returns page type.
161    pub fn page_type(&self) -> PageType {
162        self.compressed_page.page_type()
163    }
164
165    /// Returns underlying page with potentially compressed buffer.
166    pub fn compressed_page(&self) -> &Page {
167        &self.compressed_page
168    }
169
170    /// Returns uncompressed size in bytes.
171    pub fn uncompressed_size(&self) -> usize {
172        self.uncompressed_size
173    }
174
175    /// Returns compressed size in bytes.
176    ///
177    /// Note that it is assumed that buffer is compressed, but it may not be. In this
178    /// case compressed size will be equal to uncompressed size.
179    pub fn compressed_size(&self) -> usize {
180        self.compressed_page.buffer().len()
181    }
182
183    /// Number of values in page.
184    pub fn num_values(&self) -> u32 {
185        self.compressed_page.num_values()
186    }
187
188    /// Returns encoding for values in page.
189    pub fn encoding(&self) -> Encoding {
190        self.compressed_page.encoding()
191    }
192
193    /// Returns slice of compressed buffer in the page.
194    pub fn data(&self) -> &[u8] {
195        self.compressed_page.buffer()
196    }
197
198    /// Returns the thrift page header
199    pub(crate) fn to_thrift_header(&self) -> PageHeader {
200        let uncompressed_size = self.uncompressed_size();
201        let compressed_size = self.compressed_size();
202        let num_values = self.num_values();
203        let encoding = self.encoding();
204        let page_type = self.page_type();
205
206        let mut page_header = PageHeader {
207            type_: page_type.into(),
208            uncompressed_page_size: uncompressed_size as i32,
209            compressed_page_size: compressed_size as i32,
210            // TODO: Add support for crc checksum
211            crc: None,
212            data_page_header: None,
213            index_page_header: None,
214            dictionary_page_header: None,
215            data_page_header_v2: None,
216        };
217
218        match self.compressed_page {
219            Page::DataPage {
220                def_level_encoding,
221                rep_level_encoding,
222                ref statistics,
223                ..
224            } => {
225                let data_page_header = crate::format::DataPageHeader {
226                    num_values: num_values as i32,
227                    encoding: encoding.into(),
228                    definition_level_encoding: def_level_encoding.into(),
229                    repetition_level_encoding: rep_level_encoding.into(),
230                    statistics: crate::file::statistics::to_thrift(statistics.as_ref()),
231                };
232                page_header.data_page_header = Some(data_page_header);
233            }
234            Page::DataPageV2 {
235                num_nulls,
236                num_rows,
237                def_levels_byte_len,
238                rep_levels_byte_len,
239                is_compressed,
240                ref statistics,
241                ..
242            } => {
243                let data_page_header_v2 = crate::format::DataPageHeaderV2 {
244                    num_values: num_values as i32,
245                    num_nulls: num_nulls as i32,
246                    num_rows: num_rows as i32,
247                    encoding: encoding.into(),
248                    definition_levels_byte_length: def_levels_byte_len as i32,
249                    repetition_levels_byte_length: rep_levels_byte_len as i32,
250                    is_compressed: Some(is_compressed),
251                    statistics: crate::file::statistics::to_thrift(statistics.as_ref()),
252                };
253                page_header.data_page_header_v2 = Some(data_page_header_v2);
254            }
255            Page::DictionaryPage { is_sorted, .. } => {
256                let dictionary_page_header = crate::format::DictionaryPageHeader {
257                    num_values: num_values as i32,
258                    encoding: encoding.into(),
259                    is_sorted: Some(is_sorted),
260                };
261                page_header.dictionary_page_header = Some(dictionary_page_header);
262            }
263        }
264        page_header
265    }
266
267    /// Update the compressed buffer for a page.
268    /// This might be required when encrypting page data for example.
269    /// The size of uncompressed data must not change.
270    #[cfg(feature = "encryption")]
271    pub(crate) fn with_new_compressed_buffer(mut self, new_buffer: Bytes) -> Self {
272        match &mut self.compressed_page {
273            Page::DataPage { buf, .. } => {
274                *buf = new_buffer;
275            }
276            Page::DataPageV2 { buf, .. } => {
277                *buf = new_buffer;
278            }
279            Page::DictionaryPage { buf, .. } => {
280                *buf = new_buffer;
281            }
282        }
283        self
284    }
285}
286
287/// Contains page write metrics.
288pub struct PageWriteSpec {
289    /// The type of page being written
290    pub page_type: PageType,
291    /// The total size of the page, before compression
292    pub uncompressed_size: usize,
293    /// The compressed size of the page
294    pub compressed_size: usize,
295    /// The number of values in the page
296    pub num_values: u32,
297    /// The offset of the page in the column chunk
298    pub offset: u64,
299    /// The number of bytes written to the underlying sink
300    pub bytes_written: u64,
301}
302
303impl Default for PageWriteSpec {
304    fn default() -> Self {
305        Self::new()
306    }
307}
308
309impl PageWriteSpec {
310    /// Creates new spec with default page write metrics.
311    pub fn new() -> Self {
312        Self {
313            page_type: PageType::DATA_PAGE,
314            uncompressed_size: 0,
315            compressed_size: 0,
316            num_values: 0,
317            offset: 0,
318            bytes_written: 0,
319        }
320    }
321}
322
323/// Contains metadata for a page
324#[derive(Clone)]
325pub struct PageMetadata {
326    /// The number of rows within the page if known
327    pub num_rows: Option<usize>,
328    /// The number of levels within the page if known
329    pub num_levels: Option<usize>,
330    /// Returns true if the page is a dictionary page
331    pub is_dict: bool,
332}
333
334impl TryFrom<&PageHeader> for PageMetadata {
335    type Error = ParquetError;
336
337    fn try_from(value: &PageHeader) -> std::result::Result<Self, Self::Error> {
338        match value.type_ {
339            crate::format::PageType::DATA_PAGE => {
340                let header = value.data_page_header.as_ref().unwrap();
341                Ok(PageMetadata {
342                    num_rows: None,
343                    num_levels: Some(header.num_values as _),
344                    is_dict: false,
345                })
346            }
347            crate::format::PageType::DICTIONARY_PAGE => Ok(PageMetadata {
348                num_rows: None,
349                num_levels: None,
350                is_dict: true,
351            }),
352            crate::format::PageType::DATA_PAGE_V2 => {
353                let header = value.data_page_header_v2.as_ref().unwrap();
354                Ok(PageMetadata {
355                    num_rows: Some(header.num_rows as _),
356                    num_levels: Some(header.num_values as _),
357                    is_dict: false,
358                })
359            }
360            other => Err(ParquetError::General(format!(
361                "page type {other:?} cannot be converted to PageMetadata"
362            ))),
363        }
364    }
365}
366
367/// API for reading pages from a column chunk.
368/// This offers a iterator like API to get the next page.
369pub trait PageReader: Iterator<Item = Result<Page>> + Send {
370    /// Gets the next page in the column chunk associated with this reader.
371    /// Returns `None` if there are no pages left.
372    fn get_next_page(&mut self) -> Result<Option<Page>>;
373
374    /// Gets metadata about the next page, returns an error if no
375    /// column index information
376    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>>;
377
378    /// Skips reading the next page, returns an error if no
379    /// column index information
380    fn skip_next_page(&mut self) -> Result<()>;
381
382    /// Returns `true` if the next page can be assumed to contain the start of a new record
383    ///
384    /// Prior to parquet V2 the specification was ambiguous as to whether a single record
385    /// could be split across multiple pages, and prior to [(#4327)] the Rust writer would do
386    /// this in certain situations. However, correctly interpreting the offset index relies on
387    /// this assumption holding [(#4943)], and so this mechanism is provided for a [`PageReader`]
388    /// to signal this to the calling context
389    ///
390    /// [(#4327)]: https://github.com/apache/arrow-rs/pull/4327
391    /// [(#4943)]: https://github.com/apache/arrow-rs/pull/4943
392    fn at_record_boundary(&mut self) -> Result<bool> {
393        Ok(self.peek_next_page()?.is_none())
394    }
395}
396
397/// API for writing pages in a column chunk.
398///
399/// It is reasonable to assume that all pages will be written in the correct order, e.g.
400/// dictionary page followed by data pages, or a set of data pages, etc.
401pub trait PageWriter: Send {
402    /// Writes a page into the output stream/sink.
403    /// Returns `PageWriteSpec` that contains information about written page metrics,
404    /// including number of bytes, size, number of values, offset, etc.
405    ///
406    /// This method is called for every compressed page we write into underlying buffer,
407    /// either data page or dictionary page.
408    fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
409
410    /// Closes resources and flushes underlying sink.
411    /// Page writer should not be used after this method is called.
412    fn close(&mut self) -> Result<()>;
413}
414
415/// An iterator over pages of one specific column in a parquet file.
416pub trait PageIterator: Iterator<Item = Result<Box<dyn PageReader>>> + Send {}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421
422    #[test]
423    fn test_page() {
424        let data_page = Page::DataPage {
425            buf: Bytes::from(vec![0, 1, 2]),
426            num_values: 10,
427            encoding: Encoding::PLAIN,
428            def_level_encoding: Encoding::RLE,
429            rep_level_encoding: Encoding::RLE,
430            statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
431        };
432        assert_eq!(data_page.page_type(), PageType::DATA_PAGE);
433        assert_eq!(data_page.buffer(), vec![0, 1, 2].as_slice());
434        assert_eq!(data_page.num_values(), 10);
435        assert_eq!(data_page.encoding(), Encoding::PLAIN);
436        assert_eq!(
437            data_page.statistics(),
438            Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
439        );
440
441        let data_page_v2 = Page::DataPageV2 {
442            buf: Bytes::from(vec![0, 1, 2]),
443            num_values: 10,
444            encoding: Encoding::PLAIN,
445            num_nulls: 5,
446            num_rows: 20,
447            def_levels_byte_len: 30,
448            rep_levels_byte_len: 40,
449            is_compressed: false,
450            statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
451        };
452        assert_eq!(data_page_v2.page_type(), PageType::DATA_PAGE_V2);
453        assert_eq!(data_page_v2.buffer(), vec![0, 1, 2].as_slice());
454        assert_eq!(data_page_v2.num_values(), 10);
455        assert_eq!(data_page_v2.encoding(), Encoding::PLAIN);
456        assert_eq!(
457            data_page_v2.statistics(),
458            Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
459        );
460
461        let dict_page = Page::DictionaryPage {
462            buf: Bytes::from(vec![0, 1, 2]),
463            num_values: 10,
464            encoding: Encoding::PLAIN,
465            is_sorted: false,
466        };
467        assert_eq!(dict_page.page_type(), PageType::DICTIONARY_PAGE);
468        assert_eq!(dict_page.buffer(), vec![0, 1, 2].as_slice());
469        assert_eq!(dict_page.num_values(), 10);
470        assert_eq!(dict_page.encoding(), Encoding::PLAIN);
471        assert_eq!(dict_page.statistics(), None);
472    }
473
474    #[test]
475    fn test_compressed_page() {
476        let data_page = Page::DataPage {
477            buf: Bytes::from(vec![0, 1, 2]),
478            num_values: 10,
479            encoding: Encoding::PLAIN,
480            def_level_encoding: Encoding::RLE,
481            rep_level_encoding: Encoding::RLE,
482            statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
483        };
484
485        let cpage = CompressedPage::new(data_page, 5);
486
487        assert_eq!(cpage.page_type(), PageType::DATA_PAGE);
488        assert_eq!(cpage.uncompressed_size(), 5);
489        assert_eq!(cpage.compressed_size(), 3);
490        assert_eq!(cpage.num_values(), 10);
491        assert_eq!(cpage.encoding(), Encoding::PLAIN);
492        assert_eq!(cpage.data(), &[0, 1, 2]);
493    }
494}