1use bytes::Bytes;
21
22use crate::basic::{Encoding, PageType};
23use crate::errors::{ParquetError, Result};
24use crate::file::metadata::thrift::{
25 DataPageHeader, DataPageHeaderV2, DictionaryPageHeader, PageHeader,
26};
27use crate::file::statistics::{Statistics, page_stats_to_thrift};
28
29#[derive(Clone)]
35pub enum Page {
36 DataPage {
38 buf: Bytes,
40 num_values: u32,
42 encoding: Encoding,
44 def_level_encoding: Encoding,
46 rep_level_encoding: Encoding,
48 statistics: Option<Statistics>,
50 },
51 DataPageV2 {
53 buf: Bytes,
55 num_values: u32,
57 encoding: Encoding,
59 num_nulls: u32,
61 num_rows: u32,
63 def_levels_byte_len: u32,
65 rep_levels_byte_len: u32,
67 is_compressed: bool,
69 statistics: Option<Statistics>,
71 },
72 DictionaryPage {
74 buf: Bytes,
76 num_values: u32,
78 encoding: Encoding,
80 is_sorted: bool,
82 },
83}
84
85impl Page {
86 pub fn page_type(&self) -> PageType {
88 match self {
89 Page::DataPage { .. } => PageType::DATA_PAGE,
90 Page::DataPageV2 { .. } => PageType::DATA_PAGE_V2,
91 Page::DictionaryPage { .. } => PageType::DICTIONARY_PAGE,
92 }
93 }
94
95 pub fn is_data_page(&self) -> bool {
97 matches!(self, Page::DataPage { .. } | Page::DataPageV2 { .. })
98 }
99
100 pub fn is_dictionary_page(&self) -> bool {
102 matches!(self, Page::DictionaryPage { .. })
103 }
104
105 pub fn buffer(&self) -> &Bytes {
107 match self {
108 Page::DataPage { buf, .. } => buf,
109 Page::DataPageV2 { buf, .. } => buf,
110 Page::DictionaryPage { buf, .. } => buf,
111 }
112 }
113
114 pub fn num_values(&self) -> u32 {
116 match self {
117 Page::DataPage { num_values, .. } => *num_values,
118 Page::DataPageV2 { num_values, .. } => *num_values,
119 Page::DictionaryPage { num_values, .. } => *num_values,
120 }
121 }
122
123 pub fn encoding(&self) -> Encoding {
125 match self {
126 Page::DataPage { encoding, .. } => *encoding,
127 Page::DataPageV2 { encoding, .. } => *encoding,
128 Page::DictionaryPage { encoding, .. } => *encoding,
129 }
130 }
131
132 pub fn statistics(&self) -> Option<&Statistics> {
134 match self {
135 Page::DataPage { statistics, .. } => statistics.as_ref(),
136 Page::DataPageV2 { statistics, .. } => statistics.as_ref(),
137 Page::DictionaryPage { .. } => None,
138 }
139 }
140}
141
142pub struct CompressedPage {
148 compressed_page: Page,
149 uncompressed_size: usize,
150}
151
152impl CompressedPage {
153 pub fn new(compressed_page: Page, uncompressed_size: usize) -> Self {
156 Self {
157 compressed_page,
158 uncompressed_size,
159 }
160 }
161
162 pub fn page_type(&self) -> PageType {
164 self.compressed_page.page_type()
165 }
166
167 pub fn compressed_page(&self) -> &Page {
169 &self.compressed_page
170 }
171
172 pub fn uncompressed_size(&self) -> usize {
174 self.uncompressed_size
175 }
176
177 pub fn compressed_size(&self) -> usize {
182 self.compressed_page.buffer().len()
183 }
184
185 pub fn num_values(&self) -> u32 {
187 self.compressed_page.num_values()
188 }
189
190 pub fn encoding(&self) -> Encoding {
192 self.compressed_page.encoding()
193 }
194
195 pub fn data(&self) -> &[u8] {
197 self.compressed_page.buffer()
198 }
199
200 pub(crate) fn to_thrift_header(&self) -> Result<PageHeader> {
202 let uncompressed_size = self.uncompressed_size();
203 let compressed_size = self.compressed_size();
204 if uncompressed_size > i32::MAX as usize {
205 return Err(general_err!(
206 "Page uncompressed size overflow: {}",
207 uncompressed_size
208 ));
209 }
210 if compressed_size > i32::MAX as usize {
211 return Err(general_err!(
212 "Page compressed size overflow: {}",
213 compressed_size
214 ));
215 }
216 let num_values = self.num_values();
217 let encoding = self.encoding();
218 let page_type = self.page_type();
219
220 let mut page_header = PageHeader {
221 r#type: page_type,
222 uncompressed_page_size: uncompressed_size as i32,
223 compressed_page_size: compressed_size as i32,
224 crc: None,
226 data_page_header: None,
227 index_page_header: None,
228 dictionary_page_header: None,
229 data_page_header_v2: None,
230 };
231
232 match self.compressed_page {
233 Page::DataPage {
234 def_level_encoding,
235 rep_level_encoding,
236 ref statistics,
237 ..
238 } => {
239 let data_page_header = DataPageHeader {
240 num_values: num_values as i32,
241 encoding,
242 definition_level_encoding: def_level_encoding,
243 repetition_level_encoding: rep_level_encoding,
244 statistics: page_stats_to_thrift(statistics.as_ref()),
245 };
246 page_header.data_page_header = Some(data_page_header);
247 }
248 Page::DataPageV2 {
249 num_nulls,
250 num_rows,
251 def_levels_byte_len,
252 rep_levels_byte_len,
253 is_compressed,
254 ref statistics,
255 ..
256 } => {
257 let data_page_header_v2 = DataPageHeaderV2 {
258 num_values: num_values as i32,
259 num_nulls: num_nulls as i32,
260 num_rows: num_rows as i32,
261 encoding,
262 definition_levels_byte_length: def_levels_byte_len as i32,
263 repetition_levels_byte_length: rep_levels_byte_len as i32,
264 is_compressed: Some(is_compressed),
265 statistics: page_stats_to_thrift(statistics.as_ref()),
266 };
267 page_header.data_page_header_v2 = Some(data_page_header_v2);
268 }
269 Page::DictionaryPage { is_sorted, .. } => {
270 let dictionary_page_header = DictionaryPageHeader {
271 num_values: num_values as i32,
272 encoding,
273 is_sorted: Some(is_sorted),
274 };
275 page_header.dictionary_page_header = Some(dictionary_page_header);
276 }
277 }
278 Ok(page_header)
279 }
280
281 #[cfg(feature = "encryption")]
285 pub(crate) fn with_new_compressed_buffer(mut self, new_buffer: Bytes) -> Self {
286 match &mut self.compressed_page {
287 Page::DataPage { buf, .. } => {
288 *buf = new_buffer;
289 }
290 Page::DataPageV2 { buf, .. } => {
291 *buf = new_buffer;
292 }
293 Page::DictionaryPage { buf, .. } => {
294 *buf = new_buffer;
295 }
296 }
297 self
298 }
299}
300
301pub struct PageWriteSpec {
303 pub page_type: PageType,
305 pub uncompressed_size: usize,
307 pub compressed_size: usize,
309 pub num_values: u32,
311 pub offset: u64,
313 pub bytes_written: u64,
315}
316
317impl Default for PageWriteSpec {
318 fn default() -> Self {
319 Self::new()
320 }
321}
322
323impl PageWriteSpec {
324 pub fn new() -> Self {
326 Self {
327 page_type: PageType::DATA_PAGE,
328 uncompressed_size: 0,
329 compressed_size: 0,
330 num_values: 0,
331 offset: 0,
332 bytes_written: 0,
333 }
334 }
335}
336
337#[derive(Clone)]
339pub struct PageMetadata {
340 pub num_rows: Option<usize>,
342 pub num_levels: Option<usize>,
344 pub is_dict: bool,
346}
347
348impl TryFrom<&crate::file::metadata::thrift::PageHeader> for PageMetadata {
349 type Error = ParquetError;
350
351 fn try_from(
352 value: &crate::file::metadata::thrift::PageHeader,
353 ) -> std::result::Result<Self, Self::Error> {
354 match value.r#type {
355 PageType::DATA_PAGE => {
356 let header = value.data_page_header.as_ref().unwrap();
357 Ok(PageMetadata {
358 num_rows: None,
359 num_levels: Some(header.num_values as _),
360 is_dict: false,
361 })
362 }
363 PageType::DICTIONARY_PAGE => Ok(PageMetadata {
364 num_rows: None,
365 num_levels: None,
366 is_dict: true,
367 }),
368 PageType::DATA_PAGE_V2 => {
369 let header = value.data_page_header_v2.as_ref().unwrap();
370 Ok(PageMetadata {
371 num_rows: Some(header.num_rows as _),
372 num_levels: Some(header.num_values as _),
373 is_dict: false,
374 })
375 }
376 other => Err(ParquetError::General(format!(
377 "page type {other:?} cannot be converted to PageMetadata"
378 ))),
379 }
380 }
381}
382
383pub trait PageReader: Iterator<Item = Result<Page>> + Send {
386 fn get_next_page(&mut self) -> Result<Option<Page>>;
389
390 fn peek_next_page(&mut self) -> Result<Option<PageMetadata>>;
393
394 fn skip_next_page(&mut self) -> Result<()>;
397
398 fn at_record_boundary(&mut self) -> Result<bool> {
409 Ok(self.peek_next_page()?.is_none())
410 }
411}
412
413pub trait PageWriter: Send {
418 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
425
426 fn close(&mut self) -> Result<()>;
429}
430
431pub trait PageIterator: Iterator<Item = Result<Box<dyn PageReader>>> + Send {}
433
434#[cfg(test)]
435mod tests {
436 use super::*;
437
438 #[test]
439 fn test_page() {
440 let data_page = Page::DataPage {
441 buf: Bytes::from(vec![0, 1, 2]),
442 num_values: 10,
443 encoding: Encoding::PLAIN,
444 def_level_encoding: Encoding::RLE,
445 rep_level_encoding: Encoding::RLE,
446 statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
447 };
448 assert_eq!(data_page.page_type(), PageType::DATA_PAGE);
449 assert_eq!(data_page.buffer(), vec![0, 1, 2].as_slice());
450 assert_eq!(data_page.num_values(), 10);
451 assert_eq!(data_page.encoding(), Encoding::PLAIN);
452 assert_eq!(
453 data_page.statistics(),
454 Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
455 );
456
457 let data_page_v2 = Page::DataPageV2 {
458 buf: Bytes::from(vec![0, 1, 2]),
459 num_values: 10,
460 encoding: Encoding::PLAIN,
461 num_nulls: 5,
462 num_rows: 20,
463 def_levels_byte_len: 30,
464 rep_levels_byte_len: 40,
465 is_compressed: false,
466 statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
467 };
468 assert_eq!(data_page_v2.page_type(), PageType::DATA_PAGE_V2);
469 assert_eq!(data_page_v2.buffer(), vec![0, 1, 2].as_slice());
470 assert_eq!(data_page_v2.num_values(), 10);
471 assert_eq!(data_page_v2.encoding(), Encoding::PLAIN);
472 assert_eq!(
473 data_page_v2.statistics(),
474 Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
475 );
476
477 let dict_page = Page::DictionaryPage {
478 buf: Bytes::from(vec![0, 1, 2]),
479 num_values: 10,
480 encoding: Encoding::PLAIN,
481 is_sorted: false,
482 };
483 assert_eq!(dict_page.page_type(), PageType::DICTIONARY_PAGE);
484 assert_eq!(dict_page.buffer(), vec![0, 1, 2].as_slice());
485 assert_eq!(dict_page.num_values(), 10);
486 assert_eq!(dict_page.encoding(), Encoding::PLAIN);
487 assert_eq!(dict_page.statistics(), None);
488 }
489
490 #[test]
491 fn test_compressed_page() {
492 let data_page = Page::DataPage {
493 buf: Bytes::from(vec![0, 1, 2]),
494 num_values: 10,
495 encoding: Encoding::PLAIN,
496 def_level_encoding: Encoding::RLE,
497 rep_level_encoding: Encoding::RLE,
498 statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
499 };
500
501 let cpage = CompressedPage::new(data_page, 5);
502
503 assert_eq!(cpage.page_type(), PageType::DATA_PAGE);
504 assert_eq!(cpage.uncompressed_size(), 5);
505 assert_eq!(cpage.compressed_size(), 3);
506 assert_eq!(cpage.num_values(), 10);
507 assert_eq!(cpage.encoding(), Encoding::PLAIN);
508 assert_eq!(cpage.data(), &[0, 1, 2]);
509 }
510
511 #[test]
512 fn test_compressed_page_uncompressed_size_overflow() {
513 let data_page = Page::DataPage {
515 buf: Bytes::from(vec![0, 1, 2]),
516 num_values: 10,
517 encoding: Encoding::PLAIN,
518 def_level_encoding: Encoding::RLE,
519 rep_level_encoding: Encoding::RLE,
520 statistics: None,
521 };
522
523 let uncompressed_size = (i32::MAX as usize) + 1;
525 let cpage = CompressedPage::new(data_page, uncompressed_size);
526
527 let result = cpage.to_thrift_header();
529 assert!(result.is_err());
530
531 let error_msg = result.unwrap_err().to_string();
532 assert!(error_msg.contains("Page uncompressed size overflow"));
533 }
534}