1use crate::basic::{PageType, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{Codec, create_codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{CryptoContext, read_and_decrypt};
27use crate::errors::{ParquetError, Result};
28use crate::file::metadata::thrift::PageHeader;
29use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
30use crate::file::statistics;
31use crate::file::{
32 metadata::*,
33 properties::{ReaderProperties, ReaderPropertiesPtr},
34 reader::*,
35};
36#[cfg(feature = "encryption")]
37use crate::parquet_thrift::ThriftSliceInputProtocol;
38use crate::parquet_thrift::{ReadThrift, ThriftReadInputProtocol};
39use crate::record::Row;
40use crate::record::reader::RowIter;
41use crate::schema::types::{SchemaDescPtr, Type as SchemaType};
42use bytes::Bytes;
43use std::collections::VecDeque;
44use std::{fs::File, io::Read, path::Path, sync::Arc};
45
46impl TryFrom<File> for SerializedFileReader<File> {
47 type Error = ParquetError;
48
49 fn try_from(file: File) -> Result<Self> {
50 Self::new(file)
51 }
52}
53
54impl TryFrom<&Path> for SerializedFileReader<File> {
55 type Error = ParquetError;
56
57 fn try_from(path: &Path) -> Result<Self> {
58 let file = File::open(path)?;
59 Self::try_from(file)
60 }
61}
62
63impl TryFrom<String> for SerializedFileReader<File> {
64 type Error = ParquetError;
65
66 fn try_from(path: String) -> Result<Self> {
67 Self::try_from(Path::new(&path))
68 }
69}
70
71impl TryFrom<&str> for SerializedFileReader<File> {
72 type Error = ParquetError;
73
74 fn try_from(path: &str) -> Result<Self> {
75 Self::try_from(Path::new(&path))
76 }
77}
78
79impl IntoIterator for SerializedFileReader<File> {
82 type Item = Result<Row>;
83 type IntoIter = RowIter<'static>;
84
85 fn into_iter(self) -> Self::IntoIter {
86 RowIter::from_file_into(Box::new(self))
87 }
88}
89
90pub struct SerializedFileReader<R: ChunkReader> {
95 chunk_reader: Arc<R>,
96 metadata: Arc<ParquetMetaData>,
97 props: ReaderPropertiesPtr,
98}
99
100pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
104
105#[derive(Default)]
109pub struct ReadOptionsBuilder {
110 predicates: Vec<ReadGroupPredicate>,
111 enable_page_index: bool,
112 props: Option<ReaderProperties>,
113 metadata_options: ParquetMetaDataOptions,
114}
115
116impl ReadOptionsBuilder {
117 pub fn new() -> Self {
119 Self::default()
120 }
121
122 pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
125 self.predicates.push(predicate);
126 self
127 }
128
129 pub fn with_range(mut self, start: i64, end: i64) -> Self {
132 assert!(start < end);
133 let predicate = move |rg: &RowGroupMetaData, _: usize| {
134 let mid = get_midpoint_offset(rg);
135 mid >= start && mid < end
136 };
137 self.predicates.push(Box::new(predicate));
138 self
139 }
140
141 pub fn with_page_index(mut self) -> Self {
146 self.enable_page_index = true;
147 self
148 }
149
150 pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
152 self.props = Some(properties);
153 self
154 }
155
156 pub fn with_parquet_schema(mut self, schema: SchemaDescPtr) -> Self {
159 self.metadata_options.set_schema(schema);
160 self
161 }
162
163 pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
172 self.metadata_options.set_encoding_stats_as_mask(val);
173 self
174 }
175
176 pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
181 self.metadata_options.set_encoding_stats_policy(policy);
182 self
183 }
184
185 pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
190 self.metadata_options.set_column_stats_policy(policy);
191 self
192 }
193
194 pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
199 self.metadata_options.set_size_stats_policy(policy);
200 self
201 }
202
203 pub fn build(self) -> ReadOptions {
205 let props = self
206 .props
207 .unwrap_or_else(|| ReaderProperties::builder().build());
208 ReadOptions {
209 predicates: self.predicates,
210 enable_page_index: self.enable_page_index,
211 props,
212 metadata_options: self.metadata_options,
213 }
214 }
215}
216
217pub struct ReadOptions {
222 predicates: Vec<ReadGroupPredicate>,
223 enable_page_index: bool,
224 props: ReaderProperties,
225 metadata_options: ParquetMetaDataOptions,
226}
227
228impl<R: 'static + ChunkReader> SerializedFileReader<R> {
229 pub fn new(chunk_reader: R) -> Result<Self> {
232 let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
233 let props = Arc::new(ReaderProperties::builder().build());
234 Ok(Self {
235 chunk_reader: Arc::new(chunk_reader),
236 metadata: Arc::new(metadata),
237 props,
238 })
239 }
240
241 #[allow(deprecated)]
244 pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
245 let mut metadata_builder = ParquetMetaDataReader::new()
246 .with_metadata_options(Some(options.metadata_options.clone()))
247 .parse_and_finish(&chunk_reader)?
248 .into_builder();
249 let mut predicates = options.predicates;
250
251 for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
253 let mut keep = true;
254 for predicate in &mut predicates {
255 if !predicate(&rg_meta, i) {
256 keep = false;
257 break;
258 }
259 }
260 if keep {
261 metadata_builder = metadata_builder.add_row_group(rg_meta);
262 }
263 }
264
265 let mut metadata = metadata_builder.build();
266
267 if options.enable_page_index {
269 let mut reader =
270 ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
271 reader.read_page_indexes(&chunk_reader)?;
272 metadata = reader.finish()?;
273 }
274
275 Ok(Self {
276 chunk_reader: Arc::new(chunk_reader),
277 metadata: Arc::new(metadata),
278 props: Arc::new(options.props),
279 })
280 }
281}
282
283fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
285 let col = meta.column(0);
286 let mut offset = col.data_page_offset();
287 if let Some(dic_offset) = col.dictionary_page_offset() {
288 if offset > dic_offset {
289 offset = dic_offset
290 }
291 };
292 offset + meta.compressed_size() / 2
293}
294
295impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
296 fn metadata(&self) -> &ParquetMetaData {
297 &self.metadata
298 }
299
300 fn num_row_groups(&self) -> usize {
301 self.metadata.num_row_groups()
302 }
303
304 fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
305 let row_group_metadata = self.metadata.row_group(i);
306 let props = Arc::clone(&self.props);
308 let f = Arc::clone(&self.chunk_reader);
309 Ok(Box::new(SerializedRowGroupReader::new(
310 f,
311 row_group_metadata,
312 self.metadata.offset_index().map(|x| x[i].as_slice()),
313 props,
314 )?))
315 }
316
317 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
318 RowIter::from_file(projection, self)
319 }
320}
321
322pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
324 chunk_reader: Arc<R>,
325 metadata: &'a RowGroupMetaData,
326 offset_index: Option<&'a [OffsetIndexMetaData]>,
327 props: ReaderPropertiesPtr,
328 bloom_filters: Vec<Option<Sbbf>>,
329}
330
331impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
332 pub fn new(
334 chunk_reader: Arc<R>,
335 metadata: &'a RowGroupMetaData,
336 offset_index: Option<&'a [OffsetIndexMetaData]>,
337 props: ReaderPropertiesPtr,
338 ) -> Result<Self> {
339 let bloom_filters = if props.read_bloom_filter() {
340 metadata
341 .columns()
342 .iter()
343 .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
344 .collect::<Result<Vec<_>>>()?
345 } else {
346 std::iter::repeat_n(None, metadata.columns().len()).collect()
347 };
348 Ok(Self {
349 chunk_reader,
350 metadata,
351 offset_index,
352 props,
353 bloom_filters,
354 })
355 }
356}
357
358impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
359 fn metadata(&self) -> &RowGroupMetaData {
360 self.metadata
361 }
362
363 fn num_columns(&self) -> usize {
364 self.metadata.num_columns()
365 }
366
367 fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
369 let col = self.metadata.column(i);
370
371 let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
372
373 let props = Arc::clone(&self.props);
374 Ok(Box::new(SerializedPageReader::new_with_properties(
375 Arc::clone(&self.chunk_reader),
376 col,
377 usize::try_from(self.metadata.num_rows())?,
378 page_locations,
379 props,
380 )?))
381 }
382
383 fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
385 self.bloom_filters[i].as_ref()
386 }
387
388 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
389 RowIter::from_row_group(projection, self)
390 }
391}
392
393pub(crate) fn decode_page(
395 page_header: PageHeader,
396 buffer: Bytes,
397 physical_type: Type,
398 decompressor: Option<&mut Box<dyn Codec>>,
399) -> Result<Page> {
400 #[cfg(feature = "crc")]
402 if let Some(expected_crc) = page_header.crc {
403 let crc = crc32fast::hash(&buffer);
404 if crc != expected_crc as u32 {
405 return Err(general_err!("Page CRC checksum mismatch"));
406 }
407 }
408
409 let mut offset: usize = 0;
416 let mut can_decompress = true;
417
418 if let Some(ref header_v2) = page_header.data_page_header_v2 {
419 if header_v2.definition_levels_byte_length < 0
420 || header_v2.repetition_levels_byte_length < 0
421 || header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
422 > page_header.uncompressed_page_size
423 {
424 return Err(general_err!(
425 "DataPage v2 header contains implausible values \
426 for definition_levels_byte_length ({}) \
427 and repetition_levels_byte_length ({}) \
428 given DataPage header provides uncompressed_page_size ({})",
429 header_v2.definition_levels_byte_length,
430 header_v2.repetition_levels_byte_length,
431 page_header.uncompressed_page_size
432 ));
433 }
434 offset = usize::try_from(
435 header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
436 )?;
437 can_decompress = header_v2.is_compressed.unwrap_or(true);
439 }
440
441 let buffer = match decompressor {
442 Some(decompressor) if can_decompress => {
443 let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
444 if offset > buffer.len() || offset > uncompressed_page_size {
445 return Err(general_err!("Invalid page header"));
446 }
447 let decompressed_size = uncompressed_page_size - offset;
448 let mut decompressed = Vec::with_capacity(uncompressed_page_size);
449 decompressed.extend_from_slice(&buffer[..offset]);
450 if decompressed_size > 0 {
453 let compressed = &buffer[offset..];
454 decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
455 }
456
457 if decompressed.len() != uncompressed_page_size {
458 return Err(general_err!(
459 "Actual decompressed size doesn't match the expected one ({} vs {})",
460 decompressed.len(),
461 uncompressed_page_size
462 ));
463 }
464
465 Bytes::from(decompressed)
466 }
467 _ => buffer,
468 };
469
470 let result = match page_header.r#type {
471 PageType::DICTIONARY_PAGE => {
472 let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
473 ParquetError::General("Missing dictionary page header".to_string())
474 })?;
475 let is_sorted = dict_header.is_sorted.unwrap_or(false);
476 Page::DictionaryPage {
477 buf: buffer,
478 num_values: dict_header.num_values.try_into()?,
479 encoding: dict_header.encoding,
480 is_sorted,
481 }
482 }
483 PageType::DATA_PAGE => {
484 let header = page_header
485 .data_page_header
486 .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
487 Page::DataPage {
488 buf: buffer,
489 num_values: header.num_values.try_into()?,
490 encoding: header.encoding,
491 def_level_encoding: header.definition_level_encoding,
492 rep_level_encoding: header.repetition_level_encoding,
493 statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
494 }
495 }
496 PageType::DATA_PAGE_V2 => {
497 let header = page_header
498 .data_page_header_v2
499 .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
500 let is_compressed = header.is_compressed.unwrap_or(true);
501 Page::DataPageV2 {
502 buf: buffer,
503 num_values: header.num_values.try_into()?,
504 encoding: header.encoding,
505 num_nulls: header.num_nulls.try_into()?,
506 num_rows: header.num_rows.try_into()?,
507 def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
508 rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
509 is_compressed,
510 statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
511 }
512 }
513 _ => {
514 return Err(general_err!(
516 "Page type {:?} is not supported",
517 page_header.r#type
518 ));
519 }
520 };
521
522 Ok(result)
523}
524
525enum SerializedPageReaderState {
526 Values {
527 offset: u64,
530
531 remaining_bytes: u64,
534
535 next_page_header: Option<Box<PageHeader>>,
537
538 page_index: usize,
540
541 require_dictionary: bool,
543 },
544 Pages {
545 page_locations: VecDeque<PageLocation>,
547 dictionary_page: Option<PageLocation>,
549 total_rows: usize,
551 page_index: usize,
553 },
554}
555
556#[derive(Default)]
557struct SerializedPageReaderContext {
558 read_stats: bool,
560 #[cfg(feature = "encryption")]
562 crypto_context: Option<Arc<CryptoContext>>,
563}
564
565pub struct SerializedPageReader<R: ChunkReader> {
567 reader: Arc<R>,
569
570 decompressor: Option<Box<dyn Codec>>,
572
573 physical_type: Type,
575
576 state: SerializedPageReaderState,
577
578 context: SerializedPageReaderContext,
579}
580
581impl<R: ChunkReader> SerializedPageReader<R> {
582 pub fn new(
584 reader: Arc<R>,
585 column_chunk_metadata: &ColumnChunkMetaData,
586 total_rows: usize,
587 page_locations: Option<Vec<PageLocation>>,
588 ) -> Result<Self> {
589 let props = Arc::new(ReaderProperties::builder().build());
590 SerializedPageReader::new_with_properties(
591 reader,
592 column_chunk_metadata,
593 total_rows,
594 page_locations,
595 props,
596 )
597 }
598
599 #[cfg(all(feature = "arrow", not(feature = "encryption")))]
601 pub(crate) fn add_crypto_context(
602 self,
603 _rg_idx: usize,
604 _column_idx: usize,
605 _parquet_meta_data: &ParquetMetaData,
606 _column_chunk_metadata: &ColumnChunkMetaData,
607 ) -> Result<SerializedPageReader<R>> {
608 Ok(self)
609 }
610
611 #[cfg(feature = "encryption")]
613 pub(crate) fn add_crypto_context(
614 mut self,
615 rg_idx: usize,
616 column_idx: usize,
617 parquet_meta_data: &ParquetMetaData,
618 column_chunk_metadata: &ColumnChunkMetaData,
619 ) -> Result<SerializedPageReader<R>> {
620 let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
621 return Ok(self);
622 };
623 let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
624 return Ok(self);
625 };
626 let crypto_context =
627 CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
628 self.context.crypto_context = Some(Arc::new(crypto_context));
629 Ok(self)
630 }
631
632 pub fn new_with_properties(
634 reader: Arc<R>,
635 meta: &ColumnChunkMetaData,
636 total_rows: usize,
637 page_locations: Option<Vec<PageLocation>>,
638 props: ReaderPropertiesPtr,
639 ) -> Result<Self> {
640 let decompressor = create_codec(meta.compression(), props.codec_options())?;
641 let (start, len) = meta.byte_range();
642
643 let state = match page_locations {
644 Some(locations) => {
645 let dictionary_page = match locations.first() {
648 Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
649 offset: start as i64,
650 compressed_page_size: (dict_offset.offset as u64 - start) as i32,
651 first_row_index: 0,
652 }),
653 _ => None,
654 };
655
656 SerializedPageReaderState::Pages {
657 page_locations: locations.into(),
658 dictionary_page,
659 total_rows,
660 page_index: 0,
661 }
662 }
663 None => SerializedPageReaderState::Values {
664 offset: start,
665 remaining_bytes: len,
666 next_page_header: None,
667 page_index: 0,
668 require_dictionary: meta.dictionary_page_offset().is_some(),
669 },
670 };
671 let mut context = SerializedPageReaderContext::default();
672 if props.read_page_stats() {
673 context.read_stats = true;
674 }
675 Ok(Self {
676 reader,
677 decompressor,
678 state,
679 physical_type: meta.column_type(),
680 context,
681 })
682 }
683
684 #[cfg(test)]
690 fn peek_next_page_offset(&mut self) -> Result<Option<u64>> {
691 match &mut self.state {
692 SerializedPageReaderState::Values {
693 offset,
694 remaining_bytes,
695 next_page_header,
696 page_index,
697 require_dictionary,
698 } => {
699 loop {
700 if *remaining_bytes == 0 {
701 return Ok(None);
702 }
703 return if let Some(header) = next_page_header.as_ref() {
704 if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
705 Ok(Some(*offset))
706 } else {
707 *next_page_header = None;
709 continue;
710 }
711 } else {
712 let mut read = self.reader.get_read(*offset)?;
713 let (header_len, header) = Self::read_page_header_len(
714 &self.context,
715 &mut read,
716 *page_index,
717 *require_dictionary,
718 )?;
719 *offset += header_len as u64;
720 *remaining_bytes -= header_len as u64;
721 let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
722 Ok(Some(*offset))
723 } else {
724 continue;
726 };
727 *next_page_header = Some(Box::new(header));
728 page_meta
729 };
730 }
731 }
732 SerializedPageReaderState::Pages {
733 page_locations,
734 dictionary_page,
735 ..
736 } => {
737 if let Some(page) = dictionary_page {
738 Ok(Some(page.offset as u64))
739 } else if let Some(page) = page_locations.front() {
740 Ok(Some(page.offset as u64))
741 } else {
742 Ok(None)
743 }
744 }
745 }
746 }
747
748 fn read_page_header_len<T: Read>(
749 context: &SerializedPageReaderContext,
750 input: &mut T,
751 page_index: usize,
752 dictionary_page: bool,
753 ) -> Result<(usize, PageHeader)> {
754 struct TrackedRead<R> {
756 inner: R,
757 bytes_read: usize,
758 }
759
760 impl<R: Read> Read for TrackedRead<R> {
761 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
762 let v = self.inner.read(buf)?;
763 self.bytes_read += v;
764 Ok(v)
765 }
766 }
767
768 let mut tracked = TrackedRead {
769 inner: input,
770 bytes_read: 0,
771 };
772 let header = context.read_page_header(&mut tracked, page_index, dictionary_page)?;
773 Ok((tracked.bytes_read, header))
774 }
775
776 fn read_page_header_len_from_bytes(
777 context: &SerializedPageReaderContext,
778 buffer: &[u8],
779 page_index: usize,
780 dictionary_page: bool,
781 ) -> Result<(usize, PageHeader)> {
782 let mut input = std::io::Cursor::new(buffer);
783 let header = context.read_page_header(&mut input, page_index, dictionary_page)?;
784 let header_len = input.position() as usize;
785 Ok((header_len, header))
786 }
787}
788
789#[cfg(not(feature = "encryption"))]
790impl SerializedPageReaderContext {
791 fn read_page_header<T: Read>(
792 &self,
793 input: &mut T,
794 _page_index: usize,
795 _dictionary_page: bool,
796 ) -> Result<PageHeader> {
797 let mut prot = ThriftReadInputProtocol::new(input);
798 if self.read_stats {
799 Ok(PageHeader::read_thrift(&mut prot)?)
800 } else {
801 Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
802 }
803 }
804
805 fn decrypt_page_data<T>(
806 &self,
807 buffer: T,
808 _page_index: usize,
809 _dictionary_page: bool,
810 ) -> Result<T> {
811 Ok(buffer)
812 }
813}
814
815#[cfg(feature = "encryption")]
816impl SerializedPageReaderContext {
817 fn read_page_header<T: Read>(
818 &self,
819 input: &mut T,
820 page_index: usize,
821 dictionary_page: bool,
822 ) -> Result<PageHeader> {
823 match self.page_crypto_context(page_index, dictionary_page) {
824 None => {
825 let mut prot = ThriftReadInputProtocol::new(input);
826 if self.read_stats {
827 Ok(PageHeader::read_thrift(&mut prot)?)
828 } else {
829 use crate::file::metadata::thrift::PageHeader;
830
831 Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
832 }
833 }
834 Some(page_crypto_context) => {
835 let data_decryptor = page_crypto_context.data_decryptor();
836 let aad = page_crypto_context.create_page_header_aad()?;
837
838 let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
839 ParquetError::General(format!(
840 "Error decrypting page header for column {}, decryption key may be wrong",
841 page_crypto_context.column_ordinal
842 ))
843 })?;
844
845 let mut prot = ThriftSliceInputProtocol::new(buf.as_slice());
846 if self.read_stats {
847 Ok(PageHeader::read_thrift(&mut prot)?)
848 } else {
849 Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
850 }
851 }
852 }
853 }
854
855 fn decrypt_page_data<T>(&self, buffer: T, page_index: usize, dictionary_page: bool) -> Result<T>
856 where
857 T: AsRef<[u8]>,
858 T: From<Vec<u8>>,
859 {
860 let page_crypto_context = self.page_crypto_context(page_index, dictionary_page);
861 if let Some(page_crypto_context) = page_crypto_context {
862 let decryptor = page_crypto_context.data_decryptor();
863 let aad = page_crypto_context.create_page_aad()?;
864 let decrypted = decryptor.decrypt(buffer.as_ref(), &aad)?;
865 Ok(T::from(decrypted))
866 } else {
867 Ok(buffer)
868 }
869 }
870
871 fn page_crypto_context(
872 &self,
873 page_index: usize,
874 dictionary_page: bool,
875 ) -> Option<Arc<CryptoContext>> {
876 self.crypto_context.as_ref().map(|c| {
877 Arc::new(if dictionary_page {
878 c.for_dictionary_page()
879 } else {
880 c.with_page_ordinal(page_index)
881 })
882 })
883 }
884}
885
886impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
887 type Item = Result<Page>;
888
889 fn next(&mut self) -> Option<Self::Item> {
890 self.get_next_page().transpose()
891 }
892}
893
894fn verify_page_header_len(header_len: usize, remaining_bytes: u64) -> Result<()> {
895 if header_len as u64 > remaining_bytes {
896 return Err(eof_err!("Invalid page header"));
897 }
898 Ok(())
899}
900
901fn verify_page_size(
902 compressed_size: i32,
903 uncompressed_size: i32,
904 remaining_bytes: u64,
905) -> Result<()> {
906 if compressed_size < 0 || compressed_size as u64 > remaining_bytes || uncompressed_size < 0 {
910 return Err(eof_err!("Invalid page header"));
911 }
912 Ok(())
913}
914
915impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
916 fn get_next_page(&mut self) -> Result<Option<Page>> {
917 loop {
918 let page = match &mut self.state {
919 SerializedPageReaderState::Values {
920 offset,
921 remaining_bytes: remaining,
922 next_page_header,
923 page_index,
924 require_dictionary,
925 } => {
926 if *remaining == 0 {
927 return Ok(None);
928 }
929
930 let mut read = self.reader.get_read(*offset)?;
931 let header = if let Some(header) = next_page_header.take() {
932 *header
933 } else {
934 let (header_len, header) = Self::read_page_header_len(
935 &self.context,
936 &mut read,
937 *page_index,
938 *require_dictionary,
939 )?;
940 verify_page_header_len(header_len, *remaining)?;
941 *offset += header_len as u64;
942 *remaining -= header_len as u64;
943 header
944 };
945 verify_page_size(
946 header.compressed_page_size,
947 header.uncompressed_page_size,
948 *remaining,
949 )?;
950 let data_len = header.compressed_page_size as usize;
951 let data_start = *offset;
952 *offset += data_len as u64;
953 *remaining -= data_len as u64;
954
955 if header.r#type == PageType::INDEX_PAGE {
956 continue;
957 }
958
959 let buffer = self.reader.get_bytes(data_start, data_len)?;
960
961 let buffer =
962 self.context
963 .decrypt_page_data(buffer, *page_index, *require_dictionary)?;
964
965 let page = decode_page(
966 header,
967 buffer,
968 self.physical_type,
969 self.decompressor.as_mut(),
970 )?;
971 if page.is_data_page() {
972 *page_index += 1;
973 } else if page.is_dictionary_page() {
974 *require_dictionary = false;
975 }
976 page
977 }
978 SerializedPageReaderState::Pages {
979 page_locations,
980 dictionary_page,
981 page_index,
982 ..
983 } => {
984 let (front, is_dictionary_page) = match dictionary_page.take() {
985 Some(front) => (front, true),
986 None => match page_locations.pop_front() {
987 Some(front) => (front, false),
988 None => return Ok(None),
989 },
990 };
991
992 let page_len = usize::try_from(front.compressed_page_size)?;
993 let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
994
995 let (offset, header) = Self::read_page_header_len_from_bytes(
996 &self.context,
997 buffer.as_ref(),
998 *page_index,
999 is_dictionary_page,
1000 )?;
1001 let bytes = buffer.slice(offset..);
1002 let bytes =
1003 self.context
1004 .decrypt_page_data(bytes, *page_index, is_dictionary_page)?;
1005
1006 if !is_dictionary_page {
1007 *page_index += 1;
1008 }
1009 decode_page(
1010 header,
1011 bytes,
1012 self.physical_type,
1013 self.decompressor.as_mut(),
1014 )?
1015 }
1016 };
1017
1018 return Ok(Some(page));
1019 }
1020 }
1021
1022 fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
1023 match &mut self.state {
1024 SerializedPageReaderState::Values {
1025 offset,
1026 remaining_bytes,
1027 next_page_header,
1028 page_index,
1029 require_dictionary,
1030 } => {
1031 loop {
1032 if *remaining_bytes == 0 {
1033 return Ok(None);
1034 }
1035 return if let Some(header) = next_page_header.as_ref() {
1036 if let Ok(page_meta) = (&**header).try_into() {
1037 Ok(Some(page_meta))
1038 } else {
1039 *next_page_header = None;
1041 continue;
1042 }
1043 } else {
1044 let mut read = self.reader.get_read(*offset)?;
1045 let (header_len, header) = Self::read_page_header_len(
1046 &self.context,
1047 &mut read,
1048 *page_index,
1049 *require_dictionary,
1050 )?;
1051 verify_page_header_len(header_len, *remaining_bytes)?;
1052 *offset += header_len as u64;
1053 *remaining_bytes -= header_len as u64;
1054 let page_meta = if let Ok(page_meta) = (&header).try_into() {
1055 Ok(Some(page_meta))
1056 } else {
1057 continue;
1059 };
1060 *next_page_header = Some(Box::new(header));
1061 page_meta
1062 };
1063 }
1064 }
1065 SerializedPageReaderState::Pages {
1066 page_locations,
1067 dictionary_page,
1068 total_rows,
1069 page_index: _,
1070 } => {
1071 if dictionary_page.is_some() {
1072 Ok(Some(PageMetadata {
1073 num_rows: None,
1074 num_levels: None,
1075 is_dict: true,
1076 }))
1077 } else if let Some(page) = page_locations.front() {
1078 let next_rows = page_locations
1079 .get(1)
1080 .map(|x| x.first_row_index as usize)
1081 .unwrap_or(*total_rows);
1082
1083 Ok(Some(PageMetadata {
1084 num_rows: Some(next_rows - page.first_row_index as usize),
1085 num_levels: None,
1086 is_dict: false,
1087 }))
1088 } else {
1089 Ok(None)
1090 }
1091 }
1092 }
1093 }
1094
1095 fn skip_next_page(&mut self) -> Result<()> {
1096 match &mut self.state {
1097 SerializedPageReaderState::Values {
1098 offset,
1099 remaining_bytes,
1100 next_page_header,
1101 page_index,
1102 require_dictionary,
1103 } => {
1104 if let Some(buffered_header) = next_page_header.take() {
1105 verify_page_size(
1106 buffered_header.compressed_page_size,
1107 buffered_header.uncompressed_page_size,
1108 *remaining_bytes,
1109 )?;
1110 *offset += buffered_header.compressed_page_size as u64;
1112 *remaining_bytes -= buffered_header.compressed_page_size as u64;
1113 } else {
1114 let mut read = self.reader.get_read(*offset)?;
1115 let (header_len, header) = Self::read_page_header_len(
1116 &self.context,
1117 &mut read,
1118 *page_index,
1119 *require_dictionary,
1120 )?;
1121 verify_page_header_len(header_len, *remaining_bytes)?;
1122 verify_page_size(
1123 header.compressed_page_size,
1124 header.uncompressed_page_size,
1125 *remaining_bytes,
1126 )?;
1127 let data_page_size = header.compressed_page_size as u64;
1128 *offset += header_len as u64 + data_page_size;
1129 *remaining_bytes -= header_len as u64 + data_page_size;
1130 }
1131 if *require_dictionary {
1132 *require_dictionary = false;
1133 } else {
1134 *page_index += 1;
1135 }
1136 Ok(())
1137 }
1138 SerializedPageReaderState::Pages {
1139 page_locations,
1140 dictionary_page,
1141 page_index,
1142 ..
1143 } => {
1144 if dictionary_page.is_some() {
1145 dictionary_page.take();
1147 } else {
1148 if page_locations.pop_front().is_some() {
1150 *page_index += 1;
1151 }
1152 }
1153
1154 Ok(())
1155 }
1156 }
1157 }
1158
1159 fn at_record_boundary(&mut self) -> Result<bool> {
1160 match &mut self.state {
1161 SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
1162 SerializedPageReaderState::Pages { .. } => Ok(true),
1163 }
1164 }
1165}
1166
1167#[cfg(test)]
1168mod tests {
1169 use std::collections::HashSet;
1170
1171 use bytes::Buf;
1172
1173 use crate::file::page_index::column_index::{
1174 ByteArrayColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex,
1175 };
1176 use crate::file::properties::{EnabledStatistics, WriterProperties};
1177
1178 use crate::basic::{self, BoundaryOrder, ColumnOrder, Encoding, SortOrder};
1179 use crate::column::reader::ColumnReader;
1180 use crate::data_type::private::ParquetValueType;
1181 use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1182 use crate::file::metadata::thrift::DataPageHeaderV2;
1183 #[allow(deprecated)]
1184 use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
1185 use crate::file::writer::SerializedFileWriter;
1186 use crate::record::RowAccessor;
1187 use crate::schema::parser::parse_message_type;
1188 use crate::util::test_common::file_util::{get_test_file, get_test_path};
1189
1190 use super::*;
1191
1192 #[test]
1193 fn test_decode_page_invalid_offset() {
1194 let page_header = PageHeader {
1195 r#type: PageType::DATA_PAGE_V2,
1196 uncompressed_page_size: 10,
1197 compressed_page_size: 10,
1198 data_page_header: None,
1199 index_page_header: None,
1200 dictionary_page_header: None,
1201 crc: None,
1202 data_page_header_v2: Some(DataPageHeaderV2 {
1203 num_nulls: 0,
1204 num_rows: 0,
1205 num_values: 0,
1206 encoding: Encoding::PLAIN,
1207 definition_levels_byte_length: 11,
1208 repetition_levels_byte_length: 0,
1209 is_compressed: None,
1210 statistics: None,
1211 }),
1212 };
1213
1214 let buffer = Bytes::new();
1215 let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1216 assert!(
1217 err.to_string()
1218 .contains("DataPage v2 header contains implausible values")
1219 );
1220 }
1221
1222 #[test]
1223 fn test_decode_unsupported_page() {
1224 let mut page_header = PageHeader {
1225 r#type: PageType::INDEX_PAGE,
1226 uncompressed_page_size: 10,
1227 compressed_page_size: 10,
1228 data_page_header: None,
1229 index_page_header: None,
1230 dictionary_page_header: None,
1231 crc: None,
1232 data_page_header_v2: None,
1233 };
1234 let buffer = Bytes::new();
1235 let err = decode_page(page_header.clone(), buffer.clone(), Type::INT32, None).unwrap_err();
1236 assert_eq!(
1237 err.to_string(),
1238 "Parquet error: Page type INDEX_PAGE is not supported"
1239 );
1240
1241 page_header.data_page_header_v2 = Some(DataPageHeaderV2 {
1242 num_nulls: 0,
1243 num_rows: 0,
1244 num_values: 0,
1245 encoding: Encoding::PLAIN,
1246 definition_levels_byte_length: 11,
1247 repetition_levels_byte_length: 0,
1248 is_compressed: None,
1249 statistics: None,
1250 });
1251 let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1252 assert!(
1253 err.to_string()
1254 .contains("DataPage v2 header contains implausible values")
1255 );
1256 }
1257
1258 #[test]
1259 fn test_cursor_and_file_has_the_same_behaviour() {
1260 let mut buf: Vec<u8> = Vec::new();
1261 get_test_file("alltypes_plain.parquet")
1262 .read_to_end(&mut buf)
1263 .unwrap();
1264 let cursor = Bytes::from(buf);
1265 let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1266
1267 let test_file = get_test_file("alltypes_plain.parquet");
1268 let read_from_file = SerializedFileReader::new(test_file).unwrap();
1269
1270 let file_iter = read_from_file.get_row_iter(None).unwrap();
1271 let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1272
1273 for (a, b) in file_iter.zip(cursor_iter) {
1274 assert_eq!(a.unwrap(), b.unwrap())
1275 }
1276 }
1277
1278 #[test]
1279 fn test_file_reader_try_from() {
1280 let test_file = get_test_file("alltypes_plain.parquet");
1282 let test_path_buf = get_test_path("alltypes_plain.parquet");
1283 let test_path = test_path_buf.as_path();
1284 let test_path_str = test_path.to_str().unwrap();
1285
1286 let reader = SerializedFileReader::try_from(test_file);
1287 assert!(reader.is_ok());
1288
1289 let reader = SerializedFileReader::try_from(test_path);
1290 assert!(reader.is_ok());
1291
1292 let reader = SerializedFileReader::try_from(test_path_str);
1293 assert!(reader.is_ok());
1294
1295 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1296 assert!(reader.is_ok());
1297
1298 let test_path = Path::new("invalid.parquet");
1300 let test_path_str = test_path.to_str().unwrap();
1301
1302 let reader = SerializedFileReader::try_from(test_path);
1303 assert!(reader.is_err());
1304
1305 let reader = SerializedFileReader::try_from(test_path_str);
1306 assert!(reader.is_err());
1307
1308 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1309 assert!(reader.is_err());
1310 }
1311
1312 #[test]
1313 fn test_file_reader_into_iter() {
1314 let path = get_test_path("alltypes_plain.parquet");
1315 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1316 let iter = reader.into_iter();
1317 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1318
1319 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1320 }
1321
1322 #[test]
1323 fn test_file_reader_into_iter_project() {
1324 let path = get_test_path("alltypes_plain.parquet");
1325 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1326 let schema = "message schema { OPTIONAL INT32 id; }";
1327 let proj = parse_message_type(schema).ok();
1328 let iter = reader.into_iter().project(proj).unwrap();
1329 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1330
1331 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1332 }
1333
1334 #[test]
1335 fn test_reuse_file_chunk() {
1336 let test_file = get_test_file("alltypes_plain.parquet");
1340 let reader = SerializedFileReader::new(test_file).unwrap();
1341 let row_group = reader.get_row_group(0).unwrap();
1342
1343 let mut page_readers = Vec::new();
1344 for i in 0..row_group.num_columns() {
1345 page_readers.push(row_group.get_column_page_reader(i).unwrap());
1346 }
1347
1348 for mut page_reader in page_readers {
1351 assert!(page_reader.get_next_page().is_ok());
1352 }
1353 }
1354
1355 #[test]
1356 fn test_file_reader() {
1357 let test_file = get_test_file("alltypes_plain.parquet");
1358 let reader_result = SerializedFileReader::new(test_file);
1359 assert!(reader_result.is_ok());
1360 let reader = reader_result.unwrap();
1361
1362 let metadata = reader.metadata();
1364 assert_eq!(metadata.num_row_groups(), 1);
1365
1366 let file_metadata = metadata.file_metadata();
1368 assert!(file_metadata.created_by().is_some());
1369 assert_eq!(
1370 file_metadata.created_by().unwrap(),
1371 "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1372 );
1373 assert!(file_metadata.key_value_metadata().is_none());
1374 assert_eq!(file_metadata.num_rows(), 8);
1375 assert_eq!(file_metadata.version(), 1);
1376 assert_eq!(file_metadata.column_orders(), None);
1377
1378 let row_group_metadata = metadata.row_group(0);
1380 assert_eq!(row_group_metadata.num_columns(), 11);
1381 assert_eq!(row_group_metadata.num_rows(), 8);
1382 assert_eq!(row_group_metadata.total_byte_size(), 671);
1383 for i in 0..row_group_metadata.num_columns() {
1385 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1386 }
1387
1388 let row_group_reader_result = reader.get_row_group(0);
1390 assert!(row_group_reader_result.is_ok());
1391 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1392 assert_eq!(
1393 row_group_reader.num_columns(),
1394 row_group_metadata.num_columns()
1395 );
1396 assert_eq!(
1397 row_group_reader.metadata().total_byte_size(),
1398 row_group_metadata.total_byte_size()
1399 );
1400
1401 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1404 assert!(page_reader_0_result.is_ok());
1405 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1406 let mut page_count = 0;
1407 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1408 let is_expected_page = match page {
1409 Page::DictionaryPage {
1410 buf,
1411 num_values,
1412 encoding,
1413 is_sorted,
1414 } => {
1415 assert_eq!(buf.len(), 32);
1416 assert_eq!(num_values, 8);
1417 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1418 assert!(!is_sorted);
1419 true
1420 }
1421 Page::DataPage {
1422 buf,
1423 num_values,
1424 encoding,
1425 def_level_encoding,
1426 rep_level_encoding,
1427 statistics,
1428 } => {
1429 assert_eq!(buf.len(), 11);
1430 assert_eq!(num_values, 8);
1431 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1432 assert_eq!(def_level_encoding, Encoding::RLE);
1433 #[allow(deprecated)]
1434 let expected_rep_level_encoding = Encoding::BIT_PACKED;
1435 assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1436 assert!(statistics.is_none());
1437 true
1438 }
1439 _ => false,
1440 };
1441 assert!(is_expected_page);
1442 page_count += 1;
1443 }
1444 assert_eq!(page_count, 2);
1445 }
1446
1447 #[test]
1448 fn test_file_reader_datapage_v2() {
1449 let test_file = get_test_file("datapage_v2.snappy.parquet");
1450 let reader_result = SerializedFileReader::new(test_file);
1451 assert!(reader_result.is_ok());
1452 let reader = reader_result.unwrap();
1453
1454 let metadata = reader.metadata();
1456 assert_eq!(metadata.num_row_groups(), 1);
1457
1458 let file_metadata = metadata.file_metadata();
1460 assert!(file_metadata.created_by().is_some());
1461 assert_eq!(
1462 file_metadata.created_by().unwrap(),
1463 "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1464 );
1465 assert!(file_metadata.key_value_metadata().is_some());
1466 assert_eq!(
1467 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1468 1
1469 );
1470
1471 assert_eq!(file_metadata.num_rows(), 5);
1472 assert_eq!(file_metadata.version(), 1);
1473 assert_eq!(file_metadata.column_orders(), None);
1474
1475 let row_group_metadata = metadata.row_group(0);
1476
1477 for i in 0..row_group_metadata.num_columns() {
1479 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1480 }
1481
1482 let row_group_reader_result = reader.get_row_group(0);
1484 assert!(row_group_reader_result.is_ok());
1485 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1486 assert_eq!(
1487 row_group_reader.num_columns(),
1488 row_group_metadata.num_columns()
1489 );
1490 assert_eq!(
1491 row_group_reader.metadata().total_byte_size(),
1492 row_group_metadata.total_byte_size()
1493 );
1494
1495 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1498 assert!(page_reader_0_result.is_ok());
1499 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1500 let mut page_count = 0;
1501 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1502 let is_expected_page = match page {
1503 Page::DictionaryPage {
1504 buf,
1505 num_values,
1506 encoding,
1507 is_sorted,
1508 } => {
1509 assert_eq!(buf.len(), 7);
1510 assert_eq!(num_values, 1);
1511 assert_eq!(encoding, Encoding::PLAIN);
1512 assert!(!is_sorted);
1513 true
1514 }
1515 Page::DataPageV2 {
1516 buf,
1517 num_values,
1518 encoding,
1519 num_nulls,
1520 num_rows,
1521 def_levels_byte_len,
1522 rep_levels_byte_len,
1523 is_compressed,
1524 statistics,
1525 } => {
1526 assert_eq!(buf.len(), 4);
1527 assert_eq!(num_values, 5);
1528 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1529 assert_eq!(num_nulls, 1);
1530 assert_eq!(num_rows, 5);
1531 assert_eq!(def_levels_byte_len, 2);
1532 assert_eq!(rep_levels_byte_len, 0);
1533 assert!(is_compressed);
1534 assert!(statistics.is_none()); true
1536 }
1537 _ => false,
1538 };
1539 assert!(is_expected_page);
1540 page_count += 1;
1541 }
1542 assert_eq!(page_count, 2);
1543 }
1544
1545 #[test]
1546 fn test_file_reader_empty_compressed_datapage_v2() {
1547 let test_file = get_test_file("page_v2_empty_compressed.parquet");
1549 let reader_result = SerializedFileReader::new(test_file);
1550 assert!(reader_result.is_ok());
1551 let reader = reader_result.unwrap();
1552
1553 let metadata = reader.metadata();
1555 assert_eq!(metadata.num_row_groups(), 1);
1556
1557 let file_metadata = metadata.file_metadata();
1559 assert!(file_metadata.created_by().is_some());
1560 assert_eq!(
1561 file_metadata.created_by().unwrap(),
1562 "parquet-cpp-arrow version 14.0.2"
1563 );
1564 assert!(file_metadata.key_value_metadata().is_some());
1565 assert_eq!(
1566 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1567 1
1568 );
1569
1570 assert_eq!(file_metadata.num_rows(), 10);
1571 assert_eq!(file_metadata.version(), 2);
1572 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1573 assert_eq!(
1574 file_metadata.column_orders(),
1575 Some(vec![expected_order].as_ref())
1576 );
1577
1578 let row_group_metadata = metadata.row_group(0);
1579
1580 for i in 0..row_group_metadata.num_columns() {
1582 assert_eq!(file_metadata.column_order(i), expected_order);
1583 }
1584
1585 let row_group_reader_result = reader.get_row_group(0);
1587 assert!(row_group_reader_result.is_ok());
1588 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1589 assert_eq!(
1590 row_group_reader.num_columns(),
1591 row_group_metadata.num_columns()
1592 );
1593 assert_eq!(
1594 row_group_reader.metadata().total_byte_size(),
1595 row_group_metadata.total_byte_size()
1596 );
1597
1598 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1600 assert!(page_reader_0_result.is_ok());
1601 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1602 let mut page_count = 0;
1603 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1604 let is_expected_page = match page {
1605 Page::DictionaryPage {
1606 buf,
1607 num_values,
1608 encoding,
1609 is_sorted,
1610 } => {
1611 assert_eq!(buf.len(), 0);
1612 assert_eq!(num_values, 0);
1613 assert_eq!(encoding, Encoding::PLAIN);
1614 assert!(!is_sorted);
1615 true
1616 }
1617 Page::DataPageV2 {
1618 buf,
1619 num_values,
1620 encoding,
1621 num_nulls,
1622 num_rows,
1623 def_levels_byte_len,
1624 rep_levels_byte_len,
1625 is_compressed,
1626 statistics,
1627 } => {
1628 assert_eq!(buf.len(), 3);
1629 assert_eq!(num_values, 10);
1630 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1631 assert_eq!(num_nulls, 10);
1632 assert_eq!(num_rows, 10);
1633 assert_eq!(def_levels_byte_len, 2);
1634 assert_eq!(rep_levels_byte_len, 0);
1635 assert!(is_compressed);
1636 assert!(statistics.is_none()); true
1638 }
1639 _ => false,
1640 };
1641 assert!(is_expected_page);
1642 page_count += 1;
1643 }
1644 assert_eq!(page_count, 2);
1645 }
1646
1647 #[test]
1648 fn test_file_reader_empty_datapage_v2() {
1649 let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1651 let reader_result = SerializedFileReader::new(test_file);
1652 assert!(reader_result.is_ok());
1653 let reader = reader_result.unwrap();
1654
1655 let metadata = reader.metadata();
1657 assert_eq!(metadata.num_row_groups(), 1);
1658
1659 let file_metadata = metadata.file_metadata();
1661 assert!(file_metadata.created_by().is_some());
1662 assert_eq!(
1663 file_metadata.created_by().unwrap(),
1664 "parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1665 );
1666 assert!(file_metadata.key_value_metadata().is_some());
1667 assert_eq!(
1668 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1669 2
1670 );
1671
1672 assert_eq!(file_metadata.num_rows(), 1);
1673 assert_eq!(file_metadata.version(), 1);
1674 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1675 assert_eq!(
1676 file_metadata.column_orders(),
1677 Some(vec![expected_order].as_ref())
1678 );
1679
1680 let row_group_metadata = metadata.row_group(0);
1681
1682 for i in 0..row_group_metadata.num_columns() {
1684 assert_eq!(file_metadata.column_order(i), expected_order);
1685 }
1686
1687 let row_group_reader_result = reader.get_row_group(0);
1689 assert!(row_group_reader_result.is_ok());
1690 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1691 assert_eq!(
1692 row_group_reader.num_columns(),
1693 row_group_metadata.num_columns()
1694 );
1695 assert_eq!(
1696 row_group_reader.metadata().total_byte_size(),
1697 row_group_metadata.total_byte_size()
1698 );
1699
1700 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1702 assert!(page_reader_0_result.is_ok());
1703 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1704 let mut page_count = 0;
1705 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1706 let is_expected_page = match page {
1707 Page::DataPageV2 {
1708 buf,
1709 num_values,
1710 encoding,
1711 num_nulls,
1712 num_rows,
1713 def_levels_byte_len,
1714 rep_levels_byte_len,
1715 is_compressed,
1716 statistics,
1717 } => {
1718 assert_eq!(buf.len(), 2);
1719 assert_eq!(num_values, 1);
1720 assert_eq!(encoding, Encoding::PLAIN);
1721 assert_eq!(num_nulls, 1);
1722 assert_eq!(num_rows, 1);
1723 assert_eq!(def_levels_byte_len, 2);
1724 assert_eq!(rep_levels_byte_len, 0);
1725 assert!(is_compressed);
1726 assert!(statistics.is_none());
1727 true
1728 }
1729 _ => false,
1730 };
1731 assert!(is_expected_page);
1732 page_count += 1;
1733 }
1734 assert_eq!(page_count, 1);
1735 }
1736
1737 fn get_serialized_page_reader<R: ChunkReader>(
1738 file_reader: &SerializedFileReader<R>,
1739 row_group: usize,
1740 column: usize,
1741 ) -> Result<SerializedPageReader<R>> {
1742 let row_group = {
1743 let row_group_metadata = file_reader.metadata.row_group(row_group);
1744 let props = Arc::clone(&file_reader.props);
1745 let f = Arc::clone(&file_reader.chunk_reader);
1746 SerializedRowGroupReader::new(
1747 f,
1748 row_group_metadata,
1749 file_reader
1750 .metadata
1751 .offset_index()
1752 .map(|x| x[row_group].as_slice()),
1753 props,
1754 )?
1755 };
1756
1757 let col = row_group.metadata.column(column);
1758
1759 let page_locations = row_group
1760 .offset_index
1761 .map(|x| x[column].page_locations.clone());
1762
1763 let props = Arc::clone(&row_group.props);
1764 SerializedPageReader::new_with_properties(
1765 Arc::clone(&row_group.chunk_reader),
1766 col,
1767 usize::try_from(row_group.metadata.num_rows())?,
1768 page_locations,
1769 props,
1770 )
1771 }
1772
1773 #[test]
1774 fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1775 let test_file = get_test_file("alltypes_plain.parquet");
1776 let reader = SerializedFileReader::new(test_file)?;
1777
1778 let mut offset_set = HashSet::new();
1779 let num_row_groups = reader.metadata.num_row_groups();
1780 for row_group in 0..num_row_groups {
1781 let num_columns = reader.metadata.row_group(row_group).num_columns();
1782 for column in 0..num_columns {
1783 let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1784
1785 while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1786 match &page_reader.state {
1787 SerializedPageReaderState::Pages {
1788 page_locations,
1789 dictionary_page,
1790 ..
1791 } => {
1792 if let Some(page) = dictionary_page {
1793 assert_eq!(page.offset as u64, page_offset);
1794 } else if let Some(page) = page_locations.front() {
1795 assert_eq!(page.offset as u64, page_offset);
1796 } else {
1797 unreachable!()
1798 }
1799 }
1800 SerializedPageReaderState::Values {
1801 offset,
1802 next_page_header,
1803 ..
1804 } => {
1805 assert!(next_page_header.is_some());
1806 assert_eq!(*offset, page_offset);
1807 }
1808 }
1809 let page = page_reader.get_next_page()?;
1810 assert!(page.is_some());
1811 let newly_inserted = offset_set.insert(page_offset);
1812 assert!(newly_inserted);
1813 }
1814 }
1815 }
1816
1817 Ok(())
1818 }
1819
1820 #[test]
1821 fn test_page_iterator() {
1822 let file = get_test_file("alltypes_plain.parquet");
1823 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1824
1825 let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1826
1827 let page = page_iterator.next();
1829 assert!(page.is_some());
1830 assert!(page.unwrap().is_ok());
1831
1832 let page = page_iterator.next();
1834 assert!(page.is_none());
1835
1836 let row_group_indices = Box::new(0..1);
1837 let mut page_iterator =
1838 FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1839
1840 let page = page_iterator.next();
1842 assert!(page.is_some());
1843 assert!(page.unwrap().is_ok());
1844
1845 let page = page_iterator.next();
1847 assert!(page.is_none());
1848 }
1849
1850 #[test]
1851 fn test_file_reader_key_value_metadata() {
1852 let file = get_test_file("binary.parquet");
1853 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1854
1855 let metadata = file_reader
1856 .metadata
1857 .file_metadata()
1858 .key_value_metadata()
1859 .unwrap();
1860
1861 assert_eq!(metadata.len(), 3);
1862
1863 assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1864
1865 assert_eq!(metadata[1].key, "writer.model.name");
1866 assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1867
1868 assert_eq!(metadata[2].key, "parquet.proto.class");
1869 assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1870 }
1871
1872 #[test]
1873 fn test_file_reader_optional_metadata() {
1874 let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1876 let options = ReadOptionsBuilder::new()
1877 .with_encoding_stats_as_mask(false)
1878 .build();
1879 let file_reader = Arc::new(SerializedFileReader::new_with_options(file, options).unwrap());
1880
1881 let row_group_metadata = file_reader.metadata.row_group(0);
1882 let col0_metadata = row_group_metadata.column(0);
1883
1884 assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1886
1887 let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1889
1890 assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1891 assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1892 assert_eq!(page_encoding_stats.count, 1);
1893
1894 assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1896 assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1897
1898 assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1900 assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1901 }
1902
1903 #[test]
1904 fn test_file_reader_page_stats_mask() {
1905 let file = get_test_file("alltypes_tiny_pages.parquet");
1906 let options = ReadOptionsBuilder::new()
1907 .with_encoding_stats_as_mask(true)
1908 .build();
1909 let file_reader = Arc::new(SerializedFileReader::new_with_options(file, options).unwrap());
1910
1911 let row_group_metadata = file_reader.metadata.row_group(0);
1912
1913 let page_encoding_stats = row_group_metadata
1915 .column(0)
1916 .page_encoding_stats_mask()
1917 .unwrap();
1918 assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1919 let page_encoding_stats = row_group_metadata
1920 .column(2)
1921 .page_encoding_stats_mask()
1922 .unwrap();
1923 assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1924 }
1925
1926 #[test]
1927 fn test_file_reader_page_stats_skipped() {
1928 let file = get_test_file("alltypes_tiny_pages.parquet");
1929
1930 let options = ReadOptionsBuilder::new()
1932 .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1933 .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll)
1934 .build();
1935 let file_reader = Arc::new(
1936 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1937 );
1938
1939 let row_group_metadata = file_reader.metadata.row_group(0);
1940 for column in row_group_metadata.columns() {
1941 assert!(column.page_encoding_stats().is_none());
1942 assert!(column.page_encoding_stats_mask().is_none());
1943 assert!(column.statistics().is_none());
1944 }
1945
1946 let options = ReadOptionsBuilder::new()
1948 .with_encoding_stats_as_mask(true)
1949 .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1950 .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1951 .build();
1952 let file_reader = Arc::new(
1953 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1954 );
1955
1956 let row_group_metadata = file_reader.metadata.row_group(0);
1957 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1958 assert!(column.page_encoding_stats().is_none());
1959 assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1960 assert_eq!(column.statistics().is_some(), idx == 0);
1961 }
1962 }
1963
1964 #[test]
1965 fn test_file_reader_size_stats_skipped() {
1966 let file = get_test_file("repeated_primitive_no_list.parquet");
1967
1968 let options = ReadOptionsBuilder::new()
1970 .with_size_stats_policy(ParquetStatisticsPolicy::SkipAll)
1971 .build();
1972 let file_reader = Arc::new(
1973 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1974 );
1975
1976 let row_group_metadata = file_reader.metadata.row_group(0);
1977 for column in row_group_metadata.columns() {
1978 assert!(column.repetition_level_histogram().is_none());
1979 assert!(column.definition_level_histogram().is_none());
1980 assert!(column.unencoded_byte_array_data_bytes().is_none());
1981 }
1982
1983 let options = ReadOptionsBuilder::new()
1985 .with_encoding_stats_as_mask(true)
1986 .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]))
1987 .build();
1988 let file_reader = Arc::new(
1989 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1990 );
1991
1992 let row_group_metadata = file_reader.metadata.row_group(0);
1993 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1994 assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
1995 assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
1996 assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
1997 }
1998 }
1999
2000 #[test]
2001 fn test_file_reader_with_no_filter() -> Result<()> {
2002 let test_file = get_test_file("alltypes_plain.parquet");
2003 let origin_reader = SerializedFileReader::new(test_file)?;
2004 let metadata = origin_reader.metadata();
2006 assert_eq!(metadata.num_row_groups(), 1);
2007 Ok(())
2008 }
2009
2010 #[test]
2011 fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
2012 let test_file = get_test_file("alltypes_plain.parquet");
2013 let read_options = ReadOptionsBuilder::new()
2014 .with_predicate(Box::new(|_, _| false))
2015 .build();
2016 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2017 let metadata = reader.metadata();
2018 assert_eq!(metadata.num_row_groups(), 0);
2019 Ok(())
2020 }
2021
2022 #[test]
2023 fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
2024 let test_file = get_test_file("alltypes_plain.parquet");
2025 let origin_reader = SerializedFileReader::new(test_file)?;
2026 let metadata = origin_reader.metadata();
2028 assert_eq!(metadata.num_row_groups(), 1);
2029 let mid = get_midpoint_offset(metadata.row_group(0));
2030
2031 let test_file = get_test_file("alltypes_plain.parquet");
2032 let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
2033 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2034 let metadata = reader.metadata();
2035 assert_eq!(metadata.num_row_groups(), 1);
2036
2037 let test_file = get_test_file("alltypes_plain.parquet");
2038 let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
2039 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2040 let metadata = reader.metadata();
2041 assert_eq!(metadata.num_row_groups(), 0);
2042 Ok(())
2043 }
2044
2045 #[test]
2046 fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
2047 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2048 let origin_reader = SerializedFileReader::new(test_file)?;
2049 let metadata = origin_reader.metadata();
2050 let mid = get_midpoint_offset(metadata.row_group(0));
2051
2052 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2054 let read_options = ReadOptionsBuilder::new()
2055 .with_page_index()
2056 .with_predicate(Box::new(|_, _| true))
2057 .with_range(mid, mid + 1)
2058 .build();
2059 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2060 let metadata = reader.metadata();
2061 assert_eq!(metadata.num_row_groups(), 1);
2062 assert_eq!(metadata.column_index().unwrap().len(), 1);
2063 assert_eq!(metadata.offset_index().unwrap().len(), 1);
2064
2065 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2067 let read_options = ReadOptionsBuilder::new()
2068 .with_page_index()
2069 .with_predicate(Box::new(|_, _| true))
2070 .with_range(0, mid)
2071 .build();
2072 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2073 let metadata = reader.metadata();
2074 assert_eq!(metadata.num_row_groups(), 0);
2075 assert!(metadata.column_index().is_none());
2076 assert!(metadata.offset_index().is_none());
2077
2078 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2080 let read_options = ReadOptionsBuilder::new()
2081 .with_page_index()
2082 .with_predicate(Box::new(|_, _| false))
2083 .with_range(mid, mid + 1)
2084 .build();
2085 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2086 let metadata = reader.metadata();
2087 assert_eq!(metadata.num_row_groups(), 0);
2088 assert!(metadata.column_index().is_none());
2089 assert!(metadata.offset_index().is_none());
2090
2091 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2093 let read_options = ReadOptionsBuilder::new()
2094 .with_page_index()
2095 .with_predicate(Box::new(|_, _| false))
2096 .with_range(0, mid)
2097 .build();
2098 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2099 let metadata = reader.metadata();
2100 assert_eq!(metadata.num_row_groups(), 0);
2101 assert!(metadata.column_index().is_none());
2102 assert!(metadata.offset_index().is_none());
2103 Ok(())
2104 }
2105
2106 #[test]
2107 fn test_file_reader_invalid_metadata() {
2108 let data = [
2109 255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
2110 80, 65, 82, 49,
2111 ];
2112 let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
2113 assert_eq!(
2114 ret.err().unwrap().to_string(),
2115 "Parquet error: Received empty union from remote ColumnOrder"
2116 );
2117 }
2118
2119 #[test]
2120 fn test_page_index_reader() {
2137 let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
2138 let builder = ReadOptionsBuilder::new();
2139 let options = builder.with_page_index().build();
2141 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2142 let reader = reader_result.unwrap();
2143
2144 let metadata = reader.metadata();
2146 assert_eq!(metadata.num_row_groups(), 1);
2147
2148 let column_index = metadata.column_index().unwrap();
2149
2150 assert_eq!(column_index.len(), 1);
2152 let index = if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][0] {
2153 index
2154 } else {
2155 unreachable!()
2156 };
2157
2158 assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
2159
2160 assert_eq!(index.num_pages(), 1);
2162
2163 let min = index.min_value(0).unwrap();
2164 let max = index.max_value(0).unwrap();
2165 assert_eq!(b"Hello", min.as_bytes());
2166 assert_eq!(b"today", max.as_bytes());
2167
2168 let offset_indexes = metadata.offset_index().unwrap();
2169 assert_eq!(offset_indexes.len(), 1);
2171 let offset_index = &offset_indexes[0];
2172 let page_offset = &offset_index[0].page_locations()[0];
2173
2174 assert_eq!(4, page_offset.offset);
2175 assert_eq!(152, page_offset.compressed_page_size);
2176 assert_eq!(0, page_offset.first_row_index);
2177 }
2178
2179 #[test]
2180 #[allow(deprecated)]
2181 fn test_page_index_reader_out_of_order() {
2182 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2183 let options = ReadOptionsBuilder::new().with_page_index().build();
2184 let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
2185 let metadata = reader.metadata();
2186
2187 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2188 let columns = metadata.row_group(0).columns();
2189 let reversed: Vec<_> = columns.iter().cloned().rev().collect();
2190
2191 let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
2192 let mut b = read_columns_indexes(&test_file, &reversed)
2193 .unwrap()
2194 .unwrap();
2195 b.reverse();
2196 assert_eq!(a, b);
2197
2198 let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
2199 let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
2200 b.reverse();
2201 assert_eq!(a, b);
2202 }
2203
2204 #[test]
2205 fn test_page_index_reader_all_type() {
2206 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2207 let builder = ReadOptionsBuilder::new();
2208 let options = builder.with_page_index().build();
2210 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2211 let reader = reader_result.unwrap();
2212
2213 let metadata = reader.metadata();
2215 assert_eq!(metadata.num_row_groups(), 1);
2216
2217 let column_index = metadata.column_index().unwrap();
2218 let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
2219
2220 assert_eq!(column_index.len(), 1);
2222 let row_group_metadata = metadata.row_group(0);
2223
2224 assert!(!&column_index[0][0].is_sorted());
2226 let boundary_order = &column_index[0][0].get_boundary_order();
2227 assert!(boundary_order.is_some());
2228 matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
2229 if let ColumnIndexMetaData::INT32(index) = &column_index[0][0] {
2230 check_native_page_index(
2231 index,
2232 325,
2233 get_row_group_min_max_bytes(row_group_metadata, 0),
2234 BoundaryOrder::UNORDERED,
2235 );
2236 assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
2237 } else {
2238 unreachable!()
2239 };
2240 assert!(&column_index[0][1].is_sorted());
2242 if let ColumnIndexMetaData::BOOLEAN(index) = &column_index[0][1] {
2243 assert_eq!(index.num_pages(), 82);
2244 assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
2245 } else {
2246 unreachable!()
2247 };
2248 assert!(&column_index[0][2].is_sorted());
2250 if let ColumnIndexMetaData::INT32(index) = &column_index[0][2] {
2251 check_native_page_index(
2252 index,
2253 325,
2254 get_row_group_min_max_bytes(row_group_metadata, 2),
2255 BoundaryOrder::ASCENDING,
2256 );
2257 assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
2258 } else {
2259 unreachable!()
2260 };
2261 assert!(&column_index[0][3].is_sorted());
2263 if let ColumnIndexMetaData::INT32(index) = &column_index[0][3] {
2264 check_native_page_index(
2265 index,
2266 325,
2267 get_row_group_min_max_bytes(row_group_metadata, 3),
2268 BoundaryOrder::ASCENDING,
2269 );
2270 assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
2271 } else {
2272 unreachable!()
2273 };
2274 assert!(&column_index[0][4].is_sorted());
2276 if let ColumnIndexMetaData::INT32(index) = &column_index[0][4] {
2277 check_native_page_index(
2278 index,
2279 325,
2280 get_row_group_min_max_bytes(row_group_metadata, 4),
2281 BoundaryOrder::ASCENDING,
2282 );
2283 assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
2284 } else {
2285 unreachable!()
2286 };
2287 assert!(!&column_index[0][5].is_sorted());
2289 if let ColumnIndexMetaData::INT64(index) = &column_index[0][5] {
2290 check_native_page_index(
2291 index,
2292 528,
2293 get_row_group_min_max_bytes(row_group_metadata, 5),
2294 BoundaryOrder::UNORDERED,
2295 );
2296 assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
2297 } else {
2298 unreachable!()
2299 };
2300 assert!(&column_index[0][6].is_sorted());
2302 if let ColumnIndexMetaData::FLOAT(index) = &column_index[0][6] {
2303 check_native_page_index(
2304 index,
2305 325,
2306 get_row_group_min_max_bytes(row_group_metadata, 6),
2307 BoundaryOrder::ASCENDING,
2308 );
2309 assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
2310 } else {
2311 unreachable!()
2312 };
2313 assert!(!&column_index[0][7].is_sorted());
2315 if let ColumnIndexMetaData::DOUBLE(index) = &column_index[0][7] {
2316 check_native_page_index(
2317 index,
2318 528,
2319 get_row_group_min_max_bytes(row_group_metadata, 7),
2320 BoundaryOrder::UNORDERED,
2321 );
2322 assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
2323 } else {
2324 unreachable!()
2325 };
2326 assert!(!&column_index[0][8].is_sorted());
2328 if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][8] {
2329 check_byte_array_page_index(
2330 index,
2331 974,
2332 get_row_group_min_max_bytes(row_group_metadata, 8),
2333 BoundaryOrder::UNORDERED,
2334 );
2335 assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
2336 } else {
2337 unreachable!()
2338 };
2339 assert!(&column_index[0][9].is_sorted());
2341 if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][9] {
2342 check_byte_array_page_index(
2343 index,
2344 352,
2345 get_row_group_min_max_bytes(row_group_metadata, 9),
2346 BoundaryOrder::ASCENDING,
2347 );
2348 assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
2349 } else {
2350 unreachable!()
2351 };
2352 assert!(!&column_index[0][10].is_sorted());
2355 if let ColumnIndexMetaData::NONE = &column_index[0][10] {
2356 assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
2357 } else {
2358 unreachable!()
2359 };
2360 assert!(&column_index[0][11].is_sorted());
2362 if let ColumnIndexMetaData::INT32(index) = &column_index[0][11] {
2363 check_native_page_index(
2364 index,
2365 325,
2366 get_row_group_min_max_bytes(row_group_metadata, 11),
2367 BoundaryOrder::ASCENDING,
2368 );
2369 assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
2370 } else {
2371 unreachable!()
2372 };
2373 assert!(!&column_index[0][12].is_sorted());
2375 if let ColumnIndexMetaData::INT32(index) = &column_index[0][12] {
2376 check_native_page_index(
2377 index,
2378 325,
2379 get_row_group_min_max_bytes(row_group_metadata, 12),
2380 BoundaryOrder::UNORDERED,
2381 );
2382 assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
2383 } else {
2384 unreachable!()
2385 };
2386 }
2387
2388 fn check_native_page_index<T: ParquetValueType>(
2389 row_group_index: &PrimitiveColumnIndex<T>,
2390 page_size: usize,
2391 min_max: (&[u8], &[u8]),
2392 boundary_order: BoundaryOrder,
2393 ) {
2394 assert_eq!(row_group_index.num_pages() as usize, page_size);
2395 assert_eq!(row_group_index.boundary_order, boundary_order);
2396 assert!(row_group_index.min_values().iter().all(|x| {
2397 x >= &T::try_from_le_slice(min_max.0).unwrap()
2398 && x <= &T::try_from_le_slice(min_max.1).unwrap()
2399 }));
2400 }
2401
2402 fn check_byte_array_page_index(
2403 row_group_index: &ByteArrayColumnIndex,
2404 page_size: usize,
2405 min_max: (&[u8], &[u8]),
2406 boundary_order: BoundaryOrder,
2407 ) {
2408 assert_eq!(row_group_index.num_pages() as usize, page_size);
2409 assert_eq!(row_group_index.boundary_order, boundary_order);
2410 for i in 0..row_group_index.num_pages() as usize {
2411 let x = row_group_index.min_value(i).unwrap();
2412 assert!(x >= min_max.0 && x <= min_max.1);
2413 }
2414 }
2415
2416 fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
2417 let statistics = r.column(col_num).statistics().unwrap();
2418 (
2419 statistics.min_bytes_opt().unwrap_or_default(),
2420 statistics.max_bytes_opt().unwrap_or_default(),
2421 )
2422 }
2423
2424 #[test]
2425 fn test_skip_next_page_with_dictionary_page() {
2426 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2427 let builder = ReadOptionsBuilder::new();
2428 let options = builder.with_page_index().build();
2430 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2431 let reader = reader_result.unwrap();
2432
2433 let row_group_reader = reader.get_row_group(0).unwrap();
2434
2435 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2437
2438 let mut vec = vec![];
2439
2440 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2442 assert!(meta.is_dict);
2443
2444 column_page_reader.skip_next_page().unwrap();
2446
2447 let page = column_page_reader.get_next_page().unwrap().unwrap();
2449 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2450
2451 for _i in 0..351 {
2453 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2455 assert!(!meta.is_dict); vec.push(meta);
2457
2458 let page = column_page_reader.get_next_page().unwrap().unwrap();
2459 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2460 }
2461
2462 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2464 assert!(column_page_reader.get_next_page().unwrap().is_none());
2465
2466 assert_eq!(vec.len(), 351);
2468 }
2469
2470 #[test]
2471 fn test_skip_page_with_offset_index() {
2472 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2473 let builder = ReadOptionsBuilder::new();
2474 let options = builder.with_page_index().build();
2476 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2477 let reader = reader_result.unwrap();
2478
2479 let row_group_reader = reader.get_row_group(0).unwrap();
2480
2481 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2483
2484 let mut vec = vec![];
2485
2486 for i in 0..325 {
2487 if i % 2 == 0 {
2488 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2489 } else {
2490 column_page_reader.skip_next_page().unwrap();
2491 }
2492 }
2493 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2495 assert!(column_page_reader.get_next_page().unwrap().is_none());
2496
2497 assert_eq!(vec.len(), 163);
2498 }
2499
2500 #[test]
2501 fn test_skip_page_without_offset_index() {
2502 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2503
2504 let reader_result = SerializedFileReader::new(test_file);
2506 let reader = reader_result.unwrap();
2507
2508 let row_group_reader = reader.get_row_group(0).unwrap();
2509
2510 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2512
2513 let mut vec = vec![];
2514
2515 for i in 0..325 {
2516 if i % 2 == 0 {
2517 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2518 } else {
2519 column_page_reader.peek_next_page().unwrap().unwrap();
2520 column_page_reader.skip_next_page().unwrap();
2521 }
2522 }
2523 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2525 assert!(column_page_reader.get_next_page().unwrap().is_none());
2526
2527 assert_eq!(vec.len(), 163);
2528 }
2529
2530 #[test]
2531 fn test_peek_page_with_dictionary_page() {
2532 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2533 let builder = ReadOptionsBuilder::new();
2534 let options = builder.with_page_index().build();
2536 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2537 let reader = reader_result.unwrap();
2538 let row_group_reader = reader.get_row_group(0).unwrap();
2539
2540 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2542
2543 let mut vec = vec![];
2544
2545 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2546 assert!(meta.is_dict);
2547 let page = column_page_reader.get_next_page().unwrap().unwrap();
2548 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2549
2550 for i in 0..352 {
2551 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2552 if i != 351 {
2555 assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
2556 } else {
2557 assert_eq!(meta.num_rows, Some(10));
2560 }
2561 assert!(!meta.is_dict);
2562 vec.push(meta);
2563 let page = column_page_reader.get_next_page().unwrap().unwrap();
2564 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2565 }
2566
2567 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2569 assert!(column_page_reader.get_next_page().unwrap().is_none());
2570
2571 assert_eq!(vec.len(), 352);
2572 }
2573
2574 #[test]
2575 fn test_peek_page_with_dictionary_page_without_offset_index() {
2576 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2577
2578 let reader_result = SerializedFileReader::new(test_file);
2579 let reader = reader_result.unwrap();
2580 let row_group_reader = reader.get_row_group(0).unwrap();
2581
2582 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2584
2585 let mut vec = vec![];
2586
2587 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2588 assert!(meta.is_dict);
2589 let page = column_page_reader.get_next_page().unwrap().unwrap();
2590 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2591
2592 for i in 0..352 {
2593 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2594 if i != 351 {
2597 assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2598 } else {
2599 assert_eq!(meta.num_levels, Some(10));
2602 }
2603 assert!(!meta.is_dict);
2604 vec.push(meta);
2605 let page = column_page_reader.get_next_page().unwrap().unwrap();
2606 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2607 }
2608
2609 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2611 assert!(column_page_reader.get_next_page().unwrap().is_none());
2612
2613 assert_eq!(vec.len(), 352);
2614 }
2615
2616 #[test]
2617 fn test_fixed_length_index() {
2618 let message_type = "
2619 message test_schema {
2620 OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2621 }
2622 ";
2623
2624 let schema = parse_message_type(message_type).unwrap();
2625 let mut out = Vec::with_capacity(1024);
2626 let mut writer =
2627 SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2628
2629 let mut r = writer.next_row_group().unwrap();
2630 let mut c = r.next_column().unwrap().unwrap();
2631 c.typed::<FixedLenByteArrayType>()
2632 .write_batch(
2633 &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2634 Some(&[1, 1, 0, 1]),
2635 None,
2636 )
2637 .unwrap();
2638 c.close().unwrap();
2639 r.close().unwrap();
2640 writer.close().unwrap();
2641
2642 let b = Bytes::from(out);
2643 let options = ReadOptionsBuilder::new().with_page_index().build();
2644 let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2645 let index = reader.metadata().column_index().unwrap();
2646
2647 assert_eq!(index.len(), 1);
2649 let c = &index[0];
2650 assert_eq!(c.len(), 1);
2652
2653 match &c[0] {
2654 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(v) => {
2655 assert_eq!(v.num_pages(), 1);
2656 assert_eq!(v.null_count(0).unwrap(), 1);
2657 assert_eq!(v.min_value(0).unwrap(), &[0; 11]);
2658 assert_eq!(v.max_value(0).unwrap(), &[5; 11]);
2659 }
2660 _ => unreachable!(),
2661 }
2662 }
2663
2664 #[test]
2665 fn test_multi_gz() {
2666 let file = get_test_file("concatenated_gzip_members.parquet");
2667 let reader = SerializedFileReader::new(file).unwrap();
2668 let row_group_reader = reader.get_row_group(0).unwrap();
2669 match row_group_reader.get_column_reader(0).unwrap() {
2670 ColumnReader::Int64ColumnReader(mut reader) => {
2671 let mut buffer = Vec::with_capacity(1024);
2672 let mut def_levels = Vec::with_capacity(1024);
2673 let (num_records, num_values, num_levels) = reader
2674 .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2675 .unwrap();
2676
2677 assert_eq!(num_records, 513);
2678 assert_eq!(num_values, 513);
2679 assert_eq!(num_levels, 513);
2680
2681 let expected: Vec<i64> = (1..514).collect();
2682 assert_eq!(&buffer, &expected);
2683 }
2684 _ => unreachable!(),
2685 }
2686 }
2687
2688 #[test]
2689 fn test_byte_stream_split_extended() {
2690 let path = format!(
2691 "{}/byte_stream_split_extended.gzip.parquet",
2692 arrow::util::test_util::parquet_test_data(),
2693 );
2694 let file = File::open(path).unwrap();
2695 let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2696
2697 let mut iter = reader
2699 .get_row_iter(None)
2700 .expect("Failed to create row iterator");
2701
2702 let mut start = 0;
2703 let end = reader.metadata().file_metadata().num_rows();
2704
2705 let check_row = |row: Result<Row, ParquetError>| {
2706 assert!(row.is_ok());
2707 let r = row.unwrap();
2708 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2709 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2710 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2711 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2712 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2713 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2714 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2715 };
2716
2717 while start < end {
2718 match iter.next() {
2719 Some(row) => check_row(row),
2720 None => break,
2721 };
2722 start += 1;
2723 }
2724 }
2725
2726 #[test]
2727 fn test_filtered_rowgroup_metadata() {
2728 let message_type = "
2729 message test_schema {
2730 REQUIRED INT32 a;
2731 }
2732 ";
2733 let schema = Arc::new(parse_message_type(message_type).unwrap());
2734 let props = Arc::new(
2735 WriterProperties::builder()
2736 .set_statistics_enabled(EnabledStatistics::Page)
2737 .build(),
2738 );
2739 let mut file: File = tempfile::tempfile().unwrap();
2740 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2741 let data = [1, 2, 3, 4, 5];
2742
2743 for idx in 0..5 {
2745 let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2746 let mut row_group_writer = file_writer.next_row_group().unwrap();
2747 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2748 writer
2749 .typed::<Int32Type>()
2750 .write_batch(data_i.as_slice(), None, None)
2751 .unwrap();
2752 writer.close().unwrap();
2753 }
2754 row_group_writer.close().unwrap();
2755 file_writer.flushed_row_groups();
2756 }
2757 let file_metadata = file_writer.close().unwrap();
2758
2759 assert_eq!(file_metadata.file_metadata().num_rows(), 25);
2760 assert_eq!(file_metadata.num_row_groups(), 5);
2761
2762 let read_options = ReadOptionsBuilder::new()
2764 .with_page_index()
2765 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2766 .build();
2767 let reader =
2768 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2769 .unwrap();
2770 let metadata = reader.metadata();
2771
2772 assert_eq!(metadata.num_row_groups(), 1);
2774 assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2775
2776 assert!(metadata.column_index().is_some());
2778 assert!(metadata.offset_index().is_some());
2779 assert_eq!(metadata.column_index().unwrap().len(), 1);
2780 assert_eq!(metadata.offset_index().unwrap().len(), 1);
2781 let col_idx = metadata.column_index().unwrap();
2782 let off_idx = metadata.offset_index().unwrap();
2783 let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2784 let pg_idx = &col_idx[0][0];
2785 let off_idx_i = &off_idx[0][0];
2786
2787 match pg_idx {
2789 ColumnIndexMetaData::INT32(int_idx) => {
2790 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2791 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2792 assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2793 assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2794 }
2795 _ => panic!("wrong stats type"),
2796 }
2797
2798 assert_eq!(
2800 off_idx_i.page_locations[0].offset,
2801 metadata.row_group(0).column(0).data_page_offset()
2802 );
2803
2804 let read_options = ReadOptionsBuilder::new()
2806 .with_page_index()
2807 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2808 .build();
2809 let reader =
2810 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2811 .unwrap();
2812 let metadata = reader.metadata();
2813
2814 assert_eq!(metadata.num_row_groups(), 2);
2816 assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2817 assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2818
2819 assert!(metadata.column_index().is_some());
2821 assert!(metadata.offset_index().is_some());
2822 assert_eq!(metadata.column_index().unwrap().len(), 2);
2823 assert_eq!(metadata.offset_index().unwrap().len(), 2);
2824 let col_idx = metadata.column_index().unwrap();
2825 let off_idx = metadata.offset_index().unwrap();
2826
2827 for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2828 let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2829 let pg_idx = &col_idx_i[0];
2830 let off_idx_i = &off_idx[i][0];
2831
2832 match pg_idx {
2834 ColumnIndexMetaData::INT32(int_idx) => {
2835 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2836 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2837 assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2838 assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2839 }
2840 _ => panic!("wrong stats type"),
2841 }
2842
2843 assert_eq!(
2845 off_idx_i.page_locations[0].offset,
2846 metadata.row_group(i).column(0).data_page_offset()
2847 );
2848 }
2849 }
2850
2851 #[test]
2852 fn test_reuse_schema() {
2853 let file = get_test_file("alltypes_plain.parquet");
2854 let file_reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
2855 let schema = file_reader.metadata().file_metadata().schema_descr_ptr();
2856 let expected = file_reader.metadata;
2857
2858 let options = ReadOptionsBuilder::new()
2859 .with_parquet_schema(schema)
2860 .build();
2861 let file_reader = SerializedFileReader::new_with_options(file, options).unwrap();
2862
2863 assert_eq!(expected.as_ref(), file_reader.metadata.as_ref());
2864 assert!(Arc::ptr_eq(
2866 &expected.file_metadata().schema_descr_ptr(),
2867 &file_reader.metadata.file_metadata().schema_descr_ptr()
2868 ));
2869 }
2870
2871 #[test]
2872 fn test_read_unknown_logical_type() {
2873 let file = get_test_file("unknown-logical-type.parquet");
2874 let reader = SerializedFileReader::new(file).expect("Error opening file");
2875
2876 let schema = reader.metadata().file_metadata().schema_descr();
2877 assert_eq!(
2878 schema.column(0).logical_type_ref(),
2879 Some(&basic::LogicalType::String)
2880 );
2881 assert_eq!(
2882 schema.column(1).logical_type_ref(),
2883 Some(&basic::LogicalType::_Unknown { field_id: 2555 })
2884 );
2885 assert_eq!(schema.column(1).physical_type(), Type::BYTE_ARRAY);
2886
2887 let mut iter = reader
2888 .get_row_iter(None)
2889 .expect("Failed to create row iterator");
2890
2891 let mut num_rows = 0;
2892 while iter.next().is_some() {
2893 num_rows += 1;
2894 }
2895 assert_eq!(num_rows, reader.metadata().file_metadata().num_rows());
2896 }
2897}