parquet/column/page.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains Parquet Page definitions and page reader interface.
19
20use bytes::Bytes;
21
22use crate::basic::{Encoding, PageType};
23use crate::errors::{ParquetError, Result};
24use crate::file::metadata::thrift::{
25 DataPageHeader, DataPageHeaderV2, DictionaryPageHeader, PageHeader,
26};
27use crate::file::statistics::{Statistics, page_stats_to_thrift};
28
29/// Parquet Page definition.
30///
31/// List of supported pages.
32/// These are 1-to-1 mapped from the equivalent Thrift definitions, except `buf` which
33/// used to store uncompressed bytes of the page.
34#[derive(Clone, Debug)]
35pub enum Page {
36 /// Data page Parquet format v1.
37 DataPage {
38 /// The underlying data buffer
39 buf: Bytes,
40 /// Number of values in this page
41 num_values: u32,
42 /// Encoding for values in this page
43 encoding: Encoding,
44 /// Definition level encoding
45 def_level_encoding: Encoding,
46 /// Repetition level encoding
47 rep_level_encoding: Encoding,
48 /// Optional statistics for this page
49 statistics: Option<Statistics>,
50 },
51 /// Data page Parquet format v2.
52 DataPageV2 {
53 /// The underlying data buffer
54 buf: Bytes,
55 /// Number of values in this page
56 num_values: u32,
57 /// Encoding for values in this page
58 encoding: Encoding,
59 /// Number of null values in this page
60 num_nulls: u32,
61 /// Number of rows in this page
62 num_rows: u32,
63 /// Length of definition levels
64 def_levels_byte_len: u32,
65 /// Length of repetition levels
66 rep_levels_byte_len: u32,
67 /// Is this page compressed
68 is_compressed: bool,
69 /// Optional statistics for this page
70 statistics: Option<Statistics>,
71 },
72 /// Dictionary page.
73 DictionaryPage {
74 /// The underlying data buffer
75 buf: Bytes,
76 /// Number of values in this page
77 num_values: u32,
78 /// Encoding for values in this page
79 encoding: Encoding,
80 /// Is dictionary page sorted
81 is_sorted: bool,
82 },
83}
84
85impl Page {
86 /// Returns [`PageType`] for this page.
87 pub fn page_type(&self) -> PageType {
88 match self {
89 Page::DataPage { .. } => PageType::DATA_PAGE,
90 Page::DataPageV2 { .. } => PageType::DATA_PAGE_V2,
91 Page::DictionaryPage { .. } => PageType::DICTIONARY_PAGE,
92 }
93 }
94
95 /// Returns whether this page is any version of a data page
96 pub fn is_data_page(&self) -> bool {
97 matches!(self, Page::DataPage { .. } | Page::DataPageV2 { .. })
98 }
99
100 /// Returns whether this page is a dictionary page
101 pub fn is_dictionary_page(&self) -> bool {
102 matches!(self, Page::DictionaryPage { .. })
103 }
104
105 /// Returns internal byte buffer reference for this page.
106 pub fn buffer(&self) -> &Bytes {
107 match self {
108 Page::DataPage { buf, .. } => buf,
109 Page::DataPageV2 { buf, .. } => buf,
110 Page::DictionaryPage { buf, .. } => buf,
111 }
112 }
113
114 /// Returns number of values in this page.
115 pub fn num_values(&self) -> u32 {
116 match self {
117 Page::DataPage { num_values, .. } => *num_values,
118 Page::DataPageV2 { num_values, .. } => *num_values,
119 Page::DictionaryPage { num_values, .. } => *num_values,
120 }
121 }
122
123 /// Returns this page [`Encoding`].
124 pub fn encoding(&self) -> Encoding {
125 match self {
126 Page::DataPage { encoding, .. } => *encoding,
127 Page::DataPageV2 { encoding, .. } => *encoding,
128 Page::DictionaryPage { encoding, .. } => *encoding,
129 }
130 }
131
132 /// Returns optional [`Statistics`].
133 pub fn statistics(&self) -> Option<&Statistics> {
134 match self {
135 Page::DataPage { statistics, .. } => statistics.as_ref(),
136 Page::DataPageV2 { statistics, .. } => statistics.as_ref(),
137 Page::DictionaryPage { .. } => None,
138 }
139 }
140}
141
142/// Helper struct to represent pages with potentially compressed buffer (data page v1) or
143/// compressed and concatenated buffer (def levels + rep levels + compressed values for
144/// data page v2).
145///
146/// The difference with `Page` is that `Page` buffer is always uncompressed.
147pub struct CompressedPage {
148 compressed_page: Page,
149 uncompressed_size: usize,
150}
151
152impl CompressedPage {
153 /// Creates `CompressedPage` from a page with potentially compressed buffer and
154 /// uncompressed size.
155 pub fn new(compressed_page: Page, uncompressed_size: usize) -> Self {
156 Self {
157 compressed_page,
158 uncompressed_size,
159 }
160 }
161
162 /// Returns page type.
163 pub fn page_type(&self) -> PageType {
164 self.compressed_page.page_type()
165 }
166
167 /// Returns underlying page with potentially compressed buffer.
168 pub fn compressed_page(&self) -> &Page {
169 &self.compressed_page
170 }
171
172 /// Returns uncompressed size in bytes.
173 pub fn uncompressed_size(&self) -> usize {
174 self.uncompressed_size
175 }
176
177 /// Returns compressed size in bytes.
178 ///
179 /// Note that it is assumed that buffer is compressed, but it may not be. In this
180 /// case compressed size will be equal to uncompressed size.
181 pub fn compressed_size(&self) -> usize {
182 self.compressed_page.buffer().len()
183 }
184
185 /// Number of values in page.
186 pub fn num_values(&self) -> u32 {
187 self.compressed_page.num_values()
188 }
189
190 /// Returns encoding for values in page.
191 pub fn encoding(&self) -> Encoding {
192 self.compressed_page.encoding()
193 }
194
195 /// Returns slice of compressed buffer in the page.
196 pub fn data(&self) -> &[u8] {
197 self.compressed_page.buffer()
198 }
199
200 /// Returns the number of heap bytes this page currently holds.
201 ///
202 /// This is the page's compressed buffer (the embedded [`Bytes`]); use it to
203 /// account for a buffered page's memory footprint rather than reaching for
204 /// `data().len()` at each call site.
205 pub fn memory_usage(&self) -> usize {
206 self.compressed_page.buffer().len()
207 }
208
209 /// Returns the thrift page header
210 pub(crate) fn to_thrift_header(&self) -> Result<PageHeader> {
211 let uncompressed_size = self.uncompressed_size();
212 let compressed_size = self.compressed_size();
213 if uncompressed_size > i32::MAX as usize {
214 return Err(general_err!(
215 "Page uncompressed size overflow: {}",
216 uncompressed_size
217 ));
218 }
219 if compressed_size > i32::MAX as usize {
220 return Err(general_err!(
221 "Page compressed size overflow: {}",
222 compressed_size
223 ));
224 }
225 let num_values = self.num_values();
226 let encoding = self.encoding();
227 let page_type = self.page_type();
228
229 let mut page_header = PageHeader {
230 r#type: page_type,
231 uncompressed_page_size: uncompressed_size as i32,
232 compressed_page_size: compressed_size as i32,
233 // TODO: Add support for crc checksum
234 crc: None,
235 data_page_header: None,
236 index_page_header: None,
237 dictionary_page_header: None,
238 data_page_header_v2: None,
239 };
240
241 match self.compressed_page {
242 Page::DataPage {
243 def_level_encoding,
244 rep_level_encoding,
245 ref statistics,
246 ..
247 } => {
248 let data_page_header = DataPageHeader {
249 num_values: num_values as i32,
250 encoding,
251 definition_level_encoding: def_level_encoding,
252 repetition_level_encoding: rep_level_encoding,
253 statistics: page_stats_to_thrift(statistics.as_ref()),
254 };
255 page_header.data_page_header = Some(data_page_header);
256 }
257 Page::DataPageV2 {
258 num_nulls,
259 num_rows,
260 def_levels_byte_len,
261 rep_levels_byte_len,
262 is_compressed,
263 ref statistics,
264 ..
265 } => {
266 let data_page_header_v2 = DataPageHeaderV2 {
267 num_values: num_values as i32,
268 num_nulls: num_nulls as i32,
269 num_rows: num_rows as i32,
270 encoding,
271 definition_levels_byte_length: def_levels_byte_len as i32,
272 repetition_levels_byte_length: rep_levels_byte_len as i32,
273 is_compressed: Some(is_compressed),
274 statistics: page_stats_to_thrift(statistics.as_ref()),
275 };
276 page_header.data_page_header_v2 = Some(data_page_header_v2);
277 }
278 Page::DictionaryPage { is_sorted, .. } => {
279 let dictionary_page_header = DictionaryPageHeader {
280 num_values: num_values as i32,
281 encoding,
282 is_sorted: Some(is_sorted),
283 };
284 page_header.dictionary_page_header = Some(dictionary_page_header);
285 }
286 }
287 Ok(page_header)
288 }
289
290 /// Update the compressed buffer for a page.
291 /// This might be required when encrypting page data for example.
292 /// The size of uncompressed data must not change.
293 #[cfg(feature = "encryption")]
294 pub(crate) fn with_new_compressed_buffer(mut self, new_buffer: Bytes) -> Self {
295 match &mut self.compressed_page {
296 Page::DataPage { buf, .. } => {
297 *buf = new_buffer;
298 }
299 Page::DataPageV2 { buf, .. } => {
300 *buf = new_buffer;
301 }
302 Page::DictionaryPage { buf, .. } => {
303 *buf = new_buffer;
304 }
305 }
306 self
307 }
308}
309
310/// Contains page write metrics.
311pub struct PageWriteSpec {
312 /// The type of page being written
313 pub page_type: PageType,
314 /// The total size of the page, before compression
315 pub uncompressed_size: usize,
316 /// The compressed size of the page
317 pub compressed_size: usize,
318 /// The number of values in the page
319 pub num_values: u32,
320 /// The offset of the page in the column chunk
321 pub offset: u64,
322 /// The number of bytes written to the underlying sink
323 pub bytes_written: u64,
324}
325
326impl Default for PageWriteSpec {
327 fn default() -> Self {
328 Self::new()
329 }
330}
331
332impl PageWriteSpec {
333 /// Creates new spec with default page write metrics.
334 pub fn new() -> Self {
335 Self {
336 page_type: PageType::DATA_PAGE,
337 uncompressed_size: 0,
338 compressed_size: 0,
339 num_values: 0,
340 offset: 0,
341 bytes_written: 0,
342 }
343 }
344}
345
346/// Contains metadata for a page
347#[derive(Clone)]
348pub struct PageMetadata {
349 /// The number of rows within the page if known
350 pub num_rows: Option<usize>,
351 /// The number of levels within the page if known
352 pub num_levels: Option<usize>,
353 /// Returns true if the page is a dictionary page
354 pub is_dict: bool,
355}
356
357impl TryFrom<&crate::file::metadata::thrift::PageHeader> for PageMetadata {
358 type Error = ParquetError;
359
360 fn try_from(
361 value: &crate::file::metadata::thrift::PageHeader,
362 ) -> std::result::Result<Self, Self::Error> {
363 match value.r#type {
364 PageType::DATA_PAGE => {
365 let header = value.data_page_header.as_ref().unwrap();
366 Ok(PageMetadata {
367 num_rows: None,
368 num_levels: Some(header.num_values as _),
369 is_dict: false,
370 })
371 }
372 PageType::DICTIONARY_PAGE => Ok(PageMetadata {
373 num_rows: None,
374 num_levels: None,
375 is_dict: true,
376 }),
377 PageType::DATA_PAGE_V2 => {
378 let header = value.data_page_header_v2.as_ref().unwrap();
379 Ok(PageMetadata {
380 num_rows: Some(header.num_rows as _),
381 num_levels: Some(header.num_values as _),
382 is_dict: false,
383 })
384 }
385 other => Err(ParquetError::General(format!(
386 "page type {other:?} cannot be converted to PageMetadata"
387 ))),
388 }
389 }
390}
391
392/// API for reading pages from a column chunk.
393/// This offers a iterator like API to get the next page.
394pub trait PageReader: Iterator<Item = Result<Page>> + Send {
395 /// Gets the next page in the column chunk associated with this reader.
396 /// Returns `None` if there are no pages left.
397 fn get_next_page(&mut self) -> Result<Option<Page>>;
398
399 /// Gets metadata about the next page, returns an error if no
400 /// column index information
401 fn peek_next_page(&mut self) -> Result<Option<PageMetadata>>;
402
403 /// Skips reading the next page, returns an error if no
404 /// column index information
405 fn skip_next_page(&mut self) -> Result<()>;
406
407 /// Returns `true` if the next page can be assumed to contain the start of a new record
408 ///
409 /// Prior to parquet V2 the specification was ambiguous as to whether a single record
410 /// could be split across multiple pages, and prior to [(#4327)] the Rust writer would do
411 /// this in certain situations. However, correctly interpreting the offset index relies on
412 /// this assumption holding [(#4943)], and so this mechanism is provided for a [`PageReader`]
413 /// to signal this to the calling context
414 ///
415 /// [(#4327)]: https://github.com/apache/arrow-rs/pull/4327
416 /// [(#4943)]: https://github.com/apache/arrow-rs/pull/4943
417 fn at_record_boundary(&mut self) -> Result<bool> {
418 match self.peek_next_page()? {
419 // Last page in the column chunk - always a record boundary
420 None => Ok(true),
421 // A V2 data page is required by the parquet spec to start at a
422 // record boundary, so the current page ends at one. V2 pages
423 // are identified by having `num_rows` set in their header.
424 Some(metadata) => Ok(metadata.num_rows.is_some()),
425 }
426 }
427}
428
429/// API for writing pages in a column chunk.
430///
431/// It is reasonable to assume that all pages will be written in the correct order, e.g.
432/// dictionary page followed by data pages, or a set of data pages, etc.
433pub trait PageWriter: Send {
434 /// Writes a page into the output stream/sink.
435 /// Returns `PageWriteSpec` that contains information about written page metrics,
436 /// including number of bytes, size, number of values, offset, etc.
437 ///
438 /// This method is called for every compressed page we write into underlying buffer,
439 /// either data page or dictionary page.
440 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
441
442 /// **Unstable, not public API.** This is an internal protocol between
443 /// [`GenericColumnWriter`] and its in-crate page writers; it is hidden from
444 /// the rendered docs and may change or be removed without a major version
445 /// bump. External `PageWriter` implementations should not override it. See
446 /// the page-spilling cleanup tracked in
447 /// <https://github.com/apache/arrow-rs/pull/10020>.
448 ///
449 /// Whether this writer resolves the final page layout itself (at flush)
450 /// rather than committing bytes to their final position as pages arrive.
451 ///
452 /// The dictionary page of a column chunk must be written *first*, but it is
453 /// not finalized until every value has been seen. A writer that commits
454 /// bytes live (e.g. straight to a file) therefore relies on the column
455 /// writer buffering the dictionary-encoded data pages in memory until the
456 /// dictionary page is ready — see [`GenericColumnWriter`]'s `data_pages`.
457 ///
458 /// A writer that instead buffers the whole chunk and splices it later (the
459 /// [`ArrowWriter`] path) can accept data pages *before* the dictionary page
460 /// and order them itself at flush. Returning `true` tells the column writer
461 /// to skip that in-memory buffering and stream dictionary-column data pages
462 /// straight through, bounding the column writer's memory.
463 ///
464 /// [`GenericColumnWriter`]: crate::column::writer::GenericColumnWriter
465 /// [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
466 #[doc(hidden)]
467 fn defers_dictionary_ordering(&self) -> bool {
468 false
469 }
470
471 /// **Unstable, not public API.** Companion to
472 /// [`defers_dictionary_ordering`](Self::defers_dictionary_ordering): an
473 /// internal hook for the column writer's memory accounting, hidden from the
474 /// rendered docs and subject to change or removal without a major version
475 /// bump. External `PageWriter` implementations should not override it.
476 ///
477 /// The number of bytes this writer is currently holding **in memory** for
478 /// pages it has been handed (i.e. completed pages not yet committed to their
479 /// final destination).
480 ///
481 /// Used by the column writer to report its memory footprint. The default is
482 /// `0`: a writer that streams pages straight to their destination retains
483 /// nothing. A writer that buffers pages should report what it actually holds
484 /// on the heap — which, when it spills to a backing store, can be far less
485 /// than the bytes written.
486 #[doc(hidden)]
487 fn buffered_memory_size(&self) -> usize {
488 0
489 }
490
491 /// Closes resources and flushes underlying sink.
492 /// Page writer should not be used after this method is called.
493 fn close(&mut self) -> Result<()>;
494}
495
496/// An iterator over pages of one specific column in a parquet file.
497pub trait PageIterator: Iterator<Item = Result<Box<dyn PageReader>>> + Send {}
498
499#[cfg(test)]
500mod tests {
501 use super::*;
502
503 #[test]
504 fn test_page() {
505 let data_page = Page::DataPage {
506 buf: Bytes::from(vec![0, 1, 2]),
507 num_values: 10,
508 encoding: Encoding::PLAIN,
509 def_level_encoding: Encoding::RLE,
510 rep_level_encoding: Encoding::RLE,
511 statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
512 };
513 assert_eq!(data_page.page_type(), PageType::DATA_PAGE);
514 assert_eq!(data_page.buffer(), vec![0, 1, 2].as_slice());
515 assert_eq!(data_page.num_values(), 10);
516 assert_eq!(data_page.encoding(), Encoding::PLAIN);
517 assert_eq!(
518 data_page.statistics(),
519 Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
520 );
521
522 let data_page_v2 = Page::DataPageV2 {
523 buf: Bytes::from(vec![0, 1, 2]),
524 num_values: 10,
525 encoding: Encoding::PLAIN,
526 num_nulls: 5,
527 num_rows: 20,
528 def_levels_byte_len: 30,
529 rep_levels_byte_len: 40,
530 is_compressed: false,
531 statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
532 };
533 assert_eq!(data_page_v2.page_type(), PageType::DATA_PAGE_V2);
534 assert_eq!(data_page_v2.buffer(), vec![0, 1, 2].as_slice());
535 assert_eq!(data_page_v2.num_values(), 10);
536 assert_eq!(data_page_v2.encoding(), Encoding::PLAIN);
537 assert_eq!(
538 data_page_v2.statistics(),
539 Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
540 );
541
542 let dict_page = Page::DictionaryPage {
543 buf: Bytes::from(vec![0, 1, 2]),
544 num_values: 10,
545 encoding: Encoding::PLAIN,
546 is_sorted: false,
547 };
548 assert_eq!(dict_page.page_type(), PageType::DICTIONARY_PAGE);
549 assert_eq!(dict_page.buffer(), vec![0, 1, 2].as_slice());
550 assert_eq!(dict_page.num_values(), 10);
551 assert_eq!(dict_page.encoding(), Encoding::PLAIN);
552 assert_eq!(dict_page.statistics(), None);
553 }
554
555 #[test]
556 fn test_compressed_page() {
557 let data_page = Page::DataPage {
558 buf: Bytes::from(vec![0, 1, 2]),
559 num_values: 10,
560 encoding: Encoding::PLAIN,
561 def_level_encoding: Encoding::RLE,
562 rep_level_encoding: Encoding::RLE,
563 statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
564 };
565
566 let cpage = CompressedPage::new(data_page, 5);
567
568 assert_eq!(cpage.page_type(), PageType::DATA_PAGE);
569 assert_eq!(cpage.uncompressed_size(), 5);
570 assert_eq!(cpage.compressed_size(), 3);
571 assert_eq!(cpage.num_values(), 10);
572 assert_eq!(cpage.encoding(), Encoding::PLAIN);
573 assert_eq!(cpage.data(), &[0, 1, 2]);
574 }
575
576 #[test]
577 fn test_compressed_page_uncompressed_size_overflow() {
578 // Test that to_thrift_header fails when uncompressed size exceeds i32::MAX
579 let data_page = Page::DataPage {
580 buf: Bytes::from(vec![0, 1, 2]),
581 num_values: 10,
582 encoding: Encoding::PLAIN,
583 def_level_encoding: Encoding::RLE,
584 rep_level_encoding: Encoding::RLE,
585 statistics: None,
586 };
587
588 // Create a CompressedPage with uncompressed size larger than i32::MAX
589 let uncompressed_size = (i32::MAX as usize) + 1;
590 let cpage = CompressedPage::new(data_page, uncompressed_size);
591
592 // Verify that to_thrift_header returns an error
593 let result = cpage.to_thrift_header();
594 assert!(result.is_err());
595
596 let error_msg = result.unwrap_err().to_string();
597 assert!(error_msg.contains("Page uncompressed size overflow"));
598 }
599}