1use crate::basic::{PageType, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{Codec, create_codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{CryptoContext, read_and_decrypt};
27use crate::errors::{ParquetError, Result};
28use crate::file::metadata::thrift::PageHeader;
29use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
30use crate::file::statistics;
31use crate::file::{
32 metadata::*,
33 properties::{ReaderProperties, ReaderPropertiesPtr},
34 reader::*,
35};
36#[cfg(feature = "encryption")]
37use crate::parquet_thrift::ThriftSliceInputProtocol;
38use crate::parquet_thrift::{ReadThrift, ThriftReadInputProtocol};
39use crate::record::Row;
40use crate::record::reader::RowIter;
41use crate::schema::types::Type as SchemaType;
42use bytes::Bytes;
43use std::collections::VecDeque;
44use std::{fs::File, io::Read, path::Path, sync::Arc};
45
46impl TryFrom<File> for SerializedFileReader<File> {
47 type Error = ParquetError;
48
49 fn try_from(file: File) -> Result<Self> {
50 Self::new(file)
51 }
52}
53
54impl TryFrom<&Path> for SerializedFileReader<File> {
55 type Error = ParquetError;
56
57 fn try_from(path: &Path) -> Result<Self> {
58 let file = File::open(path)?;
59 Self::try_from(file)
60 }
61}
62
63impl TryFrom<String> for SerializedFileReader<File> {
64 type Error = ParquetError;
65
66 fn try_from(path: String) -> Result<Self> {
67 Self::try_from(Path::new(&path))
68 }
69}
70
71impl TryFrom<&str> for SerializedFileReader<File> {
72 type Error = ParquetError;
73
74 fn try_from(path: &str) -> Result<Self> {
75 Self::try_from(Path::new(&path))
76 }
77}
78
79impl IntoIterator for SerializedFileReader<File> {
82 type Item = Result<Row>;
83 type IntoIter = RowIter<'static>;
84
85 fn into_iter(self) -> Self::IntoIter {
86 RowIter::from_file_into(Box::new(self))
87 }
88}
89
90pub struct SerializedFileReader<R: ChunkReader> {
95 chunk_reader: Arc<R>,
96 metadata: Arc<ParquetMetaData>,
97 props: ReaderPropertiesPtr,
98}
99
100pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
104
105#[derive(Default)]
109pub struct ReadOptionsBuilder {
110 predicates: Vec<ReadGroupPredicate>,
111 enable_page_index: bool,
112 props: Option<ReaderProperties>,
113}
114
115impl ReadOptionsBuilder {
116 pub fn new() -> Self {
118 Self::default()
119 }
120
121 pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
124 self.predicates.push(predicate);
125 self
126 }
127
128 pub fn with_range(mut self, start: i64, end: i64) -> Self {
131 assert!(start < end);
132 let predicate = move |rg: &RowGroupMetaData, _: usize| {
133 let mid = get_midpoint_offset(rg);
134 mid >= start && mid < end
135 };
136 self.predicates.push(Box::new(predicate));
137 self
138 }
139
140 pub fn with_page_index(mut self) -> Self {
145 self.enable_page_index = true;
146 self
147 }
148
149 pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
151 self.props = Some(properties);
152 self
153 }
154
155 pub fn build(self) -> ReadOptions {
157 let props = self
158 .props
159 .unwrap_or_else(|| ReaderProperties::builder().build());
160 ReadOptions {
161 predicates: self.predicates,
162 enable_page_index: self.enable_page_index,
163 props,
164 }
165 }
166}
167
168pub struct ReadOptions {
173 predicates: Vec<ReadGroupPredicate>,
174 enable_page_index: bool,
175 props: ReaderProperties,
176}
177
178impl<R: 'static + ChunkReader> SerializedFileReader<R> {
179 pub fn new(chunk_reader: R) -> Result<Self> {
182 let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
183 let props = Arc::new(ReaderProperties::builder().build());
184 Ok(Self {
185 chunk_reader: Arc::new(chunk_reader),
186 metadata: Arc::new(metadata),
187 props,
188 })
189 }
190
191 #[allow(deprecated)]
194 pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
195 let mut metadata_builder = ParquetMetaDataReader::new()
196 .parse_and_finish(&chunk_reader)?
197 .into_builder();
198 let mut predicates = options.predicates;
199
200 for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
202 let mut keep = true;
203 for predicate in &mut predicates {
204 if !predicate(&rg_meta, i) {
205 keep = false;
206 break;
207 }
208 }
209 if keep {
210 metadata_builder = metadata_builder.add_row_group(rg_meta);
211 }
212 }
213
214 let mut metadata = metadata_builder.build();
215
216 if options.enable_page_index {
218 let mut reader =
219 ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
220 reader.read_page_indexes(&chunk_reader)?;
221 metadata = reader.finish()?;
222 }
223
224 Ok(Self {
225 chunk_reader: Arc::new(chunk_reader),
226 metadata: Arc::new(metadata),
227 props: Arc::new(options.props),
228 })
229 }
230}
231
232fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
234 let col = meta.column(0);
235 let mut offset = col.data_page_offset();
236 if let Some(dic_offset) = col.dictionary_page_offset() {
237 if offset > dic_offset {
238 offset = dic_offset
239 }
240 };
241 offset + meta.compressed_size() / 2
242}
243
244impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
245 fn metadata(&self) -> &ParquetMetaData {
246 &self.metadata
247 }
248
249 fn num_row_groups(&self) -> usize {
250 self.metadata.num_row_groups()
251 }
252
253 fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
254 let row_group_metadata = self.metadata.row_group(i);
255 let props = Arc::clone(&self.props);
257 let f = Arc::clone(&self.chunk_reader);
258 Ok(Box::new(SerializedRowGroupReader::new(
259 f,
260 row_group_metadata,
261 self.metadata.offset_index().map(|x| x[i].as_slice()),
262 props,
263 )?))
264 }
265
266 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
267 RowIter::from_file(projection, self)
268 }
269}
270
271pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
273 chunk_reader: Arc<R>,
274 metadata: &'a RowGroupMetaData,
275 offset_index: Option<&'a [OffsetIndexMetaData]>,
276 props: ReaderPropertiesPtr,
277 bloom_filters: Vec<Option<Sbbf>>,
278}
279
280impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
281 pub fn new(
283 chunk_reader: Arc<R>,
284 metadata: &'a RowGroupMetaData,
285 offset_index: Option<&'a [OffsetIndexMetaData]>,
286 props: ReaderPropertiesPtr,
287 ) -> Result<Self> {
288 let bloom_filters = if props.read_bloom_filter() {
289 metadata
290 .columns()
291 .iter()
292 .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
293 .collect::<Result<Vec<_>>>()?
294 } else {
295 std::iter::repeat_n(None, metadata.columns().len()).collect()
296 };
297 Ok(Self {
298 chunk_reader,
299 metadata,
300 offset_index,
301 props,
302 bloom_filters,
303 })
304 }
305}
306
307impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
308 fn metadata(&self) -> &RowGroupMetaData {
309 self.metadata
310 }
311
312 fn num_columns(&self) -> usize {
313 self.metadata.num_columns()
314 }
315
316 fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
318 let col = self.metadata.column(i);
319
320 let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
321
322 let props = Arc::clone(&self.props);
323 Ok(Box::new(SerializedPageReader::new_with_properties(
324 Arc::clone(&self.chunk_reader),
325 col,
326 usize::try_from(self.metadata.num_rows())?,
327 page_locations,
328 props,
329 )?))
330 }
331
332 fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
334 self.bloom_filters[i].as_ref()
335 }
336
337 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
338 RowIter::from_row_group(projection, self)
339 }
340}
341
342pub(crate) fn decode_page(
344 page_header: PageHeader,
345 buffer: Bytes,
346 physical_type: Type,
347 decompressor: Option<&mut Box<dyn Codec>>,
348) -> Result<Page> {
349 #[cfg(feature = "crc")]
351 if let Some(expected_crc) = page_header.crc {
352 let crc = crc32fast::hash(&buffer);
353 if crc != expected_crc as u32 {
354 return Err(general_err!("Page CRC checksum mismatch"));
355 }
356 }
357
358 let mut offset: usize = 0;
365 let mut can_decompress = true;
366
367 if let Some(ref header_v2) = page_header.data_page_header_v2 {
368 if header_v2.definition_levels_byte_length < 0
369 || header_v2.repetition_levels_byte_length < 0
370 || header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
371 > page_header.uncompressed_page_size
372 {
373 return Err(general_err!(
374 "DataPage v2 header contains implausible values \
375 for definition_levels_byte_length ({}) \
376 and repetition_levels_byte_length ({}) \
377 given DataPage header provides uncompressed_page_size ({})",
378 header_v2.definition_levels_byte_length,
379 header_v2.repetition_levels_byte_length,
380 page_header.uncompressed_page_size
381 ));
382 }
383 offset = usize::try_from(
384 header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
385 )?;
386 can_decompress = header_v2.is_compressed.unwrap_or(true);
388 }
389
390 let buffer = match decompressor {
393 Some(decompressor) if can_decompress => {
394 let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
395 if offset > buffer.len() || offset > uncompressed_page_size {
396 return Err(general_err!("Invalid page header"));
397 }
398 let decompressed_size = uncompressed_page_size - offset;
399 let mut decompressed = Vec::with_capacity(uncompressed_page_size);
400 decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
401 if decompressed_size > 0 {
402 let compressed = &buffer.as_ref()[offset..];
403 decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
404 }
405
406 if decompressed.len() != uncompressed_page_size {
407 return Err(general_err!(
408 "Actual decompressed size doesn't match the expected one ({} vs {})",
409 decompressed.len(),
410 uncompressed_page_size
411 ));
412 }
413
414 Bytes::from(decompressed)
415 }
416 _ => buffer,
417 };
418
419 let result = match page_header.r#type {
420 PageType::DICTIONARY_PAGE => {
421 let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
422 ParquetError::General("Missing dictionary page header".to_string())
423 })?;
424 let is_sorted = dict_header.is_sorted.unwrap_or(false);
425 Page::DictionaryPage {
426 buf: buffer,
427 num_values: dict_header.num_values.try_into()?,
428 encoding: dict_header.encoding,
429 is_sorted,
430 }
431 }
432 PageType::DATA_PAGE => {
433 let header = page_header
434 .data_page_header
435 .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
436 Page::DataPage {
437 buf: buffer,
438 num_values: header.num_values.try_into()?,
439 encoding: header.encoding,
440 def_level_encoding: header.definition_level_encoding,
441 rep_level_encoding: header.repetition_level_encoding,
442 statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
443 }
444 }
445 PageType::DATA_PAGE_V2 => {
446 let header = page_header
447 .data_page_header_v2
448 .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
449 let is_compressed = header.is_compressed.unwrap_or(true);
450 Page::DataPageV2 {
451 buf: buffer,
452 num_values: header.num_values.try_into()?,
453 encoding: header.encoding,
454 num_nulls: header.num_nulls.try_into()?,
455 num_rows: header.num_rows.try_into()?,
456 def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
457 rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
458 is_compressed,
459 statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
460 }
461 }
462 _ => {
463 return Err(general_err!(
465 "Page type {:?} is not supported",
466 page_header.r#type
467 ));
468 }
469 };
470
471 Ok(result)
472}
473
474enum SerializedPageReaderState {
475 Values {
476 offset: u64,
479
480 remaining_bytes: u64,
483
484 next_page_header: Option<Box<PageHeader>>,
486
487 page_index: usize,
489
490 require_dictionary: bool,
492 },
493 Pages {
494 page_locations: VecDeque<PageLocation>,
496 dictionary_page: Option<PageLocation>,
498 total_rows: usize,
500 page_index: usize,
502 },
503}
504
505#[derive(Default)]
506struct SerializedPageReaderContext {
507 read_stats: bool,
509 #[cfg(feature = "encryption")]
511 crypto_context: Option<Arc<CryptoContext>>,
512}
513
514pub struct SerializedPageReader<R: ChunkReader> {
516 reader: Arc<R>,
518
519 decompressor: Option<Box<dyn Codec>>,
521
522 physical_type: Type,
524
525 state: SerializedPageReaderState,
526
527 context: SerializedPageReaderContext,
528}
529
530impl<R: ChunkReader> SerializedPageReader<R> {
531 pub fn new(
533 reader: Arc<R>,
534 column_chunk_metadata: &ColumnChunkMetaData,
535 total_rows: usize,
536 page_locations: Option<Vec<PageLocation>>,
537 ) -> Result<Self> {
538 let props = Arc::new(ReaderProperties::builder().build());
539 SerializedPageReader::new_with_properties(
540 reader,
541 column_chunk_metadata,
542 total_rows,
543 page_locations,
544 props,
545 )
546 }
547
548 #[cfg(all(feature = "arrow", not(feature = "encryption")))]
550 pub(crate) fn add_crypto_context(
551 self,
552 _rg_idx: usize,
553 _column_idx: usize,
554 _parquet_meta_data: &ParquetMetaData,
555 _column_chunk_metadata: &ColumnChunkMetaData,
556 ) -> Result<SerializedPageReader<R>> {
557 Ok(self)
558 }
559
560 #[cfg(feature = "encryption")]
562 pub(crate) fn add_crypto_context(
563 mut self,
564 rg_idx: usize,
565 column_idx: usize,
566 parquet_meta_data: &ParquetMetaData,
567 column_chunk_metadata: &ColumnChunkMetaData,
568 ) -> Result<SerializedPageReader<R>> {
569 let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
570 return Ok(self);
571 };
572 let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
573 return Ok(self);
574 };
575 let crypto_context =
576 CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
577 self.context.crypto_context = Some(Arc::new(crypto_context));
578 Ok(self)
579 }
580
581 pub fn new_with_properties(
583 reader: Arc<R>,
584 meta: &ColumnChunkMetaData,
585 total_rows: usize,
586 page_locations: Option<Vec<PageLocation>>,
587 props: ReaderPropertiesPtr,
588 ) -> Result<Self> {
589 let decompressor = create_codec(meta.compression(), props.codec_options())?;
590 let (start, len) = meta.byte_range();
591
592 let state = match page_locations {
593 Some(locations) => {
594 let dictionary_page = match locations.first() {
597 Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
598 offset: start as i64,
599 compressed_page_size: (dict_offset.offset as u64 - start) as i32,
600 first_row_index: 0,
601 }),
602 _ => None,
603 };
604
605 SerializedPageReaderState::Pages {
606 page_locations: locations.into(),
607 dictionary_page,
608 total_rows,
609 page_index: 0,
610 }
611 }
612 None => SerializedPageReaderState::Values {
613 offset: start,
614 remaining_bytes: len,
615 next_page_header: None,
616 page_index: 0,
617 require_dictionary: meta.dictionary_page_offset().is_some(),
618 },
619 };
620 let mut context = SerializedPageReaderContext::default();
621 if props.read_page_stats() {
622 context.read_stats = true;
623 }
624 Ok(Self {
625 reader,
626 decompressor,
627 state,
628 physical_type: meta.column_type(),
629 context,
630 })
631 }
632
633 #[cfg(test)]
639 fn peek_next_page_offset(&mut self) -> Result<Option<u64>> {
640 match &mut self.state {
641 SerializedPageReaderState::Values {
642 offset,
643 remaining_bytes,
644 next_page_header,
645 page_index,
646 require_dictionary,
647 } => {
648 loop {
649 if *remaining_bytes == 0 {
650 return Ok(None);
651 }
652 return if let Some(header) = next_page_header.as_ref() {
653 if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
654 Ok(Some(*offset))
655 } else {
656 *next_page_header = None;
658 continue;
659 }
660 } else {
661 let mut read = self.reader.get_read(*offset)?;
662 let (header_len, header) = Self::read_page_header_len(
663 &self.context,
664 &mut read,
665 *page_index,
666 *require_dictionary,
667 )?;
668 *offset += header_len as u64;
669 *remaining_bytes -= header_len as u64;
670 let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
671 Ok(Some(*offset))
672 } else {
673 continue;
675 };
676 *next_page_header = Some(Box::new(header));
677 page_meta
678 };
679 }
680 }
681 SerializedPageReaderState::Pages {
682 page_locations,
683 dictionary_page,
684 ..
685 } => {
686 if let Some(page) = dictionary_page {
687 Ok(Some(page.offset as u64))
688 } else if let Some(page) = page_locations.front() {
689 Ok(Some(page.offset as u64))
690 } else {
691 Ok(None)
692 }
693 }
694 }
695 }
696
697 fn read_page_header_len<T: Read>(
698 context: &SerializedPageReaderContext,
699 input: &mut T,
700 page_index: usize,
701 dictionary_page: bool,
702 ) -> Result<(usize, PageHeader)> {
703 struct TrackedRead<R> {
705 inner: R,
706 bytes_read: usize,
707 }
708
709 impl<R: Read> Read for TrackedRead<R> {
710 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
711 let v = self.inner.read(buf)?;
712 self.bytes_read += v;
713 Ok(v)
714 }
715 }
716
717 let mut tracked = TrackedRead {
718 inner: input,
719 bytes_read: 0,
720 };
721 let header = context.read_page_header(&mut tracked, page_index, dictionary_page)?;
722 Ok((tracked.bytes_read, header))
723 }
724
725 fn read_page_header_len_from_bytes(
726 context: &SerializedPageReaderContext,
727 buffer: &[u8],
728 page_index: usize,
729 dictionary_page: bool,
730 ) -> Result<(usize, PageHeader)> {
731 let mut input = std::io::Cursor::new(buffer);
732 let header = context.read_page_header(&mut input, page_index, dictionary_page)?;
733 let header_len = input.position() as usize;
734 Ok((header_len, header))
735 }
736}
737
738#[cfg(not(feature = "encryption"))]
739impl SerializedPageReaderContext {
740 fn read_page_header<T: Read>(
741 &self,
742 input: &mut T,
743 _page_index: usize,
744 _dictionary_page: bool,
745 ) -> Result<PageHeader> {
746 let mut prot = ThriftReadInputProtocol::new(input);
747 if self.read_stats {
748 Ok(PageHeader::read_thrift(&mut prot)?)
749 } else {
750 Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
751 }
752 }
753
754 fn decrypt_page_data<T>(
755 &self,
756 buffer: T,
757 _page_index: usize,
758 _dictionary_page: bool,
759 ) -> Result<T> {
760 Ok(buffer)
761 }
762}
763
764#[cfg(feature = "encryption")]
765impl SerializedPageReaderContext {
766 fn read_page_header<T: Read>(
767 &self,
768 input: &mut T,
769 page_index: usize,
770 dictionary_page: bool,
771 ) -> Result<PageHeader> {
772 match self.page_crypto_context(page_index, dictionary_page) {
773 None => {
774 let mut prot = ThriftReadInputProtocol::new(input);
775 if self.read_stats {
776 Ok(PageHeader::read_thrift(&mut prot)?)
777 } else {
778 use crate::file::metadata::thrift::PageHeader;
779
780 Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
781 }
782 }
783 Some(page_crypto_context) => {
784 let data_decryptor = page_crypto_context.data_decryptor();
785 let aad = page_crypto_context.create_page_header_aad()?;
786
787 let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
788 ParquetError::General(format!(
789 "Error decrypting page header for column {}, decryption key may be wrong",
790 page_crypto_context.column_ordinal
791 ))
792 })?;
793
794 let mut prot = ThriftSliceInputProtocol::new(buf.as_slice());
795 if self.read_stats {
796 Ok(PageHeader::read_thrift(&mut prot)?)
797 } else {
798 Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
799 }
800 }
801 }
802 }
803
804 fn decrypt_page_data<T>(&self, buffer: T, page_index: usize, dictionary_page: bool) -> Result<T>
805 where
806 T: AsRef<[u8]>,
807 T: From<Vec<u8>>,
808 {
809 let page_crypto_context = self.page_crypto_context(page_index, dictionary_page);
810 if let Some(page_crypto_context) = page_crypto_context {
811 let decryptor = page_crypto_context.data_decryptor();
812 let aad = page_crypto_context.create_page_aad()?;
813 let decrypted = decryptor.decrypt(buffer.as_ref(), &aad)?;
814 Ok(T::from(decrypted))
815 } else {
816 Ok(buffer)
817 }
818 }
819
820 fn page_crypto_context(
821 &self,
822 page_index: usize,
823 dictionary_page: bool,
824 ) -> Option<Arc<CryptoContext>> {
825 self.crypto_context.as_ref().map(|c| {
826 Arc::new(if dictionary_page {
827 c.for_dictionary_page()
828 } else {
829 c.with_page_ordinal(page_index)
830 })
831 })
832 }
833}
834
835impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
836 type Item = Result<Page>;
837
838 fn next(&mut self) -> Option<Self::Item> {
839 self.get_next_page().transpose()
840 }
841}
842
843fn verify_page_header_len(header_len: usize, remaining_bytes: u64) -> Result<()> {
844 if header_len as u64 > remaining_bytes {
845 return Err(eof_err!("Invalid page header"));
846 }
847 Ok(())
848}
849
850fn verify_page_size(
851 compressed_size: i32,
852 uncompressed_size: i32,
853 remaining_bytes: u64,
854) -> Result<()> {
855 if compressed_size < 0 || compressed_size as u64 > remaining_bytes || uncompressed_size < 0 {
859 return Err(eof_err!("Invalid page header"));
860 }
861 Ok(())
862}
863
864impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
865 fn get_next_page(&mut self) -> Result<Option<Page>> {
866 loop {
867 let page = match &mut self.state {
868 SerializedPageReaderState::Values {
869 offset,
870 remaining_bytes: remaining,
871 next_page_header,
872 page_index,
873 require_dictionary,
874 } => {
875 if *remaining == 0 {
876 return Ok(None);
877 }
878
879 let mut read = self.reader.get_read(*offset)?;
880 let header = if let Some(header) = next_page_header.take() {
881 *header
882 } else {
883 let (header_len, header) = Self::read_page_header_len(
884 &self.context,
885 &mut read,
886 *page_index,
887 *require_dictionary,
888 )?;
889 verify_page_header_len(header_len, *remaining)?;
890 *offset += header_len as u64;
891 *remaining -= header_len as u64;
892 header
893 };
894 verify_page_size(
895 header.compressed_page_size,
896 header.uncompressed_page_size,
897 *remaining,
898 )?;
899 let data_len = header.compressed_page_size as usize;
900 *offset += data_len as u64;
901 *remaining -= data_len as u64;
902
903 if header.r#type == PageType::INDEX_PAGE {
904 continue;
905 }
906
907 let mut buffer = Vec::with_capacity(data_len);
908 let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
909
910 if read != data_len {
911 return Err(eof_err!(
912 "Expected to read {} bytes of page, read only {}",
913 data_len,
914 read
915 ));
916 }
917
918 let buffer =
919 self.context
920 .decrypt_page_data(buffer, *page_index, *require_dictionary)?;
921
922 let page = decode_page(
923 header,
924 Bytes::from(buffer),
925 self.physical_type,
926 self.decompressor.as_mut(),
927 )?;
928 if page.is_data_page() {
929 *page_index += 1;
930 } else if page.is_dictionary_page() {
931 *require_dictionary = false;
932 }
933 page
934 }
935 SerializedPageReaderState::Pages {
936 page_locations,
937 dictionary_page,
938 page_index,
939 ..
940 } => {
941 let (front, is_dictionary_page) = match dictionary_page.take() {
942 Some(front) => (front, true),
943 None => match page_locations.pop_front() {
944 Some(front) => (front, false),
945 None => return Ok(None),
946 },
947 };
948
949 let page_len = usize::try_from(front.compressed_page_size)?;
950 let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
951
952 let (offset, header) = Self::read_page_header_len_from_bytes(
953 &self.context,
954 buffer.as_ref(),
955 *page_index,
956 is_dictionary_page,
957 )?;
958 let bytes = buffer.slice(offset..);
959 let bytes =
960 self.context
961 .decrypt_page_data(bytes, *page_index, is_dictionary_page)?;
962
963 if !is_dictionary_page {
964 *page_index += 1;
965 }
966 decode_page(
967 header,
968 bytes,
969 self.physical_type,
970 self.decompressor.as_mut(),
971 )?
972 }
973 };
974
975 return Ok(Some(page));
976 }
977 }
978
979 fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
980 match &mut self.state {
981 SerializedPageReaderState::Values {
982 offset,
983 remaining_bytes,
984 next_page_header,
985 page_index,
986 require_dictionary,
987 } => {
988 loop {
989 if *remaining_bytes == 0 {
990 return Ok(None);
991 }
992 return if let Some(header) = next_page_header.as_ref() {
993 if let Ok(page_meta) = (&**header).try_into() {
994 Ok(Some(page_meta))
995 } else {
996 *next_page_header = None;
998 continue;
999 }
1000 } else {
1001 let mut read = self.reader.get_read(*offset)?;
1002 let (header_len, header) = Self::read_page_header_len(
1003 &self.context,
1004 &mut read,
1005 *page_index,
1006 *require_dictionary,
1007 )?;
1008 verify_page_header_len(header_len, *remaining_bytes)?;
1009 *offset += header_len as u64;
1010 *remaining_bytes -= header_len as u64;
1011 let page_meta = if let Ok(page_meta) = (&header).try_into() {
1012 Ok(Some(page_meta))
1013 } else {
1014 continue;
1016 };
1017 *next_page_header = Some(Box::new(header));
1018 page_meta
1019 };
1020 }
1021 }
1022 SerializedPageReaderState::Pages {
1023 page_locations,
1024 dictionary_page,
1025 total_rows,
1026 page_index: _,
1027 } => {
1028 if dictionary_page.is_some() {
1029 Ok(Some(PageMetadata {
1030 num_rows: None,
1031 num_levels: None,
1032 is_dict: true,
1033 }))
1034 } else if let Some(page) = page_locations.front() {
1035 let next_rows = page_locations
1036 .get(1)
1037 .map(|x| x.first_row_index as usize)
1038 .unwrap_or(*total_rows);
1039
1040 Ok(Some(PageMetadata {
1041 num_rows: Some(next_rows - page.first_row_index as usize),
1042 num_levels: None,
1043 is_dict: false,
1044 }))
1045 } else {
1046 Ok(None)
1047 }
1048 }
1049 }
1050 }
1051
1052 fn skip_next_page(&mut self) -> Result<()> {
1053 match &mut self.state {
1054 SerializedPageReaderState::Values {
1055 offset,
1056 remaining_bytes,
1057 next_page_header,
1058 page_index,
1059 require_dictionary,
1060 } => {
1061 if let Some(buffered_header) = next_page_header.take() {
1062 verify_page_size(
1063 buffered_header.compressed_page_size,
1064 buffered_header.uncompressed_page_size,
1065 *remaining_bytes,
1066 )?;
1067 *offset += buffered_header.compressed_page_size as u64;
1069 *remaining_bytes -= buffered_header.compressed_page_size as u64;
1070 } else {
1071 let mut read = self.reader.get_read(*offset)?;
1072 let (header_len, header) = Self::read_page_header_len(
1073 &self.context,
1074 &mut read,
1075 *page_index,
1076 *require_dictionary,
1077 )?;
1078 verify_page_header_len(header_len, *remaining_bytes)?;
1079 verify_page_size(
1080 header.compressed_page_size,
1081 header.uncompressed_page_size,
1082 *remaining_bytes,
1083 )?;
1084 let data_page_size = header.compressed_page_size as u64;
1085 *offset += header_len as u64 + data_page_size;
1086 *remaining_bytes -= header_len as u64 + data_page_size;
1087 }
1088 if *require_dictionary {
1089 *require_dictionary = false;
1090 } else {
1091 *page_index += 1;
1092 }
1093 Ok(())
1094 }
1095 SerializedPageReaderState::Pages {
1096 page_locations,
1097 dictionary_page,
1098 page_index,
1099 ..
1100 } => {
1101 if dictionary_page.is_some() {
1102 dictionary_page.take();
1104 } else {
1105 if page_locations.pop_front().is_some() {
1107 *page_index += 1;
1108 }
1109 }
1110
1111 Ok(())
1112 }
1113 }
1114 }
1115
1116 fn at_record_boundary(&mut self) -> Result<bool> {
1117 match &mut self.state {
1118 SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
1119 SerializedPageReaderState::Pages { .. } => Ok(true),
1120 }
1121 }
1122}
1123
1124#[cfg(test)]
1125mod tests {
1126 use std::collections::HashSet;
1127
1128 use bytes::Buf;
1129
1130 use crate::file::page_index::column_index::{
1131 ByteArrayColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex,
1132 };
1133 use crate::file::properties::{EnabledStatistics, WriterProperties};
1134
1135 use crate::basic::{self, BoundaryOrder, ColumnOrder, Encoding, SortOrder};
1136 use crate::column::reader::ColumnReader;
1137 use crate::data_type::private::ParquetValueType;
1138 use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1139 use crate::file::metadata::thrift::DataPageHeaderV2;
1140 #[allow(deprecated)]
1141 use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
1142 use crate::file::writer::SerializedFileWriter;
1143 use crate::record::RowAccessor;
1144 use crate::schema::parser::parse_message_type;
1145 use crate::util::test_common::file_util::{get_test_file, get_test_path};
1146
1147 use super::*;
1148
1149 #[test]
1150 fn test_decode_page_invalid_offset() {
1151 let page_header = PageHeader {
1152 r#type: PageType::DATA_PAGE_V2,
1153 uncompressed_page_size: 10,
1154 compressed_page_size: 10,
1155 data_page_header: None,
1156 index_page_header: None,
1157 dictionary_page_header: None,
1158 crc: None,
1159 data_page_header_v2: Some(DataPageHeaderV2 {
1160 num_nulls: 0,
1161 num_rows: 0,
1162 num_values: 0,
1163 encoding: Encoding::PLAIN,
1164 definition_levels_byte_length: 11,
1165 repetition_levels_byte_length: 0,
1166 is_compressed: None,
1167 statistics: None,
1168 }),
1169 };
1170
1171 let buffer = Bytes::new();
1172 let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1173 assert!(
1174 err.to_string()
1175 .contains("DataPage v2 header contains implausible values")
1176 );
1177 }
1178
1179 #[test]
1180 fn test_decode_unsupported_page() {
1181 let mut page_header = PageHeader {
1182 r#type: PageType::INDEX_PAGE,
1183 uncompressed_page_size: 10,
1184 compressed_page_size: 10,
1185 data_page_header: None,
1186 index_page_header: None,
1187 dictionary_page_header: None,
1188 crc: None,
1189 data_page_header_v2: None,
1190 };
1191 let buffer = Bytes::new();
1192 let err = decode_page(page_header.clone(), buffer.clone(), Type::INT32, None).unwrap_err();
1193 assert_eq!(
1194 err.to_string(),
1195 "Parquet error: Page type INDEX_PAGE is not supported"
1196 );
1197
1198 page_header.data_page_header_v2 = Some(DataPageHeaderV2 {
1199 num_nulls: 0,
1200 num_rows: 0,
1201 num_values: 0,
1202 encoding: Encoding::PLAIN,
1203 definition_levels_byte_length: 11,
1204 repetition_levels_byte_length: 0,
1205 is_compressed: None,
1206 statistics: None,
1207 });
1208 let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1209 assert!(
1210 err.to_string()
1211 .contains("DataPage v2 header contains implausible values")
1212 );
1213 }
1214
1215 #[test]
1216 fn test_cursor_and_file_has_the_same_behaviour() {
1217 let mut buf: Vec<u8> = Vec::new();
1218 get_test_file("alltypes_plain.parquet")
1219 .read_to_end(&mut buf)
1220 .unwrap();
1221 let cursor = Bytes::from(buf);
1222 let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1223
1224 let test_file = get_test_file("alltypes_plain.parquet");
1225 let read_from_file = SerializedFileReader::new(test_file).unwrap();
1226
1227 let file_iter = read_from_file.get_row_iter(None).unwrap();
1228 let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1229
1230 for (a, b) in file_iter.zip(cursor_iter) {
1231 assert_eq!(a.unwrap(), b.unwrap())
1232 }
1233 }
1234
1235 #[test]
1236 fn test_file_reader_try_from() {
1237 let test_file = get_test_file("alltypes_plain.parquet");
1239 let test_path_buf = get_test_path("alltypes_plain.parquet");
1240 let test_path = test_path_buf.as_path();
1241 let test_path_str = test_path.to_str().unwrap();
1242
1243 let reader = SerializedFileReader::try_from(test_file);
1244 assert!(reader.is_ok());
1245
1246 let reader = SerializedFileReader::try_from(test_path);
1247 assert!(reader.is_ok());
1248
1249 let reader = SerializedFileReader::try_from(test_path_str);
1250 assert!(reader.is_ok());
1251
1252 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1253 assert!(reader.is_ok());
1254
1255 let test_path = Path::new("invalid.parquet");
1257 let test_path_str = test_path.to_str().unwrap();
1258
1259 let reader = SerializedFileReader::try_from(test_path);
1260 assert!(reader.is_err());
1261
1262 let reader = SerializedFileReader::try_from(test_path_str);
1263 assert!(reader.is_err());
1264
1265 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1266 assert!(reader.is_err());
1267 }
1268
1269 #[test]
1270 fn test_file_reader_into_iter() {
1271 let path = get_test_path("alltypes_plain.parquet");
1272 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1273 let iter = reader.into_iter();
1274 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1275
1276 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1277 }
1278
1279 #[test]
1280 fn test_file_reader_into_iter_project() {
1281 let path = get_test_path("alltypes_plain.parquet");
1282 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1283 let schema = "message schema { OPTIONAL INT32 id; }";
1284 let proj = parse_message_type(schema).ok();
1285 let iter = reader.into_iter().project(proj).unwrap();
1286 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1287
1288 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1289 }
1290
1291 #[test]
1292 fn test_reuse_file_chunk() {
1293 let test_file = get_test_file("alltypes_plain.parquet");
1297 let reader = SerializedFileReader::new(test_file).unwrap();
1298 let row_group = reader.get_row_group(0).unwrap();
1299
1300 let mut page_readers = Vec::new();
1301 for i in 0..row_group.num_columns() {
1302 page_readers.push(row_group.get_column_page_reader(i).unwrap());
1303 }
1304
1305 for mut page_reader in page_readers {
1308 assert!(page_reader.get_next_page().is_ok());
1309 }
1310 }
1311
1312 #[test]
1313 fn test_file_reader() {
1314 let test_file = get_test_file("alltypes_plain.parquet");
1315 let reader_result = SerializedFileReader::new(test_file);
1316 assert!(reader_result.is_ok());
1317 let reader = reader_result.unwrap();
1318
1319 let metadata = reader.metadata();
1321 assert_eq!(metadata.num_row_groups(), 1);
1322
1323 let file_metadata = metadata.file_metadata();
1325 assert!(file_metadata.created_by().is_some());
1326 assert_eq!(
1327 file_metadata.created_by().unwrap(),
1328 "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1329 );
1330 assert!(file_metadata.key_value_metadata().is_none());
1331 assert_eq!(file_metadata.num_rows(), 8);
1332 assert_eq!(file_metadata.version(), 1);
1333 assert_eq!(file_metadata.column_orders(), None);
1334
1335 let row_group_metadata = metadata.row_group(0);
1337 assert_eq!(row_group_metadata.num_columns(), 11);
1338 assert_eq!(row_group_metadata.num_rows(), 8);
1339 assert_eq!(row_group_metadata.total_byte_size(), 671);
1340 for i in 0..row_group_metadata.num_columns() {
1342 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1343 }
1344
1345 let row_group_reader_result = reader.get_row_group(0);
1347 assert!(row_group_reader_result.is_ok());
1348 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1349 assert_eq!(
1350 row_group_reader.num_columns(),
1351 row_group_metadata.num_columns()
1352 );
1353 assert_eq!(
1354 row_group_reader.metadata().total_byte_size(),
1355 row_group_metadata.total_byte_size()
1356 );
1357
1358 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1361 assert!(page_reader_0_result.is_ok());
1362 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1363 let mut page_count = 0;
1364 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1365 let is_expected_page = match page {
1366 Page::DictionaryPage {
1367 buf,
1368 num_values,
1369 encoding,
1370 is_sorted,
1371 } => {
1372 assert_eq!(buf.len(), 32);
1373 assert_eq!(num_values, 8);
1374 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1375 assert!(!is_sorted);
1376 true
1377 }
1378 Page::DataPage {
1379 buf,
1380 num_values,
1381 encoding,
1382 def_level_encoding,
1383 rep_level_encoding,
1384 statistics,
1385 } => {
1386 assert_eq!(buf.len(), 11);
1387 assert_eq!(num_values, 8);
1388 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1389 assert_eq!(def_level_encoding, Encoding::RLE);
1390 #[allow(deprecated)]
1391 let expected_rep_level_encoding = Encoding::BIT_PACKED;
1392 assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1393 assert!(statistics.is_none());
1394 true
1395 }
1396 _ => false,
1397 };
1398 assert!(is_expected_page);
1399 page_count += 1;
1400 }
1401 assert_eq!(page_count, 2);
1402 }
1403
1404 #[test]
1405 fn test_file_reader_datapage_v2() {
1406 let test_file = get_test_file("datapage_v2.snappy.parquet");
1407 let reader_result = SerializedFileReader::new(test_file);
1408 assert!(reader_result.is_ok());
1409 let reader = reader_result.unwrap();
1410
1411 let metadata = reader.metadata();
1413 assert_eq!(metadata.num_row_groups(), 1);
1414
1415 let file_metadata = metadata.file_metadata();
1417 assert!(file_metadata.created_by().is_some());
1418 assert_eq!(
1419 file_metadata.created_by().unwrap(),
1420 "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1421 );
1422 assert!(file_metadata.key_value_metadata().is_some());
1423 assert_eq!(
1424 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1425 1
1426 );
1427
1428 assert_eq!(file_metadata.num_rows(), 5);
1429 assert_eq!(file_metadata.version(), 1);
1430 assert_eq!(file_metadata.column_orders(), None);
1431
1432 let row_group_metadata = metadata.row_group(0);
1433
1434 for i in 0..row_group_metadata.num_columns() {
1436 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1437 }
1438
1439 let row_group_reader_result = reader.get_row_group(0);
1441 assert!(row_group_reader_result.is_ok());
1442 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1443 assert_eq!(
1444 row_group_reader.num_columns(),
1445 row_group_metadata.num_columns()
1446 );
1447 assert_eq!(
1448 row_group_reader.metadata().total_byte_size(),
1449 row_group_metadata.total_byte_size()
1450 );
1451
1452 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1455 assert!(page_reader_0_result.is_ok());
1456 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1457 let mut page_count = 0;
1458 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1459 let is_expected_page = match page {
1460 Page::DictionaryPage {
1461 buf,
1462 num_values,
1463 encoding,
1464 is_sorted,
1465 } => {
1466 assert_eq!(buf.len(), 7);
1467 assert_eq!(num_values, 1);
1468 assert_eq!(encoding, Encoding::PLAIN);
1469 assert!(!is_sorted);
1470 true
1471 }
1472 Page::DataPageV2 {
1473 buf,
1474 num_values,
1475 encoding,
1476 num_nulls,
1477 num_rows,
1478 def_levels_byte_len,
1479 rep_levels_byte_len,
1480 is_compressed,
1481 statistics,
1482 } => {
1483 assert_eq!(buf.len(), 4);
1484 assert_eq!(num_values, 5);
1485 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1486 assert_eq!(num_nulls, 1);
1487 assert_eq!(num_rows, 5);
1488 assert_eq!(def_levels_byte_len, 2);
1489 assert_eq!(rep_levels_byte_len, 0);
1490 assert!(is_compressed);
1491 assert!(statistics.is_none()); true
1493 }
1494 _ => false,
1495 };
1496 assert!(is_expected_page);
1497 page_count += 1;
1498 }
1499 assert_eq!(page_count, 2);
1500 }
1501
1502 #[test]
1503 fn test_file_reader_empty_compressed_datapage_v2() {
1504 let test_file = get_test_file("page_v2_empty_compressed.parquet");
1506 let reader_result = SerializedFileReader::new(test_file);
1507 assert!(reader_result.is_ok());
1508 let reader = reader_result.unwrap();
1509
1510 let metadata = reader.metadata();
1512 assert_eq!(metadata.num_row_groups(), 1);
1513
1514 let file_metadata = metadata.file_metadata();
1516 assert!(file_metadata.created_by().is_some());
1517 assert_eq!(
1518 file_metadata.created_by().unwrap(),
1519 "parquet-cpp-arrow version 14.0.2"
1520 );
1521 assert!(file_metadata.key_value_metadata().is_some());
1522 assert_eq!(
1523 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1524 1
1525 );
1526
1527 assert_eq!(file_metadata.num_rows(), 10);
1528 assert_eq!(file_metadata.version(), 2);
1529 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1530 assert_eq!(
1531 file_metadata.column_orders(),
1532 Some(vec![expected_order].as_ref())
1533 );
1534
1535 let row_group_metadata = metadata.row_group(0);
1536
1537 for i in 0..row_group_metadata.num_columns() {
1539 assert_eq!(file_metadata.column_order(i), expected_order);
1540 }
1541
1542 let row_group_reader_result = reader.get_row_group(0);
1544 assert!(row_group_reader_result.is_ok());
1545 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1546 assert_eq!(
1547 row_group_reader.num_columns(),
1548 row_group_metadata.num_columns()
1549 );
1550 assert_eq!(
1551 row_group_reader.metadata().total_byte_size(),
1552 row_group_metadata.total_byte_size()
1553 );
1554
1555 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1557 assert!(page_reader_0_result.is_ok());
1558 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1559 let mut page_count = 0;
1560 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1561 let is_expected_page = match page {
1562 Page::DictionaryPage {
1563 buf,
1564 num_values,
1565 encoding,
1566 is_sorted,
1567 } => {
1568 assert_eq!(buf.len(), 0);
1569 assert_eq!(num_values, 0);
1570 assert_eq!(encoding, Encoding::PLAIN);
1571 assert!(!is_sorted);
1572 true
1573 }
1574 Page::DataPageV2 {
1575 buf,
1576 num_values,
1577 encoding,
1578 num_nulls,
1579 num_rows,
1580 def_levels_byte_len,
1581 rep_levels_byte_len,
1582 is_compressed,
1583 statistics,
1584 } => {
1585 assert_eq!(buf.len(), 3);
1586 assert_eq!(num_values, 10);
1587 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1588 assert_eq!(num_nulls, 10);
1589 assert_eq!(num_rows, 10);
1590 assert_eq!(def_levels_byte_len, 2);
1591 assert_eq!(rep_levels_byte_len, 0);
1592 assert!(is_compressed);
1593 assert!(statistics.is_none()); true
1595 }
1596 _ => false,
1597 };
1598 assert!(is_expected_page);
1599 page_count += 1;
1600 }
1601 assert_eq!(page_count, 2);
1602 }
1603
1604 #[test]
1605 fn test_file_reader_empty_datapage_v2() {
1606 let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1608 let reader_result = SerializedFileReader::new(test_file);
1609 assert!(reader_result.is_ok());
1610 let reader = reader_result.unwrap();
1611
1612 let metadata = reader.metadata();
1614 assert_eq!(metadata.num_row_groups(), 1);
1615
1616 let file_metadata = metadata.file_metadata();
1618 assert!(file_metadata.created_by().is_some());
1619 assert_eq!(
1620 file_metadata.created_by().unwrap(),
1621 "parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1622 );
1623 assert!(file_metadata.key_value_metadata().is_some());
1624 assert_eq!(
1625 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1626 2
1627 );
1628
1629 assert_eq!(file_metadata.num_rows(), 1);
1630 assert_eq!(file_metadata.version(), 1);
1631 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1632 assert_eq!(
1633 file_metadata.column_orders(),
1634 Some(vec![expected_order].as_ref())
1635 );
1636
1637 let row_group_metadata = metadata.row_group(0);
1638
1639 for i in 0..row_group_metadata.num_columns() {
1641 assert_eq!(file_metadata.column_order(i), expected_order);
1642 }
1643
1644 let row_group_reader_result = reader.get_row_group(0);
1646 assert!(row_group_reader_result.is_ok());
1647 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1648 assert_eq!(
1649 row_group_reader.num_columns(),
1650 row_group_metadata.num_columns()
1651 );
1652 assert_eq!(
1653 row_group_reader.metadata().total_byte_size(),
1654 row_group_metadata.total_byte_size()
1655 );
1656
1657 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1659 assert!(page_reader_0_result.is_ok());
1660 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1661 let mut page_count = 0;
1662 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1663 let is_expected_page = match page {
1664 Page::DataPageV2 {
1665 buf,
1666 num_values,
1667 encoding,
1668 num_nulls,
1669 num_rows,
1670 def_levels_byte_len,
1671 rep_levels_byte_len,
1672 is_compressed,
1673 statistics,
1674 } => {
1675 assert_eq!(buf.len(), 2);
1676 assert_eq!(num_values, 1);
1677 assert_eq!(encoding, Encoding::PLAIN);
1678 assert_eq!(num_nulls, 1);
1679 assert_eq!(num_rows, 1);
1680 assert_eq!(def_levels_byte_len, 2);
1681 assert_eq!(rep_levels_byte_len, 0);
1682 assert!(is_compressed);
1683 assert!(statistics.is_none());
1684 true
1685 }
1686 _ => false,
1687 };
1688 assert!(is_expected_page);
1689 page_count += 1;
1690 }
1691 assert_eq!(page_count, 1);
1692 }
1693
1694 fn get_serialized_page_reader<R: ChunkReader>(
1695 file_reader: &SerializedFileReader<R>,
1696 row_group: usize,
1697 column: usize,
1698 ) -> Result<SerializedPageReader<R>> {
1699 let row_group = {
1700 let row_group_metadata = file_reader.metadata.row_group(row_group);
1701 let props = Arc::clone(&file_reader.props);
1702 let f = Arc::clone(&file_reader.chunk_reader);
1703 SerializedRowGroupReader::new(
1704 f,
1705 row_group_metadata,
1706 file_reader
1707 .metadata
1708 .offset_index()
1709 .map(|x| x[row_group].as_slice()),
1710 props,
1711 )?
1712 };
1713
1714 let col = row_group.metadata.column(column);
1715
1716 let page_locations = row_group
1717 .offset_index
1718 .map(|x| x[column].page_locations.clone());
1719
1720 let props = Arc::clone(&row_group.props);
1721 SerializedPageReader::new_with_properties(
1722 Arc::clone(&row_group.chunk_reader),
1723 col,
1724 usize::try_from(row_group.metadata.num_rows())?,
1725 page_locations,
1726 props,
1727 )
1728 }
1729
1730 #[test]
1731 fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1732 let test_file = get_test_file("alltypes_plain.parquet");
1733 let reader = SerializedFileReader::new(test_file)?;
1734
1735 let mut offset_set = HashSet::new();
1736 let num_row_groups = reader.metadata.num_row_groups();
1737 for row_group in 0..num_row_groups {
1738 let num_columns = reader.metadata.row_group(row_group).num_columns();
1739 for column in 0..num_columns {
1740 let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1741
1742 while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1743 match &page_reader.state {
1744 SerializedPageReaderState::Pages {
1745 page_locations,
1746 dictionary_page,
1747 ..
1748 } => {
1749 if let Some(page) = dictionary_page {
1750 assert_eq!(page.offset as u64, page_offset);
1751 } else if let Some(page) = page_locations.front() {
1752 assert_eq!(page.offset as u64, page_offset);
1753 } else {
1754 unreachable!()
1755 }
1756 }
1757 SerializedPageReaderState::Values {
1758 offset,
1759 next_page_header,
1760 ..
1761 } => {
1762 assert!(next_page_header.is_some());
1763 assert_eq!(*offset, page_offset);
1764 }
1765 }
1766 let page = page_reader.get_next_page()?;
1767 assert!(page.is_some());
1768 let newly_inserted = offset_set.insert(page_offset);
1769 assert!(newly_inserted);
1770 }
1771 }
1772 }
1773
1774 Ok(())
1775 }
1776
1777 #[test]
1778 fn test_page_iterator() {
1779 let file = get_test_file("alltypes_plain.parquet");
1780 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1781
1782 let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1783
1784 let page = page_iterator.next();
1786 assert!(page.is_some());
1787 assert!(page.unwrap().is_ok());
1788
1789 let page = page_iterator.next();
1791 assert!(page.is_none());
1792
1793 let row_group_indices = Box::new(0..1);
1794 let mut page_iterator =
1795 FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1796
1797 let page = page_iterator.next();
1799 assert!(page.is_some());
1800 assert!(page.unwrap().is_ok());
1801
1802 let page = page_iterator.next();
1804 assert!(page.is_none());
1805 }
1806
1807 #[test]
1808 fn test_file_reader_key_value_metadata() {
1809 let file = get_test_file("binary.parquet");
1810 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1811
1812 let metadata = file_reader
1813 .metadata
1814 .file_metadata()
1815 .key_value_metadata()
1816 .unwrap();
1817
1818 assert_eq!(metadata.len(), 3);
1819
1820 assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1821
1822 assert_eq!(metadata[1].key, "writer.model.name");
1823 assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1824
1825 assert_eq!(metadata[2].key, "parquet.proto.class");
1826 assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1827 }
1828
1829 #[test]
1830 fn test_file_reader_optional_metadata() {
1831 let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1833 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1834
1835 let row_group_metadata = file_reader.metadata.row_group(0);
1836 let col0_metadata = row_group_metadata.column(0);
1837
1838 assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1840
1841 let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1843
1844 assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1845 assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1846 assert_eq!(page_encoding_stats.count, 1);
1847
1848 assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1850 assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1851
1852 assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1854 assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1855 }
1856
1857 #[test]
1858 fn test_file_reader_with_no_filter() -> Result<()> {
1859 let test_file = get_test_file("alltypes_plain.parquet");
1860 let origin_reader = SerializedFileReader::new(test_file)?;
1861 let metadata = origin_reader.metadata();
1863 assert_eq!(metadata.num_row_groups(), 1);
1864 Ok(())
1865 }
1866
1867 #[test]
1868 fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1869 let test_file = get_test_file("alltypes_plain.parquet");
1870 let read_options = ReadOptionsBuilder::new()
1871 .with_predicate(Box::new(|_, _| false))
1872 .build();
1873 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1874 let metadata = reader.metadata();
1875 assert_eq!(metadata.num_row_groups(), 0);
1876 Ok(())
1877 }
1878
1879 #[test]
1880 fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1881 let test_file = get_test_file("alltypes_plain.parquet");
1882 let origin_reader = SerializedFileReader::new(test_file)?;
1883 let metadata = origin_reader.metadata();
1885 assert_eq!(metadata.num_row_groups(), 1);
1886 let mid = get_midpoint_offset(metadata.row_group(0));
1887
1888 let test_file = get_test_file("alltypes_plain.parquet");
1889 let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1890 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1891 let metadata = reader.metadata();
1892 assert_eq!(metadata.num_row_groups(), 1);
1893
1894 let test_file = get_test_file("alltypes_plain.parquet");
1895 let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1896 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1897 let metadata = reader.metadata();
1898 assert_eq!(metadata.num_row_groups(), 0);
1899 Ok(())
1900 }
1901
1902 #[test]
1903 fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1904 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1905 let origin_reader = SerializedFileReader::new(test_file)?;
1906 let metadata = origin_reader.metadata();
1907 let mid = get_midpoint_offset(metadata.row_group(0));
1908
1909 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1911 let read_options = ReadOptionsBuilder::new()
1912 .with_page_index()
1913 .with_predicate(Box::new(|_, _| true))
1914 .with_range(mid, mid + 1)
1915 .build();
1916 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1917 let metadata = reader.metadata();
1918 assert_eq!(metadata.num_row_groups(), 1);
1919 assert_eq!(metadata.column_index().unwrap().len(), 1);
1920 assert_eq!(metadata.offset_index().unwrap().len(), 1);
1921
1922 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1924 let read_options = ReadOptionsBuilder::new()
1925 .with_page_index()
1926 .with_predicate(Box::new(|_, _| true))
1927 .with_range(0, mid)
1928 .build();
1929 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1930 let metadata = reader.metadata();
1931 assert_eq!(metadata.num_row_groups(), 0);
1932 assert!(metadata.column_index().is_none());
1933 assert!(metadata.offset_index().is_none());
1934
1935 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1937 let read_options = ReadOptionsBuilder::new()
1938 .with_page_index()
1939 .with_predicate(Box::new(|_, _| false))
1940 .with_range(mid, mid + 1)
1941 .build();
1942 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1943 let metadata = reader.metadata();
1944 assert_eq!(metadata.num_row_groups(), 0);
1945 assert!(metadata.column_index().is_none());
1946 assert!(metadata.offset_index().is_none());
1947
1948 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1950 let read_options = ReadOptionsBuilder::new()
1951 .with_page_index()
1952 .with_predicate(Box::new(|_, _| false))
1953 .with_range(0, mid)
1954 .build();
1955 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1956 let metadata = reader.metadata();
1957 assert_eq!(metadata.num_row_groups(), 0);
1958 assert!(metadata.column_index().is_none());
1959 assert!(metadata.offset_index().is_none());
1960 Ok(())
1961 }
1962
1963 #[test]
1964 fn test_file_reader_invalid_metadata() {
1965 let data = [
1966 255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
1967 80, 65, 82, 49,
1968 ];
1969 let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
1970 assert_eq!(
1971 ret.err().unwrap().to_string(),
1972 "Parquet error: Received empty union from remote ColumnOrder"
1973 );
1974 }
1975
1976 #[test]
1977 fn test_page_index_reader() {
1994 let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
1995 let builder = ReadOptionsBuilder::new();
1996 let options = builder.with_page_index().build();
1998 let reader_result = SerializedFileReader::new_with_options(test_file, options);
1999 let reader = reader_result.unwrap();
2000
2001 let metadata = reader.metadata();
2003 assert_eq!(metadata.num_row_groups(), 1);
2004
2005 let column_index = metadata.column_index().unwrap();
2006
2007 assert_eq!(column_index.len(), 1);
2009 let index = if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][0] {
2010 index
2011 } else {
2012 unreachable!()
2013 };
2014
2015 assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
2016
2017 assert_eq!(index.num_pages(), 1);
2019
2020 let min = index.min_value(0).unwrap();
2021 let max = index.max_value(0).unwrap();
2022 assert_eq!(b"Hello", min.as_bytes());
2023 assert_eq!(b"today", max.as_bytes());
2024
2025 let offset_indexes = metadata.offset_index().unwrap();
2026 assert_eq!(offset_indexes.len(), 1);
2028 let offset_index = &offset_indexes[0];
2029 let page_offset = &offset_index[0].page_locations()[0];
2030
2031 assert_eq!(4, page_offset.offset);
2032 assert_eq!(152, page_offset.compressed_page_size);
2033 assert_eq!(0, page_offset.first_row_index);
2034 }
2035
2036 #[test]
2037 #[allow(deprecated)]
2038 fn test_page_index_reader_out_of_order() {
2039 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2040 let options = ReadOptionsBuilder::new().with_page_index().build();
2041 let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
2042 let metadata = reader.metadata();
2043
2044 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2045 let columns = metadata.row_group(0).columns();
2046 let reversed: Vec<_> = columns.iter().cloned().rev().collect();
2047
2048 let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
2049 let mut b = read_columns_indexes(&test_file, &reversed)
2050 .unwrap()
2051 .unwrap();
2052 b.reverse();
2053 assert_eq!(a, b);
2054
2055 let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
2056 let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
2057 b.reverse();
2058 assert_eq!(a, b);
2059 }
2060
2061 #[test]
2062 fn test_page_index_reader_all_type() {
2063 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2064 let builder = ReadOptionsBuilder::new();
2065 let options = builder.with_page_index().build();
2067 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2068 let reader = reader_result.unwrap();
2069
2070 let metadata = reader.metadata();
2072 assert_eq!(metadata.num_row_groups(), 1);
2073
2074 let column_index = metadata.column_index().unwrap();
2075 let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
2076
2077 assert_eq!(column_index.len(), 1);
2079 let row_group_metadata = metadata.row_group(0);
2080
2081 assert!(!&column_index[0][0].is_sorted());
2083 let boundary_order = &column_index[0][0].get_boundary_order();
2084 assert!(boundary_order.is_some());
2085 matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
2086 if let ColumnIndexMetaData::INT32(index) = &column_index[0][0] {
2087 check_native_page_index(
2088 index,
2089 325,
2090 get_row_group_min_max_bytes(row_group_metadata, 0),
2091 BoundaryOrder::UNORDERED,
2092 );
2093 assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
2094 } else {
2095 unreachable!()
2096 };
2097 assert!(&column_index[0][1].is_sorted());
2099 if let ColumnIndexMetaData::BOOLEAN(index) = &column_index[0][1] {
2100 assert_eq!(index.num_pages(), 82);
2101 assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
2102 } else {
2103 unreachable!()
2104 };
2105 assert!(&column_index[0][2].is_sorted());
2107 if let ColumnIndexMetaData::INT32(index) = &column_index[0][2] {
2108 check_native_page_index(
2109 index,
2110 325,
2111 get_row_group_min_max_bytes(row_group_metadata, 2),
2112 BoundaryOrder::ASCENDING,
2113 );
2114 assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
2115 } else {
2116 unreachable!()
2117 };
2118 assert!(&column_index[0][3].is_sorted());
2120 if let ColumnIndexMetaData::INT32(index) = &column_index[0][3] {
2121 check_native_page_index(
2122 index,
2123 325,
2124 get_row_group_min_max_bytes(row_group_metadata, 3),
2125 BoundaryOrder::ASCENDING,
2126 );
2127 assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
2128 } else {
2129 unreachable!()
2130 };
2131 assert!(&column_index[0][4].is_sorted());
2133 if let ColumnIndexMetaData::INT32(index) = &column_index[0][4] {
2134 check_native_page_index(
2135 index,
2136 325,
2137 get_row_group_min_max_bytes(row_group_metadata, 4),
2138 BoundaryOrder::ASCENDING,
2139 );
2140 assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
2141 } else {
2142 unreachable!()
2143 };
2144 assert!(!&column_index[0][5].is_sorted());
2146 if let ColumnIndexMetaData::INT64(index) = &column_index[0][5] {
2147 check_native_page_index(
2148 index,
2149 528,
2150 get_row_group_min_max_bytes(row_group_metadata, 5),
2151 BoundaryOrder::UNORDERED,
2152 );
2153 assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
2154 } else {
2155 unreachable!()
2156 };
2157 assert!(&column_index[0][6].is_sorted());
2159 if let ColumnIndexMetaData::FLOAT(index) = &column_index[0][6] {
2160 check_native_page_index(
2161 index,
2162 325,
2163 get_row_group_min_max_bytes(row_group_metadata, 6),
2164 BoundaryOrder::ASCENDING,
2165 );
2166 assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
2167 } else {
2168 unreachable!()
2169 };
2170 assert!(!&column_index[0][7].is_sorted());
2172 if let ColumnIndexMetaData::DOUBLE(index) = &column_index[0][7] {
2173 check_native_page_index(
2174 index,
2175 528,
2176 get_row_group_min_max_bytes(row_group_metadata, 7),
2177 BoundaryOrder::UNORDERED,
2178 );
2179 assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
2180 } else {
2181 unreachable!()
2182 };
2183 assert!(!&column_index[0][8].is_sorted());
2185 if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][8] {
2186 check_byte_array_page_index(
2187 index,
2188 974,
2189 get_row_group_min_max_bytes(row_group_metadata, 8),
2190 BoundaryOrder::UNORDERED,
2191 );
2192 assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
2193 } else {
2194 unreachable!()
2195 };
2196 assert!(&column_index[0][9].is_sorted());
2198 if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][9] {
2199 check_byte_array_page_index(
2200 index,
2201 352,
2202 get_row_group_min_max_bytes(row_group_metadata, 9),
2203 BoundaryOrder::ASCENDING,
2204 );
2205 assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
2206 } else {
2207 unreachable!()
2208 };
2209 assert!(!&column_index[0][10].is_sorted());
2212 if let ColumnIndexMetaData::NONE = &column_index[0][10] {
2213 assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
2214 } else {
2215 unreachable!()
2216 };
2217 assert!(&column_index[0][11].is_sorted());
2219 if let ColumnIndexMetaData::INT32(index) = &column_index[0][11] {
2220 check_native_page_index(
2221 index,
2222 325,
2223 get_row_group_min_max_bytes(row_group_metadata, 11),
2224 BoundaryOrder::ASCENDING,
2225 );
2226 assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
2227 } else {
2228 unreachable!()
2229 };
2230 assert!(!&column_index[0][12].is_sorted());
2232 if let ColumnIndexMetaData::INT32(index) = &column_index[0][12] {
2233 check_native_page_index(
2234 index,
2235 325,
2236 get_row_group_min_max_bytes(row_group_metadata, 12),
2237 BoundaryOrder::UNORDERED,
2238 );
2239 assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
2240 } else {
2241 unreachable!()
2242 };
2243 }
2244
2245 fn check_native_page_index<T: ParquetValueType>(
2246 row_group_index: &PrimitiveColumnIndex<T>,
2247 page_size: usize,
2248 min_max: (&[u8], &[u8]),
2249 boundary_order: BoundaryOrder,
2250 ) {
2251 assert_eq!(row_group_index.num_pages() as usize, page_size);
2252 assert_eq!(row_group_index.boundary_order, boundary_order);
2253 assert!(row_group_index.min_values().iter().all(|x| {
2254 x >= &T::try_from_le_slice(min_max.0).unwrap()
2255 && x <= &T::try_from_le_slice(min_max.1).unwrap()
2256 }));
2257 }
2258
2259 fn check_byte_array_page_index(
2260 row_group_index: &ByteArrayColumnIndex,
2261 page_size: usize,
2262 min_max: (&[u8], &[u8]),
2263 boundary_order: BoundaryOrder,
2264 ) {
2265 assert_eq!(row_group_index.num_pages() as usize, page_size);
2266 assert_eq!(row_group_index.boundary_order, boundary_order);
2267 for i in 0..row_group_index.num_pages() as usize {
2268 let x = row_group_index.min_value(i).unwrap();
2269 assert!(x >= min_max.0 && x <= min_max.1);
2270 }
2271 }
2272
2273 fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
2274 let statistics = r.column(col_num).statistics().unwrap();
2275 (
2276 statistics.min_bytes_opt().unwrap_or_default(),
2277 statistics.max_bytes_opt().unwrap_or_default(),
2278 )
2279 }
2280
2281 #[test]
2282 fn test_skip_next_page_with_dictionary_page() {
2283 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2284 let builder = ReadOptionsBuilder::new();
2285 let options = builder.with_page_index().build();
2287 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2288 let reader = reader_result.unwrap();
2289
2290 let row_group_reader = reader.get_row_group(0).unwrap();
2291
2292 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2294
2295 let mut vec = vec![];
2296
2297 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2299 assert!(meta.is_dict);
2300
2301 column_page_reader.skip_next_page().unwrap();
2303
2304 let page = column_page_reader.get_next_page().unwrap().unwrap();
2306 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2307
2308 for _i in 0..351 {
2310 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2312 assert!(!meta.is_dict); vec.push(meta);
2314
2315 let page = column_page_reader.get_next_page().unwrap().unwrap();
2316 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2317 }
2318
2319 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2321 assert!(column_page_reader.get_next_page().unwrap().is_none());
2322
2323 assert_eq!(vec.len(), 351);
2325 }
2326
2327 #[test]
2328 fn test_skip_page_with_offset_index() {
2329 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2330 let builder = ReadOptionsBuilder::new();
2331 let options = builder.with_page_index().build();
2333 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2334 let reader = reader_result.unwrap();
2335
2336 let row_group_reader = reader.get_row_group(0).unwrap();
2337
2338 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2340
2341 let mut vec = vec![];
2342
2343 for i in 0..325 {
2344 if i % 2 == 0 {
2345 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2346 } else {
2347 column_page_reader.skip_next_page().unwrap();
2348 }
2349 }
2350 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2352 assert!(column_page_reader.get_next_page().unwrap().is_none());
2353
2354 assert_eq!(vec.len(), 163);
2355 }
2356
2357 #[test]
2358 fn test_skip_page_without_offset_index() {
2359 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2360
2361 let reader_result = SerializedFileReader::new(test_file);
2363 let reader = reader_result.unwrap();
2364
2365 let row_group_reader = reader.get_row_group(0).unwrap();
2366
2367 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2369
2370 let mut vec = vec![];
2371
2372 for i in 0..325 {
2373 if i % 2 == 0 {
2374 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2375 } else {
2376 column_page_reader.peek_next_page().unwrap().unwrap();
2377 column_page_reader.skip_next_page().unwrap();
2378 }
2379 }
2380 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2382 assert!(column_page_reader.get_next_page().unwrap().is_none());
2383
2384 assert_eq!(vec.len(), 163);
2385 }
2386
2387 #[test]
2388 fn test_peek_page_with_dictionary_page() {
2389 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2390 let builder = ReadOptionsBuilder::new();
2391 let options = builder.with_page_index().build();
2393 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2394 let reader = reader_result.unwrap();
2395 let row_group_reader = reader.get_row_group(0).unwrap();
2396
2397 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2399
2400 let mut vec = vec![];
2401
2402 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2403 assert!(meta.is_dict);
2404 let page = column_page_reader.get_next_page().unwrap().unwrap();
2405 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2406
2407 for i in 0..352 {
2408 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2409 if i != 351 {
2412 assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
2413 } else {
2414 assert_eq!(meta.num_rows, Some(10));
2417 }
2418 assert!(!meta.is_dict);
2419 vec.push(meta);
2420 let page = column_page_reader.get_next_page().unwrap().unwrap();
2421 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2422 }
2423
2424 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2426 assert!(column_page_reader.get_next_page().unwrap().is_none());
2427
2428 assert_eq!(vec.len(), 352);
2429 }
2430
2431 #[test]
2432 fn test_peek_page_with_dictionary_page_without_offset_index() {
2433 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2434
2435 let reader_result = SerializedFileReader::new(test_file);
2436 let reader = reader_result.unwrap();
2437 let row_group_reader = reader.get_row_group(0).unwrap();
2438
2439 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2441
2442 let mut vec = vec![];
2443
2444 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2445 assert!(meta.is_dict);
2446 let page = column_page_reader.get_next_page().unwrap().unwrap();
2447 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2448
2449 for i in 0..352 {
2450 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2451 if i != 351 {
2454 assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2455 } else {
2456 assert_eq!(meta.num_levels, Some(10));
2459 }
2460 assert!(!meta.is_dict);
2461 vec.push(meta);
2462 let page = column_page_reader.get_next_page().unwrap().unwrap();
2463 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2464 }
2465
2466 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2468 assert!(column_page_reader.get_next_page().unwrap().is_none());
2469
2470 assert_eq!(vec.len(), 352);
2471 }
2472
2473 #[test]
2474 fn test_fixed_length_index() {
2475 let message_type = "
2476 message test_schema {
2477 OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2478 }
2479 ";
2480
2481 let schema = parse_message_type(message_type).unwrap();
2482 let mut out = Vec::with_capacity(1024);
2483 let mut writer =
2484 SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2485
2486 let mut r = writer.next_row_group().unwrap();
2487 let mut c = r.next_column().unwrap().unwrap();
2488 c.typed::<FixedLenByteArrayType>()
2489 .write_batch(
2490 &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2491 Some(&[1, 1, 0, 1]),
2492 None,
2493 )
2494 .unwrap();
2495 c.close().unwrap();
2496 r.close().unwrap();
2497 writer.close().unwrap();
2498
2499 let b = Bytes::from(out);
2500 let options = ReadOptionsBuilder::new().with_page_index().build();
2501 let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2502 let index = reader.metadata().column_index().unwrap();
2503
2504 assert_eq!(index.len(), 1);
2506 let c = &index[0];
2507 assert_eq!(c.len(), 1);
2509
2510 match &c[0] {
2511 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(v) => {
2512 assert_eq!(v.num_pages(), 1);
2513 assert_eq!(v.null_count(0).unwrap(), 1);
2514 assert_eq!(v.min_value(0).unwrap(), &[0; 11]);
2515 assert_eq!(v.max_value(0).unwrap(), &[5; 11]);
2516 }
2517 _ => unreachable!(),
2518 }
2519 }
2520
2521 #[test]
2522 fn test_multi_gz() {
2523 let file = get_test_file("concatenated_gzip_members.parquet");
2524 let reader = SerializedFileReader::new(file).unwrap();
2525 let row_group_reader = reader.get_row_group(0).unwrap();
2526 match row_group_reader.get_column_reader(0).unwrap() {
2527 ColumnReader::Int64ColumnReader(mut reader) => {
2528 let mut buffer = Vec::with_capacity(1024);
2529 let mut def_levels = Vec::with_capacity(1024);
2530 let (num_records, num_values, num_levels) = reader
2531 .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2532 .unwrap();
2533
2534 assert_eq!(num_records, 513);
2535 assert_eq!(num_values, 513);
2536 assert_eq!(num_levels, 513);
2537
2538 let expected: Vec<i64> = (1..514).collect();
2539 assert_eq!(&buffer, &expected);
2540 }
2541 _ => unreachable!(),
2542 }
2543 }
2544
2545 #[test]
2546 fn test_byte_stream_split_extended() {
2547 let path = format!(
2548 "{}/byte_stream_split_extended.gzip.parquet",
2549 arrow::util::test_util::parquet_test_data(),
2550 );
2551 let file = File::open(path).unwrap();
2552 let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2553
2554 let mut iter = reader
2556 .get_row_iter(None)
2557 .expect("Failed to create row iterator");
2558
2559 let mut start = 0;
2560 let end = reader.metadata().file_metadata().num_rows();
2561
2562 let check_row = |row: Result<Row, ParquetError>| {
2563 assert!(row.is_ok());
2564 let r = row.unwrap();
2565 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2566 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2567 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2568 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2569 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2570 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2571 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2572 };
2573
2574 while start < end {
2575 match iter.next() {
2576 Some(row) => check_row(row),
2577 None => break,
2578 };
2579 start += 1;
2580 }
2581 }
2582
2583 #[test]
2584 fn test_filtered_rowgroup_metadata() {
2585 let message_type = "
2586 message test_schema {
2587 REQUIRED INT32 a;
2588 }
2589 ";
2590 let schema = Arc::new(parse_message_type(message_type).unwrap());
2591 let props = Arc::new(
2592 WriterProperties::builder()
2593 .set_statistics_enabled(EnabledStatistics::Page)
2594 .build(),
2595 );
2596 let mut file: File = tempfile::tempfile().unwrap();
2597 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2598 let data = [1, 2, 3, 4, 5];
2599
2600 for idx in 0..5 {
2602 let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2603 let mut row_group_writer = file_writer.next_row_group().unwrap();
2604 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2605 writer
2606 .typed::<Int32Type>()
2607 .write_batch(data_i.as_slice(), None, None)
2608 .unwrap();
2609 writer.close().unwrap();
2610 }
2611 row_group_writer.close().unwrap();
2612 file_writer.flushed_row_groups();
2613 }
2614 let file_metadata = file_writer.close().unwrap();
2615
2616 assert_eq!(file_metadata.file_metadata().num_rows(), 25);
2617 assert_eq!(file_metadata.num_row_groups(), 5);
2618
2619 let read_options = ReadOptionsBuilder::new()
2621 .with_page_index()
2622 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2623 .build();
2624 let reader =
2625 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2626 .unwrap();
2627 let metadata = reader.metadata();
2628
2629 assert_eq!(metadata.num_row_groups(), 1);
2631 assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2632
2633 assert!(metadata.column_index().is_some());
2635 assert!(metadata.offset_index().is_some());
2636 assert_eq!(metadata.column_index().unwrap().len(), 1);
2637 assert_eq!(metadata.offset_index().unwrap().len(), 1);
2638 let col_idx = metadata.column_index().unwrap();
2639 let off_idx = metadata.offset_index().unwrap();
2640 let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2641 let pg_idx = &col_idx[0][0];
2642 let off_idx_i = &off_idx[0][0];
2643
2644 match pg_idx {
2646 ColumnIndexMetaData::INT32(int_idx) => {
2647 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2648 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2649 assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2650 assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2651 }
2652 _ => panic!("wrong stats type"),
2653 }
2654
2655 assert_eq!(
2657 off_idx_i.page_locations[0].offset,
2658 metadata.row_group(0).column(0).data_page_offset()
2659 );
2660
2661 let read_options = ReadOptionsBuilder::new()
2663 .with_page_index()
2664 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2665 .build();
2666 let reader =
2667 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2668 .unwrap();
2669 let metadata = reader.metadata();
2670
2671 assert_eq!(metadata.num_row_groups(), 2);
2673 assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2674 assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2675
2676 assert!(metadata.column_index().is_some());
2678 assert!(metadata.offset_index().is_some());
2679 assert_eq!(metadata.column_index().unwrap().len(), 2);
2680 assert_eq!(metadata.offset_index().unwrap().len(), 2);
2681 let col_idx = metadata.column_index().unwrap();
2682 let off_idx = metadata.offset_index().unwrap();
2683
2684 for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2685 let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2686 let pg_idx = &col_idx_i[0];
2687 let off_idx_i = &off_idx[i][0];
2688
2689 match pg_idx {
2691 ColumnIndexMetaData::INT32(int_idx) => {
2692 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2693 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2694 assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2695 assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2696 }
2697 _ => panic!("wrong stats type"),
2698 }
2699
2700 assert_eq!(
2702 off_idx_i.page_locations[0].offset,
2703 metadata.row_group(i).column(0).data_page_offset()
2704 );
2705 }
2706 }
2707}