1use bytes::Bytes;
21
22use crate::basic::{Encoding, PageType};
23use crate::errors::{ParquetError, Result};
24use crate::file::statistics::Statistics;
25use crate::format::PageHeader;
26
27#[derive(Clone)]
33pub enum Page {
34 DataPage {
36 buf: Bytes,
38 num_values: u32,
40 encoding: Encoding,
42 def_level_encoding: Encoding,
44 rep_level_encoding: Encoding,
46 statistics: Option<Statistics>,
48 },
49 DataPageV2 {
51 buf: Bytes,
53 num_values: u32,
55 encoding: Encoding,
57 num_nulls: u32,
59 num_rows: u32,
61 def_levels_byte_len: u32,
63 rep_levels_byte_len: u32,
65 is_compressed: bool,
67 statistics: Option<Statistics>,
69 },
70 DictionaryPage {
72 buf: Bytes,
74 num_values: u32,
76 encoding: Encoding,
78 is_sorted: bool,
80 },
81}
82
83impl Page {
84 pub fn page_type(&self) -> PageType {
86 match self {
87 Page::DataPage { .. } => PageType::DATA_PAGE,
88 Page::DataPageV2 { .. } => PageType::DATA_PAGE_V2,
89 Page::DictionaryPage { .. } => PageType::DICTIONARY_PAGE,
90 }
91 }
92
93 pub fn is_data_page(&self) -> bool {
95 matches!(self, Page::DataPage { .. } | Page::DataPageV2 { .. })
96 }
97
98 pub fn is_dictionary_page(&self) -> bool {
100 matches!(self, Page::DictionaryPage { .. })
101 }
102
103 pub fn buffer(&self) -> &Bytes {
105 match self {
106 Page::DataPage { ref buf, .. } => buf,
107 Page::DataPageV2 { ref buf, .. } => buf,
108 Page::DictionaryPage { ref buf, .. } => buf,
109 }
110 }
111
112 pub fn num_values(&self) -> u32 {
114 match self {
115 Page::DataPage { num_values, .. } => *num_values,
116 Page::DataPageV2 { num_values, .. } => *num_values,
117 Page::DictionaryPage { num_values, .. } => *num_values,
118 }
119 }
120
121 pub fn encoding(&self) -> Encoding {
123 match self {
124 Page::DataPage { encoding, .. } => *encoding,
125 Page::DataPageV2 { encoding, .. } => *encoding,
126 Page::DictionaryPage { encoding, .. } => *encoding,
127 }
128 }
129
130 pub fn statistics(&self) -> Option<&Statistics> {
132 match self {
133 Page::DataPage { ref statistics, .. } => statistics.as_ref(),
134 Page::DataPageV2 { ref statistics, .. } => statistics.as_ref(),
135 Page::DictionaryPage { .. } => None,
136 }
137 }
138}
139
140pub struct CompressedPage {
146 compressed_page: Page,
147 uncompressed_size: usize,
148}
149
150impl CompressedPage {
151 pub fn new(compressed_page: Page, uncompressed_size: usize) -> Self {
154 Self {
155 compressed_page,
156 uncompressed_size,
157 }
158 }
159
160 pub fn page_type(&self) -> PageType {
162 self.compressed_page.page_type()
163 }
164
165 pub fn compressed_page(&self) -> &Page {
167 &self.compressed_page
168 }
169
170 pub fn uncompressed_size(&self) -> usize {
172 self.uncompressed_size
173 }
174
175 pub fn compressed_size(&self) -> usize {
180 self.compressed_page.buffer().len()
181 }
182
183 pub fn num_values(&self) -> u32 {
185 self.compressed_page.num_values()
186 }
187
188 pub fn encoding(&self) -> Encoding {
190 self.compressed_page.encoding()
191 }
192
193 pub fn data(&self) -> &[u8] {
195 self.compressed_page.buffer()
196 }
197
198 pub(crate) fn to_thrift_header(&self) -> PageHeader {
200 let uncompressed_size = self.uncompressed_size();
201 let compressed_size = self.compressed_size();
202 let num_values = self.num_values();
203 let encoding = self.encoding();
204 let page_type = self.page_type();
205
206 let mut page_header = PageHeader {
207 type_: page_type.into(),
208 uncompressed_page_size: uncompressed_size as i32,
209 compressed_page_size: compressed_size as i32,
210 crc: None,
212 data_page_header: None,
213 index_page_header: None,
214 dictionary_page_header: None,
215 data_page_header_v2: None,
216 };
217
218 match self.compressed_page {
219 Page::DataPage {
220 def_level_encoding,
221 rep_level_encoding,
222 ref statistics,
223 ..
224 } => {
225 let data_page_header = crate::format::DataPageHeader {
226 num_values: num_values as i32,
227 encoding: encoding.into(),
228 definition_level_encoding: def_level_encoding.into(),
229 repetition_level_encoding: rep_level_encoding.into(),
230 statistics: crate::file::statistics::to_thrift(statistics.as_ref()),
231 };
232 page_header.data_page_header = Some(data_page_header);
233 }
234 Page::DataPageV2 {
235 num_nulls,
236 num_rows,
237 def_levels_byte_len,
238 rep_levels_byte_len,
239 is_compressed,
240 ref statistics,
241 ..
242 } => {
243 let data_page_header_v2 = crate::format::DataPageHeaderV2 {
244 num_values: num_values as i32,
245 num_nulls: num_nulls as i32,
246 num_rows: num_rows as i32,
247 encoding: encoding.into(),
248 definition_levels_byte_length: def_levels_byte_len as i32,
249 repetition_levels_byte_length: rep_levels_byte_len as i32,
250 is_compressed: Some(is_compressed),
251 statistics: crate::file::statistics::to_thrift(statistics.as_ref()),
252 };
253 page_header.data_page_header_v2 = Some(data_page_header_v2);
254 }
255 Page::DictionaryPage { is_sorted, .. } => {
256 let dictionary_page_header = crate::format::DictionaryPageHeader {
257 num_values: num_values as i32,
258 encoding: encoding.into(),
259 is_sorted: Some(is_sorted),
260 };
261 page_header.dictionary_page_header = Some(dictionary_page_header);
262 }
263 }
264 page_header
265 }
266
267 #[cfg(feature = "encryption")]
271 pub(crate) fn with_new_compressed_buffer(mut self, new_buffer: Bytes) -> Self {
272 match &mut self.compressed_page {
273 Page::DataPage { buf, .. } => {
274 *buf = new_buffer;
275 }
276 Page::DataPageV2 { buf, .. } => {
277 *buf = new_buffer;
278 }
279 Page::DictionaryPage { buf, .. } => {
280 *buf = new_buffer;
281 }
282 }
283 self
284 }
285}
286
287pub struct PageWriteSpec {
289 pub page_type: PageType,
291 pub uncompressed_size: usize,
293 pub compressed_size: usize,
295 pub num_values: u32,
297 pub offset: u64,
299 pub bytes_written: u64,
301}
302
303impl Default for PageWriteSpec {
304 fn default() -> Self {
305 Self::new()
306 }
307}
308
309impl PageWriteSpec {
310 pub fn new() -> Self {
312 Self {
313 page_type: PageType::DATA_PAGE,
314 uncompressed_size: 0,
315 compressed_size: 0,
316 num_values: 0,
317 offset: 0,
318 bytes_written: 0,
319 }
320 }
321}
322
323#[derive(Clone)]
325pub struct PageMetadata {
326 pub num_rows: Option<usize>,
328 pub num_levels: Option<usize>,
330 pub is_dict: bool,
332}
333
334impl TryFrom<&PageHeader> for PageMetadata {
335 type Error = ParquetError;
336
337 fn try_from(value: &PageHeader) -> std::result::Result<Self, Self::Error> {
338 match value.type_ {
339 crate::format::PageType::DATA_PAGE => {
340 let header = value.data_page_header.as_ref().unwrap();
341 Ok(PageMetadata {
342 num_rows: None,
343 num_levels: Some(header.num_values as _),
344 is_dict: false,
345 })
346 }
347 crate::format::PageType::DICTIONARY_PAGE => Ok(PageMetadata {
348 num_rows: None,
349 num_levels: None,
350 is_dict: true,
351 }),
352 crate::format::PageType::DATA_PAGE_V2 => {
353 let header = value.data_page_header_v2.as_ref().unwrap();
354 Ok(PageMetadata {
355 num_rows: Some(header.num_rows as _),
356 num_levels: Some(header.num_values as _),
357 is_dict: false,
358 })
359 }
360 other => Err(ParquetError::General(format!(
361 "page type {other:?} cannot be converted to PageMetadata"
362 ))),
363 }
364 }
365}
366
367pub trait PageReader: Iterator<Item = Result<Page>> + Send {
370 fn get_next_page(&mut self) -> Result<Option<Page>>;
373
374 fn peek_next_page(&mut self) -> Result<Option<PageMetadata>>;
377
378 fn skip_next_page(&mut self) -> Result<()>;
381
382 fn at_record_boundary(&mut self) -> Result<bool> {
393 Ok(self.peek_next_page()?.is_none())
394 }
395}
396
397pub trait PageWriter: Send {
402 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
409
410 fn close(&mut self) -> Result<()>;
413}
414
415pub trait PageIterator: Iterator<Item = Result<Box<dyn PageReader>>> + Send {}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421
422 #[test]
423 fn test_page() {
424 let data_page = Page::DataPage {
425 buf: Bytes::from(vec![0, 1, 2]),
426 num_values: 10,
427 encoding: Encoding::PLAIN,
428 def_level_encoding: Encoding::RLE,
429 rep_level_encoding: Encoding::RLE,
430 statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
431 };
432 assert_eq!(data_page.page_type(), PageType::DATA_PAGE);
433 assert_eq!(data_page.buffer(), vec![0, 1, 2].as_slice());
434 assert_eq!(data_page.num_values(), 10);
435 assert_eq!(data_page.encoding(), Encoding::PLAIN);
436 assert_eq!(
437 data_page.statistics(),
438 Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
439 );
440
441 let data_page_v2 = Page::DataPageV2 {
442 buf: Bytes::from(vec![0, 1, 2]),
443 num_values: 10,
444 encoding: Encoding::PLAIN,
445 num_nulls: 5,
446 num_rows: 20,
447 def_levels_byte_len: 30,
448 rep_levels_byte_len: 40,
449 is_compressed: false,
450 statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
451 };
452 assert_eq!(data_page_v2.page_type(), PageType::DATA_PAGE_V2);
453 assert_eq!(data_page_v2.buffer(), vec![0, 1, 2].as_slice());
454 assert_eq!(data_page_v2.num_values(), 10);
455 assert_eq!(data_page_v2.encoding(), Encoding::PLAIN);
456 assert_eq!(
457 data_page_v2.statistics(),
458 Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
459 );
460
461 let dict_page = Page::DictionaryPage {
462 buf: Bytes::from(vec![0, 1, 2]),
463 num_values: 10,
464 encoding: Encoding::PLAIN,
465 is_sorted: false,
466 };
467 assert_eq!(dict_page.page_type(), PageType::DICTIONARY_PAGE);
468 assert_eq!(dict_page.buffer(), vec![0, 1, 2].as_slice());
469 assert_eq!(dict_page.num_values(), 10);
470 assert_eq!(dict_page.encoding(), Encoding::PLAIN);
471 assert_eq!(dict_page.statistics(), None);
472 }
473
474 #[test]
475 fn test_compressed_page() {
476 let data_page = Page::DataPage {
477 buf: Bytes::from(vec![0, 1, 2]),
478 num_values: 10,
479 encoding: Encoding::PLAIN,
480 def_level_encoding: Encoding::RLE,
481 rep_level_encoding: Encoding::RLE,
482 statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
483 };
484
485 let cpage = CompressedPage::new(data_page, 5);
486
487 assert_eq!(cpage.page_type(), PageType::DATA_PAGE);
488 assert_eq!(cpage.uncompressed_size(), 5);
489 assert_eq!(cpage.compressed_size(), 3);
490 assert_eq!(cpage.num_values(), 10);
491 assert_eq!(cpage.encoding(), Encoding::PLAIN);
492 assert_eq!(cpage.data(), &[0, 1, 2]);
493 }
494}