1use crate::basic::{PageType, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{Codec, create_codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{CryptoContext, read_and_decrypt};
27use crate::errors::{ParquetError, Result};
28use crate::file::metadata::thrift::PageHeader;
29use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
30use crate::file::statistics;
31use crate::file::{
32 metadata::*,
33 properties::{ReaderProperties, ReaderPropertiesPtr},
34 reader::*,
35};
36#[cfg(feature = "encryption")]
37use crate::parquet_thrift::ThriftSliceInputProtocol;
38use crate::parquet_thrift::{ReadThrift, ThriftReadInputProtocol};
39use crate::record::Row;
40use crate::record::reader::RowIter;
41use crate::schema::types::{SchemaDescPtr, Type as SchemaType};
42use bytes::Bytes;
43use std::collections::VecDeque;
44use std::{fs::File, io::Read, path::Path, sync::Arc};
45
46impl TryFrom<File> for SerializedFileReader<File> {
47 type Error = ParquetError;
48
49 fn try_from(file: File) -> Result<Self> {
50 Self::new(file)
51 }
52}
53
54impl TryFrom<&Path> for SerializedFileReader<File> {
55 type Error = ParquetError;
56
57 fn try_from(path: &Path) -> Result<Self> {
58 let file = File::open(path)?;
59 Self::try_from(file)
60 }
61}
62
63impl TryFrom<String> for SerializedFileReader<File> {
64 type Error = ParquetError;
65
66 fn try_from(path: String) -> Result<Self> {
67 Self::try_from(Path::new(&path))
68 }
69}
70
71impl TryFrom<&str> for SerializedFileReader<File> {
72 type Error = ParquetError;
73
74 fn try_from(path: &str) -> Result<Self> {
75 Self::try_from(Path::new(&path))
76 }
77}
78
79impl IntoIterator for SerializedFileReader<File> {
82 type Item = Result<Row>;
83 type IntoIter = RowIter<'static>;
84
85 fn into_iter(self) -> Self::IntoIter {
86 RowIter::from_file_into(Box::new(self))
87 }
88}
89
90pub struct SerializedFileReader<R: ChunkReader> {
95 chunk_reader: Arc<R>,
96 metadata: Arc<ParquetMetaData>,
97 props: ReaderPropertiesPtr,
98}
99
100pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
104
105#[derive(Default)]
109pub struct ReadOptionsBuilder {
110 predicates: Vec<ReadGroupPredicate>,
111 enable_page_index: bool,
112 props: Option<ReaderProperties>,
113 metadata_options: ParquetMetaDataOptions,
114}
115
116impl ReadOptionsBuilder {
117 pub fn new() -> Self {
119 Self::default()
120 }
121
122 pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
125 self.predicates.push(predicate);
126 self
127 }
128
129 pub fn with_range(mut self, start: i64, end: i64) -> Self {
132 assert!(start < end);
133 let predicate = move |rg: &RowGroupMetaData, _: usize| {
134 let mid = get_midpoint_offset(rg);
135 mid >= start && mid < end
136 };
137 self.predicates.push(Box::new(predicate));
138 self
139 }
140
141 pub fn with_page_index(mut self) -> Self {
146 self.enable_page_index = true;
147 self
148 }
149
150 pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
152 self.props = Some(properties);
153 self
154 }
155
156 pub fn with_parquet_schema(mut self, schema: SchemaDescPtr) -> Self {
159 self.metadata_options.set_schema(schema);
160 self
161 }
162
163 pub fn build(self) -> ReadOptions {
165 let props = self
166 .props
167 .unwrap_or_else(|| ReaderProperties::builder().build());
168 ReadOptions {
169 predicates: self.predicates,
170 enable_page_index: self.enable_page_index,
171 props,
172 metadata_options: self.metadata_options,
173 }
174 }
175}
176
177pub struct ReadOptions {
182 predicates: Vec<ReadGroupPredicate>,
183 enable_page_index: bool,
184 props: ReaderProperties,
185 metadata_options: ParquetMetaDataOptions,
186}
187
188impl<R: 'static + ChunkReader> SerializedFileReader<R> {
189 pub fn new(chunk_reader: R) -> Result<Self> {
192 let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
193 let props = Arc::new(ReaderProperties::builder().build());
194 Ok(Self {
195 chunk_reader: Arc::new(chunk_reader),
196 metadata: Arc::new(metadata),
197 props,
198 })
199 }
200
201 #[allow(deprecated)]
204 pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
205 let mut metadata_builder = ParquetMetaDataReader::new()
206 .with_metadata_options(Some(options.metadata_options.clone()))
207 .parse_and_finish(&chunk_reader)?
208 .into_builder();
209 let mut predicates = options.predicates;
210
211 for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
213 let mut keep = true;
214 for predicate in &mut predicates {
215 if !predicate(&rg_meta, i) {
216 keep = false;
217 break;
218 }
219 }
220 if keep {
221 metadata_builder = metadata_builder.add_row_group(rg_meta);
222 }
223 }
224
225 let mut metadata = metadata_builder.build();
226
227 if options.enable_page_index {
229 let mut reader =
230 ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
231 reader.read_page_indexes(&chunk_reader)?;
232 metadata = reader.finish()?;
233 }
234
235 Ok(Self {
236 chunk_reader: Arc::new(chunk_reader),
237 metadata: Arc::new(metadata),
238 props: Arc::new(options.props),
239 })
240 }
241}
242
243fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
245 let col = meta.column(0);
246 let mut offset = col.data_page_offset();
247 if let Some(dic_offset) = col.dictionary_page_offset() {
248 if offset > dic_offset {
249 offset = dic_offset
250 }
251 };
252 offset + meta.compressed_size() / 2
253}
254
255impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
256 fn metadata(&self) -> &ParquetMetaData {
257 &self.metadata
258 }
259
260 fn num_row_groups(&self) -> usize {
261 self.metadata.num_row_groups()
262 }
263
264 fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
265 let row_group_metadata = self.metadata.row_group(i);
266 let props = Arc::clone(&self.props);
268 let f = Arc::clone(&self.chunk_reader);
269 Ok(Box::new(SerializedRowGroupReader::new(
270 f,
271 row_group_metadata,
272 self.metadata.offset_index().map(|x| x[i].as_slice()),
273 props,
274 )?))
275 }
276
277 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
278 RowIter::from_file(projection, self)
279 }
280}
281
282pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
284 chunk_reader: Arc<R>,
285 metadata: &'a RowGroupMetaData,
286 offset_index: Option<&'a [OffsetIndexMetaData]>,
287 props: ReaderPropertiesPtr,
288 bloom_filters: Vec<Option<Sbbf>>,
289}
290
291impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
292 pub fn new(
294 chunk_reader: Arc<R>,
295 metadata: &'a RowGroupMetaData,
296 offset_index: Option<&'a [OffsetIndexMetaData]>,
297 props: ReaderPropertiesPtr,
298 ) -> Result<Self> {
299 let bloom_filters = if props.read_bloom_filter() {
300 metadata
301 .columns()
302 .iter()
303 .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
304 .collect::<Result<Vec<_>>>()?
305 } else {
306 std::iter::repeat_n(None, metadata.columns().len()).collect()
307 };
308 Ok(Self {
309 chunk_reader,
310 metadata,
311 offset_index,
312 props,
313 bloom_filters,
314 })
315 }
316}
317
318impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
319 fn metadata(&self) -> &RowGroupMetaData {
320 self.metadata
321 }
322
323 fn num_columns(&self) -> usize {
324 self.metadata.num_columns()
325 }
326
327 fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
329 let col = self.metadata.column(i);
330
331 let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
332
333 let props = Arc::clone(&self.props);
334 Ok(Box::new(SerializedPageReader::new_with_properties(
335 Arc::clone(&self.chunk_reader),
336 col,
337 usize::try_from(self.metadata.num_rows())?,
338 page_locations,
339 props,
340 )?))
341 }
342
343 fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
345 self.bloom_filters[i].as_ref()
346 }
347
348 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
349 RowIter::from_row_group(projection, self)
350 }
351}
352
353pub(crate) fn decode_page(
355 page_header: PageHeader,
356 buffer: Bytes,
357 physical_type: Type,
358 decompressor: Option<&mut Box<dyn Codec>>,
359) -> Result<Page> {
360 #[cfg(feature = "crc")]
362 if let Some(expected_crc) = page_header.crc {
363 let crc = crc32fast::hash(&buffer);
364 if crc != expected_crc as u32 {
365 return Err(general_err!("Page CRC checksum mismatch"));
366 }
367 }
368
369 let mut offset: usize = 0;
376 let mut can_decompress = true;
377
378 if let Some(ref header_v2) = page_header.data_page_header_v2 {
379 if header_v2.definition_levels_byte_length < 0
380 || header_v2.repetition_levels_byte_length < 0
381 || header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
382 > page_header.uncompressed_page_size
383 {
384 return Err(general_err!(
385 "DataPage v2 header contains implausible values \
386 for definition_levels_byte_length ({}) \
387 and repetition_levels_byte_length ({}) \
388 given DataPage header provides uncompressed_page_size ({})",
389 header_v2.definition_levels_byte_length,
390 header_v2.repetition_levels_byte_length,
391 page_header.uncompressed_page_size
392 ));
393 }
394 offset = usize::try_from(
395 header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
396 )?;
397 can_decompress = header_v2.is_compressed.unwrap_or(true);
399 }
400
401 let buffer = match decompressor {
402 Some(decompressor) if can_decompress => {
403 let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
404 if offset > buffer.len() || offset > uncompressed_page_size {
405 return Err(general_err!("Invalid page header"));
406 }
407 let decompressed_size = uncompressed_page_size - offset;
408 let mut decompressed = Vec::with_capacity(uncompressed_page_size);
409 decompressed.extend_from_slice(&buffer[..offset]);
410 if decompressed_size > 0 {
413 let compressed = &buffer[offset..];
414 decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
415 }
416
417 if decompressed.len() != uncompressed_page_size {
418 return Err(general_err!(
419 "Actual decompressed size doesn't match the expected one ({} vs {})",
420 decompressed.len(),
421 uncompressed_page_size
422 ));
423 }
424
425 Bytes::from(decompressed)
426 }
427 _ => buffer,
428 };
429
430 let result = match page_header.r#type {
431 PageType::DICTIONARY_PAGE => {
432 let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
433 ParquetError::General("Missing dictionary page header".to_string())
434 })?;
435 let is_sorted = dict_header.is_sorted.unwrap_or(false);
436 Page::DictionaryPage {
437 buf: buffer,
438 num_values: dict_header.num_values.try_into()?,
439 encoding: dict_header.encoding,
440 is_sorted,
441 }
442 }
443 PageType::DATA_PAGE => {
444 let header = page_header
445 .data_page_header
446 .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
447 Page::DataPage {
448 buf: buffer,
449 num_values: header.num_values.try_into()?,
450 encoding: header.encoding,
451 def_level_encoding: header.definition_level_encoding,
452 rep_level_encoding: header.repetition_level_encoding,
453 statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
454 }
455 }
456 PageType::DATA_PAGE_V2 => {
457 let header = page_header
458 .data_page_header_v2
459 .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
460 let is_compressed = header.is_compressed.unwrap_or(true);
461 Page::DataPageV2 {
462 buf: buffer,
463 num_values: header.num_values.try_into()?,
464 encoding: header.encoding,
465 num_nulls: header.num_nulls.try_into()?,
466 num_rows: header.num_rows.try_into()?,
467 def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
468 rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
469 is_compressed,
470 statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
471 }
472 }
473 _ => {
474 return Err(general_err!(
476 "Page type {:?} is not supported",
477 page_header.r#type
478 ));
479 }
480 };
481
482 Ok(result)
483}
484
485enum SerializedPageReaderState {
486 Values {
487 offset: u64,
490
491 remaining_bytes: u64,
494
495 next_page_header: Option<Box<PageHeader>>,
497
498 page_index: usize,
500
501 require_dictionary: bool,
503 },
504 Pages {
505 page_locations: VecDeque<PageLocation>,
507 dictionary_page: Option<PageLocation>,
509 total_rows: usize,
511 page_index: usize,
513 },
514}
515
516#[derive(Default)]
517struct SerializedPageReaderContext {
518 read_stats: bool,
520 #[cfg(feature = "encryption")]
522 crypto_context: Option<Arc<CryptoContext>>,
523}
524
525pub struct SerializedPageReader<R: ChunkReader> {
527 reader: Arc<R>,
529
530 decompressor: Option<Box<dyn Codec>>,
532
533 physical_type: Type,
535
536 state: SerializedPageReaderState,
537
538 context: SerializedPageReaderContext,
539}
540
541impl<R: ChunkReader> SerializedPageReader<R> {
542 pub fn new(
544 reader: Arc<R>,
545 column_chunk_metadata: &ColumnChunkMetaData,
546 total_rows: usize,
547 page_locations: Option<Vec<PageLocation>>,
548 ) -> Result<Self> {
549 let props = Arc::new(ReaderProperties::builder().build());
550 SerializedPageReader::new_with_properties(
551 reader,
552 column_chunk_metadata,
553 total_rows,
554 page_locations,
555 props,
556 )
557 }
558
559 #[cfg(all(feature = "arrow", not(feature = "encryption")))]
561 pub(crate) fn add_crypto_context(
562 self,
563 _rg_idx: usize,
564 _column_idx: usize,
565 _parquet_meta_data: &ParquetMetaData,
566 _column_chunk_metadata: &ColumnChunkMetaData,
567 ) -> Result<SerializedPageReader<R>> {
568 Ok(self)
569 }
570
571 #[cfg(feature = "encryption")]
573 pub(crate) fn add_crypto_context(
574 mut self,
575 rg_idx: usize,
576 column_idx: usize,
577 parquet_meta_data: &ParquetMetaData,
578 column_chunk_metadata: &ColumnChunkMetaData,
579 ) -> Result<SerializedPageReader<R>> {
580 let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
581 return Ok(self);
582 };
583 let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
584 return Ok(self);
585 };
586 let crypto_context =
587 CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
588 self.context.crypto_context = Some(Arc::new(crypto_context));
589 Ok(self)
590 }
591
592 pub fn new_with_properties(
594 reader: Arc<R>,
595 meta: &ColumnChunkMetaData,
596 total_rows: usize,
597 page_locations: Option<Vec<PageLocation>>,
598 props: ReaderPropertiesPtr,
599 ) -> Result<Self> {
600 let decompressor = create_codec(meta.compression(), props.codec_options())?;
601 let (start, len) = meta.byte_range();
602
603 let state = match page_locations {
604 Some(locations) => {
605 let dictionary_page = match locations.first() {
608 Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
609 offset: start as i64,
610 compressed_page_size: (dict_offset.offset as u64 - start) as i32,
611 first_row_index: 0,
612 }),
613 _ => None,
614 };
615
616 SerializedPageReaderState::Pages {
617 page_locations: locations.into(),
618 dictionary_page,
619 total_rows,
620 page_index: 0,
621 }
622 }
623 None => SerializedPageReaderState::Values {
624 offset: start,
625 remaining_bytes: len,
626 next_page_header: None,
627 page_index: 0,
628 require_dictionary: meta.dictionary_page_offset().is_some(),
629 },
630 };
631 let mut context = SerializedPageReaderContext::default();
632 if props.read_page_stats() {
633 context.read_stats = true;
634 }
635 Ok(Self {
636 reader,
637 decompressor,
638 state,
639 physical_type: meta.column_type(),
640 context,
641 })
642 }
643
644 #[cfg(test)]
650 fn peek_next_page_offset(&mut self) -> Result<Option<u64>> {
651 match &mut self.state {
652 SerializedPageReaderState::Values {
653 offset,
654 remaining_bytes,
655 next_page_header,
656 page_index,
657 require_dictionary,
658 } => {
659 loop {
660 if *remaining_bytes == 0 {
661 return Ok(None);
662 }
663 return if let Some(header) = next_page_header.as_ref() {
664 if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
665 Ok(Some(*offset))
666 } else {
667 *next_page_header = None;
669 continue;
670 }
671 } else {
672 let mut read = self.reader.get_read(*offset)?;
673 let (header_len, header) = Self::read_page_header_len(
674 &self.context,
675 &mut read,
676 *page_index,
677 *require_dictionary,
678 )?;
679 *offset += header_len as u64;
680 *remaining_bytes -= header_len as u64;
681 let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
682 Ok(Some(*offset))
683 } else {
684 continue;
686 };
687 *next_page_header = Some(Box::new(header));
688 page_meta
689 };
690 }
691 }
692 SerializedPageReaderState::Pages {
693 page_locations,
694 dictionary_page,
695 ..
696 } => {
697 if let Some(page) = dictionary_page {
698 Ok(Some(page.offset as u64))
699 } else if let Some(page) = page_locations.front() {
700 Ok(Some(page.offset as u64))
701 } else {
702 Ok(None)
703 }
704 }
705 }
706 }
707
708 fn read_page_header_len<T: Read>(
709 context: &SerializedPageReaderContext,
710 input: &mut T,
711 page_index: usize,
712 dictionary_page: bool,
713 ) -> Result<(usize, PageHeader)> {
714 struct TrackedRead<R> {
716 inner: R,
717 bytes_read: usize,
718 }
719
720 impl<R: Read> Read for TrackedRead<R> {
721 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
722 let v = self.inner.read(buf)?;
723 self.bytes_read += v;
724 Ok(v)
725 }
726 }
727
728 let mut tracked = TrackedRead {
729 inner: input,
730 bytes_read: 0,
731 };
732 let header = context.read_page_header(&mut tracked, page_index, dictionary_page)?;
733 Ok((tracked.bytes_read, header))
734 }
735
736 fn read_page_header_len_from_bytes(
737 context: &SerializedPageReaderContext,
738 buffer: &[u8],
739 page_index: usize,
740 dictionary_page: bool,
741 ) -> Result<(usize, PageHeader)> {
742 let mut input = std::io::Cursor::new(buffer);
743 let header = context.read_page_header(&mut input, page_index, dictionary_page)?;
744 let header_len = input.position() as usize;
745 Ok((header_len, header))
746 }
747}
748
749#[cfg(not(feature = "encryption"))]
750impl SerializedPageReaderContext {
751 fn read_page_header<T: Read>(
752 &self,
753 input: &mut T,
754 _page_index: usize,
755 _dictionary_page: bool,
756 ) -> Result<PageHeader> {
757 let mut prot = ThriftReadInputProtocol::new(input);
758 if self.read_stats {
759 Ok(PageHeader::read_thrift(&mut prot)?)
760 } else {
761 Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
762 }
763 }
764
765 fn decrypt_page_data<T>(
766 &self,
767 buffer: T,
768 _page_index: usize,
769 _dictionary_page: bool,
770 ) -> Result<T> {
771 Ok(buffer)
772 }
773}
774
775#[cfg(feature = "encryption")]
776impl SerializedPageReaderContext {
777 fn read_page_header<T: Read>(
778 &self,
779 input: &mut T,
780 page_index: usize,
781 dictionary_page: bool,
782 ) -> Result<PageHeader> {
783 match self.page_crypto_context(page_index, dictionary_page) {
784 None => {
785 let mut prot = ThriftReadInputProtocol::new(input);
786 if self.read_stats {
787 Ok(PageHeader::read_thrift(&mut prot)?)
788 } else {
789 use crate::file::metadata::thrift::PageHeader;
790
791 Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
792 }
793 }
794 Some(page_crypto_context) => {
795 let data_decryptor = page_crypto_context.data_decryptor();
796 let aad = page_crypto_context.create_page_header_aad()?;
797
798 let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
799 ParquetError::General(format!(
800 "Error decrypting page header for column {}, decryption key may be wrong",
801 page_crypto_context.column_ordinal
802 ))
803 })?;
804
805 let mut prot = ThriftSliceInputProtocol::new(buf.as_slice());
806 if self.read_stats {
807 Ok(PageHeader::read_thrift(&mut prot)?)
808 } else {
809 Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
810 }
811 }
812 }
813 }
814
815 fn decrypt_page_data<T>(&self, buffer: T, page_index: usize, dictionary_page: bool) -> Result<T>
816 where
817 T: AsRef<[u8]>,
818 T: From<Vec<u8>>,
819 {
820 let page_crypto_context = self.page_crypto_context(page_index, dictionary_page);
821 if let Some(page_crypto_context) = page_crypto_context {
822 let decryptor = page_crypto_context.data_decryptor();
823 let aad = page_crypto_context.create_page_aad()?;
824 let decrypted = decryptor.decrypt(buffer.as_ref(), &aad)?;
825 Ok(T::from(decrypted))
826 } else {
827 Ok(buffer)
828 }
829 }
830
831 fn page_crypto_context(
832 &self,
833 page_index: usize,
834 dictionary_page: bool,
835 ) -> Option<Arc<CryptoContext>> {
836 self.crypto_context.as_ref().map(|c| {
837 Arc::new(if dictionary_page {
838 c.for_dictionary_page()
839 } else {
840 c.with_page_ordinal(page_index)
841 })
842 })
843 }
844}
845
846impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
847 type Item = Result<Page>;
848
849 fn next(&mut self) -> Option<Self::Item> {
850 self.get_next_page().transpose()
851 }
852}
853
854fn verify_page_header_len(header_len: usize, remaining_bytes: u64) -> Result<()> {
855 if header_len as u64 > remaining_bytes {
856 return Err(eof_err!("Invalid page header"));
857 }
858 Ok(())
859}
860
861fn verify_page_size(
862 compressed_size: i32,
863 uncompressed_size: i32,
864 remaining_bytes: u64,
865) -> Result<()> {
866 if compressed_size < 0 || compressed_size as u64 > remaining_bytes || uncompressed_size < 0 {
870 return Err(eof_err!("Invalid page header"));
871 }
872 Ok(())
873}
874
875impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
876 fn get_next_page(&mut self) -> Result<Option<Page>> {
877 loop {
878 let page = match &mut self.state {
879 SerializedPageReaderState::Values {
880 offset,
881 remaining_bytes: remaining,
882 next_page_header,
883 page_index,
884 require_dictionary,
885 } => {
886 if *remaining == 0 {
887 return Ok(None);
888 }
889
890 let mut read = self.reader.get_read(*offset)?;
891 let header = if let Some(header) = next_page_header.take() {
892 *header
893 } else {
894 let (header_len, header) = Self::read_page_header_len(
895 &self.context,
896 &mut read,
897 *page_index,
898 *require_dictionary,
899 )?;
900 verify_page_header_len(header_len, *remaining)?;
901 *offset += header_len as u64;
902 *remaining -= header_len as u64;
903 header
904 };
905 verify_page_size(
906 header.compressed_page_size,
907 header.uncompressed_page_size,
908 *remaining,
909 )?;
910 let data_len = header.compressed_page_size as usize;
911 let data_start = *offset;
912 *offset += data_len as u64;
913 *remaining -= data_len as u64;
914
915 if header.r#type == PageType::INDEX_PAGE {
916 continue;
917 }
918
919 let buffer = self.reader.get_bytes(data_start, data_len)?;
920
921 let buffer =
922 self.context
923 .decrypt_page_data(buffer, *page_index, *require_dictionary)?;
924
925 let page = decode_page(
926 header,
927 buffer,
928 self.physical_type,
929 self.decompressor.as_mut(),
930 )?;
931 if page.is_data_page() {
932 *page_index += 1;
933 } else if page.is_dictionary_page() {
934 *require_dictionary = false;
935 }
936 page
937 }
938 SerializedPageReaderState::Pages {
939 page_locations,
940 dictionary_page,
941 page_index,
942 ..
943 } => {
944 let (front, is_dictionary_page) = match dictionary_page.take() {
945 Some(front) => (front, true),
946 None => match page_locations.pop_front() {
947 Some(front) => (front, false),
948 None => return Ok(None),
949 },
950 };
951
952 let page_len = usize::try_from(front.compressed_page_size)?;
953 let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
954
955 let (offset, header) = Self::read_page_header_len_from_bytes(
956 &self.context,
957 buffer.as_ref(),
958 *page_index,
959 is_dictionary_page,
960 )?;
961 let bytes = buffer.slice(offset..);
962 let bytes =
963 self.context
964 .decrypt_page_data(bytes, *page_index, is_dictionary_page)?;
965
966 if !is_dictionary_page {
967 *page_index += 1;
968 }
969 decode_page(
970 header,
971 bytes,
972 self.physical_type,
973 self.decompressor.as_mut(),
974 )?
975 }
976 };
977
978 return Ok(Some(page));
979 }
980 }
981
982 fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
983 match &mut self.state {
984 SerializedPageReaderState::Values {
985 offset,
986 remaining_bytes,
987 next_page_header,
988 page_index,
989 require_dictionary,
990 } => {
991 loop {
992 if *remaining_bytes == 0 {
993 return Ok(None);
994 }
995 return if let Some(header) = next_page_header.as_ref() {
996 if let Ok(page_meta) = (&**header).try_into() {
997 Ok(Some(page_meta))
998 } else {
999 *next_page_header = None;
1001 continue;
1002 }
1003 } else {
1004 let mut read = self.reader.get_read(*offset)?;
1005 let (header_len, header) = Self::read_page_header_len(
1006 &self.context,
1007 &mut read,
1008 *page_index,
1009 *require_dictionary,
1010 )?;
1011 verify_page_header_len(header_len, *remaining_bytes)?;
1012 *offset += header_len as u64;
1013 *remaining_bytes -= header_len as u64;
1014 let page_meta = if let Ok(page_meta) = (&header).try_into() {
1015 Ok(Some(page_meta))
1016 } else {
1017 continue;
1019 };
1020 *next_page_header = Some(Box::new(header));
1021 page_meta
1022 };
1023 }
1024 }
1025 SerializedPageReaderState::Pages {
1026 page_locations,
1027 dictionary_page,
1028 total_rows,
1029 page_index: _,
1030 } => {
1031 if dictionary_page.is_some() {
1032 Ok(Some(PageMetadata {
1033 num_rows: None,
1034 num_levels: None,
1035 is_dict: true,
1036 }))
1037 } else if let Some(page) = page_locations.front() {
1038 let next_rows = page_locations
1039 .get(1)
1040 .map(|x| x.first_row_index as usize)
1041 .unwrap_or(*total_rows);
1042
1043 Ok(Some(PageMetadata {
1044 num_rows: Some(next_rows - page.first_row_index as usize),
1045 num_levels: None,
1046 is_dict: false,
1047 }))
1048 } else {
1049 Ok(None)
1050 }
1051 }
1052 }
1053 }
1054
1055 fn skip_next_page(&mut self) -> Result<()> {
1056 match &mut self.state {
1057 SerializedPageReaderState::Values {
1058 offset,
1059 remaining_bytes,
1060 next_page_header,
1061 page_index,
1062 require_dictionary,
1063 } => {
1064 if let Some(buffered_header) = next_page_header.take() {
1065 verify_page_size(
1066 buffered_header.compressed_page_size,
1067 buffered_header.uncompressed_page_size,
1068 *remaining_bytes,
1069 )?;
1070 *offset += buffered_header.compressed_page_size as u64;
1072 *remaining_bytes -= buffered_header.compressed_page_size as u64;
1073 } else {
1074 let mut read = self.reader.get_read(*offset)?;
1075 let (header_len, header) = Self::read_page_header_len(
1076 &self.context,
1077 &mut read,
1078 *page_index,
1079 *require_dictionary,
1080 )?;
1081 verify_page_header_len(header_len, *remaining_bytes)?;
1082 verify_page_size(
1083 header.compressed_page_size,
1084 header.uncompressed_page_size,
1085 *remaining_bytes,
1086 )?;
1087 let data_page_size = header.compressed_page_size as u64;
1088 *offset += header_len as u64 + data_page_size;
1089 *remaining_bytes -= header_len as u64 + data_page_size;
1090 }
1091 if *require_dictionary {
1092 *require_dictionary = false;
1093 } else {
1094 *page_index += 1;
1095 }
1096 Ok(())
1097 }
1098 SerializedPageReaderState::Pages {
1099 page_locations,
1100 dictionary_page,
1101 page_index,
1102 ..
1103 } => {
1104 if dictionary_page.is_some() {
1105 dictionary_page.take();
1107 } else {
1108 if page_locations.pop_front().is_some() {
1110 *page_index += 1;
1111 }
1112 }
1113
1114 Ok(())
1115 }
1116 }
1117 }
1118
1119 fn at_record_boundary(&mut self) -> Result<bool> {
1120 match &mut self.state {
1121 SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
1122 SerializedPageReaderState::Pages { .. } => Ok(true),
1123 }
1124 }
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129 use std::collections::HashSet;
1130
1131 use bytes::Buf;
1132
1133 use crate::file::page_index::column_index::{
1134 ByteArrayColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex,
1135 };
1136 use crate::file::properties::{EnabledStatistics, WriterProperties};
1137
1138 use crate::basic::{self, BoundaryOrder, ColumnOrder, Encoding, SortOrder};
1139 use crate::column::reader::ColumnReader;
1140 use crate::data_type::private::ParquetValueType;
1141 use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1142 use crate::file::metadata::thrift::DataPageHeaderV2;
1143 #[allow(deprecated)]
1144 use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
1145 use crate::file::writer::SerializedFileWriter;
1146 use crate::record::RowAccessor;
1147 use crate::schema::parser::parse_message_type;
1148 use crate::util::test_common::file_util::{get_test_file, get_test_path};
1149
1150 use super::*;
1151
1152 #[test]
1153 fn test_decode_page_invalid_offset() {
1154 let page_header = PageHeader {
1155 r#type: PageType::DATA_PAGE_V2,
1156 uncompressed_page_size: 10,
1157 compressed_page_size: 10,
1158 data_page_header: None,
1159 index_page_header: None,
1160 dictionary_page_header: None,
1161 crc: None,
1162 data_page_header_v2: Some(DataPageHeaderV2 {
1163 num_nulls: 0,
1164 num_rows: 0,
1165 num_values: 0,
1166 encoding: Encoding::PLAIN,
1167 definition_levels_byte_length: 11,
1168 repetition_levels_byte_length: 0,
1169 is_compressed: None,
1170 statistics: None,
1171 }),
1172 };
1173
1174 let buffer = Bytes::new();
1175 let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1176 assert!(
1177 err.to_string()
1178 .contains("DataPage v2 header contains implausible values")
1179 );
1180 }
1181
1182 #[test]
1183 fn test_decode_unsupported_page() {
1184 let mut page_header = PageHeader {
1185 r#type: PageType::INDEX_PAGE,
1186 uncompressed_page_size: 10,
1187 compressed_page_size: 10,
1188 data_page_header: None,
1189 index_page_header: None,
1190 dictionary_page_header: None,
1191 crc: None,
1192 data_page_header_v2: None,
1193 };
1194 let buffer = Bytes::new();
1195 let err = decode_page(page_header.clone(), buffer.clone(), Type::INT32, None).unwrap_err();
1196 assert_eq!(
1197 err.to_string(),
1198 "Parquet error: Page type INDEX_PAGE is not supported"
1199 );
1200
1201 page_header.data_page_header_v2 = Some(DataPageHeaderV2 {
1202 num_nulls: 0,
1203 num_rows: 0,
1204 num_values: 0,
1205 encoding: Encoding::PLAIN,
1206 definition_levels_byte_length: 11,
1207 repetition_levels_byte_length: 0,
1208 is_compressed: None,
1209 statistics: None,
1210 });
1211 let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1212 assert!(
1213 err.to_string()
1214 .contains("DataPage v2 header contains implausible values")
1215 );
1216 }
1217
1218 #[test]
1219 fn test_cursor_and_file_has_the_same_behaviour() {
1220 let mut buf: Vec<u8> = Vec::new();
1221 get_test_file("alltypes_plain.parquet")
1222 .read_to_end(&mut buf)
1223 .unwrap();
1224 let cursor = Bytes::from(buf);
1225 let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1226
1227 let test_file = get_test_file("alltypes_plain.parquet");
1228 let read_from_file = SerializedFileReader::new(test_file).unwrap();
1229
1230 let file_iter = read_from_file.get_row_iter(None).unwrap();
1231 let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1232
1233 for (a, b) in file_iter.zip(cursor_iter) {
1234 assert_eq!(a.unwrap(), b.unwrap())
1235 }
1236 }
1237
1238 #[test]
1239 fn test_file_reader_try_from() {
1240 let test_file = get_test_file("alltypes_plain.parquet");
1242 let test_path_buf = get_test_path("alltypes_plain.parquet");
1243 let test_path = test_path_buf.as_path();
1244 let test_path_str = test_path.to_str().unwrap();
1245
1246 let reader = SerializedFileReader::try_from(test_file);
1247 assert!(reader.is_ok());
1248
1249 let reader = SerializedFileReader::try_from(test_path);
1250 assert!(reader.is_ok());
1251
1252 let reader = SerializedFileReader::try_from(test_path_str);
1253 assert!(reader.is_ok());
1254
1255 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1256 assert!(reader.is_ok());
1257
1258 let test_path = Path::new("invalid.parquet");
1260 let test_path_str = test_path.to_str().unwrap();
1261
1262 let reader = SerializedFileReader::try_from(test_path);
1263 assert!(reader.is_err());
1264
1265 let reader = SerializedFileReader::try_from(test_path_str);
1266 assert!(reader.is_err());
1267
1268 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1269 assert!(reader.is_err());
1270 }
1271
1272 #[test]
1273 fn test_file_reader_into_iter() {
1274 let path = get_test_path("alltypes_plain.parquet");
1275 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1276 let iter = reader.into_iter();
1277 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1278
1279 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1280 }
1281
1282 #[test]
1283 fn test_file_reader_into_iter_project() {
1284 let path = get_test_path("alltypes_plain.parquet");
1285 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1286 let schema = "message schema { OPTIONAL INT32 id; }";
1287 let proj = parse_message_type(schema).ok();
1288 let iter = reader.into_iter().project(proj).unwrap();
1289 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1290
1291 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1292 }
1293
1294 #[test]
1295 fn test_reuse_file_chunk() {
1296 let test_file = get_test_file("alltypes_plain.parquet");
1300 let reader = SerializedFileReader::new(test_file).unwrap();
1301 let row_group = reader.get_row_group(0).unwrap();
1302
1303 let mut page_readers = Vec::new();
1304 for i in 0..row_group.num_columns() {
1305 page_readers.push(row_group.get_column_page_reader(i).unwrap());
1306 }
1307
1308 for mut page_reader in page_readers {
1311 assert!(page_reader.get_next_page().is_ok());
1312 }
1313 }
1314
1315 #[test]
1316 fn test_file_reader() {
1317 let test_file = get_test_file("alltypes_plain.parquet");
1318 let reader_result = SerializedFileReader::new(test_file);
1319 assert!(reader_result.is_ok());
1320 let reader = reader_result.unwrap();
1321
1322 let metadata = reader.metadata();
1324 assert_eq!(metadata.num_row_groups(), 1);
1325
1326 let file_metadata = metadata.file_metadata();
1328 assert!(file_metadata.created_by().is_some());
1329 assert_eq!(
1330 file_metadata.created_by().unwrap(),
1331 "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1332 );
1333 assert!(file_metadata.key_value_metadata().is_none());
1334 assert_eq!(file_metadata.num_rows(), 8);
1335 assert_eq!(file_metadata.version(), 1);
1336 assert_eq!(file_metadata.column_orders(), None);
1337
1338 let row_group_metadata = metadata.row_group(0);
1340 assert_eq!(row_group_metadata.num_columns(), 11);
1341 assert_eq!(row_group_metadata.num_rows(), 8);
1342 assert_eq!(row_group_metadata.total_byte_size(), 671);
1343 for i in 0..row_group_metadata.num_columns() {
1345 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1346 }
1347
1348 let row_group_reader_result = reader.get_row_group(0);
1350 assert!(row_group_reader_result.is_ok());
1351 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1352 assert_eq!(
1353 row_group_reader.num_columns(),
1354 row_group_metadata.num_columns()
1355 );
1356 assert_eq!(
1357 row_group_reader.metadata().total_byte_size(),
1358 row_group_metadata.total_byte_size()
1359 );
1360
1361 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1364 assert!(page_reader_0_result.is_ok());
1365 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1366 let mut page_count = 0;
1367 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1368 let is_expected_page = match page {
1369 Page::DictionaryPage {
1370 buf,
1371 num_values,
1372 encoding,
1373 is_sorted,
1374 } => {
1375 assert_eq!(buf.len(), 32);
1376 assert_eq!(num_values, 8);
1377 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1378 assert!(!is_sorted);
1379 true
1380 }
1381 Page::DataPage {
1382 buf,
1383 num_values,
1384 encoding,
1385 def_level_encoding,
1386 rep_level_encoding,
1387 statistics,
1388 } => {
1389 assert_eq!(buf.len(), 11);
1390 assert_eq!(num_values, 8);
1391 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1392 assert_eq!(def_level_encoding, Encoding::RLE);
1393 #[allow(deprecated)]
1394 let expected_rep_level_encoding = Encoding::BIT_PACKED;
1395 assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1396 assert!(statistics.is_none());
1397 true
1398 }
1399 _ => false,
1400 };
1401 assert!(is_expected_page);
1402 page_count += 1;
1403 }
1404 assert_eq!(page_count, 2);
1405 }
1406
1407 #[test]
1408 fn test_file_reader_datapage_v2() {
1409 let test_file = get_test_file("datapage_v2.snappy.parquet");
1410 let reader_result = SerializedFileReader::new(test_file);
1411 assert!(reader_result.is_ok());
1412 let reader = reader_result.unwrap();
1413
1414 let metadata = reader.metadata();
1416 assert_eq!(metadata.num_row_groups(), 1);
1417
1418 let file_metadata = metadata.file_metadata();
1420 assert!(file_metadata.created_by().is_some());
1421 assert_eq!(
1422 file_metadata.created_by().unwrap(),
1423 "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1424 );
1425 assert!(file_metadata.key_value_metadata().is_some());
1426 assert_eq!(
1427 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1428 1
1429 );
1430
1431 assert_eq!(file_metadata.num_rows(), 5);
1432 assert_eq!(file_metadata.version(), 1);
1433 assert_eq!(file_metadata.column_orders(), None);
1434
1435 let row_group_metadata = metadata.row_group(0);
1436
1437 for i in 0..row_group_metadata.num_columns() {
1439 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1440 }
1441
1442 let row_group_reader_result = reader.get_row_group(0);
1444 assert!(row_group_reader_result.is_ok());
1445 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1446 assert_eq!(
1447 row_group_reader.num_columns(),
1448 row_group_metadata.num_columns()
1449 );
1450 assert_eq!(
1451 row_group_reader.metadata().total_byte_size(),
1452 row_group_metadata.total_byte_size()
1453 );
1454
1455 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1458 assert!(page_reader_0_result.is_ok());
1459 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1460 let mut page_count = 0;
1461 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1462 let is_expected_page = match page {
1463 Page::DictionaryPage {
1464 buf,
1465 num_values,
1466 encoding,
1467 is_sorted,
1468 } => {
1469 assert_eq!(buf.len(), 7);
1470 assert_eq!(num_values, 1);
1471 assert_eq!(encoding, Encoding::PLAIN);
1472 assert!(!is_sorted);
1473 true
1474 }
1475 Page::DataPageV2 {
1476 buf,
1477 num_values,
1478 encoding,
1479 num_nulls,
1480 num_rows,
1481 def_levels_byte_len,
1482 rep_levels_byte_len,
1483 is_compressed,
1484 statistics,
1485 } => {
1486 assert_eq!(buf.len(), 4);
1487 assert_eq!(num_values, 5);
1488 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1489 assert_eq!(num_nulls, 1);
1490 assert_eq!(num_rows, 5);
1491 assert_eq!(def_levels_byte_len, 2);
1492 assert_eq!(rep_levels_byte_len, 0);
1493 assert!(is_compressed);
1494 assert!(statistics.is_none()); true
1496 }
1497 _ => false,
1498 };
1499 assert!(is_expected_page);
1500 page_count += 1;
1501 }
1502 assert_eq!(page_count, 2);
1503 }
1504
1505 #[test]
1506 fn test_file_reader_empty_compressed_datapage_v2() {
1507 let test_file = get_test_file("page_v2_empty_compressed.parquet");
1509 let reader_result = SerializedFileReader::new(test_file);
1510 assert!(reader_result.is_ok());
1511 let reader = reader_result.unwrap();
1512
1513 let metadata = reader.metadata();
1515 assert_eq!(metadata.num_row_groups(), 1);
1516
1517 let file_metadata = metadata.file_metadata();
1519 assert!(file_metadata.created_by().is_some());
1520 assert_eq!(
1521 file_metadata.created_by().unwrap(),
1522 "parquet-cpp-arrow version 14.0.2"
1523 );
1524 assert!(file_metadata.key_value_metadata().is_some());
1525 assert_eq!(
1526 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1527 1
1528 );
1529
1530 assert_eq!(file_metadata.num_rows(), 10);
1531 assert_eq!(file_metadata.version(), 2);
1532 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1533 assert_eq!(
1534 file_metadata.column_orders(),
1535 Some(vec![expected_order].as_ref())
1536 );
1537
1538 let row_group_metadata = metadata.row_group(0);
1539
1540 for i in 0..row_group_metadata.num_columns() {
1542 assert_eq!(file_metadata.column_order(i), expected_order);
1543 }
1544
1545 let row_group_reader_result = reader.get_row_group(0);
1547 assert!(row_group_reader_result.is_ok());
1548 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1549 assert_eq!(
1550 row_group_reader.num_columns(),
1551 row_group_metadata.num_columns()
1552 );
1553 assert_eq!(
1554 row_group_reader.metadata().total_byte_size(),
1555 row_group_metadata.total_byte_size()
1556 );
1557
1558 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1560 assert!(page_reader_0_result.is_ok());
1561 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1562 let mut page_count = 0;
1563 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1564 let is_expected_page = match page {
1565 Page::DictionaryPage {
1566 buf,
1567 num_values,
1568 encoding,
1569 is_sorted,
1570 } => {
1571 assert_eq!(buf.len(), 0);
1572 assert_eq!(num_values, 0);
1573 assert_eq!(encoding, Encoding::PLAIN);
1574 assert!(!is_sorted);
1575 true
1576 }
1577 Page::DataPageV2 {
1578 buf,
1579 num_values,
1580 encoding,
1581 num_nulls,
1582 num_rows,
1583 def_levels_byte_len,
1584 rep_levels_byte_len,
1585 is_compressed,
1586 statistics,
1587 } => {
1588 assert_eq!(buf.len(), 3);
1589 assert_eq!(num_values, 10);
1590 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1591 assert_eq!(num_nulls, 10);
1592 assert_eq!(num_rows, 10);
1593 assert_eq!(def_levels_byte_len, 2);
1594 assert_eq!(rep_levels_byte_len, 0);
1595 assert!(is_compressed);
1596 assert!(statistics.is_none()); true
1598 }
1599 _ => false,
1600 };
1601 assert!(is_expected_page);
1602 page_count += 1;
1603 }
1604 assert_eq!(page_count, 2);
1605 }
1606
1607 #[test]
1608 fn test_file_reader_empty_datapage_v2() {
1609 let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1611 let reader_result = SerializedFileReader::new(test_file);
1612 assert!(reader_result.is_ok());
1613 let reader = reader_result.unwrap();
1614
1615 let metadata = reader.metadata();
1617 assert_eq!(metadata.num_row_groups(), 1);
1618
1619 let file_metadata = metadata.file_metadata();
1621 assert!(file_metadata.created_by().is_some());
1622 assert_eq!(
1623 file_metadata.created_by().unwrap(),
1624 "parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1625 );
1626 assert!(file_metadata.key_value_metadata().is_some());
1627 assert_eq!(
1628 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1629 2
1630 );
1631
1632 assert_eq!(file_metadata.num_rows(), 1);
1633 assert_eq!(file_metadata.version(), 1);
1634 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1635 assert_eq!(
1636 file_metadata.column_orders(),
1637 Some(vec![expected_order].as_ref())
1638 );
1639
1640 let row_group_metadata = metadata.row_group(0);
1641
1642 for i in 0..row_group_metadata.num_columns() {
1644 assert_eq!(file_metadata.column_order(i), expected_order);
1645 }
1646
1647 let row_group_reader_result = reader.get_row_group(0);
1649 assert!(row_group_reader_result.is_ok());
1650 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1651 assert_eq!(
1652 row_group_reader.num_columns(),
1653 row_group_metadata.num_columns()
1654 );
1655 assert_eq!(
1656 row_group_reader.metadata().total_byte_size(),
1657 row_group_metadata.total_byte_size()
1658 );
1659
1660 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1662 assert!(page_reader_0_result.is_ok());
1663 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1664 let mut page_count = 0;
1665 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1666 let is_expected_page = match page {
1667 Page::DataPageV2 {
1668 buf,
1669 num_values,
1670 encoding,
1671 num_nulls,
1672 num_rows,
1673 def_levels_byte_len,
1674 rep_levels_byte_len,
1675 is_compressed,
1676 statistics,
1677 } => {
1678 assert_eq!(buf.len(), 2);
1679 assert_eq!(num_values, 1);
1680 assert_eq!(encoding, Encoding::PLAIN);
1681 assert_eq!(num_nulls, 1);
1682 assert_eq!(num_rows, 1);
1683 assert_eq!(def_levels_byte_len, 2);
1684 assert_eq!(rep_levels_byte_len, 0);
1685 assert!(is_compressed);
1686 assert!(statistics.is_none());
1687 true
1688 }
1689 _ => false,
1690 };
1691 assert!(is_expected_page);
1692 page_count += 1;
1693 }
1694 assert_eq!(page_count, 1);
1695 }
1696
1697 fn get_serialized_page_reader<R: ChunkReader>(
1698 file_reader: &SerializedFileReader<R>,
1699 row_group: usize,
1700 column: usize,
1701 ) -> Result<SerializedPageReader<R>> {
1702 let row_group = {
1703 let row_group_metadata = file_reader.metadata.row_group(row_group);
1704 let props = Arc::clone(&file_reader.props);
1705 let f = Arc::clone(&file_reader.chunk_reader);
1706 SerializedRowGroupReader::new(
1707 f,
1708 row_group_metadata,
1709 file_reader
1710 .metadata
1711 .offset_index()
1712 .map(|x| x[row_group].as_slice()),
1713 props,
1714 )?
1715 };
1716
1717 let col = row_group.metadata.column(column);
1718
1719 let page_locations = row_group
1720 .offset_index
1721 .map(|x| x[column].page_locations.clone());
1722
1723 let props = Arc::clone(&row_group.props);
1724 SerializedPageReader::new_with_properties(
1725 Arc::clone(&row_group.chunk_reader),
1726 col,
1727 usize::try_from(row_group.metadata.num_rows())?,
1728 page_locations,
1729 props,
1730 )
1731 }
1732
1733 #[test]
1734 fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1735 let test_file = get_test_file("alltypes_plain.parquet");
1736 let reader = SerializedFileReader::new(test_file)?;
1737
1738 let mut offset_set = HashSet::new();
1739 let num_row_groups = reader.metadata.num_row_groups();
1740 for row_group in 0..num_row_groups {
1741 let num_columns = reader.metadata.row_group(row_group).num_columns();
1742 for column in 0..num_columns {
1743 let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1744
1745 while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1746 match &page_reader.state {
1747 SerializedPageReaderState::Pages {
1748 page_locations,
1749 dictionary_page,
1750 ..
1751 } => {
1752 if let Some(page) = dictionary_page {
1753 assert_eq!(page.offset as u64, page_offset);
1754 } else if let Some(page) = page_locations.front() {
1755 assert_eq!(page.offset as u64, page_offset);
1756 } else {
1757 unreachable!()
1758 }
1759 }
1760 SerializedPageReaderState::Values {
1761 offset,
1762 next_page_header,
1763 ..
1764 } => {
1765 assert!(next_page_header.is_some());
1766 assert_eq!(*offset, page_offset);
1767 }
1768 }
1769 let page = page_reader.get_next_page()?;
1770 assert!(page.is_some());
1771 let newly_inserted = offset_set.insert(page_offset);
1772 assert!(newly_inserted);
1773 }
1774 }
1775 }
1776
1777 Ok(())
1778 }
1779
1780 #[test]
1781 fn test_page_iterator() {
1782 let file = get_test_file("alltypes_plain.parquet");
1783 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1784
1785 let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1786
1787 let page = page_iterator.next();
1789 assert!(page.is_some());
1790 assert!(page.unwrap().is_ok());
1791
1792 let page = page_iterator.next();
1794 assert!(page.is_none());
1795
1796 let row_group_indices = Box::new(0..1);
1797 let mut page_iterator =
1798 FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1799
1800 let page = page_iterator.next();
1802 assert!(page.is_some());
1803 assert!(page.unwrap().is_ok());
1804
1805 let page = page_iterator.next();
1807 assert!(page.is_none());
1808 }
1809
1810 #[test]
1811 fn test_file_reader_key_value_metadata() {
1812 let file = get_test_file("binary.parquet");
1813 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1814
1815 let metadata = file_reader
1816 .metadata
1817 .file_metadata()
1818 .key_value_metadata()
1819 .unwrap();
1820
1821 assert_eq!(metadata.len(), 3);
1822
1823 assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1824
1825 assert_eq!(metadata[1].key, "writer.model.name");
1826 assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1827
1828 assert_eq!(metadata[2].key, "parquet.proto.class");
1829 assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1830 }
1831
1832 #[test]
1833 fn test_file_reader_optional_metadata() {
1834 let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1836 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1837
1838 let row_group_metadata = file_reader.metadata.row_group(0);
1839 let col0_metadata = row_group_metadata.column(0);
1840
1841 assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1843
1844 let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1846
1847 assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1848 assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1849 assert_eq!(page_encoding_stats.count, 1);
1850
1851 assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1853 assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1854
1855 assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1857 assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1858 }
1859
1860 #[test]
1861 fn test_file_reader_with_no_filter() -> Result<()> {
1862 let test_file = get_test_file("alltypes_plain.parquet");
1863 let origin_reader = SerializedFileReader::new(test_file)?;
1864 let metadata = origin_reader.metadata();
1866 assert_eq!(metadata.num_row_groups(), 1);
1867 Ok(())
1868 }
1869
1870 #[test]
1871 fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1872 let test_file = get_test_file("alltypes_plain.parquet");
1873 let read_options = ReadOptionsBuilder::new()
1874 .with_predicate(Box::new(|_, _| false))
1875 .build();
1876 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1877 let metadata = reader.metadata();
1878 assert_eq!(metadata.num_row_groups(), 0);
1879 Ok(())
1880 }
1881
1882 #[test]
1883 fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1884 let test_file = get_test_file("alltypes_plain.parquet");
1885 let origin_reader = SerializedFileReader::new(test_file)?;
1886 let metadata = origin_reader.metadata();
1888 assert_eq!(metadata.num_row_groups(), 1);
1889 let mid = get_midpoint_offset(metadata.row_group(0));
1890
1891 let test_file = get_test_file("alltypes_plain.parquet");
1892 let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1893 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1894 let metadata = reader.metadata();
1895 assert_eq!(metadata.num_row_groups(), 1);
1896
1897 let test_file = get_test_file("alltypes_plain.parquet");
1898 let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1899 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1900 let metadata = reader.metadata();
1901 assert_eq!(metadata.num_row_groups(), 0);
1902 Ok(())
1903 }
1904
1905 #[test]
1906 fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1907 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1908 let origin_reader = SerializedFileReader::new(test_file)?;
1909 let metadata = origin_reader.metadata();
1910 let mid = get_midpoint_offset(metadata.row_group(0));
1911
1912 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1914 let read_options = ReadOptionsBuilder::new()
1915 .with_page_index()
1916 .with_predicate(Box::new(|_, _| true))
1917 .with_range(mid, mid + 1)
1918 .build();
1919 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1920 let metadata = reader.metadata();
1921 assert_eq!(metadata.num_row_groups(), 1);
1922 assert_eq!(metadata.column_index().unwrap().len(), 1);
1923 assert_eq!(metadata.offset_index().unwrap().len(), 1);
1924
1925 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1927 let read_options = ReadOptionsBuilder::new()
1928 .with_page_index()
1929 .with_predicate(Box::new(|_, _| true))
1930 .with_range(0, mid)
1931 .build();
1932 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1933 let metadata = reader.metadata();
1934 assert_eq!(metadata.num_row_groups(), 0);
1935 assert!(metadata.column_index().is_none());
1936 assert!(metadata.offset_index().is_none());
1937
1938 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1940 let read_options = ReadOptionsBuilder::new()
1941 .with_page_index()
1942 .with_predicate(Box::new(|_, _| false))
1943 .with_range(mid, mid + 1)
1944 .build();
1945 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1946 let metadata = reader.metadata();
1947 assert_eq!(metadata.num_row_groups(), 0);
1948 assert!(metadata.column_index().is_none());
1949 assert!(metadata.offset_index().is_none());
1950
1951 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1953 let read_options = ReadOptionsBuilder::new()
1954 .with_page_index()
1955 .with_predicate(Box::new(|_, _| false))
1956 .with_range(0, mid)
1957 .build();
1958 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1959 let metadata = reader.metadata();
1960 assert_eq!(metadata.num_row_groups(), 0);
1961 assert!(metadata.column_index().is_none());
1962 assert!(metadata.offset_index().is_none());
1963 Ok(())
1964 }
1965
1966 #[test]
1967 fn test_file_reader_invalid_metadata() {
1968 let data = [
1969 255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
1970 80, 65, 82, 49,
1971 ];
1972 let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
1973 assert_eq!(
1974 ret.err().unwrap().to_string(),
1975 "Parquet error: Received empty union from remote ColumnOrder"
1976 );
1977 }
1978
1979 #[test]
1980 fn test_page_index_reader() {
1997 let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
1998 let builder = ReadOptionsBuilder::new();
1999 let options = builder.with_page_index().build();
2001 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2002 let reader = reader_result.unwrap();
2003
2004 let metadata = reader.metadata();
2006 assert_eq!(metadata.num_row_groups(), 1);
2007
2008 let column_index = metadata.column_index().unwrap();
2009
2010 assert_eq!(column_index.len(), 1);
2012 let index = if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][0] {
2013 index
2014 } else {
2015 unreachable!()
2016 };
2017
2018 assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
2019
2020 assert_eq!(index.num_pages(), 1);
2022
2023 let min = index.min_value(0).unwrap();
2024 let max = index.max_value(0).unwrap();
2025 assert_eq!(b"Hello", min.as_bytes());
2026 assert_eq!(b"today", max.as_bytes());
2027
2028 let offset_indexes = metadata.offset_index().unwrap();
2029 assert_eq!(offset_indexes.len(), 1);
2031 let offset_index = &offset_indexes[0];
2032 let page_offset = &offset_index[0].page_locations()[0];
2033
2034 assert_eq!(4, page_offset.offset);
2035 assert_eq!(152, page_offset.compressed_page_size);
2036 assert_eq!(0, page_offset.first_row_index);
2037 }
2038
2039 #[test]
2040 #[allow(deprecated)]
2041 fn test_page_index_reader_out_of_order() {
2042 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2043 let options = ReadOptionsBuilder::new().with_page_index().build();
2044 let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
2045 let metadata = reader.metadata();
2046
2047 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2048 let columns = metadata.row_group(0).columns();
2049 let reversed: Vec<_> = columns.iter().cloned().rev().collect();
2050
2051 let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
2052 let mut b = read_columns_indexes(&test_file, &reversed)
2053 .unwrap()
2054 .unwrap();
2055 b.reverse();
2056 assert_eq!(a, b);
2057
2058 let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
2059 let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
2060 b.reverse();
2061 assert_eq!(a, b);
2062 }
2063
2064 #[test]
2065 fn test_page_index_reader_all_type() {
2066 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2067 let builder = ReadOptionsBuilder::new();
2068 let options = builder.with_page_index().build();
2070 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2071 let reader = reader_result.unwrap();
2072
2073 let metadata = reader.metadata();
2075 assert_eq!(metadata.num_row_groups(), 1);
2076
2077 let column_index = metadata.column_index().unwrap();
2078 let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
2079
2080 assert_eq!(column_index.len(), 1);
2082 let row_group_metadata = metadata.row_group(0);
2083
2084 assert!(!&column_index[0][0].is_sorted());
2086 let boundary_order = &column_index[0][0].get_boundary_order();
2087 assert!(boundary_order.is_some());
2088 matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
2089 if let ColumnIndexMetaData::INT32(index) = &column_index[0][0] {
2090 check_native_page_index(
2091 index,
2092 325,
2093 get_row_group_min_max_bytes(row_group_metadata, 0),
2094 BoundaryOrder::UNORDERED,
2095 );
2096 assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
2097 } else {
2098 unreachable!()
2099 };
2100 assert!(&column_index[0][1].is_sorted());
2102 if let ColumnIndexMetaData::BOOLEAN(index) = &column_index[0][1] {
2103 assert_eq!(index.num_pages(), 82);
2104 assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
2105 } else {
2106 unreachable!()
2107 };
2108 assert!(&column_index[0][2].is_sorted());
2110 if let ColumnIndexMetaData::INT32(index) = &column_index[0][2] {
2111 check_native_page_index(
2112 index,
2113 325,
2114 get_row_group_min_max_bytes(row_group_metadata, 2),
2115 BoundaryOrder::ASCENDING,
2116 );
2117 assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
2118 } else {
2119 unreachable!()
2120 };
2121 assert!(&column_index[0][3].is_sorted());
2123 if let ColumnIndexMetaData::INT32(index) = &column_index[0][3] {
2124 check_native_page_index(
2125 index,
2126 325,
2127 get_row_group_min_max_bytes(row_group_metadata, 3),
2128 BoundaryOrder::ASCENDING,
2129 );
2130 assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
2131 } else {
2132 unreachable!()
2133 };
2134 assert!(&column_index[0][4].is_sorted());
2136 if let ColumnIndexMetaData::INT32(index) = &column_index[0][4] {
2137 check_native_page_index(
2138 index,
2139 325,
2140 get_row_group_min_max_bytes(row_group_metadata, 4),
2141 BoundaryOrder::ASCENDING,
2142 );
2143 assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
2144 } else {
2145 unreachable!()
2146 };
2147 assert!(!&column_index[0][5].is_sorted());
2149 if let ColumnIndexMetaData::INT64(index) = &column_index[0][5] {
2150 check_native_page_index(
2151 index,
2152 528,
2153 get_row_group_min_max_bytes(row_group_metadata, 5),
2154 BoundaryOrder::UNORDERED,
2155 );
2156 assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
2157 } else {
2158 unreachable!()
2159 };
2160 assert!(&column_index[0][6].is_sorted());
2162 if let ColumnIndexMetaData::FLOAT(index) = &column_index[0][6] {
2163 check_native_page_index(
2164 index,
2165 325,
2166 get_row_group_min_max_bytes(row_group_metadata, 6),
2167 BoundaryOrder::ASCENDING,
2168 );
2169 assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
2170 } else {
2171 unreachable!()
2172 };
2173 assert!(!&column_index[0][7].is_sorted());
2175 if let ColumnIndexMetaData::DOUBLE(index) = &column_index[0][7] {
2176 check_native_page_index(
2177 index,
2178 528,
2179 get_row_group_min_max_bytes(row_group_metadata, 7),
2180 BoundaryOrder::UNORDERED,
2181 );
2182 assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
2183 } else {
2184 unreachable!()
2185 };
2186 assert!(!&column_index[0][8].is_sorted());
2188 if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][8] {
2189 check_byte_array_page_index(
2190 index,
2191 974,
2192 get_row_group_min_max_bytes(row_group_metadata, 8),
2193 BoundaryOrder::UNORDERED,
2194 );
2195 assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
2196 } else {
2197 unreachable!()
2198 };
2199 assert!(&column_index[0][9].is_sorted());
2201 if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][9] {
2202 check_byte_array_page_index(
2203 index,
2204 352,
2205 get_row_group_min_max_bytes(row_group_metadata, 9),
2206 BoundaryOrder::ASCENDING,
2207 );
2208 assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
2209 } else {
2210 unreachable!()
2211 };
2212 assert!(!&column_index[0][10].is_sorted());
2215 if let ColumnIndexMetaData::NONE = &column_index[0][10] {
2216 assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
2217 } else {
2218 unreachable!()
2219 };
2220 assert!(&column_index[0][11].is_sorted());
2222 if let ColumnIndexMetaData::INT32(index) = &column_index[0][11] {
2223 check_native_page_index(
2224 index,
2225 325,
2226 get_row_group_min_max_bytes(row_group_metadata, 11),
2227 BoundaryOrder::ASCENDING,
2228 );
2229 assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
2230 } else {
2231 unreachable!()
2232 };
2233 assert!(!&column_index[0][12].is_sorted());
2235 if let ColumnIndexMetaData::INT32(index) = &column_index[0][12] {
2236 check_native_page_index(
2237 index,
2238 325,
2239 get_row_group_min_max_bytes(row_group_metadata, 12),
2240 BoundaryOrder::UNORDERED,
2241 );
2242 assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
2243 } else {
2244 unreachable!()
2245 };
2246 }
2247
2248 fn check_native_page_index<T: ParquetValueType>(
2249 row_group_index: &PrimitiveColumnIndex<T>,
2250 page_size: usize,
2251 min_max: (&[u8], &[u8]),
2252 boundary_order: BoundaryOrder,
2253 ) {
2254 assert_eq!(row_group_index.num_pages() as usize, page_size);
2255 assert_eq!(row_group_index.boundary_order, boundary_order);
2256 assert!(row_group_index.min_values().iter().all(|x| {
2257 x >= &T::try_from_le_slice(min_max.0).unwrap()
2258 && x <= &T::try_from_le_slice(min_max.1).unwrap()
2259 }));
2260 }
2261
2262 fn check_byte_array_page_index(
2263 row_group_index: &ByteArrayColumnIndex,
2264 page_size: usize,
2265 min_max: (&[u8], &[u8]),
2266 boundary_order: BoundaryOrder,
2267 ) {
2268 assert_eq!(row_group_index.num_pages() as usize, page_size);
2269 assert_eq!(row_group_index.boundary_order, boundary_order);
2270 for i in 0..row_group_index.num_pages() as usize {
2271 let x = row_group_index.min_value(i).unwrap();
2272 assert!(x >= min_max.0 && x <= min_max.1);
2273 }
2274 }
2275
2276 fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
2277 let statistics = r.column(col_num).statistics().unwrap();
2278 (
2279 statistics.min_bytes_opt().unwrap_or_default(),
2280 statistics.max_bytes_opt().unwrap_or_default(),
2281 )
2282 }
2283
2284 #[test]
2285 fn test_skip_next_page_with_dictionary_page() {
2286 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2287 let builder = ReadOptionsBuilder::new();
2288 let options = builder.with_page_index().build();
2290 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2291 let reader = reader_result.unwrap();
2292
2293 let row_group_reader = reader.get_row_group(0).unwrap();
2294
2295 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2297
2298 let mut vec = vec![];
2299
2300 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2302 assert!(meta.is_dict);
2303
2304 column_page_reader.skip_next_page().unwrap();
2306
2307 let page = column_page_reader.get_next_page().unwrap().unwrap();
2309 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2310
2311 for _i in 0..351 {
2313 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2315 assert!(!meta.is_dict); vec.push(meta);
2317
2318 let page = column_page_reader.get_next_page().unwrap().unwrap();
2319 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2320 }
2321
2322 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2324 assert!(column_page_reader.get_next_page().unwrap().is_none());
2325
2326 assert_eq!(vec.len(), 351);
2328 }
2329
2330 #[test]
2331 fn test_skip_page_with_offset_index() {
2332 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2333 let builder = ReadOptionsBuilder::new();
2334 let options = builder.with_page_index().build();
2336 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2337 let reader = reader_result.unwrap();
2338
2339 let row_group_reader = reader.get_row_group(0).unwrap();
2340
2341 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2343
2344 let mut vec = vec![];
2345
2346 for i in 0..325 {
2347 if i % 2 == 0 {
2348 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2349 } else {
2350 column_page_reader.skip_next_page().unwrap();
2351 }
2352 }
2353 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2355 assert!(column_page_reader.get_next_page().unwrap().is_none());
2356
2357 assert_eq!(vec.len(), 163);
2358 }
2359
2360 #[test]
2361 fn test_skip_page_without_offset_index() {
2362 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2363
2364 let reader_result = SerializedFileReader::new(test_file);
2366 let reader = reader_result.unwrap();
2367
2368 let row_group_reader = reader.get_row_group(0).unwrap();
2369
2370 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2372
2373 let mut vec = vec![];
2374
2375 for i in 0..325 {
2376 if i % 2 == 0 {
2377 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2378 } else {
2379 column_page_reader.peek_next_page().unwrap().unwrap();
2380 column_page_reader.skip_next_page().unwrap();
2381 }
2382 }
2383 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2385 assert!(column_page_reader.get_next_page().unwrap().is_none());
2386
2387 assert_eq!(vec.len(), 163);
2388 }
2389
2390 #[test]
2391 fn test_peek_page_with_dictionary_page() {
2392 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2393 let builder = ReadOptionsBuilder::new();
2394 let options = builder.with_page_index().build();
2396 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2397 let reader = reader_result.unwrap();
2398 let row_group_reader = reader.get_row_group(0).unwrap();
2399
2400 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2402
2403 let mut vec = vec![];
2404
2405 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2406 assert!(meta.is_dict);
2407 let page = column_page_reader.get_next_page().unwrap().unwrap();
2408 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2409
2410 for i in 0..352 {
2411 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2412 if i != 351 {
2415 assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
2416 } else {
2417 assert_eq!(meta.num_rows, Some(10));
2420 }
2421 assert!(!meta.is_dict);
2422 vec.push(meta);
2423 let page = column_page_reader.get_next_page().unwrap().unwrap();
2424 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2425 }
2426
2427 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2429 assert!(column_page_reader.get_next_page().unwrap().is_none());
2430
2431 assert_eq!(vec.len(), 352);
2432 }
2433
2434 #[test]
2435 fn test_peek_page_with_dictionary_page_without_offset_index() {
2436 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2437
2438 let reader_result = SerializedFileReader::new(test_file);
2439 let reader = reader_result.unwrap();
2440 let row_group_reader = reader.get_row_group(0).unwrap();
2441
2442 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2444
2445 let mut vec = vec![];
2446
2447 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2448 assert!(meta.is_dict);
2449 let page = column_page_reader.get_next_page().unwrap().unwrap();
2450 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2451
2452 for i in 0..352 {
2453 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2454 if i != 351 {
2457 assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2458 } else {
2459 assert_eq!(meta.num_levels, Some(10));
2462 }
2463 assert!(!meta.is_dict);
2464 vec.push(meta);
2465 let page = column_page_reader.get_next_page().unwrap().unwrap();
2466 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2467 }
2468
2469 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2471 assert!(column_page_reader.get_next_page().unwrap().is_none());
2472
2473 assert_eq!(vec.len(), 352);
2474 }
2475
2476 #[test]
2477 fn test_fixed_length_index() {
2478 let message_type = "
2479 message test_schema {
2480 OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2481 }
2482 ";
2483
2484 let schema = parse_message_type(message_type).unwrap();
2485 let mut out = Vec::with_capacity(1024);
2486 let mut writer =
2487 SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2488
2489 let mut r = writer.next_row_group().unwrap();
2490 let mut c = r.next_column().unwrap().unwrap();
2491 c.typed::<FixedLenByteArrayType>()
2492 .write_batch(
2493 &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2494 Some(&[1, 1, 0, 1]),
2495 None,
2496 )
2497 .unwrap();
2498 c.close().unwrap();
2499 r.close().unwrap();
2500 writer.close().unwrap();
2501
2502 let b = Bytes::from(out);
2503 let options = ReadOptionsBuilder::new().with_page_index().build();
2504 let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2505 let index = reader.metadata().column_index().unwrap();
2506
2507 assert_eq!(index.len(), 1);
2509 let c = &index[0];
2510 assert_eq!(c.len(), 1);
2512
2513 match &c[0] {
2514 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(v) => {
2515 assert_eq!(v.num_pages(), 1);
2516 assert_eq!(v.null_count(0).unwrap(), 1);
2517 assert_eq!(v.min_value(0).unwrap(), &[0; 11]);
2518 assert_eq!(v.max_value(0).unwrap(), &[5; 11]);
2519 }
2520 _ => unreachable!(),
2521 }
2522 }
2523
2524 #[test]
2525 fn test_multi_gz() {
2526 let file = get_test_file("concatenated_gzip_members.parquet");
2527 let reader = SerializedFileReader::new(file).unwrap();
2528 let row_group_reader = reader.get_row_group(0).unwrap();
2529 match row_group_reader.get_column_reader(0).unwrap() {
2530 ColumnReader::Int64ColumnReader(mut reader) => {
2531 let mut buffer = Vec::with_capacity(1024);
2532 let mut def_levels = Vec::with_capacity(1024);
2533 let (num_records, num_values, num_levels) = reader
2534 .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2535 .unwrap();
2536
2537 assert_eq!(num_records, 513);
2538 assert_eq!(num_values, 513);
2539 assert_eq!(num_levels, 513);
2540
2541 let expected: Vec<i64> = (1..514).collect();
2542 assert_eq!(&buffer, &expected);
2543 }
2544 _ => unreachable!(),
2545 }
2546 }
2547
2548 #[test]
2549 fn test_byte_stream_split_extended() {
2550 let path = format!(
2551 "{}/byte_stream_split_extended.gzip.parquet",
2552 arrow::util::test_util::parquet_test_data(),
2553 );
2554 let file = File::open(path).unwrap();
2555 let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2556
2557 let mut iter = reader
2559 .get_row_iter(None)
2560 .expect("Failed to create row iterator");
2561
2562 let mut start = 0;
2563 let end = reader.metadata().file_metadata().num_rows();
2564
2565 let check_row = |row: Result<Row, ParquetError>| {
2566 assert!(row.is_ok());
2567 let r = row.unwrap();
2568 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2569 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2570 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2571 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2572 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2573 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2574 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2575 };
2576
2577 while start < end {
2578 match iter.next() {
2579 Some(row) => check_row(row),
2580 None => break,
2581 };
2582 start += 1;
2583 }
2584 }
2585
2586 #[test]
2587 fn test_filtered_rowgroup_metadata() {
2588 let message_type = "
2589 message test_schema {
2590 REQUIRED INT32 a;
2591 }
2592 ";
2593 let schema = Arc::new(parse_message_type(message_type).unwrap());
2594 let props = Arc::new(
2595 WriterProperties::builder()
2596 .set_statistics_enabled(EnabledStatistics::Page)
2597 .build(),
2598 );
2599 let mut file: File = tempfile::tempfile().unwrap();
2600 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2601 let data = [1, 2, 3, 4, 5];
2602
2603 for idx in 0..5 {
2605 let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2606 let mut row_group_writer = file_writer.next_row_group().unwrap();
2607 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2608 writer
2609 .typed::<Int32Type>()
2610 .write_batch(data_i.as_slice(), None, None)
2611 .unwrap();
2612 writer.close().unwrap();
2613 }
2614 row_group_writer.close().unwrap();
2615 file_writer.flushed_row_groups();
2616 }
2617 let file_metadata = file_writer.close().unwrap();
2618
2619 assert_eq!(file_metadata.file_metadata().num_rows(), 25);
2620 assert_eq!(file_metadata.num_row_groups(), 5);
2621
2622 let read_options = ReadOptionsBuilder::new()
2624 .with_page_index()
2625 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2626 .build();
2627 let reader =
2628 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2629 .unwrap();
2630 let metadata = reader.metadata();
2631
2632 assert_eq!(metadata.num_row_groups(), 1);
2634 assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2635
2636 assert!(metadata.column_index().is_some());
2638 assert!(metadata.offset_index().is_some());
2639 assert_eq!(metadata.column_index().unwrap().len(), 1);
2640 assert_eq!(metadata.offset_index().unwrap().len(), 1);
2641 let col_idx = metadata.column_index().unwrap();
2642 let off_idx = metadata.offset_index().unwrap();
2643 let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2644 let pg_idx = &col_idx[0][0];
2645 let off_idx_i = &off_idx[0][0];
2646
2647 match pg_idx {
2649 ColumnIndexMetaData::INT32(int_idx) => {
2650 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2651 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2652 assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2653 assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2654 }
2655 _ => panic!("wrong stats type"),
2656 }
2657
2658 assert_eq!(
2660 off_idx_i.page_locations[0].offset,
2661 metadata.row_group(0).column(0).data_page_offset()
2662 );
2663
2664 let read_options = ReadOptionsBuilder::new()
2666 .with_page_index()
2667 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2668 .build();
2669 let reader =
2670 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2671 .unwrap();
2672 let metadata = reader.metadata();
2673
2674 assert_eq!(metadata.num_row_groups(), 2);
2676 assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2677 assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2678
2679 assert!(metadata.column_index().is_some());
2681 assert!(metadata.offset_index().is_some());
2682 assert_eq!(metadata.column_index().unwrap().len(), 2);
2683 assert_eq!(metadata.offset_index().unwrap().len(), 2);
2684 let col_idx = metadata.column_index().unwrap();
2685 let off_idx = metadata.offset_index().unwrap();
2686
2687 for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2688 let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2689 let pg_idx = &col_idx_i[0];
2690 let off_idx_i = &off_idx[i][0];
2691
2692 match pg_idx {
2694 ColumnIndexMetaData::INT32(int_idx) => {
2695 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2696 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2697 assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2698 assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2699 }
2700 _ => panic!("wrong stats type"),
2701 }
2702
2703 assert_eq!(
2705 off_idx_i.page_locations[0].offset,
2706 metadata.row_group(i).column(0).data_page_offset()
2707 );
2708 }
2709 }
2710
2711 #[test]
2712 fn test_reuse_schema() {
2713 let file = get_test_file("alltypes_plain.parquet");
2714 let file_reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
2715 let schema = file_reader.metadata().file_metadata().schema_descr_ptr();
2716 let expected = file_reader.metadata;
2717
2718 let options = ReadOptionsBuilder::new()
2719 .with_parquet_schema(schema)
2720 .build();
2721 let file_reader = SerializedFileReader::new_with_options(file, options).unwrap();
2722
2723 assert_eq!(expected.as_ref(), file_reader.metadata.as_ref());
2724 assert!(Arc::ptr_eq(
2726 &expected.file_metadata().schema_descr_ptr(),
2727 &file_reader.metadata.file_metadata().schema_descr_ptr()
2728 ));
2729 }
2730
2731 #[test]
2732 fn test_read_unknown_logical_type() {
2733 let file = get_test_file("unknown-logical-type.parquet");
2734 let reader = SerializedFileReader::new(file).expect("Error opening file");
2735
2736 let schema = reader.metadata().file_metadata().schema_descr();
2737 assert_eq!(
2738 schema.column(0).logical_type_ref(),
2739 Some(&basic::LogicalType::String)
2740 );
2741 assert_eq!(
2742 schema.column(1).logical_type_ref(),
2743 Some(&basic::LogicalType::_Unknown { field_id: 2555 })
2744 );
2745 assert_eq!(schema.column(1).physical_type(), Type::BYTE_ARRAY);
2746
2747 let mut iter = reader
2748 .get_row_iter(None)
2749 .expect("Failed to create row iterator");
2750
2751 let mut num_rows = 0;
2752 while iter.next().is_some() {
2753 num_rows += 1;
2754 }
2755 assert_eq!(num_rows, reader.metadata().file_metadata().num_rows());
2756 }
2757}