1use bytes::Bytes;
21
22use crate::basic::{Encoding, PageType};
23use crate::errors::{ParquetError, Result};
24use crate::file::statistics::Statistics;
25use crate::format::PageHeader;
26
27#[derive(Clone)]
33pub enum Page {
34 DataPage {
36 buf: Bytes,
38 num_values: u32,
40 encoding: Encoding,
42 def_level_encoding: Encoding,
44 rep_level_encoding: Encoding,
46 statistics: Option<Statistics>,
48 },
49 DataPageV2 {
51 buf: Bytes,
53 num_values: u32,
55 encoding: Encoding,
57 num_nulls: u32,
59 num_rows: u32,
61 def_levels_byte_len: u32,
63 rep_levels_byte_len: u32,
65 is_compressed: bool,
67 statistics: Option<Statistics>,
69 },
70 DictionaryPage {
72 buf: Bytes,
74 num_values: u32,
76 encoding: Encoding,
78 is_sorted: bool,
80 },
81}
82
83impl Page {
84 pub fn page_type(&self) -> PageType {
86 match self {
87 Page::DataPage { .. } => PageType::DATA_PAGE,
88 Page::DataPageV2 { .. } => PageType::DATA_PAGE_V2,
89 Page::DictionaryPage { .. } => PageType::DICTIONARY_PAGE,
90 }
91 }
92
93 pub fn is_data_page(&self) -> bool {
95 matches!(self, Page::DataPage { .. } | Page::DataPageV2 { .. })
96 }
97
98 pub fn is_dictionary_page(&self) -> bool {
100 matches!(self, Page::DictionaryPage { .. })
101 }
102
103 pub fn buffer(&self) -> &Bytes {
105 match self {
106 Page::DataPage { ref buf, .. } => buf,
107 Page::DataPageV2 { ref buf, .. } => buf,
108 Page::DictionaryPage { ref buf, .. } => buf,
109 }
110 }
111
112 pub fn num_values(&self) -> u32 {
114 match self {
115 Page::DataPage { num_values, .. } => *num_values,
116 Page::DataPageV2 { num_values, .. } => *num_values,
117 Page::DictionaryPage { num_values, .. } => *num_values,
118 }
119 }
120
121 pub fn encoding(&self) -> Encoding {
123 match self {
124 Page::DataPage { encoding, .. } => *encoding,
125 Page::DataPageV2 { encoding, .. } => *encoding,
126 Page::DictionaryPage { encoding, .. } => *encoding,
127 }
128 }
129
130 pub fn statistics(&self) -> Option<&Statistics> {
132 match self {
133 Page::DataPage { ref statistics, .. } => statistics.as_ref(),
134 Page::DataPageV2 { ref statistics, .. } => statistics.as_ref(),
135 Page::DictionaryPage { .. } => None,
136 }
137 }
138}
139
140pub struct CompressedPage {
146 compressed_page: Page,
147 uncompressed_size: usize,
148}
149
150impl CompressedPage {
151 pub fn new(compressed_page: Page, uncompressed_size: usize) -> Self {
154 Self {
155 compressed_page,
156 uncompressed_size,
157 }
158 }
159
160 pub fn page_type(&self) -> PageType {
162 self.compressed_page.page_type()
163 }
164
165 pub fn compressed_page(&self) -> &Page {
167 &self.compressed_page
168 }
169
170 pub fn uncompressed_size(&self) -> usize {
172 self.uncompressed_size
173 }
174
175 pub fn compressed_size(&self) -> usize {
180 self.compressed_page.buffer().len()
181 }
182
183 pub fn num_values(&self) -> u32 {
185 self.compressed_page.num_values()
186 }
187
188 pub fn encoding(&self) -> Encoding {
190 self.compressed_page.encoding()
191 }
192
193 pub fn data(&self) -> &[u8] {
195 self.compressed_page.buffer()
196 }
197
198 pub(crate) fn to_thrift_header(&self) -> Result<PageHeader> {
200 let uncompressed_size = self.uncompressed_size();
201 let compressed_size = self.compressed_size();
202 if uncompressed_size > i32::MAX as usize {
203 return Err(general_err!(
204 "Page uncompressed size overflow: {}",
205 uncompressed_size
206 ));
207 }
208 if compressed_size > i32::MAX as usize {
209 return Err(general_err!(
210 "Page compressed size overflow: {}",
211 compressed_size
212 ));
213 }
214 let num_values = self.num_values();
215 let encoding = self.encoding();
216 let page_type = self.page_type();
217
218 let mut page_header = PageHeader {
219 type_: page_type.into(),
220 uncompressed_page_size: uncompressed_size as i32,
221 compressed_page_size: compressed_size as i32,
222 crc: None,
224 data_page_header: None,
225 index_page_header: None,
226 dictionary_page_header: None,
227 data_page_header_v2: None,
228 };
229
230 match self.compressed_page {
231 Page::DataPage {
232 def_level_encoding,
233 rep_level_encoding,
234 ref statistics,
235 ..
236 } => {
237 let data_page_header = crate::format::DataPageHeader {
238 num_values: num_values as i32,
239 encoding: encoding.into(),
240 definition_level_encoding: def_level_encoding.into(),
241 repetition_level_encoding: rep_level_encoding.into(),
242 statistics: crate::file::statistics::to_thrift(statistics.as_ref()),
243 };
244 page_header.data_page_header = Some(data_page_header);
245 }
246 Page::DataPageV2 {
247 num_nulls,
248 num_rows,
249 def_levels_byte_len,
250 rep_levels_byte_len,
251 is_compressed,
252 ref statistics,
253 ..
254 } => {
255 let data_page_header_v2 = crate::format::DataPageHeaderV2 {
256 num_values: num_values as i32,
257 num_nulls: num_nulls as i32,
258 num_rows: num_rows as i32,
259 encoding: encoding.into(),
260 definition_levels_byte_length: def_levels_byte_len as i32,
261 repetition_levels_byte_length: rep_levels_byte_len as i32,
262 is_compressed: Some(is_compressed),
263 statistics: crate::file::statistics::to_thrift(statistics.as_ref()),
264 };
265 page_header.data_page_header_v2 = Some(data_page_header_v2);
266 }
267 Page::DictionaryPage { is_sorted, .. } => {
268 let dictionary_page_header = crate::format::DictionaryPageHeader {
269 num_values: num_values as i32,
270 encoding: encoding.into(),
271 is_sorted: Some(is_sorted),
272 };
273 page_header.dictionary_page_header = Some(dictionary_page_header);
274 }
275 }
276 Ok(page_header)
277 }
278
279 #[cfg(feature = "encryption")]
283 pub(crate) fn with_new_compressed_buffer(mut self, new_buffer: Bytes) -> Self {
284 match &mut self.compressed_page {
285 Page::DataPage { buf, .. } => {
286 *buf = new_buffer;
287 }
288 Page::DataPageV2 { buf, .. } => {
289 *buf = new_buffer;
290 }
291 Page::DictionaryPage { buf, .. } => {
292 *buf = new_buffer;
293 }
294 }
295 self
296 }
297}
298
299pub struct PageWriteSpec {
301 pub page_type: PageType,
303 pub uncompressed_size: usize,
305 pub compressed_size: usize,
307 pub num_values: u32,
309 pub offset: u64,
311 pub bytes_written: u64,
313}
314
315impl Default for PageWriteSpec {
316 fn default() -> Self {
317 Self::new()
318 }
319}
320
321impl PageWriteSpec {
322 pub fn new() -> Self {
324 Self {
325 page_type: PageType::DATA_PAGE,
326 uncompressed_size: 0,
327 compressed_size: 0,
328 num_values: 0,
329 offset: 0,
330 bytes_written: 0,
331 }
332 }
333}
334
335#[derive(Clone)]
337pub struct PageMetadata {
338 pub num_rows: Option<usize>,
340 pub num_levels: Option<usize>,
342 pub is_dict: bool,
344}
345
346impl TryFrom<&PageHeader> for PageMetadata {
347 type Error = ParquetError;
348
349 fn try_from(value: &PageHeader) -> std::result::Result<Self, Self::Error> {
350 match value.type_ {
351 crate::format::PageType::DATA_PAGE => {
352 let header = value.data_page_header.as_ref().unwrap();
353 Ok(PageMetadata {
354 num_rows: None,
355 num_levels: Some(header.num_values as _),
356 is_dict: false,
357 })
358 }
359 crate::format::PageType::DICTIONARY_PAGE => Ok(PageMetadata {
360 num_rows: None,
361 num_levels: None,
362 is_dict: true,
363 }),
364 crate::format::PageType::DATA_PAGE_V2 => {
365 let header = value.data_page_header_v2.as_ref().unwrap();
366 Ok(PageMetadata {
367 num_rows: Some(header.num_rows as _),
368 num_levels: Some(header.num_values as _),
369 is_dict: false,
370 })
371 }
372 other => Err(ParquetError::General(format!(
373 "page type {other:?} cannot be converted to PageMetadata"
374 ))),
375 }
376 }
377}
378
379pub trait PageReader: Iterator<Item = Result<Page>> + Send {
382 fn get_next_page(&mut self) -> Result<Option<Page>>;
385
386 fn peek_next_page(&mut self) -> Result<Option<PageMetadata>>;
389
390 fn skip_next_page(&mut self) -> Result<()>;
393
394 fn at_record_boundary(&mut self) -> Result<bool> {
405 Ok(self.peek_next_page()?.is_none())
406 }
407}
408
409pub trait PageWriter: Send {
414 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
421
422 fn close(&mut self) -> Result<()>;
425}
426
427pub trait PageIterator: Iterator<Item = Result<Box<dyn PageReader>>> + Send {}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433
434 #[test]
435 fn test_page() {
436 let data_page = Page::DataPage {
437 buf: Bytes::from(vec![0, 1, 2]),
438 num_values: 10,
439 encoding: Encoding::PLAIN,
440 def_level_encoding: Encoding::RLE,
441 rep_level_encoding: Encoding::RLE,
442 statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
443 };
444 assert_eq!(data_page.page_type(), PageType::DATA_PAGE);
445 assert_eq!(data_page.buffer(), vec![0, 1, 2].as_slice());
446 assert_eq!(data_page.num_values(), 10);
447 assert_eq!(data_page.encoding(), Encoding::PLAIN);
448 assert_eq!(
449 data_page.statistics(),
450 Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
451 );
452
453 let data_page_v2 = Page::DataPageV2 {
454 buf: Bytes::from(vec![0, 1, 2]),
455 num_values: 10,
456 encoding: Encoding::PLAIN,
457 num_nulls: 5,
458 num_rows: 20,
459 def_levels_byte_len: 30,
460 rep_levels_byte_len: 40,
461 is_compressed: false,
462 statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
463 };
464 assert_eq!(data_page_v2.page_type(), PageType::DATA_PAGE_V2);
465 assert_eq!(data_page_v2.buffer(), vec![0, 1, 2].as_slice());
466 assert_eq!(data_page_v2.num_values(), 10);
467 assert_eq!(data_page_v2.encoding(), Encoding::PLAIN);
468 assert_eq!(
469 data_page_v2.statistics(),
470 Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
471 );
472
473 let dict_page = Page::DictionaryPage {
474 buf: Bytes::from(vec![0, 1, 2]),
475 num_values: 10,
476 encoding: Encoding::PLAIN,
477 is_sorted: false,
478 };
479 assert_eq!(dict_page.page_type(), PageType::DICTIONARY_PAGE);
480 assert_eq!(dict_page.buffer(), vec![0, 1, 2].as_slice());
481 assert_eq!(dict_page.num_values(), 10);
482 assert_eq!(dict_page.encoding(), Encoding::PLAIN);
483 assert_eq!(dict_page.statistics(), None);
484 }
485
486 #[test]
487 fn test_compressed_page() {
488 let data_page = Page::DataPage {
489 buf: Bytes::from(vec![0, 1, 2]),
490 num_values: 10,
491 encoding: Encoding::PLAIN,
492 def_level_encoding: Encoding::RLE,
493 rep_level_encoding: Encoding::RLE,
494 statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
495 };
496
497 let cpage = CompressedPage::new(data_page, 5);
498
499 assert_eq!(cpage.page_type(), PageType::DATA_PAGE);
500 assert_eq!(cpage.uncompressed_size(), 5);
501 assert_eq!(cpage.compressed_size(), 3);
502 assert_eq!(cpage.num_values(), 10);
503 assert_eq!(cpage.encoding(), Encoding::PLAIN);
504 assert_eq!(cpage.data(), &[0, 1, 2]);
505 }
506
507 #[test]
508 fn test_compressed_page_uncompressed_size_overflow() {
509 let data_page = Page::DataPage {
511 buf: Bytes::from(vec![0, 1, 2]),
512 num_values: 10,
513 encoding: Encoding::PLAIN,
514 def_level_encoding: Encoding::RLE,
515 rep_level_encoding: Encoding::RLE,
516 statistics: None,
517 };
518
519 let uncompressed_size = (i32::MAX as usize) + 1;
521 let cpage = CompressedPage::new(data_page, uncompressed_size);
522
523 let result = cpage.to_thrift_header();
525 assert!(result.is_err());
526
527 let error_msg = result.unwrap_err().to_string();
528 assert!(error_msg.contains("Page uncompressed size overflow"));
529 }
530}