1use bytes::Bytes;
21
22use crate::basic::{Encoding, PageType};
23use crate::errors::{ParquetError, Result};
24use crate::file::statistics::Statistics;
25use crate::format::PageHeader;
26
27#[derive(Clone)]
33pub enum Page {
34 DataPage {
36 buf: Bytes,
38 num_values: u32,
40 encoding: Encoding,
42 def_level_encoding: Encoding,
44 rep_level_encoding: Encoding,
46 statistics: Option<Statistics>,
48 },
49 DataPageV2 {
51 buf: Bytes,
53 num_values: u32,
55 encoding: Encoding,
57 num_nulls: u32,
59 num_rows: u32,
61 def_levels_byte_len: u32,
63 rep_levels_byte_len: u32,
65 is_compressed: bool,
67 statistics: Option<Statistics>,
69 },
70 DictionaryPage {
72 buf: Bytes,
74 num_values: u32,
76 encoding: Encoding,
78 is_sorted: bool,
80 },
81}
82
83impl Page {
84 pub fn page_type(&self) -> PageType {
86 match self {
87 Page::DataPage { .. } => PageType::DATA_PAGE,
88 Page::DataPageV2 { .. } => PageType::DATA_PAGE_V2,
89 Page::DictionaryPage { .. } => PageType::DICTIONARY_PAGE,
90 }
91 }
92
93 pub fn buffer(&self) -> &Bytes {
95 match self {
96 Page::DataPage { ref buf, .. } => buf,
97 Page::DataPageV2 { ref buf, .. } => buf,
98 Page::DictionaryPage { ref buf, .. } => buf,
99 }
100 }
101
102 pub fn num_values(&self) -> u32 {
104 match self {
105 Page::DataPage { num_values, .. } => *num_values,
106 Page::DataPageV2 { num_values, .. } => *num_values,
107 Page::DictionaryPage { num_values, .. } => *num_values,
108 }
109 }
110
111 pub fn encoding(&self) -> Encoding {
113 match self {
114 Page::DataPage { encoding, .. } => *encoding,
115 Page::DataPageV2 { encoding, .. } => *encoding,
116 Page::DictionaryPage { encoding, .. } => *encoding,
117 }
118 }
119
120 pub fn statistics(&self) -> Option<&Statistics> {
122 match self {
123 Page::DataPage { ref statistics, .. } => statistics.as_ref(),
124 Page::DataPageV2 { ref statistics, .. } => statistics.as_ref(),
125 Page::DictionaryPage { .. } => None,
126 }
127 }
128}
129
130pub struct CompressedPage {
136 compressed_page: Page,
137 uncompressed_size: usize,
138}
139
140impl CompressedPage {
141 pub fn new(compressed_page: Page, uncompressed_size: usize) -> Self {
144 Self {
145 compressed_page,
146 uncompressed_size,
147 }
148 }
149
150 pub fn page_type(&self) -> PageType {
152 self.compressed_page.page_type()
153 }
154
155 pub fn compressed_page(&self) -> &Page {
157 &self.compressed_page
158 }
159
160 pub fn uncompressed_size(&self) -> usize {
162 self.uncompressed_size
163 }
164
165 pub fn compressed_size(&self) -> usize {
170 self.compressed_page.buffer().len()
171 }
172
173 pub fn num_values(&self) -> u32 {
175 self.compressed_page.num_values()
176 }
177
178 pub fn encoding(&self) -> Encoding {
180 self.compressed_page.encoding()
181 }
182
183 pub fn data(&self) -> &[u8] {
185 self.compressed_page.buffer()
186 }
187
188 pub(crate) fn to_thrift_header(&self) -> PageHeader {
190 let uncompressed_size = self.uncompressed_size();
191 let compressed_size = self.compressed_size();
192 let num_values = self.num_values();
193 let encoding = self.encoding();
194 let page_type = self.page_type();
195
196 let mut page_header = PageHeader {
197 type_: page_type.into(),
198 uncompressed_page_size: uncompressed_size as i32,
199 compressed_page_size: compressed_size as i32,
200 crc: None,
202 data_page_header: None,
203 index_page_header: None,
204 dictionary_page_header: None,
205 data_page_header_v2: None,
206 };
207
208 match self.compressed_page {
209 Page::DataPage {
210 def_level_encoding,
211 rep_level_encoding,
212 ref statistics,
213 ..
214 } => {
215 let data_page_header = crate::format::DataPageHeader {
216 num_values: num_values as i32,
217 encoding: encoding.into(),
218 definition_level_encoding: def_level_encoding.into(),
219 repetition_level_encoding: rep_level_encoding.into(),
220 statistics: crate::file::statistics::to_thrift(statistics.as_ref()),
221 };
222 page_header.data_page_header = Some(data_page_header);
223 }
224 Page::DataPageV2 {
225 num_nulls,
226 num_rows,
227 def_levels_byte_len,
228 rep_levels_byte_len,
229 is_compressed,
230 ref statistics,
231 ..
232 } => {
233 let data_page_header_v2 = crate::format::DataPageHeaderV2 {
234 num_values: num_values as i32,
235 num_nulls: num_nulls as i32,
236 num_rows: num_rows as i32,
237 encoding: encoding.into(),
238 definition_levels_byte_length: def_levels_byte_len as i32,
239 repetition_levels_byte_length: rep_levels_byte_len as i32,
240 is_compressed: Some(is_compressed),
241 statistics: crate::file::statistics::to_thrift(statistics.as_ref()),
242 };
243 page_header.data_page_header_v2 = Some(data_page_header_v2);
244 }
245 Page::DictionaryPage { is_sorted, .. } => {
246 let dictionary_page_header = crate::format::DictionaryPageHeader {
247 num_values: num_values as i32,
248 encoding: encoding.into(),
249 is_sorted: Some(is_sorted),
250 };
251 page_header.dictionary_page_header = Some(dictionary_page_header);
252 }
253 }
254 page_header
255 }
256}
257
258pub struct PageWriteSpec {
260 pub page_type: PageType,
262 pub uncompressed_size: usize,
264 pub compressed_size: usize,
266 pub num_values: u32,
268 pub offset: u64,
270 pub bytes_written: u64,
272}
273
274impl Default for PageWriteSpec {
275 fn default() -> Self {
276 Self::new()
277 }
278}
279
280impl PageWriteSpec {
281 pub fn new() -> Self {
283 Self {
284 page_type: PageType::DATA_PAGE,
285 uncompressed_size: 0,
286 compressed_size: 0,
287 num_values: 0,
288 offset: 0,
289 bytes_written: 0,
290 }
291 }
292}
293
294#[derive(Clone)]
296pub struct PageMetadata {
297 pub num_rows: Option<usize>,
299 pub num_levels: Option<usize>,
301 pub is_dict: bool,
303}
304
305impl TryFrom<&PageHeader> for PageMetadata {
306 type Error = ParquetError;
307
308 fn try_from(value: &PageHeader) -> std::result::Result<Self, Self::Error> {
309 match value.type_ {
310 crate::format::PageType::DATA_PAGE => {
311 let header = value.data_page_header.as_ref().unwrap();
312 Ok(PageMetadata {
313 num_rows: None,
314 num_levels: Some(header.num_values as _),
315 is_dict: false,
316 })
317 }
318 crate::format::PageType::DICTIONARY_PAGE => Ok(PageMetadata {
319 num_rows: None,
320 num_levels: None,
321 is_dict: true,
322 }),
323 crate::format::PageType::DATA_PAGE_V2 => {
324 let header = value.data_page_header_v2.as_ref().unwrap();
325 Ok(PageMetadata {
326 num_rows: Some(header.num_rows as _),
327 num_levels: Some(header.num_values as _),
328 is_dict: false,
329 })
330 }
331 other => Err(ParquetError::General(format!(
332 "page type {other:?} cannot be converted to PageMetadata"
333 ))),
334 }
335 }
336}
337
338pub trait PageReader: Iterator<Item = Result<Page>> + Send {
341 fn get_next_page(&mut self) -> Result<Option<Page>>;
344
345 fn peek_next_page(&mut self) -> Result<Option<PageMetadata>>;
348
349 fn skip_next_page(&mut self) -> Result<()>;
352
353 fn at_record_boundary(&mut self) -> Result<bool> {
364 Ok(self.peek_next_page()?.is_none())
365 }
366}
367
368pub trait PageWriter: Send {
373 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
380
381 fn close(&mut self) -> Result<()>;
384}
385
386pub trait PageIterator: Iterator<Item = Result<Box<dyn PageReader>>> + Send {}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392
393 #[test]
394 fn test_page() {
395 let data_page = Page::DataPage {
396 buf: Bytes::from(vec![0, 1, 2]),
397 num_values: 10,
398 encoding: Encoding::PLAIN,
399 def_level_encoding: Encoding::RLE,
400 rep_level_encoding: Encoding::RLE,
401 statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
402 };
403 assert_eq!(data_page.page_type(), PageType::DATA_PAGE);
404 assert_eq!(data_page.buffer(), vec![0, 1, 2].as_slice());
405 assert_eq!(data_page.num_values(), 10);
406 assert_eq!(data_page.encoding(), Encoding::PLAIN);
407 assert_eq!(
408 data_page.statistics(),
409 Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
410 );
411
412 let data_page_v2 = Page::DataPageV2 {
413 buf: Bytes::from(vec![0, 1, 2]),
414 num_values: 10,
415 encoding: Encoding::PLAIN,
416 num_nulls: 5,
417 num_rows: 20,
418 def_levels_byte_len: 30,
419 rep_levels_byte_len: 40,
420 is_compressed: false,
421 statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
422 };
423 assert_eq!(data_page_v2.page_type(), PageType::DATA_PAGE_V2);
424 assert_eq!(data_page_v2.buffer(), vec![0, 1, 2].as_slice());
425 assert_eq!(data_page_v2.num_values(), 10);
426 assert_eq!(data_page_v2.encoding(), Encoding::PLAIN);
427 assert_eq!(
428 data_page_v2.statistics(),
429 Some(&Statistics::int32(Some(1), Some(2), None, Some(1), true))
430 );
431
432 let dict_page = Page::DictionaryPage {
433 buf: Bytes::from(vec![0, 1, 2]),
434 num_values: 10,
435 encoding: Encoding::PLAIN,
436 is_sorted: false,
437 };
438 assert_eq!(dict_page.page_type(), PageType::DICTIONARY_PAGE);
439 assert_eq!(dict_page.buffer(), vec![0, 1, 2].as_slice());
440 assert_eq!(dict_page.num_values(), 10);
441 assert_eq!(dict_page.encoding(), Encoding::PLAIN);
442 assert_eq!(dict_page.statistics(), None);
443 }
444
445 #[test]
446 fn test_compressed_page() {
447 let data_page = Page::DataPage {
448 buf: Bytes::from(vec![0, 1, 2]),
449 num_values: 10,
450 encoding: Encoding::PLAIN,
451 def_level_encoding: Encoding::RLE,
452 rep_level_encoding: Encoding::RLE,
453 statistics: Some(Statistics::int32(Some(1), Some(2), None, Some(1), true)),
454 };
455
456 let cpage = CompressedPage::new(data_page, 5);
457
458 assert_eq!(cpage.page_type(), PageType::DATA_PAGE);
459 assert_eq!(cpage.uncompressed_size(), 5);
460 assert_eq!(cpage.compressed_size(), 3);
461 assert_eq!(cpage.num_values(), 10);
462 assert_eq!(cpage.encoding(), Encoding::PLAIN);
463 assert_eq!(cpage.data(), &[0, 1, 2]);
464 }
465}