1use crate::basic::{PageType, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{Codec, create_codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{CryptoContext, read_and_decrypt};
27use crate::errors::{ParquetError, Result};
28use crate::file::metadata::thrift::PageHeader;
29use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
30use crate::file::statistics;
31use crate::file::{
32 metadata::*,
33 properties::{ReaderProperties, ReaderPropertiesPtr},
34 reader::*,
35};
36#[cfg(feature = "encryption")]
37use crate::parquet_thrift::ThriftSliceInputProtocol;
38use crate::parquet_thrift::{ReadThrift, ThriftReadInputProtocol};
39use crate::record::Row;
40use crate::record::reader::RowIter;
41use crate::schema::types::{SchemaDescPtr, Type as SchemaType};
42use bytes::Bytes;
43use std::collections::VecDeque;
44use std::{fs::File, io::Read, path::Path, sync::Arc};
45
46impl TryFrom<File> for SerializedFileReader<File> {
47 type Error = ParquetError;
48
49 fn try_from(file: File) -> Result<Self> {
50 Self::new(file)
51 }
52}
53
54impl TryFrom<&Path> for SerializedFileReader<File> {
55 type Error = ParquetError;
56
57 fn try_from(path: &Path) -> Result<Self> {
58 let file = File::open(path)?;
59 Self::try_from(file)
60 }
61}
62
63impl TryFrom<String> for SerializedFileReader<File> {
64 type Error = ParquetError;
65
66 fn try_from(path: String) -> Result<Self> {
67 Self::try_from(Path::new(&path))
68 }
69}
70
71impl TryFrom<&str> for SerializedFileReader<File> {
72 type Error = ParquetError;
73
74 fn try_from(path: &str) -> Result<Self> {
75 Self::try_from(Path::new(&path))
76 }
77}
78
79impl IntoIterator for SerializedFileReader<File> {
82 type Item = Result<Row>;
83 type IntoIter = RowIter<'static>;
84
85 fn into_iter(self) -> Self::IntoIter {
86 RowIter::from_file_into(Box::new(self))
87 }
88}
89
90pub struct SerializedFileReader<R: ChunkReader> {
95 chunk_reader: Arc<R>,
96 metadata: Arc<ParquetMetaData>,
97 props: ReaderPropertiesPtr,
98}
99
100pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
104
105#[derive(Default)]
109pub struct ReadOptionsBuilder {
110 predicates: Vec<ReadGroupPredicate>,
111 enable_page_index: bool,
112 props: Option<ReaderProperties>,
113 metadata_options: ParquetMetaDataOptions,
114}
115
116impl ReadOptionsBuilder {
117 pub fn new() -> Self {
119 Self::default()
120 }
121
122 pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
125 self.predicates.push(predicate);
126 self
127 }
128
129 pub fn with_range(mut self, start: i64, end: i64) -> Self {
132 assert!(start < end);
133 let predicate = move |rg: &RowGroupMetaData, _: usize| {
134 let mid = get_midpoint_offset(rg);
135 mid >= start && mid < end
136 };
137 self.predicates.push(Box::new(predicate));
138 self
139 }
140
141 pub fn with_page_index(mut self) -> Self {
146 self.enable_page_index = true;
147 self
148 }
149
150 pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
152 self.props = Some(properties);
153 self
154 }
155
156 pub fn with_parquet_schema(mut self, schema: SchemaDescPtr) -> Self {
159 self.metadata_options.set_schema(schema);
160 self
161 }
162
163 pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
172 self.metadata_options.set_encoding_stats_as_mask(val);
173 self
174 }
175
176 pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
181 self.metadata_options.set_encoding_stats_policy(policy);
182 self
183 }
184
185 pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
190 self.metadata_options.set_column_stats_policy(policy);
191 self
192 }
193
194 pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
199 self.metadata_options.set_size_stats_policy(policy);
200 self
201 }
202
203 pub fn build(self) -> ReadOptions {
205 let props = self
206 .props
207 .unwrap_or_else(|| ReaderProperties::builder().build());
208 ReadOptions {
209 predicates: self.predicates,
210 enable_page_index: self.enable_page_index,
211 props,
212 metadata_options: self.metadata_options,
213 }
214 }
215}
216
217pub struct ReadOptions {
222 predicates: Vec<ReadGroupPredicate>,
223 enable_page_index: bool,
224 props: ReaderProperties,
225 metadata_options: ParquetMetaDataOptions,
226}
227
228impl<R: 'static + ChunkReader> SerializedFileReader<R> {
229 pub fn new(chunk_reader: R) -> Result<Self> {
232 let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
233 let props = Arc::new(ReaderProperties::builder().build());
234 Ok(Self {
235 chunk_reader: Arc::new(chunk_reader),
236 metadata: Arc::new(metadata),
237 props,
238 })
239 }
240
241 #[allow(deprecated)]
244 pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
245 let mut metadata_builder = ParquetMetaDataReader::new()
246 .with_metadata_options(Some(options.metadata_options.clone()))
247 .parse_and_finish(&chunk_reader)?
248 .into_builder();
249 let mut predicates = options.predicates;
250
251 for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
253 let mut keep = true;
254 for predicate in &mut predicates {
255 if !predicate(&rg_meta, i) {
256 keep = false;
257 break;
258 }
259 }
260 if keep {
261 metadata_builder = metadata_builder.add_row_group(rg_meta);
262 }
263 }
264
265 let mut metadata = metadata_builder.build();
266
267 if options.enable_page_index {
269 let mut reader =
270 ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
271 reader.read_page_indexes(&chunk_reader)?;
272 metadata = reader.finish()?;
273 }
274
275 Ok(Self {
276 chunk_reader: Arc::new(chunk_reader),
277 metadata: Arc::new(metadata),
278 props: Arc::new(options.props),
279 })
280 }
281}
282
283fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
285 let col = meta.column(0);
286 let mut offset = col.data_page_offset();
287 if let Some(dic_offset) = col.dictionary_page_offset() {
288 if offset > dic_offset {
289 offset = dic_offset
290 }
291 };
292 offset + meta.compressed_size() / 2
293}
294
295impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
296 fn metadata(&self) -> &ParquetMetaData {
297 &self.metadata
298 }
299
300 fn num_row_groups(&self) -> usize {
301 self.metadata.num_row_groups()
302 }
303
304 fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
305 let row_group_metadata = self.metadata.row_group(i);
306 let props = Arc::clone(&self.props);
308 let f = Arc::clone(&self.chunk_reader);
309 Ok(Box::new(SerializedRowGroupReader::new(
310 f,
311 row_group_metadata,
312 self.metadata.offset_index().map(|x| x[i].as_slice()),
313 props,
314 )?))
315 }
316
317 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
318 RowIter::from_file(projection, self)
319 }
320}
321
322pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
324 chunk_reader: Arc<R>,
325 metadata: &'a RowGroupMetaData,
326 offset_index: Option<&'a [OffsetIndexMetaData]>,
327 props: ReaderPropertiesPtr,
328 bloom_filters: Vec<Option<Sbbf>>,
329}
330
331impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
332 pub fn new(
334 chunk_reader: Arc<R>,
335 metadata: &'a RowGroupMetaData,
336 offset_index: Option<&'a [OffsetIndexMetaData]>,
337 props: ReaderPropertiesPtr,
338 ) -> Result<Self> {
339 let bloom_filters = if props.read_bloom_filter() {
340 metadata
341 .columns()
342 .iter()
343 .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
344 .collect::<Result<Vec<_>>>()?
345 } else {
346 std::iter::repeat_n(None, metadata.columns().len()).collect()
347 };
348 Ok(Self {
349 chunk_reader,
350 metadata,
351 offset_index,
352 props,
353 bloom_filters,
354 })
355 }
356}
357
358impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
359 fn metadata(&self) -> &RowGroupMetaData {
360 self.metadata
361 }
362
363 fn num_columns(&self) -> usize {
364 self.metadata.num_columns()
365 }
366
367 fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
369 let col = self.metadata.column(i);
370
371 let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
372
373 let props = Arc::clone(&self.props);
374 Ok(Box::new(SerializedPageReader::new_with_properties(
375 Arc::clone(&self.chunk_reader),
376 col,
377 usize::try_from(self.metadata.num_rows())?,
378 page_locations,
379 props,
380 )?))
381 }
382
383 fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
385 self.bloom_filters[i].as_ref()
386 }
387
388 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
389 RowIter::from_row_group(projection, self)
390 }
391}
392
393pub(crate) fn decode_page(
395 page_header: PageHeader,
396 buffer: Bytes,
397 physical_type: Type,
398 decompressor: Option<&mut Box<dyn Codec>>,
399) -> Result<Page> {
400 #[cfg(feature = "crc")]
402 if let Some(expected_crc) = page_header.crc {
403 let crc = crc32fast::hash(&buffer);
404 if crc != expected_crc as u32 {
405 return Err(general_err!("Page CRC checksum mismatch"));
406 }
407 }
408
409 let mut offset: usize = 0;
416 let mut can_decompress = true;
417
418 if let Some(ref header_v2) = page_header.data_page_header_v2 {
419 if header_v2.definition_levels_byte_length < 0
420 || header_v2.repetition_levels_byte_length < 0
421 || header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
422 > page_header.uncompressed_page_size
423 {
424 return Err(general_err!(
425 "DataPage v2 header contains implausible values \
426 for definition_levels_byte_length ({}) \
427 and repetition_levels_byte_length ({}) \
428 given DataPage header provides uncompressed_page_size ({})",
429 header_v2.definition_levels_byte_length,
430 header_v2.repetition_levels_byte_length,
431 page_header.uncompressed_page_size
432 ));
433 }
434 offset = usize::try_from(
435 header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
436 )?;
437 can_decompress = header_v2.is_compressed.unwrap_or(true);
439 }
440
441 let buffer = match decompressor {
442 Some(decompressor) if can_decompress => {
443 let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
444 if offset > buffer.len() || offset > uncompressed_page_size {
445 return Err(general_err!("Invalid page header"));
446 }
447 let decompressed_size = uncompressed_page_size - offset;
448 let mut decompressed = Vec::with_capacity(uncompressed_page_size);
449 decompressed.extend_from_slice(&buffer[..offset]);
450 if decompressed_size > 0 {
453 let compressed = &buffer[offset..];
454 decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
455 }
456
457 if decompressed.len() != uncompressed_page_size {
458 return Err(general_err!(
459 "Actual decompressed size doesn't match the expected one ({} vs {})",
460 decompressed.len(),
461 uncompressed_page_size
462 ));
463 }
464
465 Bytes::from(decompressed)
466 }
467 _ => buffer,
468 };
469
470 let result = match page_header.r#type {
471 PageType::DICTIONARY_PAGE => {
472 let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
473 ParquetError::General("Missing dictionary page header".to_string())
474 })?;
475 let is_sorted = dict_header.is_sorted.unwrap_or(false);
476 Page::DictionaryPage {
477 buf: buffer,
478 num_values: dict_header.num_values.try_into()?,
479 encoding: dict_header.encoding,
480 is_sorted,
481 }
482 }
483 PageType::DATA_PAGE => {
484 let header = page_header
485 .data_page_header
486 .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
487 Page::DataPage {
488 buf: buffer,
489 num_values: header.num_values.try_into()?,
490 encoding: header.encoding,
491 def_level_encoding: header.definition_level_encoding,
492 rep_level_encoding: header.repetition_level_encoding,
493 statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
494 }
495 }
496 PageType::DATA_PAGE_V2 => {
497 let header = page_header
498 .data_page_header_v2
499 .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
500 let is_compressed = header.is_compressed.unwrap_or(true);
501 Page::DataPageV2 {
502 buf: buffer,
503 num_values: header.num_values.try_into()?,
504 encoding: header.encoding,
505 num_nulls: header.num_nulls.try_into()?,
506 num_rows: header.num_rows.try_into()?,
507 def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
508 rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
509 is_compressed,
510 statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
511 }
512 }
513 _ => {
514 return Err(general_err!(
516 "Page type {:?} is not supported",
517 page_header.r#type
518 ));
519 }
520 };
521
522 Ok(result)
523}
524
525enum SerializedPageReaderState {
526 Values {
527 offset: u64,
530
531 remaining_bytes: u64,
534
535 next_page_header: Option<Box<PageHeader>>,
537
538 page_index: usize,
540
541 require_dictionary: bool,
543 },
544 Pages {
545 page_locations: VecDeque<PageLocation>,
547 dictionary_page: Option<PageLocation>,
549 total_rows: usize,
551 page_index: usize,
553 },
554}
555
556#[derive(Default)]
557struct SerializedPageReaderContext {
558 read_stats: bool,
560 #[cfg(feature = "encryption")]
562 crypto_context: Option<Arc<CryptoContext>>,
563}
564
565pub struct SerializedPageReader<R: ChunkReader> {
567 reader: Arc<R>,
569
570 decompressor: Option<Box<dyn Codec>>,
572
573 physical_type: Type,
575
576 state: SerializedPageReaderState,
577
578 context: SerializedPageReaderContext,
579}
580
581impl<R: ChunkReader> SerializedPageReader<R> {
582 pub fn new(
584 reader: Arc<R>,
585 column_chunk_metadata: &ColumnChunkMetaData,
586 total_rows: usize,
587 page_locations: Option<Vec<PageLocation>>,
588 ) -> Result<Self> {
589 let props = Arc::new(ReaderProperties::builder().build());
590 SerializedPageReader::new_with_properties(
591 reader,
592 column_chunk_metadata,
593 total_rows,
594 page_locations,
595 props,
596 )
597 }
598
599 #[cfg(all(feature = "arrow", not(feature = "encryption")))]
601 pub(crate) fn add_crypto_context(
602 self,
603 _rg_idx: usize,
604 _column_idx: usize,
605 _parquet_meta_data: &ParquetMetaData,
606 _column_chunk_metadata: &ColumnChunkMetaData,
607 ) -> Result<SerializedPageReader<R>> {
608 Ok(self)
609 }
610
611 #[cfg(feature = "encryption")]
613 pub(crate) fn add_crypto_context(
614 mut self,
615 rg_idx: usize,
616 column_idx: usize,
617 parquet_meta_data: &ParquetMetaData,
618 column_chunk_metadata: &ColumnChunkMetaData,
619 ) -> Result<SerializedPageReader<R>> {
620 let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
621 return Ok(self);
622 };
623 let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
624 return Ok(self);
625 };
626 let crypto_context =
627 CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
628 self.context.crypto_context = Some(Arc::new(crypto_context));
629 Ok(self)
630 }
631
632 pub fn new_with_properties(
634 reader: Arc<R>,
635 meta: &ColumnChunkMetaData,
636 total_rows: usize,
637 page_locations: Option<Vec<PageLocation>>,
638 props: ReaderPropertiesPtr,
639 ) -> Result<Self> {
640 let decompressor = create_codec(meta.compression(), props.codec_options())?;
641 let (start, len) = meta.byte_range();
642
643 let state = match page_locations {
644 Some(locations) => {
645 let dictionary_page = match locations.first() {
648 Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
649 offset: start as i64,
650 compressed_page_size: (dict_offset.offset as u64 - start) as i32,
651 first_row_index: 0,
652 }),
653 _ => None,
654 };
655
656 SerializedPageReaderState::Pages {
657 page_locations: locations.into(),
658 dictionary_page,
659 total_rows,
660 page_index: 0,
661 }
662 }
663 None => SerializedPageReaderState::Values {
664 offset: start,
665 remaining_bytes: len,
666 next_page_header: None,
667 page_index: 0,
668 require_dictionary: meta.dictionary_page_offset().is_some(),
669 },
670 };
671 let mut context = SerializedPageReaderContext::default();
672 if props.read_page_stats() {
673 context.read_stats = true;
674 }
675 Ok(Self {
676 reader,
677 decompressor,
678 state,
679 physical_type: meta.column_type(),
680 context,
681 })
682 }
683
684 #[cfg(test)]
690 fn peek_next_page_offset(&mut self) -> Result<Option<u64>> {
691 match &mut self.state {
692 SerializedPageReaderState::Values {
693 offset,
694 remaining_bytes,
695 next_page_header,
696 page_index,
697 require_dictionary,
698 } => {
699 loop {
700 if *remaining_bytes == 0 {
701 return Ok(None);
702 }
703 return if let Some(header) = next_page_header.as_ref() {
704 if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
705 Ok(Some(*offset))
706 } else {
707 *next_page_header = None;
709 continue;
710 }
711 } else {
712 let mut read = self.reader.get_read(*offset)?;
713 let (header_len, header) = Self::read_page_header_len(
714 &self.context,
715 &mut read,
716 *page_index,
717 *require_dictionary,
718 )?;
719 *offset += header_len as u64;
720 *remaining_bytes -= header_len as u64;
721 let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
722 Ok(Some(*offset))
723 } else {
724 continue;
726 };
727 *next_page_header = Some(Box::new(header));
728 page_meta
729 };
730 }
731 }
732 SerializedPageReaderState::Pages {
733 page_locations,
734 dictionary_page,
735 ..
736 } => {
737 if let Some(page) = dictionary_page {
738 Ok(Some(page.offset as u64))
739 } else if let Some(page) = page_locations.front() {
740 Ok(Some(page.offset as u64))
741 } else {
742 Ok(None)
743 }
744 }
745 }
746 }
747
748 fn read_page_header_len<T: Read>(
749 context: &SerializedPageReaderContext,
750 input: &mut T,
751 page_index: usize,
752 dictionary_page: bool,
753 ) -> Result<(usize, PageHeader)> {
754 struct TrackedRead<R> {
756 inner: R,
757 bytes_read: usize,
758 }
759
760 impl<R: Read> Read for TrackedRead<R> {
761 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
762 let v = self.inner.read(buf)?;
763 self.bytes_read += v;
764 Ok(v)
765 }
766 }
767
768 let mut tracked = TrackedRead {
769 inner: input,
770 bytes_read: 0,
771 };
772 let header = context.read_page_header(&mut tracked, page_index, dictionary_page)?;
773 Ok((tracked.bytes_read, header))
774 }
775
776 fn read_page_header_len_from_bytes(
777 context: &SerializedPageReaderContext,
778 buffer: &[u8],
779 page_index: usize,
780 dictionary_page: bool,
781 ) -> Result<(usize, PageHeader)> {
782 let mut input = std::io::Cursor::new(buffer);
783 let header = context.read_page_header(&mut input, page_index, dictionary_page)?;
784 let header_len = input.position() as usize;
785 Ok((header_len, header))
786 }
787}
788
789#[cfg(not(feature = "encryption"))]
790impl SerializedPageReaderContext {
791 fn read_page_header<T: Read>(
792 &self,
793 input: &mut T,
794 _page_index: usize,
795 _dictionary_page: bool,
796 ) -> Result<PageHeader> {
797 let mut prot = ThriftReadInputProtocol::new(input);
798 if self.read_stats {
799 Ok(PageHeader::read_thrift(&mut prot)?)
800 } else {
801 Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
802 }
803 }
804
805 fn decrypt_page_data<T>(
806 &self,
807 buffer: T,
808 _page_index: usize,
809 _dictionary_page: bool,
810 ) -> Result<T> {
811 Ok(buffer)
812 }
813}
814
815#[cfg(feature = "encryption")]
816impl SerializedPageReaderContext {
817 fn read_page_header<T: Read>(
818 &self,
819 input: &mut T,
820 page_index: usize,
821 dictionary_page: bool,
822 ) -> Result<PageHeader> {
823 match self.page_crypto_context(page_index, dictionary_page) {
824 None => {
825 let mut prot = ThriftReadInputProtocol::new(input);
826 if self.read_stats {
827 Ok(PageHeader::read_thrift(&mut prot)?)
828 } else {
829 use crate::file::metadata::thrift::PageHeader;
830
831 Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
832 }
833 }
834 Some(page_crypto_context) => {
835 let data_decryptor = page_crypto_context.data_decryptor();
836 let aad = page_crypto_context.create_page_header_aad()?;
837
838 let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
839 ParquetError::General(format!(
840 "Error decrypting page header for column {}, decryption key may be wrong",
841 page_crypto_context.column_ordinal
842 ))
843 })?;
844
845 let mut prot = ThriftSliceInputProtocol::new(buf.as_slice());
846 if self.read_stats {
847 Ok(PageHeader::read_thrift(&mut prot)?)
848 } else {
849 Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
850 }
851 }
852 }
853 }
854
855 fn decrypt_page_data<T>(&self, buffer: T, page_index: usize, dictionary_page: bool) -> Result<T>
856 where
857 T: AsRef<[u8]>,
858 T: From<Vec<u8>>,
859 {
860 let page_crypto_context = self.page_crypto_context(page_index, dictionary_page);
861 if let Some(page_crypto_context) = page_crypto_context {
862 let decryptor = page_crypto_context.data_decryptor();
863 let aad = page_crypto_context.create_page_aad()?;
864 let decrypted = decryptor.decrypt(buffer.as_ref(), &aad)?;
865 Ok(T::from(decrypted))
866 } else {
867 Ok(buffer)
868 }
869 }
870
871 fn page_crypto_context(
872 &self,
873 page_index: usize,
874 dictionary_page: bool,
875 ) -> Option<Arc<CryptoContext>> {
876 self.crypto_context.as_ref().map(|c| {
877 Arc::new(if dictionary_page {
878 c.for_dictionary_page()
879 } else {
880 c.with_page_ordinal(page_index)
881 })
882 })
883 }
884}
885
886impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
887 type Item = Result<Page>;
888
889 fn next(&mut self) -> Option<Self::Item> {
890 self.get_next_page().transpose()
891 }
892}
893
894fn verify_page_header_len(header_len: usize, remaining_bytes: u64) -> Result<()> {
895 if header_len as u64 > remaining_bytes {
896 return Err(eof_err!("Invalid page header"));
897 }
898 Ok(())
899}
900
901fn verify_page_size(
902 compressed_size: i32,
903 uncompressed_size: i32,
904 remaining_bytes: u64,
905) -> Result<()> {
906 if compressed_size < 0 || compressed_size as u64 > remaining_bytes || uncompressed_size < 0 {
910 return Err(eof_err!("Invalid page header"));
911 }
912 Ok(())
913}
914
915impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
916 fn get_next_page(&mut self) -> Result<Option<Page>> {
917 loop {
918 let page = match &mut self.state {
919 SerializedPageReaderState::Values {
920 offset,
921 remaining_bytes: remaining,
922 next_page_header,
923 page_index,
924 require_dictionary,
925 } => {
926 if *remaining == 0 {
927 return Ok(None);
928 }
929
930 let mut read = self.reader.get_read(*offset)?;
931 let header = if let Some(header) = next_page_header.take() {
932 *header
933 } else {
934 let (header_len, header) = Self::read_page_header_len(
935 &self.context,
936 &mut read,
937 *page_index,
938 *require_dictionary,
939 )?;
940 verify_page_header_len(header_len, *remaining)?;
941 *offset += header_len as u64;
942 *remaining -= header_len as u64;
943 header
944 };
945 verify_page_size(
946 header.compressed_page_size,
947 header.uncompressed_page_size,
948 *remaining,
949 )?;
950 let data_len = header.compressed_page_size as usize;
951 let data_start = *offset;
952 *offset += data_len as u64;
953 *remaining -= data_len as u64;
954
955 if header.r#type == PageType::INDEX_PAGE {
956 continue;
957 }
958
959 let buffer = self.reader.get_bytes(data_start, data_len)?;
960
961 let buffer =
962 self.context
963 .decrypt_page_data(buffer, *page_index, *require_dictionary)?;
964
965 let page = decode_page(
966 header,
967 buffer,
968 self.physical_type,
969 self.decompressor.as_mut(),
970 )?;
971 if page.is_data_page() {
972 *page_index += 1;
973 } else if page.is_dictionary_page() {
974 *require_dictionary = false;
975 }
976 page
977 }
978 SerializedPageReaderState::Pages {
979 page_locations,
980 dictionary_page,
981 page_index,
982 ..
983 } => {
984 let (front, is_dictionary_page) = match dictionary_page.take() {
985 Some(front) => (front, true),
986 None => match page_locations.pop_front() {
987 Some(front) => (front, false),
988 None => return Ok(None),
989 },
990 };
991
992 let page_len = usize::try_from(front.compressed_page_size)?;
993 let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
994
995 let (offset, header) = Self::read_page_header_len_from_bytes(
996 &self.context,
997 buffer.as_ref(),
998 *page_index,
999 is_dictionary_page,
1000 )?;
1001 let bytes = buffer.slice(offset..);
1002 let bytes =
1003 self.context
1004 .decrypt_page_data(bytes, *page_index, is_dictionary_page)?;
1005
1006 if !is_dictionary_page {
1007 *page_index += 1;
1008 }
1009 decode_page(
1010 header,
1011 bytes,
1012 self.physical_type,
1013 self.decompressor.as_mut(),
1014 )?
1015 }
1016 };
1017
1018 return Ok(Some(page));
1019 }
1020 }
1021
1022 fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
1023 match &mut self.state {
1024 SerializedPageReaderState::Values {
1025 offset,
1026 remaining_bytes,
1027 next_page_header,
1028 page_index,
1029 require_dictionary,
1030 } => {
1031 loop {
1032 if *remaining_bytes == 0 {
1033 return Ok(None);
1034 }
1035 return if let Some(header) = next_page_header.as_ref() {
1036 if let Ok(page_meta) = (&**header).try_into() {
1037 Ok(Some(page_meta))
1038 } else {
1039 *next_page_header = None;
1041 continue;
1042 }
1043 } else {
1044 let mut read = self.reader.get_read(*offset)?;
1045 let (header_len, header) = Self::read_page_header_len(
1046 &self.context,
1047 &mut read,
1048 *page_index,
1049 *require_dictionary,
1050 )?;
1051 verify_page_header_len(header_len, *remaining_bytes)?;
1052 *offset += header_len as u64;
1053 *remaining_bytes -= header_len as u64;
1054 let page_meta = if let Ok(page_meta) = (&header).try_into() {
1055 Ok(Some(page_meta))
1056 } else {
1057 continue;
1059 };
1060 *next_page_header = Some(Box::new(header));
1061 page_meta
1062 };
1063 }
1064 }
1065 SerializedPageReaderState::Pages {
1066 page_locations,
1067 dictionary_page,
1068 total_rows,
1069 page_index: _,
1070 } => {
1071 if dictionary_page.is_some() {
1072 Ok(Some(PageMetadata {
1073 num_rows: None,
1074 num_levels: None,
1075 is_dict: true,
1076 }))
1077 } else if let Some(page) = page_locations.front() {
1078 let next_rows = page_locations
1079 .get(1)
1080 .map(|x| x.first_row_index as usize)
1081 .unwrap_or(*total_rows);
1082
1083 Ok(Some(PageMetadata {
1084 num_rows: Some(next_rows - page.first_row_index as usize),
1085 num_levels: None,
1086 is_dict: false,
1087 }))
1088 } else {
1089 Ok(None)
1090 }
1091 }
1092 }
1093 }
1094
1095 fn skip_next_page(&mut self) -> Result<()> {
1096 match &mut self.state {
1097 SerializedPageReaderState::Values {
1098 offset,
1099 remaining_bytes,
1100 next_page_header,
1101 page_index,
1102 require_dictionary,
1103 } => {
1104 if let Some(buffered_header) = next_page_header.take() {
1105 verify_page_size(
1106 buffered_header.compressed_page_size,
1107 buffered_header.uncompressed_page_size,
1108 *remaining_bytes,
1109 )?;
1110 *offset += buffered_header.compressed_page_size as u64;
1112 *remaining_bytes -= buffered_header.compressed_page_size as u64;
1113 } else {
1114 let mut read = self.reader.get_read(*offset)?;
1115 let (header_len, header) = Self::read_page_header_len(
1116 &self.context,
1117 &mut read,
1118 *page_index,
1119 *require_dictionary,
1120 )?;
1121 verify_page_header_len(header_len, *remaining_bytes)?;
1122 verify_page_size(
1123 header.compressed_page_size,
1124 header.uncompressed_page_size,
1125 *remaining_bytes,
1126 )?;
1127 let data_page_size = header.compressed_page_size as u64;
1128 *offset += header_len as u64 + data_page_size;
1129 *remaining_bytes -= header_len as u64 + data_page_size;
1130 }
1131 if *require_dictionary {
1132 *require_dictionary = false;
1133 } else {
1134 *page_index += 1;
1135 }
1136 Ok(())
1137 }
1138 SerializedPageReaderState::Pages {
1139 page_locations,
1140 dictionary_page,
1141 page_index,
1142 ..
1143 } => {
1144 if dictionary_page.is_some() {
1145 dictionary_page.take();
1147 } else {
1148 if page_locations.pop_front().is_some() {
1150 *page_index += 1;
1151 }
1152 }
1153
1154 Ok(())
1155 }
1156 }
1157 }
1158
1159 fn at_record_boundary(&mut self) -> Result<bool> {
1160 match &mut self.state {
1161 SerializedPageReaderState::Values { .. } => match self.peek_next_page()? {
1162 None => Ok(true),
1163 Some(metadata) => Ok(metadata.num_rows.is_some()),
1166 },
1167 SerializedPageReaderState::Pages { .. } => Ok(true),
1168 }
1169 }
1170}
1171
1172#[cfg(test)]
1173mod tests {
1174 use std::collections::HashSet;
1175
1176 use bytes::Buf;
1177
1178 use crate::file::page_index::column_index::{
1179 ByteArrayColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex,
1180 };
1181 use crate::file::properties::{EnabledStatistics, WriterProperties};
1182
1183 use crate::basic::{self, BoundaryOrder, ColumnOrder, Encoding, SortOrder};
1184 use crate::column::reader::ColumnReader;
1185 use crate::data_type::private::ParquetValueType;
1186 use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1187 use crate::file::metadata::thrift::DataPageHeaderV2;
1188 #[allow(deprecated)]
1189 use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
1190 use crate::file::writer::SerializedFileWriter;
1191 use crate::record::RowAccessor;
1192 use crate::schema::parser::parse_message_type;
1193 use crate::util::test_common::file_util::{get_test_file, get_test_path};
1194
1195 use super::*;
1196
1197 #[test]
1198 fn test_decode_page_invalid_offset() {
1199 let page_header = PageHeader {
1200 r#type: PageType::DATA_PAGE_V2,
1201 uncompressed_page_size: 10,
1202 compressed_page_size: 10,
1203 data_page_header: None,
1204 index_page_header: None,
1205 dictionary_page_header: None,
1206 crc: None,
1207 data_page_header_v2: Some(DataPageHeaderV2 {
1208 num_nulls: 0,
1209 num_rows: 0,
1210 num_values: 0,
1211 encoding: Encoding::PLAIN,
1212 definition_levels_byte_length: 11,
1213 repetition_levels_byte_length: 0,
1214 is_compressed: None,
1215 statistics: None,
1216 }),
1217 };
1218
1219 let buffer = Bytes::new();
1220 let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1221 assert!(
1222 err.to_string()
1223 .contains("DataPage v2 header contains implausible values")
1224 );
1225 }
1226
1227 #[test]
1228 fn test_decode_unsupported_page() {
1229 let mut page_header = PageHeader {
1230 r#type: PageType::INDEX_PAGE,
1231 uncompressed_page_size: 10,
1232 compressed_page_size: 10,
1233 data_page_header: None,
1234 index_page_header: None,
1235 dictionary_page_header: None,
1236 crc: None,
1237 data_page_header_v2: None,
1238 };
1239 let buffer = Bytes::new();
1240 let err = decode_page(page_header.clone(), buffer.clone(), Type::INT32, None).unwrap_err();
1241 assert_eq!(
1242 err.to_string(),
1243 "Parquet error: Page type INDEX_PAGE is not supported"
1244 );
1245
1246 page_header.data_page_header_v2 = Some(DataPageHeaderV2 {
1247 num_nulls: 0,
1248 num_rows: 0,
1249 num_values: 0,
1250 encoding: Encoding::PLAIN,
1251 definition_levels_byte_length: 11,
1252 repetition_levels_byte_length: 0,
1253 is_compressed: None,
1254 statistics: None,
1255 });
1256 let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1257 assert!(
1258 err.to_string()
1259 .contains("DataPage v2 header contains implausible values")
1260 );
1261 }
1262
1263 #[test]
1264 fn test_cursor_and_file_has_the_same_behaviour() {
1265 let mut buf: Vec<u8> = Vec::new();
1266 get_test_file("alltypes_plain.parquet")
1267 .read_to_end(&mut buf)
1268 .unwrap();
1269 let cursor = Bytes::from(buf);
1270 let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1271
1272 let test_file = get_test_file("alltypes_plain.parquet");
1273 let read_from_file = SerializedFileReader::new(test_file).unwrap();
1274
1275 let file_iter = read_from_file.get_row_iter(None).unwrap();
1276 let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1277
1278 for (a, b) in file_iter.zip(cursor_iter) {
1279 assert_eq!(a.unwrap(), b.unwrap())
1280 }
1281 }
1282
1283 #[test]
1284 fn test_file_reader_try_from() {
1285 let test_file = get_test_file("alltypes_plain.parquet");
1287 let test_path_buf = get_test_path("alltypes_plain.parquet");
1288 let test_path = test_path_buf.as_path();
1289 let test_path_str = test_path.to_str().unwrap();
1290
1291 let reader = SerializedFileReader::try_from(test_file);
1292 assert!(reader.is_ok());
1293
1294 let reader = SerializedFileReader::try_from(test_path);
1295 assert!(reader.is_ok());
1296
1297 let reader = SerializedFileReader::try_from(test_path_str);
1298 assert!(reader.is_ok());
1299
1300 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1301 assert!(reader.is_ok());
1302
1303 let test_path = Path::new("invalid.parquet");
1305 let test_path_str = test_path.to_str().unwrap();
1306
1307 let reader = SerializedFileReader::try_from(test_path);
1308 assert!(reader.is_err());
1309
1310 let reader = SerializedFileReader::try_from(test_path_str);
1311 assert!(reader.is_err());
1312
1313 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1314 assert!(reader.is_err());
1315 }
1316
1317 #[test]
1318 fn test_file_reader_into_iter() {
1319 let path = get_test_path("alltypes_plain.parquet");
1320 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1321 let iter = reader.into_iter();
1322 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1323
1324 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1325 }
1326
1327 #[test]
1328 fn test_file_reader_into_iter_project() {
1329 let path = get_test_path("alltypes_plain.parquet");
1330 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1331 let schema = "message schema { OPTIONAL INT32 id; }";
1332 let proj = parse_message_type(schema).ok();
1333 let iter = reader.into_iter().project(proj).unwrap();
1334 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1335
1336 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1337 }
1338
1339 #[test]
1340 fn test_reuse_file_chunk() {
1341 let test_file = get_test_file("alltypes_plain.parquet");
1345 let reader = SerializedFileReader::new(test_file).unwrap();
1346 let row_group = reader.get_row_group(0).unwrap();
1347
1348 let mut page_readers = Vec::new();
1349 for i in 0..row_group.num_columns() {
1350 page_readers.push(row_group.get_column_page_reader(i).unwrap());
1351 }
1352
1353 for mut page_reader in page_readers {
1356 assert!(page_reader.get_next_page().is_ok());
1357 }
1358 }
1359
1360 #[test]
1361 fn test_file_reader() {
1362 let test_file = get_test_file("alltypes_plain.parquet");
1363 let reader_result = SerializedFileReader::new(test_file);
1364 assert!(reader_result.is_ok());
1365 let reader = reader_result.unwrap();
1366
1367 let metadata = reader.metadata();
1369 assert_eq!(metadata.num_row_groups(), 1);
1370
1371 let file_metadata = metadata.file_metadata();
1373 assert!(file_metadata.created_by().is_some());
1374 assert_eq!(
1375 file_metadata.created_by().unwrap(),
1376 "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1377 );
1378 assert!(file_metadata.key_value_metadata().is_none());
1379 assert_eq!(file_metadata.num_rows(), 8);
1380 assert_eq!(file_metadata.version(), 1);
1381 assert_eq!(file_metadata.column_orders(), None);
1382
1383 let row_group_metadata = metadata.row_group(0);
1385 assert_eq!(row_group_metadata.num_columns(), 11);
1386 assert_eq!(row_group_metadata.num_rows(), 8);
1387 assert_eq!(row_group_metadata.total_byte_size(), 671);
1388 for i in 0..row_group_metadata.num_columns() {
1390 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1391 }
1392
1393 let row_group_reader_result = reader.get_row_group(0);
1395 assert!(row_group_reader_result.is_ok());
1396 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1397 assert_eq!(
1398 row_group_reader.num_columns(),
1399 row_group_metadata.num_columns()
1400 );
1401 assert_eq!(
1402 row_group_reader.metadata().total_byte_size(),
1403 row_group_metadata.total_byte_size()
1404 );
1405
1406 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1409 assert!(page_reader_0_result.is_ok());
1410 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1411 let mut page_count = 0;
1412 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1413 let is_expected_page = match page {
1414 Page::DictionaryPage {
1415 buf,
1416 num_values,
1417 encoding,
1418 is_sorted,
1419 } => {
1420 assert_eq!(buf.len(), 32);
1421 assert_eq!(num_values, 8);
1422 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1423 assert!(!is_sorted);
1424 true
1425 }
1426 Page::DataPage {
1427 buf,
1428 num_values,
1429 encoding,
1430 def_level_encoding,
1431 rep_level_encoding,
1432 statistics,
1433 } => {
1434 assert_eq!(buf.len(), 11);
1435 assert_eq!(num_values, 8);
1436 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1437 assert_eq!(def_level_encoding, Encoding::RLE);
1438 #[allow(deprecated)]
1439 let expected_rep_level_encoding = Encoding::BIT_PACKED;
1440 assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1441 assert!(statistics.is_none());
1442 true
1443 }
1444 _ => false,
1445 };
1446 assert!(is_expected_page);
1447 page_count += 1;
1448 }
1449 assert_eq!(page_count, 2);
1450 }
1451
1452 #[test]
1453 fn test_file_reader_datapage_v2() {
1454 let test_file = get_test_file("datapage_v2.snappy.parquet");
1455 let reader_result = SerializedFileReader::new(test_file);
1456 assert!(reader_result.is_ok());
1457 let reader = reader_result.unwrap();
1458
1459 let metadata = reader.metadata();
1461 assert_eq!(metadata.num_row_groups(), 1);
1462
1463 let file_metadata = metadata.file_metadata();
1465 assert!(file_metadata.created_by().is_some());
1466 assert_eq!(
1467 file_metadata.created_by().unwrap(),
1468 "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1469 );
1470 assert!(file_metadata.key_value_metadata().is_some());
1471 assert_eq!(
1472 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1473 1
1474 );
1475
1476 assert_eq!(file_metadata.num_rows(), 5);
1477 assert_eq!(file_metadata.version(), 1);
1478 assert_eq!(file_metadata.column_orders(), None);
1479
1480 let row_group_metadata = metadata.row_group(0);
1481
1482 for i in 0..row_group_metadata.num_columns() {
1484 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1485 }
1486
1487 let row_group_reader_result = reader.get_row_group(0);
1489 assert!(row_group_reader_result.is_ok());
1490 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1491 assert_eq!(
1492 row_group_reader.num_columns(),
1493 row_group_metadata.num_columns()
1494 );
1495 assert_eq!(
1496 row_group_reader.metadata().total_byte_size(),
1497 row_group_metadata.total_byte_size()
1498 );
1499
1500 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1503 assert!(page_reader_0_result.is_ok());
1504 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1505 let mut page_count = 0;
1506 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1507 let is_expected_page = match page {
1508 Page::DictionaryPage {
1509 buf,
1510 num_values,
1511 encoding,
1512 is_sorted,
1513 } => {
1514 assert_eq!(buf.len(), 7);
1515 assert_eq!(num_values, 1);
1516 assert_eq!(encoding, Encoding::PLAIN);
1517 assert!(!is_sorted);
1518 true
1519 }
1520 Page::DataPageV2 {
1521 buf,
1522 num_values,
1523 encoding,
1524 num_nulls,
1525 num_rows,
1526 def_levels_byte_len,
1527 rep_levels_byte_len,
1528 is_compressed,
1529 statistics,
1530 } => {
1531 assert_eq!(buf.len(), 4);
1532 assert_eq!(num_values, 5);
1533 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1534 assert_eq!(num_nulls, 1);
1535 assert_eq!(num_rows, 5);
1536 assert_eq!(def_levels_byte_len, 2);
1537 assert_eq!(rep_levels_byte_len, 0);
1538 assert!(is_compressed);
1539 assert!(statistics.is_none()); true
1541 }
1542 _ => false,
1543 };
1544 assert!(is_expected_page);
1545 page_count += 1;
1546 }
1547 assert_eq!(page_count, 2);
1548 }
1549
1550 #[test]
1551 fn test_file_reader_empty_compressed_datapage_v2() {
1552 let test_file = get_test_file("page_v2_empty_compressed.parquet");
1554 let reader_result = SerializedFileReader::new(test_file);
1555 assert!(reader_result.is_ok());
1556 let reader = reader_result.unwrap();
1557
1558 let metadata = reader.metadata();
1560 assert_eq!(metadata.num_row_groups(), 1);
1561
1562 let file_metadata = metadata.file_metadata();
1564 assert!(file_metadata.created_by().is_some());
1565 assert_eq!(
1566 file_metadata.created_by().unwrap(),
1567 "parquet-cpp-arrow version 14.0.2"
1568 );
1569 assert!(file_metadata.key_value_metadata().is_some());
1570 assert_eq!(
1571 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1572 1
1573 );
1574
1575 assert_eq!(file_metadata.num_rows(), 10);
1576 assert_eq!(file_metadata.version(), 2);
1577 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1578 assert_eq!(
1579 file_metadata.column_orders(),
1580 Some(vec![expected_order].as_ref())
1581 );
1582
1583 let row_group_metadata = metadata.row_group(0);
1584
1585 for i in 0..row_group_metadata.num_columns() {
1587 assert_eq!(file_metadata.column_order(i), expected_order);
1588 }
1589
1590 let row_group_reader_result = reader.get_row_group(0);
1592 assert!(row_group_reader_result.is_ok());
1593 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1594 assert_eq!(
1595 row_group_reader.num_columns(),
1596 row_group_metadata.num_columns()
1597 );
1598 assert_eq!(
1599 row_group_reader.metadata().total_byte_size(),
1600 row_group_metadata.total_byte_size()
1601 );
1602
1603 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1605 assert!(page_reader_0_result.is_ok());
1606 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1607 let mut page_count = 0;
1608 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1609 let is_expected_page = match page {
1610 Page::DictionaryPage {
1611 buf,
1612 num_values,
1613 encoding,
1614 is_sorted,
1615 } => {
1616 assert_eq!(buf.len(), 0);
1617 assert_eq!(num_values, 0);
1618 assert_eq!(encoding, Encoding::PLAIN);
1619 assert!(!is_sorted);
1620 true
1621 }
1622 Page::DataPageV2 {
1623 buf,
1624 num_values,
1625 encoding,
1626 num_nulls,
1627 num_rows,
1628 def_levels_byte_len,
1629 rep_levels_byte_len,
1630 is_compressed,
1631 statistics,
1632 } => {
1633 assert_eq!(buf.len(), 3);
1634 assert_eq!(num_values, 10);
1635 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1636 assert_eq!(num_nulls, 10);
1637 assert_eq!(num_rows, 10);
1638 assert_eq!(def_levels_byte_len, 2);
1639 assert_eq!(rep_levels_byte_len, 0);
1640 assert!(is_compressed);
1641 assert!(statistics.is_none()); true
1643 }
1644 _ => false,
1645 };
1646 assert!(is_expected_page);
1647 page_count += 1;
1648 }
1649 assert_eq!(page_count, 2);
1650 }
1651
1652 #[test]
1653 fn test_file_reader_empty_datapage_v2() {
1654 let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1656 let reader_result = SerializedFileReader::new(test_file);
1657 assert!(reader_result.is_ok());
1658 let reader = reader_result.unwrap();
1659
1660 let metadata = reader.metadata();
1662 assert_eq!(metadata.num_row_groups(), 1);
1663
1664 let file_metadata = metadata.file_metadata();
1666 assert!(file_metadata.created_by().is_some());
1667 assert_eq!(
1668 file_metadata.created_by().unwrap(),
1669 "parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1670 );
1671 assert!(file_metadata.key_value_metadata().is_some());
1672 assert_eq!(
1673 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1674 2
1675 );
1676
1677 assert_eq!(file_metadata.num_rows(), 1);
1678 assert_eq!(file_metadata.version(), 1);
1679 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1680 assert_eq!(
1681 file_metadata.column_orders(),
1682 Some(vec![expected_order].as_ref())
1683 );
1684
1685 let row_group_metadata = metadata.row_group(0);
1686
1687 for i in 0..row_group_metadata.num_columns() {
1689 assert_eq!(file_metadata.column_order(i), expected_order);
1690 }
1691
1692 let row_group_reader_result = reader.get_row_group(0);
1694 assert!(row_group_reader_result.is_ok());
1695 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1696 assert_eq!(
1697 row_group_reader.num_columns(),
1698 row_group_metadata.num_columns()
1699 );
1700 assert_eq!(
1701 row_group_reader.metadata().total_byte_size(),
1702 row_group_metadata.total_byte_size()
1703 );
1704
1705 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1707 assert!(page_reader_0_result.is_ok());
1708 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1709 let mut page_count = 0;
1710 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1711 let is_expected_page = match page {
1712 Page::DataPageV2 {
1713 buf,
1714 num_values,
1715 encoding,
1716 num_nulls,
1717 num_rows,
1718 def_levels_byte_len,
1719 rep_levels_byte_len,
1720 is_compressed,
1721 statistics,
1722 } => {
1723 assert_eq!(buf.len(), 2);
1724 assert_eq!(num_values, 1);
1725 assert_eq!(encoding, Encoding::PLAIN);
1726 assert_eq!(num_nulls, 1);
1727 assert_eq!(num_rows, 1);
1728 assert_eq!(def_levels_byte_len, 2);
1729 assert_eq!(rep_levels_byte_len, 0);
1730 assert!(is_compressed);
1731 assert!(statistics.is_none());
1732 true
1733 }
1734 _ => false,
1735 };
1736 assert!(is_expected_page);
1737 page_count += 1;
1738 }
1739 assert_eq!(page_count, 1);
1740 }
1741
1742 fn get_serialized_page_reader<R: ChunkReader>(
1743 file_reader: &SerializedFileReader<R>,
1744 row_group: usize,
1745 column: usize,
1746 ) -> Result<SerializedPageReader<R>> {
1747 let row_group = {
1748 let row_group_metadata = file_reader.metadata.row_group(row_group);
1749 let props = Arc::clone(&file_reader.props);
1750 let f = Arc::clone(&file_reader.chunk_reader);
1751 SerializedRowGroupReader::new(
1752 f,
1753 row_group_metadata,
1754 file_reader
1755 .metadata
1756 .offset_index()
1757 .map(|x| x[row_group].as_slice()),
1758 props,
1759 )?
1760 };
1761
1762 let col = row_group.metadata.column(column);
1763
1764 let page_locations = row_group
1765 .offset_index
1766 .map(|x| x[column].page_locations.clone());
1767
1768 let props = Arc::clone(&row_group.props);
1769 SerializedPageReader::new_with_properties(
1770 Arc::clone(&row_group.chunk_reader),
1771 col,
1772 usize::try_from(row_group.metadata.num_rows())?,
1773 page_locations,
1774 props,
1775 )
1776 }
1777
1778 #[test]
1779 fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1780 let test_file = get_test_file("alltypes_plain.parquet");
1781 let reader = SerializedFileReader::new(test_file)?;
1782
1783 let mut offset_set = HashSet::new();
1784 let num_row_groups = reader.metadata.num_row_groups();
1785 for row_group in 0..num_row_groups {
1786 let num_columns = reader.metadata.row_group(row_group).num_columns();
1787 for column in 0..num_columns {
1788 let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1789
1790 while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1791 match &page_reader.state {
1792 SerializedPageReaderState::Pages {
1793 page_locations,
1794 dictionary_page,
1795 ..
1796 } => {
1797 if let Some(page) = dictionary_page {
1798 assert_eq!(page.offset as u64, page_offset);
1799 } else if let Some(page) = page_locations.front() {
1800 assert_eq!(page.offset as u64, page_offset);
1801 } else {
1802 unreachable!()
1803 }
1804 }
1805 SerializedPageReaderState::Values {
1806 offset,
1807 next_page_header,
1808 ..
1809 } => {
1810 assert!(next_page_header.is_some());
1811 assert_eq!(*offset, page_offset);
1812 }
1813 }
1814 let page = page_reader.get_next_page()?;
1815 assert!(page.is_some());
1816 let newly_inserted = offset_set.insert(page_offset);
1817 assert!(newly_inserted);
1818 }
1819 }
1820 }
1821
1822 Ok(())
1823 }
1824
1825 #[test]
1826 fn test_page_iterator() {
1827 let file = get_test_file("alltypes_plain.parquet");
1828 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1829
1830 let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1831
1832 let page = page_iterator.next();
1834 assert!(page.is_some());
1835 assert!(page.unwrap().is_ok());
1836
1837 let page = page_iterator.next();
1839 assert!(page.is_none());
1840
1841 let row_group_indices = Box::new(0..1);
1842 let mut page_iterator =
1843 FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1844
1845 let page = page_iterator.next();
1847 assert!(page.is_some());
1848 assert!(page.unwrap().is_ok());
1849
1850 let page = page_iterator.next();
1852 assert!(page.is_none());
1853 }
1854
1855 #[test]
1856 fn test_file_reader_key_value_metadata() {
1857 let file = get_test_file("binary.parquet");
1858 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1859
1860 let metadata = file_reader
1861 .metadata
1862 .file_metadata()
1863 .key_value_metadata()
1864 .unwrap();
1865
1866 assert_eq!(metadata.len(), 3);
1867
1868 assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1869
1870 assert_eq!(metadata[1].key, "writer.model.name");
1871 assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1872
1873 assert_eq!(metadata[2].key, "parquet.proto.class");
1874 assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1875 }
1876
1877 #[test]
1878 fn test_file_reader_optional_metadata() {
1879 let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1881 let options = ReadOptionsBuilder::new()
1882 .with_encoding_stats_as_mask(false)
1883 .build();
1884 let file_reader = Arc::new(SerializedFileReader::new_with_options(file, options).unwrap());
1885
1886 let row_group_metadata = file_reader.metadata.row_group(0);
1887 let col0_metadata = row_group_metadata.column(0);
1888
1889 assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1891
1892 let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1894
1895 assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1896 assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1897 assert_eq!(page_encoding_stats.count, 1);
1898
1899 assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1901 assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1902
1903 assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1905 assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1906 }
1907
1908 #[test]
1909 fn test_file_reader_page_stats_mask() {
1910 let file = get_test_file("alltypes_tiny_pages.parquet");
1911 let options = ReadOptionsBuilder::new()
1912 .with_encoding_stats_as_mask(true)
1913 .build();
1914 let file_reader = Arc::new(SerializedFileReader::new_with_options(file, options).unwrap());
1915
1916 let row_group_metadata = file_reader.metadata.row_group(0);
1917
1918 let page_encoding_stats = row_group_metadata
1920 .column(0)
1921 .page_encoding_stats_mask()
1922 .unwrap();
1923 assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1924 let page_encoding_stats = row_group_metadata
1925 .column(2)
1926 .page_encoding_stats_mask()
1927 .unwrap();
1928 assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1929 }
1930
1931 #[test]
1932 fn test_file_reader_page_stats_skipped() {
1933 let file = get_test_file("alltypes_tiny_pages.parquet");
1934
1935 let options = ReadOptionsBuilder::new()
1937 .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1938 .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll)
1939 .build();
1940 let file_reader = Arc::new(
1941 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1942 );
1943
1944 let row_group_metadata = file_reader.metadata.row_group(0);
1945 for column in row_group_metadata.columns() {
1946 assert!(column.page_encoding_stats().is_none());
1947 assert!(column.page_encoding_stats_mask().is_none());
1948 assert!(column.statistics().is_none());
1949 }
1950
1951 let options = ReadOptionsBuilder::new()
1953 .with_encoding_stats_as_mask(true)
1954 .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1955 .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1956 .build();
1957 let file_reader = Arc::new(
1958 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1959 );
1960
1961 let row_group_metadata = file_reader.metadata.row_group(0);
1962 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1963 assert!(column.page_encoding_stats().is_none());
1964 assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1965 assert_eq!(column.statistics().is_some(), idx == 0);
1966 }
1967 }
1968
1969 #[test]
1970 fn test_file_reader_size_stats_skipped() {
1971 let file = get_test_file("repeated_primitive_no_list.parquet");
1972
1973 let options = ReadOptionsBuilder::new()
1975 .with_size_stats_policy(ParquetStatisticsPolicy::SkipAll)
1976 .build();
1977 let file_reader = Arc::new(
1978 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1979 );
1980
1981 let row_group_metadata = file_reader.metadata.row_group(0);
1982 for column in row_group_metadata.columns() {
1983 assert!(column.repetition_level_histogram().is_none());
1984 assert!(column.definition_level_histogram().is_none());
1985 assert!(column.unencoded_byte_array_data_bytes().is_none());
1986 }
1987
1988 let options = ReadOptionsBuilder::new()
1990 .with_encoding_stats_as_mask(true)
1991 .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]))
1992 .build();
1993 let file_reader = Arc::new(
1994 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1995 );
1996
1997 let row_group_metadata = file_reader.metadata.row_group(0);
1998 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1999 assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
2000 assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
2001 assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
2002 }
2003 }
2004
2005 #[test]
2006 fn test_file_reader_with_no_filter() -> Result<()> {
2007 let test_file = get_test_file("alltypes_plain.parquet");
2008 let origin_reader = SerializedFileReader::new(test_file)?;
2009 let metadata = origin_reader.metadata();
2011 assert_eq!(metadata.num_row_groups(), 1);
2012 Ok(())
2013 }
2014
2015 #[test]
2016 fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
2017 let test_file = get_test_file("alltypes_plain.parquet");
2018 let read_options = ReadOptionsBuilder::new()
2019 .with_predicate(Box::new(|_, _| false))
2020 .build();
2021 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2022 let metadata = reader.metadata();
2023 assert_eq!(metadata.num_row_groups(), 0);
2024 Ok(())
2025 }
2026
2027 #[test]
2028 fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
2029 let test_file = get_test_file("alltypes_plain.parquet");
2030 let origin_reader = SerializedFileReader::new(test_file)?;
2031 let metadata = origin_reader.metadata();
2033 assert_eq!(metadata.num_row_groups(), 1);
2034 let mid = get_midpoint_offset(metadata.row_group(0));
2035
2036 let test_file = get_test_file("alltypes_plain.parquet");
2037 let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
2038 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2039 let metadata = reader.metadata();
2040 assert_eq!(metadata.num_row_groups(), 1);
2041
2042 let test_file = get_test_file("alltypes_plain.parquet");
2043 let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
2044 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2045 let metadata = reader.metadata();
2046 assert_eq!(metadata.num_row_groups(), 0);
2047 Ok(())
2048 }
2049
2050 #[test]
2051 fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
2052 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2053 let origin_reader = SerializedFileReader::new(test_file)?;
2054 let metadata = origin_reader.metadata();
2055 let mid = get_midpoint_offset(metadata.row_group(0));
2056
2057 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2059 let read_options = ReadOptionsBuilder::new()
2060 .with_page_index()
2061 .with_predicate(Box::new(|_, _| true))
2062 .with_range(mid, mid + 1)
2063 .build();
2064 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2065 let metadata = reader.metadata();
2066 assert_eq!(metadata.num_row_groups(), 1);
2067 assert_eq!(metadata.column_index().unwrap().len(), 1);
2068 assert_eq!(metadata.offset_index().unwrap().len(), 1);
2069
2070 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2072 let read_options = ReadOptionsBuilder::new()
2073 .with_page_index()
2074 .with_predicate(Box::new(|_, _| true))
2075 .with_range(0, mid)
2076 .build();
2077 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2078 let metadata = reader.metadata();
2079 assert_eq!(metadata.num_row_groups(), 0);
2080 assert!(metadata.column_index().is_none());
2081 assert!(metadata.offset_index().is_none());
2082
2083 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2085 let read_options = ReadOptionsBuilder::new()
2086 .with_page_index()
2087 .with_predicate(Box::new(|_, _| false))
2088 .with_range(mid, mid + 1)
2089 .build();
2090 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2091 let metadata = reader.metadata();
2092 assert_eq!(metadata.num_row_groups(), 0);
2093 assert!(metadata.column_index().is_none());
2094 assert!(metadata.offset_index().is_none());
2095
2096 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2098 let read_options = ReadOptionsBuilder::new()
2099 .with_page_index()
2100 .with_predicate(Box::new(|_, _| false))
2101 .with_range(0, mid)
2102 .build();
2103 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2104 let metadata = reader.metadata();
2105 assert_eq!(metadata.num_row_groups(), 0);
2106 assert!(metadata.column_index().is_none());
2107 assert!(metadata.offset_index().is_none());
2108 Ok(())
2109 }
2110
2111 #[test]
2112 fn test_file_reader_invalid_metadata() {
2113 let data = [
2114 255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
2115 80, 65, 82, 49,
2116 ];
2117 let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
2118 assert_eq!(
2119 ret.err().unwrap().to_string(),
2120 "Parquet error: Received empty union from remote ColumnOrder"
2121 );
2122 }
2123
2124 #[test]
2125 fn test_page_index_reader() {
2142 let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
2143 let builder = ReadOptionsBuilder::new();
2144 let options = builder.with_page_index().build();
2146 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2147 let reader = reader_result.unwrap();
2148
2149 let metadata = reader.metadata();
2151 assert_eq!(metadata.num_row_groups(), 1);
2152
2153 let column_index = metadata.column_index().unwrap();
2154
2155 assert_eq!(column_index.len(), 1);
2157 let index = if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][0] {
2158 index
2159 } else {
2160 unreachable!()
2161 };
2162
2163 assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
2164
2165 assert_eq!(index.num_pages(), 1);
2167
2168 let min = index.min_value(0).unwrap();
2169 let max = index.max_value(0).unwrap();
2170 assert_eq!(b"Hello", min.as_bytes());
2171 assert_eq!(b"today", max.as_bytes());
2172
2173 let offset_indexes = metadata.offset_index().unwrap();
2174 assert_eq!(offset_indexes.len(), 1);
2176 let offset_index = &offset_indexes[0];
2177 let page_offset = &offset_index[0].page_locations()[0];
2178
2179 assert_eq!(4, page_offset.offset);
2180 assert_eq!(152, page_offset.compressed_page_size);
2181 assert_eq!(0, page_offset.first_row_index);
2182 }
2183
2184 #[test]
2185 #[allow(deprecated)]
2186 fn test_page_index_reader_out_of_order() {
2187 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2188 let options = ReadOptionsBuilder::new().with_page_index().build();
2189 let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
2190 let metadata = reader.metadata();
2191
2192 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2193 let columns = metadata.row_group(0).columns();
2194 let reversed: Vec<_> = columns.iter().cloned().rev().collect();
2195
2196 let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
2197 let mut b = read_columns_indexes(&test_file, &reversed)
2198 .unwrap()
2199 .unwrap();
2200 b.reverse();
2201 assert_eq!(a, b);
2202
2203 let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
2204 let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
2205 b.reverse();
2206 assert_eq!(a, b);
2207 }
2208
2209 #[test]
2210 fn test_page_index_reader_all_type() {
2211 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2212 let builder = ReadOptionsBuilder::new();
2213 let options = builder.with_page_index().build();
2215 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2216 let reader = reader_result.unwrap();
2217
2218 let metadata = reader.metadata();
2220 assert_eq!(metadata.num_row_groups(), 1);
2221
2222 let column_index = metadata.column_index().unwrap();
2223 let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
2224
2225 assert_eq!(column_index.len(), 1);
2227 let row_group_metadata = metadata.row_group(0);
2228
2229 assert!(!&column_index[0][0].is_sorted());
2231 let boundary_order = &column_index[0][0].get_boundary_order();
2232 assert!(boundary_order.is_some());
2233 matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
2234 if let ColumnIndexMetaData::INT32(index) = &column_index[0][0] {
2235 check_native_page_index(
2236 index,
2237 325,
2238 get_row_group_min_max_bytes(row_group_metadata, 0),
2239 BoundaryOrder::UNORDERED,
2240 );
2241 assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
2242 } else {
2243 unreachable!()
2244 };
2245 assert!(&column_index[0][1].is_sorted());
2247 if let ColumnIndexMetaData::BOOLEAN(index) = &column_index[0][1] {
2248 assert_eq!(index.num_pages(), 82);
2249 assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
2250 } else {
2251 unreachable!()
2252 };
2253 assert!(&column_index[0][2].is_sorted());
2255 if let ColumnIndexMetaData::INT32(index) = &column_index[0][2] {
2256 check_native_page_index(
2257 index,
2258 325,
2259 get_row_group_min_max_bytes(row_group_metadata, 2),
2260 BoundaryOrder::ASCENDING,
2261 );
2262 assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
2263 } else {
2264 unreachable!()
2265 };
2266 assert!(&column_index[0][3].is_sorted());
2268 if let ColumnIndexMetaData::INT32(index) = &column_index[0][3] {
2269 check_native_page_index(
2270 index,
2271 325,
2272 get_row_group_min_max_bytes(row_group_metadata, 3),
2273 BoundaryOrder::ASCENDING,
2274 );
2275 assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
2276 } else {
2277 unreachable!()
2278 };
2279 assert!(&column_index[0][4].is_sorted());
2281 if let ColumnIndexMetaData::INT32(index) = &column_index[0][4] {
2282 check_native_page_index(
2283 index,
2284 325,
2285 get_row_group_min_max_bytes(row_group_metadata, 4),
2286 BoundaryOrder::ASCENDING,
2287 );
2288 assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
2289 } else {
2290 unreachable!()
2291 };
2292 assert!(!&column_index[0][5].is_sorted());
2294 if let ColumnIndexMetaData::INT64(index) = &column_index[0][5] {
2295 check_native_page_index(
2296 index,
2297 528,
2298 get_row_group_min_max_bytes(row_group_metadata, 5),
2299 BoundaryOrder::UNORDERED,
2300 );
2301 assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
2302 } else {
2303 unreachable!()
2304 };
2305 assert!(&column_index[0][6].is_sorted());
2307 if let ColumnIndexMetaData::FLOAT(index) = &column_index[0][6] {
2308 check_native_page_index(
2309 index,
2310 325,
2311 get_row_group_min_max_bytes(row_group_metadata, 6),
2312 BoundaryOrder::ASCENDING,
2313 );
2314 assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
2315 } else {
2316 unreachable!()
2317 };
2318 assert!(!&column_index[0][7].is_sorted());
2320 if let ColumnIndexMetaData::DOUBLE(index) = &column_index[0][7] {
2321 check_native_page_index(
2322 index,
2323 528,
2324 get_row_group_min_max_bytes(row_group_metadata, 7),
2325 BoundaryOrder::UNORDERED,
2326 );
2327 assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
2328 } else {
2329 unreachable!()
2330 };
2331 assert!(!&column_index[0][8].is_sorted());
2333 if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][8] {
2334 check_byte_array_page_index(
2335 index,
2336 974,
2337 get_row_group_min_max_bytes(row_group_metadata, 8),
2338 BoundaryOrder::UNORDERED,
2339 );
2340 assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
2341 } else {
2342 unreachable!()
2343 };
2344 assert!(&column_index[0][9].is_sorted());
2346 if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][9] {
2347 check_byte_array_page_index(
2348 index,
2349 352,
2350 get_row_group_min_max_bytes(row_group_metadata, 9),
2351 BoundaryOrder::ASCENDING,
2352 );
2353 assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
2354 } else {
2355 unreachable!()
2356 };
2357 assert!(!&column_index[0][10].is_sorted());
2360 if let ColumnIndexMetaData::NONE = &column_index[0][10] {
2361 assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
2362 } else {
2363 unreachable!()
2364 };
2365 assert!(&column_index[0][11].is_sorted());
2367 if let ColumnIndexMetaData::INT32(index) = &column_index[0][11] {
2368 check_native_page_index(
2369 index,
2370 325,
2371 get_row_group_min_max_bytes(row_group_metadata, 11),
2372 BoundaryOrder::ASCENDING,
2373 );
2374 assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
2375 } else {
2376 unreachable!()
2377 };
2378 assert!(!&column_index[0][12].is_sorted());
2380 if let ColumnIndexMetaData::INT32(index) = &column_index[0][12] {
2381 check_native_page_index(
2382 index,
2383 325,
2384 get_row_group_min_max_bytes(row_group_metadata, 12),
2385 BoundaryOrder::UNORDERED,
2386 );
2387 assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
2388 } else {
2389 unreachable!()
2390 };
2391 }
2392
2393 fn check_native_page_index<T: ParquetValueType>(
2394 row_group_index: &PrimitiveColumnIndex<T>,
2395 page_size: usize,
2396 min_max: (&[u8], &[u8]),
2397 boundary_order: BoundaryOrder,
2398 ) {
2399 assert_eq!(row_group_index.num_pages() as usize, page_size);
2400 assert_eq!(row_group_index.boundary_order, boundary_order);
2401 assert!(row_group_index.min_values().iter().all(|x| {
2402 x >= &T::try_from_le_slice(min_max.0).unwrap()
2403 && x <= &T::try_from_le_slice(min_max.1).unwrap()
2404 }));
2405 }
2406
2407 fn check_byte_array_page_index(
2408 row_group_index: &ByteArrayColumnIndex,
2409 page_size: usize,
2410 min_max: (&[u8], &[u8]),
2411 boundary_order: BoundaryOrder,
2412 ) {
2413 assert_eq!(row_group_index.num_pages() as usize, page_size);
2414 assert_eq!(row_group_index.boundary_order, boundary_order);
2415 for i in 0..row_group_index.num_pages() as usize {
2416 let x = row_group_index.min_value(i).unwrap();
2417 assert!(x >= min_max.0 && x <= min_max.1);
2418 }
2419 }
2420
2421 fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
2422 let statistics = r.column(col_num).statistics().unwrap();
2423 (
2424 statistics.min_bytes_opt().unwrap_or_default(),
2425 statistics.max_bytes_opt().unwrap_or_default(),
2426 )
2427 }
2428
2429 #[test]
2430 fn test_skip_next_page_with_dictionary_page() {
2431 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2432 let builder = ReadOptionsBuilder::new();
2433 let options = builder.with_page_index().build();
2435 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2436 let reader = reader_result.unwrap();
2437
2438 let row_group_reader = reader.get_row_group(0).unwrap();
2439
2440 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2442
2443 let mut vec = vec![];
2444
2445 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2447 assert!(meta.is_dict);
2448
2449 column_page_reader.skip_next_page().unwrap();
2451
2452 let page = column_page_reader.get_next_page().unwrap().unwrap();
2454 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2455
2456 for _i in 0..351 {
2458 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2460 assert!(!meta.is_dict); vec.push(meta);
2462
2463 let page = column_page_reader.get_next_page().unwrap().unwrap();
2464 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2465 }
2466
2467 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2469 assert!(column_page_reader.get_next_page().unwrap().is_none());
2470
2471 assert_eq!(vec.len(), 351);
2473 }
2474
2475 #[test]
2476 fn test_skip_page_with_offset_index() {
2477 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2478 let builder = ReadOptionsBuilder::new();
2479 let options = builder.with_page_index().build();
2481 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2482 let reader = reader_result.unwrap();
2483
2484 let row_group_reader = reader.get_row_group(0).unwrap();
2485
2486 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2488
2489 let mut vec = vec![];
2490
2491 for i in 0..325 {
2492 if i % 2 == 0 {
2493 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2494 } else {
2495 column_page_reader.skip_next_page().unwrap();
2496 }
2497 }
2498 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2500 assert!(column_page_reader.get_next_page().unwrap().is_none());
2501
2502 assert_eq!(vec.len(), 163);
2503 }
2504
2505 #[test]
2506 fn test_skip_page_without_offset_index() {
2507 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2508
2509 let reader_result = SerializedFileReader::new(test_file);
2511 let reader = reader_result.unwrap();
2512
2513 let row_group_reader = reader.get_row_group(0).unwrap();
2514
2515 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2517
2518 let mut vec = vec![];
2519
2520 for i in 0..325 {
2521 if i % 2 == 0 {
2522 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2523 } else {
2524 column_page_reader.peek_next_page().unwrap().unwrap();
2525 column_page_reader.skip_next_page().unwrap();
2526 }
2527 }
2528 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2530 assert!(column_page_reader.get_next_page().unwrap().is_none());
2531
2532 assert_eq!(vec.len(), 163);
2533 }
2534
2535 #[test]
2536 fn test_peek_page_with_dictionary_page() {
2537 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2538 let builder = ReadOptionsBuilder::new();
2539 let options = builder.with_page_index().build();
2541 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2542 let reader = reader_result.unwrap();
2543 let row_group_reader = reader.get_row_group(0).unwrap();
2544
2545 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2547
2548 let mut vec = vec![];
2549
2550 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2551 assert!(meta.is_dict);
2552 let page = column_page_reader.get_next_page().unwrap().unwrap();
2553 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2554
2555 for i in 0..352 {
2556 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2557 if i != 351 {
2560 assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
2561 } else {
2562 assert_eq!(meta.num_rows, Some(10));
2565 }
2566 assert!(!meta.is_dict);
2567 vec.push(meta);
2568 let page = column_page_reader.get_next_page().unwrap().unwrap();
2569 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2570 }
2571
2572 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2574 assert!(column_page_reader.get_next_page().unwrap().is_none());
2575
2576 assert_eq!(vec.len(), 352);
2577 }
2578
2579 #[test]
2580 fn test_peek_page_with_dictionary_page_without_offset_index() {
2581 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2582
2583 let reader_result = SerializedFileReader::new(test_file);
2584 let reader = reader_result.unwrap();
2585 let row_group_reader = reader.get_row_group(0).unwrap();
2586
2587 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2589
2590 let mut vec = vec![];
2591
2592 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2593 assert!(meta.is_dict);
2594 let page = column_page_reader.get_next_page().unwrap().unwrap();
2595 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2596
2597 for i in 0..352 {
2598 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2599 if i != 351 {
2602 assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2603 } else {
2604 assert_eq!(meta.num_levels, Some(10));
2607 }
2608 assert!(!meta.is_dict);
2609 vec.push(meta);
2610 let page = column_page_reader.get_next_page().unwrap().unwrap();
2611 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2612 }
2613
2614 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2616 assert!(column_page_reader.get_next_page().unwrap().is_none());
2617
2618 assert_eq!(vec.len(), 352);
2619 }
2620
2621 #[test]
2622 fn test_fixed_length_index() {
2623 let message_type = "
2624 message test_schema {
2625 OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2626 }
2627 ";
2628
2629 let schema = parse_message_type(message_type).unwrap();
2630 let mut out = Vec::with_capacity(1024);
2631 let mut writer =
2632 SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2633
2634 let mut r = writer.next_row_group().unwrap();
2635 let mut c = r.next_column().unwrap().unwrap();
2636 c.typed::<FixedLenByteArrayType>()
2637 .write_batch(
2638 &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2639 Some(&[1, 1, 0, 1]),
2640 None,
2641 )
2642 .unwrap();
2643 c.close().unwrap();
2644 r.close().unwrap();
2645 writer.close().unwrap();
2646
2647 let b = Bytes::from(out);
2648 let options = ReadOptionsBuilder::new().with_page_index().build();
2649 let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2650 let index = reader.metadata().column_index().unwrap();
2651
2652 assert_eq!(index.len(), 1);
2654 let c = &index[0];
2655 assert_eq!(c.len(), 1);
2657
2658 match &c[0] {
2659 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(v) => {
2660 assert_eq!(v.num_pages(), 1);
2661 assert_eq!(v.null_count(0).unwrap(), 1);
2662 assert_eq!(v.min_value(0).unwrap(), &[0; 11]);
2663 assert_eq!(v.max_value(0).unwrap(), &[5; 11]);
2664 }
2665 _ => unreachable!(),
2666 }
2667 }
2668
2669 #[test]
2670 fn test_multi_gz() {
2671 let file = get_test_file("concatenated_gzip_members.parquet");
2672 let reader = SerializedFileReader::new(file).unwrap();
2673 let row_group_reader = reader.get_row_group(0).unwrap();
2674 match row_group_reader.get_column_reader(0).unwrap() {
2675 ColumnReader::Int64ColumnReader(mut reader) => {
2676 let mut buffer = Vec::with_capacity(1024);
2677 let mut def_levels = Vec::with_capacity(1024);
2678 let (num_records, num_values, num_levels) = reader
2679 .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2680 .unwrap();
2681
2682 assert_eq!(num_records, 513);
2683 assert_eq!(num_values, 513);
2684 assert_eq!(num_levels, 513);
2685
2686 let expected: Vec<i64> = (1..514).collect();
2687 assert_eq!(&buffer, &expected);
2688 }
2689 _ => unreachable!(),
2690 }
2691 }
2692
2693 #[test]
2694 fn test_byte_stream_split_extended() {
2695 let path = format!(
2696 "{}/byte_stream_split_extended.gzip.parquet",
2697 arrow::util::test_util::parquet_test_data(),
2698 );
2699 let file = File::open(path).unwrap();
2700 let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2701
2702 let mut iter = reader
2704 .get_row_iter(None)
2705 .expect("Failed to create row iterator");
2706
2707 let mut start = 0;
2708 let end = reader.metadata().file_metadata().num_rows();
2709
2710 let check_row = |row: Result<Row, ParquetError>| {
2711 assert!(row.is_ok());
2712 let r = row.unwrap();
2713 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2714 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2715 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2716 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2717 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2718 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2719 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2720 };
2721
2722 while start < end {
2723 match iter.next() {
2724 Some(row) => check_row(row),
2725 None => break,
2726 };
2727 start += 1;
2728 }
2729 }
2730
2731 #[test]
2732 fn test_filtered_rowgroup_metadata() {
2733 let message_type = "
2734 message test_schema {
2735 REQUIRED INT32 a;
2736 }
2737 ";
2738 let schema = Arc::new(parse_message_type(message_type).unwrap());
2739 let props = Arc::new(
2740 WriterProperties::builder()
2741 .set_statistics_enabled(EnabledStatistics::Page)
2742 .build(),
2743 );
2744 let mut file: File = tempfile::tempfile().unwrap();
2745 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2746 let data = [1, 2, 3, 4, 5];
2747
2748 for idx in 0..5 {
2750 let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2751 let mut row_group_writer = file_writer.next_row_group().unwrap();
2752 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2753 writer
2754 .typed::<Int32Type>()
2755 .write_batch(data_i.as_slice(), None, None)
2756 .unwrap();
2757 writer.close().unwrap();
2758 }
2759 row_group_writer.close().unwrap();
2760 file_writer.flushed_row_groups();
2761 }
2762 let file_metadata = file_writer.close().unwrap();
2763
2764 assert_eq!(file_metadata.file_metadata().num_rows(), 25);
2765 assert_eq!(file_metadata.num_row_groups(), 5);
2766
2767 let read_options = ReadOptionsBuilder::new()
2769 .with_page_index()
2770 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2771 .build();
2772 let reader =
2773 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2774 .unwrap();
2775 let metadata = reader.metadata();
2776
2777 assert_eq!(metadata.num_row_groups(), 1);
2779 assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2780
2781 assert!(metadata.column_index().is_some());
2783 assert!(metadata.offset_index().is_some());
2784 assert_eq!(metadata.column_index().unwrap().len(), 1);
2785 assert_eq!(metadata.offset_index().unwrap().len(), 1);
2786 let col_idx = metadata.column_index().unwrap();
2787 let off_idx = metadata.offset_index().unwrap();
2788 let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2789 let pg_idx = &col_idx[0][0];
2790 let off_idx_i = &off_idx[0][0];
2791
2792 match pg_idx {
2794 ColumnIndexMetaData::INT32(int_idx) => {
2795 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2796 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2797 assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2798 assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2799 }
2800 _ => panic!("wrong stats type"),
2801 }
2802
2803 assert_eq!(
2805 off_idx_i.page_locations[0].offset,
2806 metadata.row_group(0).column(0).data_page_offset()
2807 );
2808
2809 let read_options = ReadOptionsBuilder::new()
2811 .with_page_index()
2812 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2813 .build();
2814 let reader =
2815 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2816 .unwrap();
2817 let metadata = reader.metadata();
2818
2819 assert_eq!(metadata.num_row_groups(), 2);
2821 assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2822 assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2823
2824 assert!(metadata.column_index().is_some());
2826 assert!(metadata.offset_index().is_some());
2827 assert_eq!(metadata.column_index().unwrap().len(), 2);
2828 assert_eq!(metadata.offset_index().unwrap().len(), 2);
2829 let col_idx = metadata.column_index().unwrap();
2830 let off_idx = metadata.offset_index().unwrap();
2831
2832 for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2833 let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2834 let pg_idx = &col_idx_i[0];
2835 let off_idx_i = &off_idx[i][0];
2836
2837 match pg_idx {
2839 ColumnIndexMetaData::INT32(int_idx) => {
2840 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2841 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2842 assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2843 assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2844 }
2845 _ => panic!("wrong stats type"),
2846 }
2847
2848 assert_eq!(
2850 off_idx_i.page_locations[0].offset,
2851 metadata.row_group(i).column(0).data_page_offset()
2852 );
2853 }
2854 }
2855
2856 #[test]
2857 fn test_reuse_schema() {
2858 let file = get_test_file("alltypes_plain.parquet");
2859 let file_reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
2860 let schema = file_reader.metadata().file_metadata().schema_descr_ptr();
2861 let expected = file_reader.metadata;
2862
2863 let options = ReadOptionsBuilder::new()
2864 .with_parquet_schema(schema)
2865 .build();
2866 let file_reader = SerializedFileReader::new_with_options(file, options).unwrap();
2867
2868 assert_eq!(expected.as_ref(), file_reader.metadata.as_ref());
2869 assert!(Arc::ptr_eq(
2871 &expected.file_metadata().schema_descr_ptr(),
2872 &file_reader.metadata.file_metadata().schema_descr_ptr()
2873 ));
2874 }
2875
2876 #[test]
2877 fn test_read_unknown_logical_type() {
2878 let file = get_test_file("unknown-logical-type.parquet");
2879 let reader = SerializedFileReader::new(file).expect("Error opening file");
2880
2881 let schema = reader.metadata().file_metadata().schema_descr();
2882 assert_eq!(
2883 schema.column(0).logical_type_ref(),
2884 Some(&basic::LogicalType::String)
2885 );
2886 assert_eq!(
2887 schema.column(1).logical_type_ref(),
2888 Some(&basic::LogicalType::_Unknown { field_id: 2555 })
2889 );
2890 assert_eq!(schema.column(1).physical_type(), Type::BYTE_ARRAY);
2891
2892 let mut iter = reader
2893 .get_row_iter(None)
2894 .expect("Failed to create row iterator");
2895
2896 let mut num_rows = 0;
2897 while iter.next().is_some() {
2898 num_rows += 1;
2899 }
2900 assert_eq!(num_rows, reader.metadata().file_metadata().num_rows());
2901 }
2902}