1use crate::basic::{PageType, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{Codec, create_codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{CryptoContext, read_and_decrypt};
27use crate::errors::{ParquetError, Result};
28use crate::file::metadata::thrift::PageHeader;
29use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
30use crate::file::statistics;
31use crate::file::{
32 metadata::*,
33 properties::{ReaderProperties, ReaderPropertiesPtr},
34 reader::*,
35};
36#[cfg(feature = "encryption")]
37use crate::parquet_thrift::ThriftSliceInputProtocol;
38use crate::parquet_thrift::{ReadThrift, ThriftReadInputProtocol};
39use crate::record::Row;
40use crate::record::reader::RowIter;
41use crate::schema::types::{SchemaDescPtr, Type as SchemaType};
42use bytes::Bytes;
43use std::collections::VecDeque;
44use std::{fs::File, io::Read, path::Path, sync::Arc};
45
46impl TryFrom<File> for SerializedFileReader<File> {
47 type Error = ParquetError;
48
49 fn try_from(file: File) -> Result<Self> {
50 Self::new(file)
51 }
52}
53
54impl TryFrom<&Path> for SerializedFileReader<File> {
55 type Error = ParquetError;
56
57 fn try_from(path: &Path) -> Result<Self> {
58 let file = File::open(path)?;
59 Self::try_from(file)
60 }
61}
62
63impl TryFrom<String> for SerializedFileReader<File> {
64 type Error = ParquetError;
65
66 fn try_from(path: String) -> Result<Self> {
67 Self::try_from(Path::new(&path))
68 }
69}
70
71impl TryFrom<&str> for SerializedFileReader<File> {
72 type Error = ParquetError;
73
74 fn try_from(path: &str) -> Result<Self> {
75 Self::try_from(Path::new(&path))
76 }
77}
78
79impl IntoIterator for SerializedFileReader<File> {
82 type Item = Result<Row>;
83 type IntoIter = RowIter<'static>;
84
85 fn into_iter(self) -> Self::IntoIter {
86 RowIter::from_file_into(Box::new(self))
87 }
88}
89
90pub struct SerializedFileReader<R: ChunkReader> {
95 chunk_reader: Arc<R>,
96 metadata: Arc<ParquetMetaData>,
97 props: ReaderPropertiesPtr,
98}
99
100pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
104
105#[derive(Default)]
109pub struct ReadOptionsBuilder {
110 predicates: Vec<ReadGroupPredicate>,
111 enable_page_index: bool,
112 props: Option<ReaderProperties>,
113 metadata_options: ParquetMetaDataOptions,
114}
115
116impl ReadOptionsBuilder {
117 pub fn new() -> Self {
119 Self::default()
120 }
121
122 pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
125 self.predicates.push(predicate);
126 self
127 }
128
129 pub fn with_range(mut self, start: i64, end: i64) -> Self {
132 assert!(start < end);
133 let predicate = move |rg: &RowGroupMetaData, _: usize| {
134 let mid = get_midpoint_offset(rg);
135 mid >= start && mid < end
136 };
137 self.predicates.push(Box::new(predicate));
138 self
139 }
140
141 pub fn with_page_index(mut self) -> Self {
146 self.enable_page_index = true;
147 self
148 }
149
150 pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
152 self.props = Some(properties);
153 self
154 }
155
156 pub fn with_parquet_schema(mut self, schema: SchemaDescPtr) -> Self {
159 self.metadata_options.set_schema(schema);
160 self
161 }
162
163 pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
172 self.metadata_options.set_encoding_stats_as_mask(val);
173 self
174 }
175
176 pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
181 self.metadata_options.set_encoding_stats_policy(policy);
182 self
183 }
184
185 pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
190 self.metadata_options.set_column_stats_policy(policy);
191 self
192 }
193
194 pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
199 self.metadata_options.set_size_stats_policy(policy);
200 self
201 }
202
203 pub fn build(self) -> ReadOptions {
205 let props = self
206 .props
207 .unwrap_or_else(|| ReaderProperties::builder().build());
208 ReadOptions {
209 predicates: self.predicates,
210 enable_page_index: self.enable_page_index,
211 props,
212 metadata_options: self.metadata_options,
213 }
214 }
215}
216
217pub struct ReadOptions {
222 predicates: Vec<ReadGroupPredicate>,
223 enable_page_index: bool,
224 props: ReaderProperties,
225 metadata_options: ParquetMetaDataOptions,
226}
227
228impl<R: 'static + ChunkReader> SerializedFileReader<R> {
229 pub fn new(chunk_reader: R) -> Result<Self> {
232 let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
233 let props = Arc::new(ReaderProperties::builder().build());
234 Ok(Self {
235 chunk_reader: Arc::new(chunk_reader),
236 metadata: Arc::new(metadata),
237 props,
238 })
239 }
240
241 pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
244 let mut metadata_builder = ParquetMetaDataReader::new()
245 .with_metadata_options(Some(options.metadata_options.clone()))
246 .parse_and_finish(&chunk_reader)?
247 .into_builder();
248 let mut predicates = options.predicates;
249
250 for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
252 let mut keep = true;
253 for predicate in &mut predicates {
254 if !predicate(&rg_meta, i) {
255 keep = false;
256 break;
257 }
258 }
259 if keep {
260 metadata_builder = metadata_builder.add_row_group(rg_meta);
261 }
262 }
263
264 let mut metadata = metadata_builder.build();
265
266 if options.enable_page_index {
268 let mut reader = ParquetMetaDataReader::new_with_metadata(metadata)
269 .with_page_index_policy(PageIndexPolicy::Required);
270 reader.read_page_indexes(&chunk_reader)?;
271 metadata = reader.finish()?;
272 }
273
274 Ok(Self {
275 chunk_reader: Arc::new(chunk_reader),
276 metadata: Arc::new(metadata),
277 props: Arc::new(options.props),
278 })
279 }
280}
281
282fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
284 let col = meta.column(0);
285 let mut offset = col.data_page_offset();
286 if let Some(dic_offset) = col.dictionary_page_offset() {
287 if offset > dic_offset {
288 offset = dic_offset
289 }
290 };
291 offset + meta.compressed_size() / 2
292}
293
294impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
295 fn metadata(&self) -> &ParquetMetaData {
296 &self.metadata
297 }
298
299 fn num_row_groups(&self) -> usize {
300 self.metadata.num_row_groups()
301 }
302
303 fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
304 let row_group_metadata = self.metadata.row_group(i);
305 let props = Arc::clone(&self.props);
307 let f = Arc::clone(&self.chunk_reader);
308 Ok(Box::new(SerializedRowGroupReader::new(
309 f,
310 row_group_metadata,
311 self.metadata.offset_index().map(|x| x[i].as_slice()),
312 props,
313 )?))
314 }
315
316 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
317 RowIter::from_file(projection, self)
318 }
319}
320
321pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
323 chunk_reader: Arc<R>,
324 metadata: &'a RowGroupMetaData,
325 offset_index: Option<&'a [OffsetIndexMetaData]>,
326 props: ReaderPropertiesPtr,
327 bloom_filters: Vec<Option<Sbbf>>,
328}
329
330impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
331 pub fn new(
333 chunk_reader: Arc<R>,
334 metadata: &'a RowGroupMetaData,
335 offset_index: Option<&'a [OffsetIndexMetaData]>,
336 props: ReaderPropertiesPtr,
337 ) -> Result<Self> {
338 let bloom_filters = if props.read_bloom_filter() {
339 metadata
340 .columns()
341 .iter()
342 .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
343 .collect::<Result<Vec<_>>>()?
344 } else {
345 std::iter::repeat_n(None, metadata.columns().len()).collect()
346 };
347 Ok(Self {
348 chunk_reader,
349 metadata,
350 offset_index,
351 props,
352 bloom_filters,
353 })
354 }
355}
356
357impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
358 fn metadata(&self) -> &RowGroupMetaData {
359 self.metadata
360 }
361
362 fn num_columns(&self) -> usize {
363 self.metadata.num_columns()
364 }
365
366 fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
368 let col = self.metadata.column(i);
369
370 let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
371
372 let props = Arc::clone(&self.props);
373 Ok(Box::new(SerializedPageReader::new_with_properties(
374 Arc::clone(&self.chunk_reader),
375 col,
376 usize::try_from(self.metadata.num_rows())?,
377 page_locations,
378 props,
379 )?))
380 }
381
382 fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
384 self.bloom_filters[i].as_ref()
385 }
386
387 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
388 RowIter::from_row_group(projection, self)
389 }
390}
391
392pub(crate) fn decode_page(
394 page_header: PageHeader,
395 buffer: Bytes,
396 physical_type: Type,
397 decompressor: Option<&mut Box<dyn Codec>>,
398) -> Result<Page> {
399 #[cfg(feature = "crc")]
401 if let Some(expected_crc) = page_header.crc {
402 let crc = crc32fast::hash(&buffer);
403 if crc != expected_crc as u32 {
404 return Err(general_err!("Page CRC checksum mismatch"));
405 }
406 }
407
408 let mut offset: usize = 0;
415 let mut can_decompress = true;
416
417 if let Some(ref header_v2) = page_header.data_page_header_v2 {
418 if header_v2.definition_levels_byte_length < 0
419 || header_v2.repetition_levels_byte_length < 0
420 || header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
421 > page_header.uncompressed_page_size
422 {
423 return Err(general_err!(
424 "DataPage v2 header contains implausible values \
425 for definition_levels_byte_length ({}) \
426 and repetition_levels_byte_length ({}) \
427 given DataPage header provides uncompressed_page_size ({})",
428 header_v2.definition_levels_byte_length,
429 header_v2.repetition_levels_byte_length,
430 page_header.uncompressed_page_size
431 ));
432 }
433 offset = usize::try_from(
434 header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
435 )?;
436 can_decompress = header_v2.is_compressed.unwrap_or(true);
438 }
439
440 let buffer = match decompressor {
441 Some(decompressor) if can_decompress => {
442 let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
443 if offset > buffer.len() || offset > uncompressed_page_size {
444 return Err(general_err!("Invalid page header"));
445 }
446 let decompressed_size = uncompressed_page_size - offset;
447 let mut decompressed = Vec::with_capacity(uncompressed_page_size);
448 decompressed.extend_from_slice(&buffer[..offset]);
449 if decompressed_size > 0 {
452 let compressed = &buffer[offset..];
453 decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
454 }
455
456 if decompressed.len() != uncompressed_page_size {
457 return Err(general_err!(
458 "Actual decompressed size doesn't match the expected one ({} vs {})",
459 decompressed.len(),
460 uncompressed_page_size
461 ));
462 }
463
464 Bytes::from(decompressed)
465 }
466 _ => buffer,
467 };
468
469 let result = match page_header.r#type {
470 PageType::DICTIONARY_PAGE => {
471 let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
472 ParquetError::General("Missing dictionary page header".to_string())
473 })?;
474 let is_sorted = dict_header.is_sorted.unwrap_or(false);
475 Page::DictionaryPage {
476 buf: buffer,
477 num_values: dict_header.num_values.try_into()?,
478 encoding: dict_header.encoding,
479 is_sorted,
480 }
481 }
482 PageType::DATA_PAGE => {
483 let header = page_header
484 .data_page_header
485 .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
486 Page::DataPage {
487 buf: buffer,
488 num_values: header.num_values.try_into()?,
489 encoding: header.encoding,
490 def_level_encoding: header.definition_level_encoding,
491 rep_level_encoding: header.repetition_level_encoding,
492 statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
493 }
494 }
495 PageType::DATA_PAGE_V2 => {
496 let header = page_header
497 .data_page_header_v2
498 .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
499 let is_compressed = header.is_compressed.unwrap_or(true);
500 Page::DataPageV2 {
501 buf: buffer,
502 num_values: header.num_values.try_into()?,
503 encoding: header.encoding,
504 num_nulls: header.num_nulls.try_into()?,
505 num_rows: header.num_rows.try_into()?,
506 def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
507 rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
508 is_compressed,
509 statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
510 }
511 }
512 _ => {
513 return Err(general_err!(
515 "Page type {:?} is not supported",
516 page_header.r#type
517 ));
518 }
519 };
520
521 Ok(result)
522}
523
524enum SerializedPageReaderState {
525 Values {
526 offset: u64,
529
530 remaining_bytes: u64,
533
534 next_page_header: Option<Box<PageHeader>>,
536
537 page_index: usize,
539
540 require_dictionary: bool,
542 },
543 Pages {
544 page_locations: VecDeque<PageLocation>,
546 dictionary_page: Option<PageLocation>,
548 total_rows: usize,
550 page_index: usize,
552 },
553}
554
555#[derive(Default)]
556struct SerializedPageReaderContext {
557 read_stats: bool,
559 #[cfg(feature = "encryption")]
561 crypto_context: Option<Arc<CryptoContext>>,
562}
563
564pub struct SerializedPageReader<R: ChunkReader> {
566 reader: Arc<R>,
568
569 decompressor: Option<Box<dyn Codec>>,
571
572 physical_type: Type,
574
575 state: SerializedPageReaderState,
576
577 context: SerializedPageReaderContext,
578}
579
580impl<R: ChunkReader> SerializedPageReader<R> {
581 pub fn new(
583 reader: Arc<R>,
584 column_chunk_metadata: &ColumnChunkMetaData,
585 total_rows: usize,
586 page_locations: Option<Vec<PageLocation>>,
587 ) -> Result<Self> {
588 let props = Arc::new(ReaderProperties::builder().build());
589 SerializedPageReader::new_with_properties(
590 reader,
591 column_chunk_metadata,
592 total_rows,
593 page_locations,
594 props,
595 )
596 }
597
598 #[cfg(all(feature = "arrow", not(feature = "encryption")))]
600 pub(crate) fn add_crypto_context(
601 self,
602 _rg_idx: usize,
603 _column_idx: usize,
604 _parquet_meta_data: &ParquetMetaData,
605 _column_chunk_metadata: &ColumnChunkMetaData,
606 ) -> Result<SerializedPageReader<R>> {
607 Ok(self)
608 }
609
610 #[cfg(feature = "encryption")]
612 pub(crate) fn add_crypto_context(
613 mut self,
614 rg_idx: usize,
615 column_idx: usize,
616 parquet_meta_data: &ParquetMetaData,
617 column_chunk_metadata: &ColumnChunkMetaData,
618 ) -> Result<SerializedPageReader<R>> {
619 let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
620 return Ok(self);
621 };
622 let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
623 return Ok(self);
624 };
625 let crypto_context =
626 CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
627 self.context.crypto_context = Some(Arc::new(crypto_context));
628 Ok(self)
629 }
630
631 pub fn new_with_properties(
633 reader: Arc<R>,
634 meta: &ColumnChunkMetaData,
635 total_rows: usize,
636 page_locations: Option<Vec<PageLocation>>,
637 props: ReaderPropertiesPtr,
638 ) -> Result<Self> {
639 let decompressor = create_codec(meta.compression(), props.codec_options())?;
640 let (start, len) = meta.byte_range();
641
642 let state = match page_locations {
643 Some(locations) => {
644 let dictionary_page = match locations.first() {
647 Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
648 offset: start as i64,
649 compressed_page_size: (dict_offset.offset as u64 - start) as i32,
650 first_row_index: 0,
651 }),
652 _ => None,
653 };
654
655 SerializedPageReaderState::Pages {
656 page_locations: locations.into(),
657 dictionary_page,
658 total_rows,
659 page_index: 0,
660 }
661 }
662 None => SerializedPageReaderState::Values {
663 offset: start,
664 remaining_bytes: len,
665 next_page_header: None,
666 page_index: 0,
667 require_dictionary: meta.dictionary_page_offset().is_some(),
668 },
669 };
670 let mut context = SerializedPageReaderContext::default();
671 if props.read_page_stats() {
672 context.read_stats = true;
673 }
674 Ok(Self {
675 reader,
676 decompressor,
677 state,
678 physical_type: meta.column_type(),
679 context,
680 })
681 }
682
683 #[cfg(test)]
689 fn peek_next_page_offset(&mut self) -> Result<Option<u64>> {
690 match &mut self.state {
691 SerializedPageReaderState::Values {
692 offset,
693 remaining_bytes,
694 next_page_header,
695 page_index,
696 require_dictionary,
697 } => {
698 loop {
699 if *remaining_bytes == 0 {
700 return Ok(None);
701 }
702 return if let Some(header) = next_page_header.as_ref() {
703 if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
704 Ok(Some(*offset))
705 } else {
706 *next_page_header = None;
708 continue;
709 }
710 } else {
711 let mut read = self.reader.get_read(*offset)?;
712 let (header_len, header) = Self::read_page_header_len(
713 &self.context,
714 &mut read,
715 *page_index,
716 *require_dictionary,
717 )?;
718 *offset += header_len as u64;
719 *remaining_bytes -= header_len as u64;
720 let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
721 Ok(Some(*offset))
722 } else {
723 continue;
725 };
726 *next_page_header = Some(Box::new(header));
727 page_meta
728 };
729 }
730 }
731 SerializedPageReaderState::Pages {
732 page_locations,
733 dictionary_page,
734 ..
735 } => {
736 if let Some(page) = dictionary_page {
737 Ok(Some(page.offset as u64))
738 } else if let Some(page) = page_locations.front() {
739 Ok(Some(page.offset as u64))
740 } else {
741 Ok(None)
742 }
743 }
744 }
745 }
746
747 fn read_page_header_len<T: Read>(
748 context: &SerializedPageReaderContext,
749 input: &mut T,
750 page_index: usize,
751 dictionary_page: bool,
752 ) -> Result<(usize, PageHeader)> {
753 struct TrackedRead<R> {
755 inner: R,
756 bytes_read: usize,
757 }
758
759 impl<R: Read> Read for TrackedRead<R> {
760 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
761 let v = self.inner.read(buf)?;
762 self.bytes_read += v;
763 Ok(v)
764 }
765 }
766
767 let mut tracked = TrackedRead {
768 inner: input,
769 bytes_read: 0,
770 };
771 let header = context.read_page_header(&mut tracked, page_index, dictionary_page)?;
772 Ok((tracked.bytes_read, header))
773 }
774
775 fn read_page_header_len_from_bytes(
776 context: &SerializedPageReaderContext,
777 buffer: &[u8],
778 page_index: usize,
779 dictionary_page: bool,
780 ) -> Result<(usize, PageHeader)> {
781 let mut input = std::io::Cursor::new(buffer);
782 let header = context.read_page_header(&mut input, page_index, dictionary_page)?;
783 let header_len = input.position() as usize;
784 Ok((header_len, header))
785 }
786}
787
788#[cfg(not(feature = "encryption"))]
789impl SerializedPageReaderContext {
790 fn read_page_header<T: Read>(
791 &self,
792 input: &mut T,
793 _page_index: usize,
794 _dictionary_page: bool,
795 ) -> Result<PageHeader> {
796 let mut prot = ThriftReadInputProtocol::new(input);
797 if self.read_stats {
798 Ok(PageHeader::read_thrift(&mut prot)?)
799 } else {
800 Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
801 }
802 }
803
804 fn decrypt_page_data<T>(
805 &self,
806 buffer: T,
807 _page_index: usize,
808 _dictionary_page: bool,
809 ) -> Result<T> {
810 Ok(buffer)
811 }
812}
813
814#[cfg(feature = "encryption")]
815impl SerializedPageReaderContext {
816 fn read_page_header<T: Read>(
817 &self,
818 input: &mut T,
819 page_index: usize,
820 dictionary_page: bool,
821 ) -> Result<PageHeader> {
822 match self.page_crypto_context(page_index, dictionary_page) {
823 None => {
824 let mut prot = ThriftReadInputProtocol::new(input);
825 if self.read_stats {
826 Ok(PageHeader::read_thrift(&mut prot)?)
827 } else {
828 use crate::file::metadata::thrift::PageHeader;
829
830 Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
831 }
832 }
833 Some(page_crypto_context) => {
834 let data_decryptor = page_crypto_context.data_decryptor();
835 let aad = page_crypto_context.create_page_header_aad()?;
836
837 let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
838 ParquetError::General(format!(
839 "Error decrypting page header for column {}, decryption key may be wrong",
840 page_crypto_context.column_ordinal
841 ))
842 })?;
843
844 let mut prot = ThriftSliceInputProtocol::new(buf.as_slice());
845 if self.read_stats {
846 Ok(PageHeader::read_thrift(&mut prot)?)
847 } else {
848 Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
849 }
850 }
851 }
852 }
853
854 fn decrypt_page_data<T>(&self, buffer: T, page_index: usize, dictionary_page: bool) -> Result<T>
855 where
856 T: AsRef<[u8]>,
857 T: From<Vec<u8>>,
858 {
859 let page_crypto_context = self.page_crypto_context(page_index, dictionary_page);
860 if let Some(page_crypto_context) = page_crypto_context {
861 let decryptor = page_crypto_context.data_decryptor();
862 let aad = page_crypto_context.create_page_aad()?;
863 let decrypted = decryptor.decrypt(buffer.as_ref(), &aad)?;
864 Ok(T::from(decrypted))
865 } else {
866 Ok(buffer)
867 }
868 }
869
870 fn page_crypto_context(
871 &self,
872 page_index: usize,
873 dictionary_page: bool,
874 ) -> Option<Arc<CryptoContext>> {
875 self.crypto_context.as_ref().map(|c| {
876 Arc::new(if dictionary_page {
877 c.for_dictionary_page()
878 } else {
879 c.with_page_ordinal(page_index)
880 })
881 })
882 }
883}
884
885impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
886 type Item = Result<Page>;
887
888 fn next(&mut self) -> Option<Self::Item> {
889 self.get_next_page().transpose()
890 }
891}
892
893fn verify_page_header_len(header_len: usize, remaining_bytes: u64) -> Result<()> {
894 if header_len as u64 > remaining_bytes {
895 return Err(eof_err!("Invalid page header"));
896 }
897 Ok(())
898}
899
900fn verify_page_size(
901 compressed_size: i32,
902 uncompressed_size: i32,
903 remaining_bytes: u64,
904) -> Result<()> {
905 if compressed_size < 0 || compressed_size as u64 > remaining_bytes || uncompressed_size < 0 {
909 return Err(eof_err!("Invalid page header"));
910 }
911 Ok(())
912}
913
914impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
915 fn get_next_page(&mut self) -> Result<Option<Page>> {
916 loop {
917 let page = match &mut self.state {
918 SerializedPageReaderState::Values {
919 offset,
920 remaining_bytes: remaining,
921 next_page_header,
922 page_index,
923 require_dictionary,
924 } => {
925 if *remaining == 0 {
926 return Ok(None);
927 }
928
929 let mut read = self.reader.get_read(*offset)?;
930 let header = if let Some(header) = next_page_header.take() {
931 *header
932 } else {
933 let (header_len, header) = Self::read_page_header_len(
934 &self.context,
935 &mut read,
936 *page_index,
937 *require_dictionary,
938 )?;
939 verify_page_header_len(header_len, *remaining)?;
940 *offset += header_len as u64;
941 *remaining -= header_len as u64;
942 header
943 };
944 verify_page_size(
945 header.compressed_page_size,
946 header.uncompressed_page_size,
947 *remaining,
948 )?;
949 let data_len = header.compressed_page_size as usize;
950 let data_start = *offset;
951 *offset += data_len as u64;
952 *remaining -= data_len as u64;
953
954 if header.r#type == PageType::INDEX_PAGE {
955 continue;
956 }
957
958 let buffer = self.reader.get_bytes(data_start, data_len)?;
959
960 let buffer =
961 self.context
962 .decrypt_page_data(buffer, *page_index, *require_dictionary)?;
963
964 let page = decode_page(
965 header,
966 buffer,
967 self.physical_type,
968 self.decompressor.as_mut(),
969 )?;
970 if page.is_data_page() {
971 *page_index += 1;
972 } else if page.is_dictionary_page() {
973 *require_dictionary = false;
974 }
975 page
976 }
977 SerializedPageReaderState::Pages {
978 page_locations,
979 dictionary_page,
980 page_index,
981 ..
982 } => {
983 let (front, is_dictionary_page) = match dictionary_page.take() {
984 Some(front) => (front, true),
985 None => match page_locations.pop_front() {
986 Some(front) => (front, false),
987 None => return Ok(None),
988 },
989 };
990
991 let page_len = usize::try_from(front.compressed_page_size)?;
992 let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
993
994 let (offset, header) = Self::read_page_header_len_from_bytes(
995 &self.context,
996 buffer.as_ref(),
997 *page_index,
998 is_dictionary_page,
999 )?;
1000 let bytes = buffer.slice(offset..);
1001 let bytes =
1002 self.context
1003 .decrypt_page_data(bytes, *page_index, is_dictionary_page)?;
1004
1005 if !is_dictionary_page {
1006 *page_index += 1;
1007 }
1008 decode_page(
1009 header,
1010 bytes,
1011 self.physical_type,
1012 self.decompressor.as_mut(),
1013 )?
1014 }
1015 };
1016
1017 return Ok(Some(page));
1018 }
1019 }
1020
1021 fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
1022 match &mut self.state {
1023 SerializedPageReaderState::Values {
1024 offset,
1025 remaining_bytes,
1026 next_page_header,
1027 page_index,
1028 require_dictionary,
1029 } => {
1030 loop {
1031 if *remaining_bytes == 0 {
1032 return Ok(None);
1033 }
1034 return if let Some(header) = next_page_header.as_ref() {
1035 if let Ok(page_meta) = (&**header).try_into() {
1036 Ok(Some(page_meta))
1037 } else {
1038 *next_page_header = None;
1040 continue;
1041 }
1042 } else {
1043 let mut read = self.reader.get_read(*offset)?;
1044 let (header_len, header) = Self::read_page_header_len(
1045 &self.context,
1046 &mut read,
1047 *page_index,
1048 *require_dictionary,
1049 )?;
1050 verify_page_header_len(header_len, *remaining_bytes)?;
1051 *offset += header_len as u64;
1052 *remaining_bytes -= header_len as u64;
1053 let page_meta = if let Ok(page_meta) = (&header).try_into() {
1054 Ok(Some(page_meta))
1055 } else {
1056 continue;
1058 };
1059 *next_page_header = Some(Box::new(header));
1060 page_meta
1061 };
1062 }
1063 }
1064 SerializedPageReaderState::Pages {
1065 page_locations,
1066 dictionary_page,
1067 total_rows,
1068 page_index: _,
1069 } => {
1070 if dictionary_page.is_some() {
1071 Ok(Some(PageMetadata {
1072 num_rows: None,
1073 num_levels: None,
1074 is_dict: true,
1075 }))
1076 } else if let Some(page) = page_locations.front() {
1077 let next_rows = page_locations
1078 .get(1)
1079 .map(|x| x.first_row_index as usize)
1080 .unwrap_or(*total_rows);
1081
1082 Ok(Some(PageMetadata {
1083 num_rows: Some(next_rows - page.first_row_index as usize),
1084 num_levels: None,
1085 is_dict: false,
1086 }))
1087 } else {
1088 Ok(None)
1089 }
1090 }
1091 }
1092 }
1093
1094 fn skip_next_page(&mut self) -> Result<()> {
1095 match &mut self.state {
1096 SerializedPageReaderState::Values {
1097 offset,
1098 remaining_bytes,
1099 next_page_header,
1100 page_index,
1101 require_dictionary,
1102 } => {
1103 if let Some(buffered_header) = next_page_header.take() {
1104 verify_page_size(
1105 buffered_header.compressed_page_size,
1106 buffered_header.uncompressed_page_size,
1107 *remaining_bytes,
1108 )?;
1109 *offset += buffered_header.compressed_page_size as u64;
1111 *remaining_bytes -= buffered_header.compressed_page_size as u64;
1112 } else {
1113 let mut read = self.reader.get_read(*offset)?;
1114 let (header_len, header) = Self::read_page_header_len(
1115 &self.context,
1116 &mut read,
1117 *page_index,
1118 *require_dictionary,
1119 )?;
1120 verify_page_header_len(header_len, *remaining_bytes)?;
1121 verify_page_size(
1122 header.compressed_page_size,
1123 header.uncompressed_page_size,
1124 *remaining_bytes,
1125 )?;
1126 let data_page_size = header.compressed_page_size as u64;
1127 *offset += header_len as u64 + data_page_size;
1128 *remaining_bytes -= header_len as u64 + data_page_size;
1129 }
1130 if *require_dictionary {
1131 *require_dictionary = false;
1132 } else {
1133 *page_index += 1;
1134 }
1135 Ok(())
1136 }
1137 SerializedPageReaderState::Pages {
1138 page_locations,
1139 dictionary_page,
1140 page_index,
1141 ..
1142 } => {
1143 if dictionary_page.is_some() {
1144 dictionary_page.take();
1146 } else {
1147 if page_locations.pop_front().is_some() {
1149 *page_index += 1;
1150 }
1151 }
1152
1153 Ok(())
1154 }
1155 }
1156 }
1157
1158 fn at_record_boundary(&mut self) -> Result<bool> {
1159 match &mut self.state {
1160 SerializedPageReaderState::Values { .. } => match self.peek_next_page()? {
1161 None => Ok(true),
1162 Some(metadata) => Ok(metadata.num_rows.is_some()),
1165 },
1166 SerializedPageReaderState::Pages { .. } => Ok(true),
1167 }
1168 }
1169}
1170
1171#[cfg(test)]
1172mod tests {
1173 use std::collections::HashSet;
1174
1175 use bytes::Buf;
1176
1177 use crate::file::page_index::column_index::{
1178 ByteArrayColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex,
1179 };
1180 use crate::file::properties::{EnabledStatistics, WriterProperties};
1181
1182 use crate::basic::{self, BoundaryOrder, ColumnOrder, Encoding, SortOrder};
1183 use crate::column::reader::ColumnReader;
1184 use crate::data_type::private::ParquetValueType;
1185 use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1186 use crate::file::metadata::thrift::DataPageHeaderV2;
1187 use crate::file::writer::SerializedFileWriter;
1188 use crate::record::RowAccessor;
1189 use crate::schema::parser::parse_message_type;
1190 use crate::util::test_common::file_util::{get_test_file, get_test_path};
1191
1192 use super::*;
1193
1194 #[test]
1195 fn test_decode_page_invalid_offset() {
1196 let page_header = PageHeader {
1197 r#type: PageType::DATA_PAGE_V2,
1198 uncompressed_page_size: 10,
1199 compressed_page_size: 10,
1200 data_page_header: None,
1201 index_page_header: None,
1202 dictionary_page_header: None,
1203 crc: None,
1204 data_page_header_v2: Some(DataPageHeaderV2 {
1205 num_nulls: 0,
1206 num_rows: 0,
1207 num_values: 0,
1208 encoding: Encoding::PLAIN,
1209 definition_levels_byte_length: 11,
1210 repetition_levels_byte_length: 0,
1211 is_compressed: None,
1212 statistics: None,
1213 }),
1214 };
1215
1216 let buffer = Bytes::new();
1217 let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1218 assert!(
1219 err.to_string()
1220 .contains("DataPage v2 header contains implausible values")
1221 );
1222 }
1223
1224 #[test]
1225 fn test_decode_unsupported_page() {
1226 let mut page_header = PageHeader {
1227 r#type: PageType::INDEX_PAGE,
1228 uncompressed_page_size: 10,
1229 compressed_page_size: 10,
1230 data_page_header: None,
1231 index_page_header: None,
1232 dictionary_page_header: None,
1233 crc: None,
1234 data_page_header_v2: None,
1235 };
1236 let buffer = Bytes::new();
1237 let err = decode_page(page_header.clone(), buffer.clone(), Type::INT32, None).unwrap_err();
1238 assert_eq!(
1239 err.to_string(),
1240 "Parquet error: Page type INDEX_PAGE is not supported"
1241 );
1242
1243 page_header.data_page_header_v2 = Some(DataPageHeaderV2 {
1244 num_nulls: 0,
1245 num_rows: 0,
1246 num_values: 0,
1247 encoding: Encoding::PLAIN,
1248 definition_levels_byte_length: 11,
1249 repetition_levels_byte_length: 0,
1250 is_compressed: None,
1251 statistics: None,
1252 });
1253 let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1254 assert!(
1255 err.to_string()
1256 .contains("DataPage v2 header contains implausible values")
1257 );
1258 }
1259
1260 #[test]
1261 fn test_cursor_and_file_has_the_same_behaviour() {
1262 let mut buf: Vec<u8> = Vec::new();
1263 get_test_file("alltypes_plain.parquet")
1264 .read_to_end(&mut buf)
1265 .unwrap();
1266 let cursor = Bytes::from(buf);
1267 let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1268
1269 let test_file = get_test_file("alltypes_plain.parquet");
1270 let read_from_file = SerializedFileReader::new(test_file).unwrap();
1271
1272 let file_iter = read_from_file.get_row_iter(None).unwrap();
1273 let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1274
1275 for (a, b) in file_iter.zip(cursor_iter) {
1276 assert_eq!(a.unwrap(), b.unwrap())
1277 }
1278 }
1279
1280 #[test]
1281 fn test_file_reader_try_from() {
1282 let test_file = get_test_file("alltypes_plain.parquet");
1284 let test_path_buf = get_test_path("alltypes_plain.parquet");
1285 let test_path = test_path_buf.as_path();
1286 let test_path_str = test_path.to_str().unwrap();
1287
1288 let reader = SerializedFileReader::try_from(test_file);
1289 assert!(reader.is_ok());
1290
1291 let reader = SerializedFileReader::try_from(test_path);
1292 assert!(reader.is_ok());
1293
1294 let reader = SerializedFileReader::try_from(test_path_str);
1295 assert!(reader.is_ok());
1296
1297 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1298 assert!(reader.is_ok());
1299
1300 let test_path = Path::new("invalid.parquet");
1302 let test_path_str = test_path.to_str().unwrap();
1303
1304 let reader = SerializedFileReader::try_from(test_path);
1305 assert!(reader.is_err());
1306
1307 let reader = SerializedFileReader::try_from(test_path_str);
1308 assert!(reader.is_err());
1309
1310 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1311 assert!(reader.is_err());
1312 }
1313
1314 #[test]
1315 fn test_file_reader_into_iter() {
1316 let path = get_test_path("alltypes_plain.parquet");
1317 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1318 let iter = reader.into_iter();
1319 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1320
1321 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1322 }
1323
1324 #[test]
1325 fn test_file_reader_into_iter_project() {
1326 let path = get_test_path("alltypes_plain.parquet");
1327 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1328 let schema = "message schema { OPTIONAL INT32 id; }";
1329 let proj = parse_message_type(schema).ok();
1330 let iter = reader.into_iter().project(proj).unwrap();
1331 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1332
1333 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1334 }
1335
1336 #[test]
1337 fn test_reuse_file_chunk() {
1338 let test_file = get_test_file("alltypes_plain.parquet");
1342 let reader = SerializedFileReader::new(test_file).unwrap();
1343 let row_group = reader.get_row_group(0).unwrap();
1344
1345 let mut page_readers = Vec::new();
1346 for i in 0..row_group.num_columns() {
1347 page_readers.push(row_group.get_column_page_reader(i).unwrap());
1348 }
1349
1350 for mut page_reader in page_readers {
1353 assert!(page_reader.get_next_page().is_ok());
1354 }
1355 }
1356
1357 #[test]
1358 fn test_file_reader() {
1359 let test_file = get_test_file("alltypes_plain.parquet");
1360 let reader_result = SerializedFileReader::new(test_file);
1361 assert!(reader_result.is_ok());
1362 let reader = reader_result.unwrap();
1363
1364 let metadata = reader.metadata();
1366 assert_eq!(metadata.num_row_groups(), 1);
1367
1368 let file_metadata = metadata.file_metadata();
1370 assert!(file_metadata.created_by().is_some());
1371 assert_eq!(
1372 file_metadata.created_by().unwrap(),
1373 "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1374 );
1375 assert!(file_metadata.key_value_metadata().is_none());
1376 assert_eq!(file_metadata.num_rows(), 8);
1377 assert_eq!(file_metadata.version(), 1);
1378 assert_eq!(file_metadata.column_orders(), None);
1379
1380 let row_group_metadata = metadata.row_group(0);
1382 assert_eq!(row_group_metadata.num_columns(), 11);
1383 assert_eq!(row_group_metadata.num_rows(), 8);
1384 assert_eq!(row_group_metadata.total_byte_size(), 671);
1385 for i in 0..row_group_metadata.num_columns() {
1387 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1388 }
1389
1390 let row_group_reader_result = reader.get_row_group(0);
1392 assert!(row_group_reader_result.is_ok());
1393 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1394 assert_eq!(
1395 row_group_reader.num_columns(),
1396 row_group_metadata.num_columns()
1397 );
1398 assert_eq!(
1399 row_group_reader.metadata().total_byte_size(),
1400 row_group_metadata.total_byte_size()
1401 );
1402
1403 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1406 assert!(page_reader_0_result.is_ok());
1407 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1408 let mut page_count = 0;
1409 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1410 let is_expected_page = match page {
1411 Page::DictionaryPage {
1412 buf,
1413 num_values,
1414 encoding,
1415 is_sorted,
1416 } => {
1417 assert_eq!(buf.len(), 32);
1418 assert_eq!(num_values, 8);
1419 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1420 assert!(!is_sorted);
1421 true
1422 }
1423 Page::DataPage {
1424 buf,
1425 num_values,
1426 encoding,
1427 def_level_encoding,
1428 rep_level_encoding,
1429 statistics,
1430 } => {
1431 assert_eq!(buf.len(), 11);
1432 assert_eq!(num_values, 8);
1433 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1434 assert_eq!(def_level_encoding, Encoding::RLE);
1435 #[allow(deprecated)]
1436 let expected_rep_level_encoding = Encoding::BIT_PACKED;
1437 assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1438 assert!(statistics.is_none());
1439 true
1440 }
1441 _ => false,
1442 };
1443 assert!(is_expected_page);
1444 page_count += 1;
1445 }
1446 assert_eq!(page_count, 2);
1447 }
1448
1449 #[test]
1450 fn test_file_reader_datapage_v2() {
1451 let test_file = get_test_file("datapage_v2.snappy.parquet");
1452 let reader_result = SerializedFileReader::new(test_file);
1453 assert!(reader_result.is_ok());
1454 let reader = reader_result.unwrap();
1455
1456 let metadata = reader.metadata();
1458 assert_eq!(metadata.num_row_groups(), 1);
1459
1460 let file_metadata = metadata.file_metadata();
1462 assert!(file_metadata.created_by().is_some());
1463 assert_eq!(
1464 file_metadata.created_by().unwrap(),
1465 "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1466 );
1467 assert!(file_metadata.key_value_metadata().is_some());
1468 assert_eq!(
1469 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1470 1
1471 );
1472
1473 assert_eq!(file_metadata.num_rows(), 5);
1474 assert_eq!(file_metadata.version(), 1);
1475 assert_eq!(file_metadata.column_orders(), None);
1476
1477 let row_group_metadata = metadata.row_group(0);
1478
1479 for i in 0..row_group_metadata.num_columns() {
1481 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1482 }
1483
1484 let row_group_reader_result = reader.get_row_group(0);
1486 assert!(row_group_reader_result.is_ok());
1487 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1488 assert_eq!(
1489 row_group_reader.num_columns(),
1490 row_group_metadata.num_columns()
1491 );
1492 assert_eq!(
1493 row_group_reader.metadata().total_byte_size(),
1494 row_group_metadata.total_byte_size()
1495 );
1496
1497 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1500 assert!(page_reader_0_result.is_ok());
1501 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1502 let mut page_count = 0;
1503 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1504 let is_expected_page = match page {
1505 Page::DictionaryPage {
1506 buf,
1507 num_values,
1508 encoding,
1509 is_sorted,
1510 } => {
1511 assert_eq!(buf.len(), 7);
1512 assert_eq!(num_values, 1);
1513 assert_eq!(encoding, Encoding::PLAIN);
1514 assert!(!is_sorted);
1515 true
1516 }
1517 Page::DataPageV2 {
1518 buf,
1519 num_values,
1520 encoding,
1521 num_nulls,
1522 num_rows,
1523 def_levels_byte_len,
1524 rep_levels_byte_len,
1525 is_compressed,
1526 statistics,
1527 } => {
1528 assert_eq!(buf.len(), 4);
1529 assert_eq!(num_values, 5);
1530 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1531 assert_eq!(num_nulls, 1);
1532 assert_eq!(num_rows, 5);
1533 assert_eq!(def_levels_byte_len, 2);
1534 assert_eq!(rep_levels_byte_len, 0);
1535 assert!(is_compressed);
1536 assert!(statistics.is_none()); true
1538 }
1539 _ => false,
1540 };
1541 assert!(is_expected_page);
1542 page_count += 1;
1543 }
1544 assert_eq!(page_count, 2);
1545 }
1546
1547 #[test]
1548 fn test_file_reader_empty_compressed_datapage_v2() {
1549 let test_file = get_test_file("page_v2_empty_compressed.parquet");
1551 let reader_result = SerializedFileReader::new(test_file);
1552 assert!(reader_result.is_ok());
1553 let reader = reader_result.unwrap();
1554
1555 let metadata = reader.metadata();
1557 assert_eq!(metadata.num_row_groups(), 1);
1558
1559 let file_metadata = metadata.file_metadata();
1561 assert!(file_metadata.created_by().is_some());
1562 assert_eq!(
1563 file_metadata.created_by().unwrap(),
1564 "parquet-cpp-arrow version 14.0.2"
1565 );
1566 assert!(file_metadata.key_value_metadata().is_some());
1567 assert_eq!(
1568 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1569 1
1570 );
1571
1572 assert_eq!(file_metadata.num_rows(), 10);
1573 assert_eq!(file_metadata.version(), 2);
1574 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1575 assert_eq!(
1576 file_metadata.column_orders(),
1577 Some(vec![expected_order].as_ref())
1578 );
1579
1580 let row_group_metadata = metadata.row_group(0);
1581
1582 for i in 0..row_group_metadata.num_columns() {
1584 assert_eq!(file_metadata.column_order(i), expected_order);
1585 }
1586
1587 let row_group_reader_result = reader.get_row_group(0);
1589 assert!(row_group_reader_result.is_ok());
1590 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1591 assert_eq!(
1592 row_group_reader.num_columns(),
1593 row_group_metadata.num_columns()
1594 );
1595 assert_eq!(
1596 row_group_reader.metadata().total_byte_size(),
1597 row_group_metadata.total_byte_size()
1598 );
1599
1600 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1602 assert!(page_reader_0_result.is_ok());
1603 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1604 let mut page_count = 0;
1605 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1606 let is_expected_page = match page {
1607 Page::DictionaryPage {
1608 buf,
1609 num_values,
1610 encoding,
1611 is_sorted,
1612 } => {
1613 assert_eq!(buf.len(), 0);
1614 assert_eq!(num_values, 0);
1615 assert_eq!(encoding, Encoding::PLAIN);
1616 assert!(!is_sorted);
1617 true
1618 }
1619 Page::DataPageV2 {
1620 buf,
1621 num_values,
1622 encoding,
1623 num_nulls,
1624 num_rows,
1625 def_levels_byte_len,
1626 rep_levels_byte_len,
1627 is_compressed,
1628 statistics,
1629 } => {
1630 assert_eq!(buf.len(), 3);
1631 assert_eq!(num_values, 10);
1632 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1633 assert_eq!(num_nulls, 10);
1634 assert_eq!(num_rows, 10);
1635 assert_eq!(def_levels_byte_len, 2);
1636 assert_eq!(rep_levels_byte_len, 0);
1637 assert!(is_compressed);
1638 assert!(statistics.is_none()); true
1640 }
1641 _ => false,
1642 };
1643 assert!(is_expected_page);
1644 page_count += 1;
1645 }
1646 assert_eq!(page_count, 2);
1647 }
1648
1649 #[test]
1650 fn test_file_reader_empty_datapage_v2() {
1651 let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1653 let reader_result = SerializedFileReader::new(test_file);
1654 assert!(reader_result.is_ok());
1655 let reader = reader_result.unwrap();
1656
1657 let metadata = reader.metadata();
1659 assert_eq!(metadata.num_row_groups(), 1);
1660
1661 let file_metadata = metadata.file_metadata();
1663 assert!(file_metadata.created_by().is_some());
1664 assert_eq!(
1665 file_metadata.created_by().unwrap(),
1666 "parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1667 );
1668 assert!(file_metadata.key_value_metadata().is_some());
1669 assert_eq!(
1670 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1671 2
1672 );
1673
1674 assert_eq!(file_metadata.num_rows(), 1);
1675 assert_eq!(file_metadata.version(), 1);
1676 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1677 assert_eq!(
1678 file_metadata.column_orders(),
1679 Some(vec![expected_order].as_ref())
1680 );
1681
1682 let row_group_metadata = metadata.row_group(0);
1683
1684 for i in 0..row_group_metadata.num_columns() {
1686 assert_eq!(file_metadata.column_order(i), expected_order);
1687 }
1688
1689 let row_group_reader_result = reader.get_row_group(0);
1691 assert!(row_group_reader_result.is_ok());
1692 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1693 assert_eq!(
1694 row_group_reader.num_columns(),
1695 row_group_metadata.num_columns()
1696 );
1697 assert_eq!(
1698 row_group_reader.metadata().total_byte_size(),
1699 row_group_metadata.total_byte_size()
1700 );
1701
1702 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1704 assert!(page_reader_0_result.is_ok());
1705 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1706 let mut page_count = 0;
1707 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1708 let is_expected_page = match page {
1709 Page::DataPageV2 {
1710 buf,
1711 num_values,
1712 encoding,
1713 num_nulls,
1714 num_rows,
1715 def_levels_byte_len,
1716 rep_levels_byte_len,
1717 is_compressed,
1718 statistics,
1719 } => {
1720 assert_eq!(buf.len(), 2);
1721 assert_eq!(num_values, 1);
1722 assert_eq!(encoding, Encoding::PLAIN);
1723 assert_eq!(num_nulls, 1);
1724 assert_eq!(num_rows, 1);
1725 assert_eq!(def_levels_byte_len, 2);
1726 assert_eq!(rep_levels_byte_len, 0);
1727 assert!(is_compressed);
1728 assert!(statistics.is_none());
1729 true
1730 }
1731 _ => false,
1732 };
1733 assert!(is_expected_page);
1734 page_count += 1;
1735 }
1736 assert_eq!(page_count, 1);
1737 }
1738
1739 fn get_serialized_page_reader<R: ChunkReader>(
1740 file_reader: &SerializedFileReader<R>,
1741 row_group: usize,
1742 column: usize,
1743 ) -> Result<SerializedPageReader<R>> {
1744 let row_group = {
1745 let row_group_metadata = file_reader.metadata.row_group(row_group);
1746 let props = Arc::clone(&file_reader.props);
1747 let f = Arc::clone(&file_reader.chunk_reader);
1748 SerializedRowGroupReader::new(
1749 f,
1750 row_group_metadata,
1751 file_reader
1752 .metadata
1753 .offset_index()
1754 .map(|x| x[row_group].as_slice()),
1755 props,
1756 )?
1757 };
1758
1759 let col = row_group.metadata.column(column);
1760
1761 let page_locations = row_group
1762 .offset_index
1763 .map(|x| x[column].page_locations.clone());
1764
1765 let props = Arc::clone(&row_group.props);
1766 SerializedPageReader::new_with_properties(
1767 Arc::clone(&row_group.chunk_reader),
1768 col,
1769 usize::try_from(row_group.metadata.num_rows())?,
1770 page_locations,
1771 props,
1772 )
1773 }
1774
1775 #[test]
1776 fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1777 let test_file = get_test_file("alltypes_plain.parquet");
1778 let reader = SerializedFileReader::new(test_file)?;
1779
1780 let mut offset_set = HashSet::new();
1781 let num_row_groups = reader.metadata.num_row_groups();
1782 for row_group in 0..num_row_groups {
1783 let num_columns = reader.metadata.row_group(row_group).num_columns();
1784 for column in 0..num_columns {
1785 let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1786
1787 while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1788 match &page_reader.state {
1789 SerializedPageReaderState::Pages {
1790 page_locations,
1791 dictionary_page,
1792 ..
1793 } => {
1794 if let Some(page) = dictionary_page {
1795 assert_eq!(page.offset as u64, page_offset);
1796 } else if let Some(page) = page_locations.front() {
1797 assert_eq!(page.offset as u64, page_offset);
1798 } else {
1799 unreachable!()
1800 }
1801 }
1802 SerializedPageReaderState::Values {
1803 offset,
1804 next_page_header,
1805 ..
1806 } => {
1807 assert!(next_page_header.is_some());
1808 assert_eq!(*offset, page_offset);
1809 }
1810 }
1811 let page = page_reader.get_next_page()?;
1812 assert!(page.is_some());
1813 let newly_inserted = offset_set.insert(page_offset);
1814 assert!(newly_inserted);
1815 }
1816 }
1817 }
1818
1819 Ok(())
1820 }
1821
1822 #[test]
1823 fn test_page_iterator() {
1824 let file = get_test_file("alltypes_plain.parquet");
1825 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1826
1827 let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1828
1829 let page = page_iterator.next();
1831 assert!(page.is_some());
1832 assert!(page.unwrap().is_ok());
1833
1834 let page = page_iterator.next();
1836 assert!(page.is_none());
1837
1838 let row_group_indices = Box::new(0..1);
1839 let mut page_iterator =
1840 FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1841
1842 let page = page_iterator.next();
1844 assert!(page.is_some());
1845 assert!(page.unwrap().is_ok());
1846
1847 let page = page_iterator.next();
1849 assert!(page.is_none());
1850 }
1851
1852 #[test]
1853 fn test_file_reader_key_value_metadata() {
1854 let file = get_test_file("binary.parquet");
1855 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1856
1857 let metadata = file_reader
1858 .metadata
1859 .file_metadata()
1860 .key_value_metadata()
1861 .unwrap();
1862
1863 assert_eq!(metadata.len(), 3);
1864
1865 assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1866
1867 assert_eq!(metadata[1].key, "writer.model.name");
1868 assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1869
1870 assert_eq!(metadata[2].key, "parquet.proto.class");
1871 assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1872 }
1873
1874 #[test]
1875 fn test_file_reader_optional_metadata() {
1876 let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1878 let options = ReadOptionsBuilder::new()
1879 .with_encoding_stats_as_mask(false)
1880 .build();
1881 let file_reader = Arc::new(SerializedFileReader::new_with_options(file, options).unwrap());
1882
1883 let row_group_metadata = file_reader.metadata.row_group(0);
1884 let col0_metadata = row_group_metadata.column(0);
1885
1886 assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1888
1889 let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1891
1892 assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1893 assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1894 assert_eq!(page_encoding_stats.count, 1);
1895
1896 assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1898 assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1899
1900 assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1902 assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1903 }
1904
1905 #[test]
1906 fn test_file_reader_page_stats_mask() {
1907 let file = get_test_file("alltypes_tiny_pages.parquet");
1908 let options = ReadOptionsBuilder::new()
1909 .with_encoding_stats_as_mask(true)
1910 .build();
1911 let file_reader = Arc::new(SerializedFileReader::new_with_options(file, options).unwrap());
1912
1913 let row_group_metadata = file_reader.metadata.row_group(0);
1914
1915 let page_encoding_stats = row_group_metadata
1917 .column(0)
1918 .page_encoding_stats_mask()
1919 .unwrap();
1920 assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1921 let page_encoding_stats = row_group_metadata
1922 .column(2)
1923 .page_encoding_stats_mask()
1924 .unwrap();
1925 assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1926 }
1927
1928 #[test]
1929 fn test_file_reader_page_stats_skipped() {
1930 let file = get_test_file("alltypes_tiny_pages.parquet");
1931
1932 let options = ReadOptionsBuilder::new()
1934 .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1935 .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll)
1936 .build();
1937 let file_reader = Arc::new(
1938 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1939 );
1940
1941 let row_group_metadata = file_reader.metadata.row_group(0);
1942 for column in row_group_metadata.columns() {
1943 assert!(column.page_encoding_stats().is_none());
1944 assert!(column.page_encoding_stats_mask().is_none());
1945 assert!(column.statistics().is_none());
1946 }
1947
1948 let options = ReadOptionsBuilder::new()
1950 .with_encoding_stats_as_mask(true)
1951 .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1952 .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1953 .build();
1954 let file_reader = Arc::new(
1955 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1956 );
1957
1958 let row_group_metadata = file_reader.metadata.row_group(0);
1959 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1960 assert!(column.page_encoding_stats().is_none());
1961 assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1962 assert_eq!(column.statistics().is_some(), idx == 0);
1963 }
1964 }
1965
1966 #[test]
1967 fn test_file_reader_size_stats_skipped() {
1968 let file = get_test_file("repeated_primitive_no_list.parquet");
1969
1970 let options = ReadOptionsBuilder::new()
1972 .with_size_stats_policy(ParquetStatisticsPolicy::SkipAll)
1973 .build();
1974 let file_reader = Arc::new(
1975 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1976 );
1977
1978 let row_group_metadata = file_reader.metadata.row_group(0);
1979 for column in row_group_metadata.columns() {
1980 assert!(column.repetition_level_histogram().is_none());
1981 assert!(column.definition_level_histogram().is_none());
1982 assert!(column.unencoded_byte_array_data_bytes().is_none());
1983 }
1984
1985 let options = ReadOptionsBuilder::new()
1987 .with_encoding_stats_as_mask(true)
1988 .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]))
1989 .build();
1990 let file_reader = Arc::new(
1991 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1992 );
1993
1994 let row_group_metadata = file_reader.metadata.row_group(0);
1995 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1996 assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
1997 assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
1998 assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
1999 }
2000 }
2001
2002 #[test]
2003 fn test_file_reader_with_no_filter() -> Result<()> {
2004 let test_file = get_test_file("alltypes_plain.parquet");
2005 let origin_reader = SerializedFileReader::new(test_file)?;
2006 let metadata = origin_reader.metadata();
2008 assert_eq!(metadata.num_row_groups(), 1);
2009 Ok(())
2010 }
2011
2012 #[test]
2013 fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
2014 let test_file = get_test_file("alltypes_plain.parquet");
2015 let read_options = ReadOptionsBuilder::new()
2016 .with_predicate(Box::new(|_, _| false))
2017 .build();
2018 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2019 let metadata = reader.metadata();
2020 assert_eq!(metadata.num_row_groups(), 0);
2021 Ok(())
2022 }
2023
2024 #[test]
2025 fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
2026 let test_file = get_test_file("alltypes_plain.parquet");
2027 let origin_reader = SerializedFileReader::new(test_file)?;
2028 let metadata = origin_reader.metadata();
2030 assert_eq!(metadata.num_row_groups(), 1);
2031 let mid = get_midpoint_offset(metadata.row_group(0));
2032
2033 let test_file = get_test_file("alltypes_plain.parquet");
2034 let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
2035 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2036 let metadata = reader.metadata();
2037 assert_eq!(metadata.num_row_groups(), 1);
2038
2039 let test_file = get_test_file("alltypes_plain.parquet");
2040 let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
2041 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2042 let metadata = reader.metadata();
2043 assert_eq!(metadata.num_row_groups(), 0);
2044 Ok(())
2045 }
2046
2047 #[test]
2048 fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
2049 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2050 let origin_reader = SerializedFileReader::new(test_file)?;
2051 let metadata = origin_reader.metadata();
2052 let mid = get_midpoint_offset(metadata.row_group(0));
2053
2054 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2056 let read_options = ReadOptionsBuilder::new()
2057 .with_page_index()
2058 .with_predicate(Box::new(|_, _| true))
2059 .with_range(mid, mid + 1)
2060 .build();
2061 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2062 let metadata = reader.metadata();
2063 assert_eq!(metadata.num_row_groups(), 1);
2064 assert_eq!(metadata.column_index().unwrap().len(), 1);
2065 assert_eq!(metadata.offset_index().unwrap().len(), 1);
2066
2067 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2069 let read_options = ReadOptionsBuilder::new()
2070 .with_page_index()
2071 .with_predicate(Box::new(|_, _| true))
2072 .with_range(0, mid)
2073 .build();
2074 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2075 let metadata = reader.metadata();
2076 assert_eq!(metadata.num_row_groups(), 0);
2077 assert!(metadata.column_index().is_none());
2078 assert!(metadata.offset_index().is_none());
2079
2080 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2082 let read_options = ReadOptionsBuilder::new()
2083 .with_page_index()
2084 .with_predicate(Box::new(|_, _| false))
2085 .with_range(mid, mid + 1)
2086 .build();
2087 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2088 let metadata = reader.metadata();
2089 assert_eq!(metadata.num_row_groups(), 0);
2090 assert!(metadata.column_index().is_none());
2091 assert!(metadata.offset_index().is_none());
2092
2093 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2095 let read_options = ReadOptionsBuilder::new()
2096 .with_page_index()
2097 .with_predicate(Box::new(|_, _| false))
2098 .with_range(0, mid)
2099 .build();
2100 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2101 let metadata = reader.metadata();
2102 assert_eq!(metadata.num_row_groups(), 0);
2103 assert!(metadata.column_index().is_none());
2104 assert!(metadata.offset_index().is_none());
2105 Ok(())
2106 }
2107
2108 #[test]
2109 fn test_file_reader_invalid_metadata() {
2110 let data = [
2111 255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
2112 80, 65, 82, 49,
2113 ];
2114 let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
2115 assert_eq!(
2116 ret.err().unwrap().to_string(),
2117 "Parquet error: Expected list element type of Struct but got List"
2118 );
2119 }
2120
2121 #[test]
2122 fn test_page_index_reader() {
2139 let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
2140 let builder = ReadOptionsBuilder::new();
2141 let options = builder.with_page_index().build();
2143 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2144 let reader = reader_result.unwrap();
2145
2146 let metadata = reader.metadata();
2148 assert_eq!(metadata.num_row_groups(), 1);
2149
2150 let column_index = metadata.column_index().unwrap();
2151
2152 assert_eq!(column_index.len(), 1);
2154 let index = if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][0] {
2155 index
2156 } else {
2157 unreachable!()
2158 };
2159
2160 assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
2161
2162 assert_eq!(index.num_pages(), 1);
2164
2165 let min = index.min_value(0).unwrap();
2166 let max = index.max_value(0).unwrap();
2167 assert_eq!(b"Hello", min.as_bytes());
2168 assert_eq!(b"today", max.as_bytes());
2169
2170 let offset_indexes = metadata.offset_index().unwrap();
2171 assert_eq!(offset_indexes.len(), 1);
2173 let offset_index = &offset_indexes[0];
2174 let page_offset = &offset_index[0].page_locations()[0];
2175
2176 assert_eq!(4, page_offset.offset);
2177 assert_eq!(152, page_offset.compressed_page_size);
2178 assert_eq!(0, page_offset.first_row_index);
2179 }
2180
2181 #[test]
2182 fn test_page_index_reader_all_type() {
2183 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2184 let builder = ReadOptionsBuilder::new();
2185 let options = builder.with_page_index().build();
2187 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2188 let reader = reader_result.unwrap();
2189
2190 let metadata = reader.metadata();
2192 assert_eq!(metadata.num_row_groups(), 1);
2193
2194 let column_index = metadata.column_index().unwrap();
2195 let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
2196
2197 assert_eq!(column_index.len(), 1);
2199 let row_group_metadata = metadata.row_group(0);
2200
2201 assert!(!&column_index[0][0].is_sorted());
2203 let boundary_order = &column_index[0][0].get_boundary_order();
2204 assert!(boundary_order.is_some());
2205 matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
2206 if let ColumnIndexMetaData::INT32(index) = &column_index[0][0] {
2207 check_native_page_index(
2208 index,
2209 325,
2210 get_row_group_min_max_bytes(row_group_metadata, 0),
2211 BoundaryOrder::UNORDERED,
2212 );
2213 assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
2214 } else {
2215 unreachable!()
2216 };
2217 assert!(&column_index[0][1].is_sorted());
2219 if let ColumnIndexMetaData::BOOLEAN(index) = &column_index[0][1] {
2220 assert_eq!(index.num_pages(), 82);
2221 assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
2222 } else {
2223 unreachable!()
2224 };
2225 assert!(&column_index[0][2].is_sorted());
2227 if let ColumnIndexMetaData::INT32(index) = &column_index[0][2] {
2228 check_native_page_index(
2229 index,
2230 325,
2231 get_row_group_min_max_bytes(row_group_metadata, 2),
2232 BoundaryOrder::ASCENDING,
2233 );
2234 assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
2235 } else {
2236 unreachable!()
2237 };
2238 assert!(&column_index[0][3].is_sorted());
2240 if let ColumnIndexMetaData::INT32(index) = &column_index[0][3] {
2241 check_native_page_index(
2242 index,
2243 325,
2244 get_row_group_min_max_bytes(row_group_metadata, 3),
2245 BoundaryOrder::ASCENDING,
2246 );
2247 assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
2248 } else {
2249 unreachable!()
2250 };
2251 assert!(&column_index[0][4].is_sorted());
2253 if let ColumnIndexMetaData::INT32(index) = &column_index[0][4] {
2254 check_native_page_index(
2255 index,
2256 325,
2257 get_row_group_min_max_bytes(row_group_metadata, 4),
2258 BoundaryOrder::ASCENDING,
2259 );
2260 assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
2261 } else {
2262 unreachable!()
2263 };
2264 assert!(!&column_index[0][5].is_sorted());
2266 if let ColumnIndexMetaData::INT64(index) = &column_index[0][5] {
2267 check_native_page_index(
2268 index,
2269 528,
2270 get_row_group_min_max_bytes(row_group_metadata, 5),
2271 BoundaryOrder::UNORDERED,
2272 );
2273 assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
2274 } else {
2275 unreachable!()
2276 };
2277 assert!(&column_index[0][6].is_sorted());
2279 if let ColumnIndexMetaData::FLOAT(index) = &column_index[0][6] {
2280 check_native_page_index(
2281 index,
2282 325,
2283 get_row_group_min_max_bytes(row_group_metadata, 6),
2284 BoundaryOrder::ASCENDING,
2285 );
2286 assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
2287 } else {
2288 unreachable!()
2289 };
2290 assert!(!&column_index[0][7].is_sorted());
2292 if let ColumnIndexMetaData::DOUBLE(index) = &column_index[0][7] {
2293 check_native_page_index(
2294 index,
2295 528,
2296 get_row_group_min_max_bytes(row_group_metadata, 7),
2297 BoundaryOrder::UNORDERED,
2298 );
2299 assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
2300 } else {
2301 unreachable!()
2302 };
2303 assert!(!&column_index[0][8].is_sorted());
2305 if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][8] {
2306 check_byte_array_page_index(
2307 index,
2308 974,
2309 get_row_group_min_max_bytes(row_group_metadata, 8),
2310 BoundaryOrder::UNORDERED,
2311 );
2312 assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
2313 } else {
2314 unreachable!()
2315 };
2316 assert!(&column_index[0][9].is_sorted());
2318 if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][9] {
2319 check_byte_array_page_index(
2320 index,
2321 352,
2322 get_row_group_min_max_bytes(row_group_metadata, 9),
2323 BoundaryOrder::ASCENDING,
2324 );
2325 assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
2326 } else {
2327 unreachable!()
2328 };
2329 assert!(!&column_index[0][10].is_sorted());
2332 if let ColumnIndexMetaData::NONE = &column_index[0][10] {
2333 assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
2334 } else {
2335 unreachable!()
2336 };
2337 assert!(&column_index[0][11].is_sorted());
2339 if let ColumnIndexMetaData::INT32(index) = &column_index[0][11] {
2340 check_native_page_index(
2341 index,
2342 325,
2343 get_row_group_min_max_bytes(row_group_metadata, 11),
2344 BoundaryOrder::ASCENDING,
2345 );
2346 assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
2347 } else {
2348 unreachable!()
2349 };
2350 assert!(!&column_index[0][12].is_sorted());
2352 if let ColumnIndexMetaData::INT32(index) = &column_index[0][12] {
2353 check_native_page_index(
2354 index,
2355 325,
2356 get_row_group_min_max_bytes(row_group_metadata, 12),
2357 BoundaryOrder::UNORDERED,
2358 );
2359 assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
2360 } else {
2361 unreachable!()
2362 };
2363 }
2364
2365 fn check_native_page_index<T: ParquetValueType>(
2366 row_group_index: &PrimitiveColumnIndex<T>,
2367 page_size: usize,
2368 min_max: (&[u8], &[u8]),
2369 boundary_order: BoundaryOrder,
2370 ) {
2371 assert_eq!(row_group_index.num_pages() as usize, page_size);
2372 assert_eq!(row_group_index.boundary_order, boundary_order);
2373 assert!(row_group_index.min_values().iter().all(|x| {
2374 x >= &T::try_from_le_slice(min_max.0).unwrap()
2375 && x <= &T::try_from_le_slice(min_max.1).unwrap()
2376 }));
2377 }
2378
2379 fn check_byte_array_page_index(
2380 row_group_index: &ByteArrayColumnIndex,
2381 page_size: usize,
2382 min_max: (&[u8], &[u8]),
2383 boundary_order: BoundaryOrder,
2384 ) {
2385 assert_eq!(row_group_index.num_pages() as usize, page_size);
2386 assert_eq!(row_group_index.boundary_order, boundary_order);
2387 for i in 0..row_group_index.num_pages() as usize {
2388 let x = row_group_index.min_value(i).unwrap();
2389 assert!(x >= min_max.0 && x <= min_max.1);
2390 }
2391 }
2392
2393 fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
2394 let statistics = r.column(col_num).statistics().unwrap();
2395 (
2396 statistics.min_bytes_opt().unwrap_or_default(),
2397 statistics.max_bytes_opt().unwrap_or_default(),
2398 )
2399 }
2400
2401 #[test]
2402 fn test_skip_next_page_with_dictionary_page() {
2403 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2404 let builder = ReadOptionsBuilder::new();
2405 let options = builder.with_page_index().build();
2407 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2408 let reader = reader_result.unwrap();
2409
2410 let row_group_reader = reader.get_row_group(0).unwrap();
2411
2412 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2414
2415 let mut vec = vec![];
2416
2417 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2419 assert!(meta.is_dict);
2420
2421 column_page_reader.skip_next_page().unwrap();
2423
2424 let page = column_page_reader.get_next_page().unwrap().unwrap();
2426 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2427
2428 for _i in 0..351 {
2430 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2432 assert!(!meta.is_dict); vec.push(meta);
2434
2435 let page = column_page_reader.get_next_page().unwrap().unwrap();
2436 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2437 }
2438
2439 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2441 assert!(column_page_reader.get_next_page().unwrap().is_none());
2442
2443 assert_eq!(vec.len(), 351);
2445 }
2446
2447 #[test]
2448 fn test_skip_page_with_offset_index() {
2449 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2450 let builder = ReadOptionsBuilder::new();
2451 let options = builder.with_page_index().build();
2453 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2454 let reader = reader_result.unwrap();
2455
2456 let row_group_reader = reader.get_row_group(0).unwrap();
2457
2458 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2460
2461 let mut vec = vec![];
2462
2463 for i in 0..325 {
2464 if i % 2 == 0 {
2465 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2466 } else {
2467 column_page_reader.skip_next_page().unwrap();
2468 }
2469 }
2470 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2472 assert!(column_page_reader.get_next_page().unwrap().is_none());
2473
2474 assert_eq!(vec.len(), 163);
2475 }
2476
2477 #[test]
2478 fn test_skip_page_without_offset_index() {
2479 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2480
2481 let reader_result = SerializedFileReader::new(test_file);
2483 let reader = reader_result.unwrap();
2484
2485 let row_group_reader = reader.get_row_group(0).unwrap();
2486
2487 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2489
2490 let mut vec = vec![];
2491
2492 for i in 0..325 {
2493 if i % 2 == 0 {
2494 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2495 } else {
2496 column_page_reader.peek_next_page().unwrap().unwrap();
2497 column_page_reader.skip_next_page().unwrap();
2498 }
2499 }
2500 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2502 assert!(column_page_reader.get_next_page().unwrap().is_none());
2503
2504 assert_eq!(vec.len(), 163);
2505 }
2506
2507 #[test]
2508 fn test_peek_page_with_dictionary_page() {
2509 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2510 let builder = ReadOptionsBuilder::new();
2511 let options = builder.with_page_index().build();
2513 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2514 let reader = reader_result.unwrap();
2515 let row_group_reader = reader.get_row_group(0).unwrap();
2516
2517 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2519
2520 let mut vec = vec![];
2521
2522 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2523 assert!(meta.is_dict);
2524 let page = column_page_reader.get_next_page().unwrap().unwrap();
2525 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2526
2527 for i in 0..352 {
2528 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2529 if i != 351 {
2532 assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
2533 } else {
2534 assert_eq!(meta.num_rows, Some(10));
2537 }
2538 assert!(!meta.is_dict);
2539 vec.push(meta);
2540 let page = column_page_reader.get_next_page().unwrap().unwrap();
2541 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2542 }
2543
2544 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2546 assert!(column_page_reader.get_next_page().unwrap().is_none());
2547
2548 assert_eq!(vec.len(), 352);
2549 }
2550
2551 #[test]
2552 fn test_peek_page_with_dictionary_page_without_offset_index() {
2553 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2554
2555 let reader_result = SerializedFileReader::new(test_file);
2556 let reader = reader_result.unwrap();
2557 let row_group_reader = reader.get_row_group(0).unwrap();
2558
2559 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2561
2562 let mut vec = vec![];
2563
2564 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2565 assert!(meta.is_dict);
2566 let page = column_page_reader.get_next_page().unwrap().unwrap();
2567 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2568
2569 for i in 0..352 {
2570 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2571 if i != 351 {
2574 assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2575 } else {
2576 assert_eq!(meta.num_levels, Some(10));
2579 }
2580 assert!(!meta.is_dict);
2581 vec.push(meta);
2582 let page = column_page_reader.get_next_page().unwrap().unwrap();
2583 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2584 }
2585
2586 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2588 assert!(column_page_reader.get_next_page().unwrap().is_none());
2589
2590 assert_eq!(vec.len(), 352);
2591 }
2592
2593 #[test]
2594 fn test_fixed_length_index() {
2595 let message_type = "
2596 message test_schema {
2597 OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2598 }
2599 ";
2600
2601 let schema = parse_message_type(message_type).unwrap();
2602 let mut out = Vec::with_capacity(1024);
2603 let mut writer =
2604 SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2605
2606 let mut r = writer.next_row_group().unwrap();
2607 let mut c = r.next_column().unwrap().unwrap();
2608 c.typed::<FixedLenByteArrayType>()
2609 .write_batch(
2610 &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2611 Some(&[1, 1, 0, 1]),
2612 None,
2613 )
2614 .unwrap();
2615 c.close().unwrap();
2616 r.close().unwrap();
2617 writer.close().unwrap();
2618
2619 let b = Bytes::from(out);
2620 let options = ReadOptionsBuilder::new().with_page_index().build();
2621 let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2622 let index = reader.metadata().column_index().unwrap();
2623
2624 assert_eq!(index.len(), 1);
2626 let c = &index[0];
2627 assert_eq!(c.len(), 1);
2629
2630 match &c[0] {
2631 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(v) => {
2632 assert_eq!(v.num_pages(), 1);
2633 assert_eq!(v.null_count(0).unwrap(), 1);
2634 assert_eq!(v.min_value(0).unwrap(), &[0; 11]);
2635 assert_eq!(v.max_value(0).unwrap(), &[5; 11]);
2636 }
2637 _ => unreachable!(),
2638 }
2639 }
2640
2641 #[test]
2642 fn test_multi_gz() {
2643 let file = get_test_file("concatenated_gzip_members.parquet");
2644 let reader = SerializedFileReader::new(file).unwrap();
2645 let row_group_reader = reader.get_row_group(0).unwrap();
2646 match row_group_reader.get_column_reader(0).unwrap() {
2647 ColumnReader::Int64ColumnReader(mut reader) => {
2648 let mut buffer = Vec::with_capacity(1024);
2649 let mut def_levels = Vec::with_capacity(1024);
2650 let (num_records, num_values, num_levels) = reader
2651 .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2652 .unwrap();
2653
2654 assert_eq!(num_records, 513);
2655 assert_eq!(num_values, 513);
2656 assert_eq!(num_levels, 513);
2657
2658 let expected: Vec<i64> = (1..514).collect();
2659 assert_eq!(&buffer, &expected);
2660 }
2661 _ => unreachable!(),
2662 }
2663 }
2664
2665 #[test]
2666 fn test_byte_stream_split_extended() {
2667 let path = format!(
2668 "{}/byte_stream_split_extended.gzip.parquet",
2669 arrow::util::test_util::parquet_test_data(),
2670 );
2671 let file = File::open(path).unwrap();
2672 let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2673
2674 let mut iter = reader
2676 .get_row_iter(None)
2677 .expect("Failed to create row iterator");
2678
2679 let mut start = 0;
2680 let end = reader.metadata().file_metadata().num_rows();
2681
2682 let check_row = |row: Result<Row, ParquetError>| {
2683 assert!(row.is_ok());
2684 let r = row.unwrap();
2685 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2686 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2687 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2688 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2689 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2690 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2691 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2692 };
2693
2694 while start < end {
2695 match iter.next() {
2696 Some(row) => check_row(row),
2697 None => break,
2698 };
2699 start += 1;
2700 }
2701 }
2702
2703 #[test]
2704 fn test_filtered_rowgroup_metadata() {
2705 let message_type = "
2706 message test_schema {
2707 REQUIRED INT32 a;
2708 }
2709 ";
2710 let schema = Arc::new(parse_message_type(message_type).unwrap());
2711 let props = Arc::new(
2712 WriterProperties::builder()
2713 .set_statistics_enabled(EnabledStatistics::Page)
2714 .build(),
2715 );
2716 let mut file: File = tempfile::tempfile().unwrap();
2717 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2718 let data = [1, 2, 3, 4, 5];
2719
2720 for idx in 0..5 {
2722 let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2723 let mut row_group_writer = file_writer.next_row_group().unwrap();
2724 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2725 writer
2726 .typed::<Int32Type>()
2727 .write_batch(data_i.as_slice(), None, None)
2728 .unwrap();
2729 writer.close().unwrap();
2730 }
2731 row_group_writer.close().unwrap();
2732 file_writer.flushed_row_groups();
2733 }
2734 let file_metadata = file_writer.close().unwrap();
2735
2736 assert_eq!(file_metadata.file_metadata().num_rows(), 25);
2737 assert_eq!(file_metadata.num_row_groups(), 5);
2738
2739 let read_options = ReadOptionsBuilder::new()
2741 .with_page_index()
2742 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2743 .build();
2744 let reader =
2745 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2746 .unwrap();
2747 let metadata = reader.metadata();
2748
2749 assert_eq!(metadata.num_row_groups(), 1);
2751 assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2752
2753 assert!(metadata.column_index().is_some());
2755 assert!(metadata.offset_index().is_some());
2756 assert_eq!(metadata.column_index().unwrap().len(), 1);
2757 assert_eq!(metadata.offset_index().unwrap().len(), 1);
2758 let col_idx = metadata.column_index().unwrap();
2759 let off_idx = metadata.offset_index().unwrap();
2760 let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2761 let pg_idx = &col_idx[0][0];
2762 let off_idx_i = &off_idx[0][0];
2763
2764 match pg_idx {
2766 ColumnIndexMetaData::INT32(int_idx) => {
2767 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2768 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2769 assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2770 assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2771 }
2772 _ => panic!("wrong stats type"),
2773 }
2774
2775 assert_eq!(
2777 off_idx_i.page_locations[0].offset,
2778 metadata.row_group(0).column(0).data_page_offset()
2779 );
2780
2781 let read_options = ReadOptionsBuilder::new()
2783 .with_page_index()
2784 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2785 .build();
2786 let reader =
2787 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2788 .unwrap();
2789 let metadata = reader.metadata();
2790
2791 assert_eq!(metadata.num_row_groups(), 2);
2793 assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2794 assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2795
2796 assert!(metadata.column_index().is_some());
2798 assert!(metadata.offset_index().is_some());
2799 assert_eq!(metadata.column_index().unwrap().len(), 2);
2800 assert_eq!(metadata.offset_index().unwrap().len(), 2);
2801 let col_idx = metadata.column_index().unwrap();
2802 let off_idx = metadata.offset_index().unwrap();
2803
2804 for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2805 let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2806 let pg_idx = &col_idx_i[0];
2807 let off_idx_i = &off_idx[i][0];
2808
2809 match pg_idx {
2811 ColumnIndexMetaData::INT32(int_idx) => {
2812 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2813 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2814 assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2815 assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2816 }
2817 _ => panic!("wrong stats type"),
2818 }
2819
2820 assert_eq!(
2822 off_idx_i.page_locations[0].offset,
2823 metadata.row_group(i).column(0).data_page_offset()
2824 );
2825 }
2826 }
2827
2828 #[test]
2829 fn test_reuse_schema() {
2830 let file = get_test_file("alltypes_plain.parquet");
2831 let file_reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
2832 let schema = file_reader.metadata().file_metadata().schema_descr_ptr();
2833 let expected = file_reader.metadata;
2834
2835 let options = ReadOptionsBuilder::new()
2836 .with_parquet_schema(schema)
2837 .build();
2838 let file_reader = SerializedFileReader::new_with_options(file, options).unwrap();
2839
2840 assert_eq!(expected.as_ref(), file_reader.metadata.as_ref());
2841 assert!(Arc::ptr_eq(
2843 &expected.file_metadata().schema_descr_ptr(),
2844 &file_reader.metadata.file_metadata().schema_descr_ptr()
2845 ));
2846 }
2847
2848 #[test]
2849 fn test_read_unknown_logical_type() {
2850 let file = get_test_file("unknown-logical-type.parquet");
2851 let reader = SerializedFileReader::new(file).expect("Error opening file");
2852
2853 let schema = reader.metadata().file_metadata().schema_descr();
2854 assert_eq!(
2855 schema.column(0).logical_type_ref(),
2856 Some(&basic::LogicalType::String)
2857 );
2858 assert_eq!(
2859 schema.column(1).logical_type_ref(),
2860 Some(&basic::LogicalType::_Unknown { field_id: 2555 })
2861 );
2862 assert_eq!(schema.column(1).physical_type(), Type::BYTE_ARRAY);
2863
2864 let mut iter = reader
2865 .get_row_iter(None)
2866 .expect("Failed to create row iterator");
2867
2868 let mut num_rows = 0;
2869 while iter.next().is_some() {
2870 num_rows += 1;
2871 }
2872 assert_eq!(num_rows, reader.metadata().file_metadata().num_rows());
2873 }
2874}