1use crate::basic::{PageType, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{Codec, create_codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{CryptoContext, read_and_decrypt};
27use crate::errors::{ParquetError, Result};
28use crate::file::metadata::thrift::PageHeader;
29use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
30use crate::file::statistics;
31use crate::file::{
32 metadata::*,
33 properties::{ReaderProperties, ReaderPropertiesPtr},
34 reader::*,
35};
36#[cfg(feature = "encryption")]
37use crate::parquet_thrift::ThriftSliceInputProtocol;
38use crate::parquet_thrift::{ReadThrift, ThriftReadInputProtocol};
39use crate::record::Row;
40use crate::record::reader::RowIter;
41use crate::schema::types::{SchemaDescPtr, Type as SchemaType};
42use bytes::Bytes;
43use std::collections::VecDeque;
44use std::{fs::File, io::Read, path::Path, sync::Arc};
45
46impl TryFrom<File> for SerializedFileReader<File> {
47 type Error = ParquetError;
48
49 fn try_from(file: File) -> Result<Self> {
50 Self::new(file)
51 }
52}
53
54impl TryFrom<&Path> for SerializedFileReader<File> {
55 type Error = ParquetError;
56
57 fn try_from(path: &Path) -> Result<Self> {
58 let file = File::open(path)?;
59 Self::try_from(file)
60 }
61}
62
63impl TryFrom<String> for SerializedFileReader<File> {
64 type Error = ParquetError;
65
66 fn try_from(path: String) -> Result<Self> {
67 Self::try_from(Path::new(&path))
68 }
69}
70
71impl TryFrom<&str> for SerializedFileReader<File> {
72 type Error = ParquetError;
73
74 fn try_from(path: &str) -> Result<Self> {
75 Self::try_from(Path::new(&path))
76 }
77}
78
79impl IntoIterator for SerializedFileReader<File> {
82 type Item = Result<Row>;
83 type IntoIter = RowIter<'static>;
84
85 fn into_iter(self) -> Self::IntoIter {
86 RowIter::from_file_into(Box::new(self))
87 }
88}
89
90pub struct SerializedFileReader<R: ChunkReader> {
95 chunk_reader: Arc<R>,
96 metadata: Arc<ParquetMetaData>,
97 props: ReaderPropertiesPtr,
98}
99
100pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
104
105#[derive(Default)]
109pub struct ReadOptionsBuilder {
110 predicates: Vec<ReadGroupPredicate>,
111 enable_page_index: bool,
112 props: Option<ReaderProperties>,
113 metadata_options: ParquetMetaDataOptions,
114}
115
116impl ReadOptionsBuilder {
117 pub fn new() -> Self {
119 Self::default()
120 }
121
122 pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
125 self.predicates.push(predicate);
126 self
127 }
128
129 pub fn with_range(mut self, start: i64, end: i64) -> Self {
132 assert!(start < end);
133 let predicate = move |rg: &RowGroupMetaData, _: usize| {
134 let mid = get_midpoint_offset(rg);
135 mid >= start && mid < end
136 };
137 self.predicates.push(Box::new(predicate));
138 self
139 }
140
141 pub fn with_page_index(mut self) -> Self {
146 self.enable_page_index = true;
147 self
148 }
149
150 pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
152 self.props = Some(properties);
153 self
154 }
155
156 pub fn with_parquet_schema(mut self, schema: SchemaDescPtr) -> Self {
159 self.metadata_options.set_schema(schema);
160 self
161 }
162
163 pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
172 self.metadata_options.set_encoding_stats_as_mask(val);
173 self
174 }
175
176 pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
181 self.metadata_options.set_encoding_stats_policy(policy);
182 self
183 }
184
185 pub fn build(self) -> ReadOptions {
187 let props = self
188 .props
189 .unwrap_or_else(|| ReaderProperties::builder().build());
190 ReadOptions {
191 predicates: self.predicates,
192 enable_page_index: self.enable_page_index,
193 props,
194 metadata_options: self.metadata_options,
195 }
196 }
197}
198
199pub struct ReadOptions {
204 predicates: Vec<ReadGroupPredicate>,
205 enable_page_index: bool,
206 props: ReaderProperties,
207 metadata_options: ParquetMetaDataOptions,
208}
209
210impl<R: 'static + ChunkReader> SerializedFileReader<R> {
211 pub fn new(chunk_reader: R) -> Result<Self> {
214 let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
215 let props = Arc::new(ReaderProperties::builder().build());
216 Ok(Self {
217 chunk_reader: Arc::new(chunk_reader),
218 metadata: Arc::new(metadata),
219 props,
220 })
221 }
222
223 #[allow(deprecated)]
226 pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
227 let mut metadata_builder = ParquetMetaDataReader::new()
228 .with_metadata_options(Some(options.metadata_options.clone()))
229 .parse_and_finish(&chunk_reader)?
230 .into_builder();
231 let mut predicates = options.predicates;
232
233 for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
235 let mut keep = true;
236 for predicate in &mut predicates {
237 if !predicate(&rg_meta, i) {
238 keep = false;
239 break;
240 }
241 }
242 if keep {
243 metadata_builder = metadata_builder.add_row_group(rg_meta);
244 }
245 }
246
247 let mut metadata = metadata_builder.build();
248
249 if options.enable_page_index {
251 let mut reader =
252 ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
253 reader.read_page_indexes(&chunk_reader)?;
254 metadata = reader.finish()?;
255 }
256
257 Ok(Self {
258 chunk_reader: Arc::new(chunk_reader),
259 metadata: Arc::new(metadata),
260 props: Arc::new(options.props),
261 })
262 }
263}
264
265fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
267 let col = meta.column(0);
268 let mut offset = col.data_page_offset();
269 if let Some(dic_offset) = col.dictionary_page_offset() {
270 if offset > dic_offset {
271 offset = dic_offset
272 }
273 };
274 offset + meta.compressed_size() / 2
275}
276
277impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
278 fn metadata(&self) -> &ParquetMetaData {
279 &self.metadata
280 }
281
282 fn num_row_groups(&self) -> usize {
283 self.metadata.num_row_groups()
284 }
285
286 fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
287 let row_group_metadata = self.metadata.row_group(i);
288 let props = Arc::clone(&self.props);
290 let f = Arc::clone(&self.chunk_reader);
291 Ok(Box::new(SerializedRowGroupReader::new(
292 f,
293 row_group_metadata,
294 self.metadata.offset_index().map(|x| x[i].as_slice()),
295 props,
296 )?))
297 }
298
299 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
300 RowIter::from_file(projection, self)
301 }
302}
303
304pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
306 chunk_reader: Arc<R>,
307 metadata: &'a RowGroupMetaData,
308 offset_index: Option<&'a [OffsetIndexMetaData]>,
309 props: ReaderPropertiesPtr,
310 bloom_filters: Vec<Option<Sbbf>>,
311}
312
313impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
314 pub fn new(
316 chunk_reader: Arc<R>,
317 metadata: &'a RowGroupMetaData,
318 offset_index: Option<&'a [OffsetIndexMetaData]>,
319 props: ReaderPropertiesPtr,
320 ) -> Result<Self> {
321 let bloom_filters = if props.read_bloom_filter() {
322 metadata
323 .columns()
324 .iter()
325 .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
326 .collect::<Result<Vec<_>>>()?
327 } else {
328 std::iter::repeat_n(None, metadata.columns().len()).collect()
329 };
330 Ok(Self {
331 chunk_reader,
332 metadata,
333 offset_index,
334 props,
335 bloom_filters,
336 })
337 }
338}
339
340impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
341 fn metadata(&self) -> &RowGroupMetaData {
342 self.metadata
343 }
344
345 fn num_columns(&self) -> usize {
346 self.metadata.num_columns()
347 }
348
349 fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
351 let col = self.metadata.column(i);
352
353 let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
354
355 let props = Arc::clone(&self.props);
356 Ok(Box::new(SerializedPageReader::new_with_properties(
357 Arc::clone(&self.chunk_reader),
358 col,
359 usize::try_from(self.metadata.num_rows())?,
360 page_locations,
361 props,
362 )?))
363 }
364
365 fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
367 self.bloom_filters[i].as_ref()
368 }
369
370 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
371 RowIter::from_row_group(projection, self)
372 }
373}
374
375pub(crate) fn decode_page(
377 page_header: PageHeader,
378 buffer: Bytes,
379 physical_type: Type,
380 decompressor: Option<&mut Box<dyn Codec>>,
381) -> Result<Page> {
382 #[cfg(feature = "crc")]
384 if let Some(expected_crc) = page_header.crc {
385 let crc = crc32fast::hash(&buffer);
386 if crc != expected_crc as u32 {
387 return Err(general_err!("Page CRC checksum mismatch"));
388 }
389 }
390
391 let mut offset: usize = 0;
398 let mut can_decompress = true;
399
400 if let Some(ref header_v2) = page_header.data_page_header_v2 {
401 if header_v2.definition_levels_byte_length < 0
402 || header_v2.repetition_levels_byte_length < 0
403 || header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
404 > page_header.uncompressed_page_size
405 {
406 return Err(general_err!(
407 "DataPage v2 header contains implausible values \
408 for definition_levels_byte_length ({}) \
409 and repetition_levels_byte_length ({}) \
410 given DataPage header provides uncompressed_page_size ({})",
411 header_v2.definition_levels_byte_length,
412 header_v2.repetition_levels_byte_length,
413 page_header.uncompressed_page_size
414 ));
415 }
416 offset = usize::try_from(
417 header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
418 )?;
419 can_decompress = header_v2.is_compressed.unwrap_or(true);
421 }
422
423 let buffer = match decompressor {
424 Some(decompressor) if can_decompress => {
425 let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
426 if offset > buffer.len() || offset > uncompressed_page_size {
427 return Err(general_err!("Invalid page header"));
428 }
429 let decompressed_size = uncompressed_page_size - offset;
430 let mut decompressed = Vec::with_capacity(uncompressed_page_size);
431 decompressed.extend_from_slice(&buffer[..offset]);
432 if decompressed_size > 0 {
435 let compressed = &buffer[offset..];
436 decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
437 }
438
439 if decompressed.len() != uncompressed_page_size {
440 return Err(general_err!(
441 "Actual decompressed size doesn't match the expected one ({} vs {})",
442 decompressed.len(),
443 uncompressed_page_size
444 ));
445 }
446
447 Bytes::from(decompressed)
448 }
449 _ => buffer,
450 };
451
452 let result = match page_header.r#type {
453 PageType::DICTIONARY_PAGE => {
454 let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
455 ParquetError::General("Missing dictionary page header".to_string())
456 })?;
457 let is_sorted = dict_header.is_sorted.unwrap_or(false);
458 Page::DictionaryPage {
459 buf: buffer,
460 num_values: dict_header.num_values.try_into()?,
461 encoding: dict_header.encoding,
462 is_sorted,
463 }
464 }
465 PageType::DATA_PAGE => {
466 let header = page_header
467 .data_page_header
468 .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
469 Page::DataPage {
470 buf: buffer,
471 num_values: header.num_values.try_into()?,
472 encoding: header.encoding,
473 def_level_encoding: header.definition_level_encoding,
474 rep_level_encoding: header.repetition_level_encoding,
475 statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
476 }
477 }
478 PageType::DATA_PAGE_V2 => {
479 let header = page_header
480 .data_page_header_v2
481 .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
482 let is_compressed = header.is_compressed.unwrap_or(true);
483 Page::DataPageV2 {
484 buf: buffer,
485 num_values: header.num_values.try_into()?,
486 encoding: header.encoding,
487 num_nulls: header.num_nulls.try_into()?,
488 num_rows: header.num_rows.try_into()?,
489 def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
490 rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
491 is_compressed,
492 statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
493 }
494 }
495 _ => {
496 return Err(general_err!(
498 "Page type {:?} is not supported",
499 page_header.r#type
500 ));
501 }
502 };
503
504 Ok(result)
505}
506
507enum SerializedPageReaderState {
508 Values {
509 offset: u64,
512
513 remaining_bytes: u64,
516
517 next_page_header: Option<Box<PageHeader>>,
519
520 page_index: usize,
522
523 require_dictionary: bool,
525 },
526 Pages {
527 page_locations: VecDeque<PageLocation>,
529 dictionary_page: Option<PageLocation>,
531 total_rows: usize,
533 page_index: usize,
535 },
536}
537
538#[derive(Default)]
539struct SerializedPageReaderContext {
540 read_stats: bool,
542 #[cfg(feature = "encryption")]
544 crypto_context: Option<Arc<CryptoContext>>,
545}
546
547pub struct SerializedPageReader<R: ChunkReader> {
549 reader: Arc<R>,
551
552 decompressor: Option<Box<dyn Codec>>,
554
555 physical_type: Type,
557
558 state: SerializedPageReaderState,
559
560 context: SerializedPageReaderContext,
561}
562
563impl<R: ChunkReader> SerializedPageReader<R> {
564 pub fn new(
566 reader: Arc<R>,
567 column_chunk_metadata: &ColumnChunkMetaData,
568 total_rows: usize,
569 page_locations: Option<Vec<PageLocation>>,
570 ) -> Result<Self> {
571 let props = Arc::new(ReaderProperties::builder().build());
572 SerializedPageReader::new_with_properties(
573 reader,
574 column_chunk_metadata,
575 total_rows,
576 page_locations,
577 props,
578 )
579 }
580
581 #[cfg(all(feature = "arrow", not(feature = "encryption")))]
583 pub(crate) fn add_crypto_context(
584 self,
585 _rg_idx: usize,
586 _column_idx: usize,
587 _parquet_meta_data: &ParquetMetaData,
588 _column_chunk_metadata: &ColumnChunkMetaData,
589 ) -> Result<SerializedPageReader<R>> {
590 Ok(self)
591 }
592
593 #[cfg(feature = "encryption")]
595 pub(crate) fn add_crypto_context(
596 mut self,
597 rg_idx: usize,
598 column_idx: usize,
599 parquet_meta_data: &ParquetMetaData,
600 column_chunk_metadata: &ColumnChunkMetaData,
601 ) -> Result<SerializedPageReader<R>> {
602 let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
603 return Ok(self);
604 };
605 let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
606 return Ok(self);
607 };
608 let crypto_context =
609 CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
610 self.context.crypto_context = Some(Arc::new(crypto_context));
611 Ok(self)
612 }
613
614 pub fn new_with_properties(
616 reader: Arc<R>,
617 meta: &ColumnChunkMetaData,
618 total_rows: usize,
619 page_locations: Option<Vec<PageLocation>>,
620 props: ReaderPropertiesPtr,
621 ) -> Result<Self> {
622 let decompressor = create_codec(meta.compression(), props.codec_options())?;
623 let (start, len) = meta.byte_range();
624
625 let state = match page_locations {
626 Some(locations) => {
627 let dictionary_page = match locations.first() {
630 Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
631 offset: start as i64,
632 compressed_page_size: (dict_offset.offset as u64 - start) as i32,
633 first_row_index: 0,
634 }),
635 _ => None,
636 };
637
638 SerializedPageReaderState::Pages {
639 page_locations: locations.into(),
640 dictionary_page,
641 total_rows,
642 page_index: 0,
643 }
644 }
645 None => SerializedPageReaderState::Values {
646 offset: start,
647 remaining_bytes: len,
648 next_page_header: None,
649 page_index: 0,
650 require_dictionary: meta.dictionary_page_offset().is_some(),
651 },
652 };
653 let mut context = SerializedPageReaderContext::default();
654 if props.read_page_stats() {
655 context.read_stats = true;
656 }
657 Ok(Self {
658 reader,
659 decompressor,
660 state,
661 physical_type: meta.column_type(),
662 context,
663 })
664 }
665
666 #[cfg(test)]
672 fn peek_next_page_offset(&mut self) -> Result<Option<u64>> {
673 match &mut self.state {
674 SerializedPageReaderState::Values {
675 offset,
676 remaining_bytes,
677 next_page_header,
678 page_index,
679 require_dictionary,
680 } => {
681 loop {
682 if *remaining_bytes == 0 {
683 return Ok(None);
684 }
685 return if let Some(header) = next_page_header.as_ref() {
686 if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
687 Ok(Some(*offset))
688 } else {
689 *next_page_header = None;
691 continue;
692 }
693 } else {
694 let mut read = self.reader.get_read(*offset)?;
695 let (header_len, header) = Self::read_page_header_len(
696 &self.context,
697 &mut read,
698 *page_index,
699 *require_dictionary,
700 )?;
701 *offset += header_len as u64;
702 *remaining_bytes -= header_len as u64;
703 let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
704 Ok(Some(*offset))
705 } else {
706 continue;
708 };
709 *next_page_header = Some(Box::new(header));
710 page_meta
711 };
712 }
713 }
714 SerializedPageReaderState::Pages {
715 page_locations,
716 dictionary_page,
717 ..
718 } => {
719 if let Some(page) = dictionary_page {
720 Ok(Some(page.offset as u64))
721 } else if let Some(page) = page_locations.front() {
722 Ok(Some(page.offset as u64))
723 } else {
724 Ok(None)
725 }
726 }
727 }
728 }
729
730 fn read_page_header_len<T: Read>(
731 context: &SerializedPageReaderContext,
732 input: &mut T,
733 page_index: usize,
734 dictionary_page: bool,
735 ) -> Result<(usize, PageHeader)> {
736 struct TrackedRead<R> {
738 inner: R,
739 bytes_read: usize,
740 }
741
742 impl<R: Read> Read for TrackedRead<R> {
743 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
744 let v = self.inner.read(buf)?;
745 self.bytes_read += v;
746 Ok(v)
747 }
748 }
749
750 let mut tracked = TrackedRead {
751 inner: input,
752 bytes_read: 0,
753 };
754 let header = context.read_page_header(&mut tracked, page_index, dictionary_page)?;
755 Ok((tracked.bytes_read, header))
756 }
757
758 fn read_page_header_len_from_bytes(
759 context: &SerializedPageReaderContext,
760 buffer: &[u8],
761 page_index: usize,
762 dictionary_page: bool,
763 ) -> Result<(usize, PageHeader)> {
764 let mut input = std::io::Cursor::new(buffer);
765 let header = context.read_page_header(&mut input, page_index, dictionary_page)?;
766 let header_len = input.position() as usize;
767 Ok((header_len, header))
768 }
769}
770
771#[cfg(not(feature = "encryption"))]
772impl SerializedPageReaderContext {
773 fn read_page_header<T: Read>(
774 &self,
775 input: &mut T,
776 _page_index: usize,
777 _dictionary_page: bool,
778 ) -> Result<PageHeader> {
779 let mut prot = ThriftReadInputProtocol::new(input);
780 if self.read_stats {
781 Ok(PageHeader::read_thrift(&mut prot)?)
782 } else {
783 Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
784 }
785 }
786
787 fn decrypt_page_data<T>(
788 &self,
789 buffer: T,
790 _page_index: usize,
791 _dictionary_page: bool,
792 ) -> Result<T> {
793 Ok(buffer)
794 }
795}
796
797#[cfg(feature = "encryption")]
798impl SerializedPageReaderContext {
799 fn read_page_header<T: Read>(
800 &self,
801 input: &mut T,
802 page_index: usize,
803 dictionary_page: bool,
804 ) -> Result<PageHeader> {
805 match self.page_crypto_context(page_index, dictionary_page) {
806 None => {
807 let mut prot = ThriftReadInputProtocol::new(input);
808 if self.read_stats {
809 Ok(PageHeader::read_thrift(&mut prot)?)
810 } else {
811 use crate::file::metadata::thrift::PageHeader;
812
813 Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
814 }
815 }
816 Some(page_crypto_context) => {
817 let data_decryptor = page_crypto_context.data_decryptor();
818 let aad = page_crypto_context.create_page_header_aad()?;
819
820 let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
821 ParquetError::General(format!(
822 "Error decrypting page header for column {}, decryption key may be wrong",
823 page_crypto_context.column_ordinal
824 ))
825 })?;
826
827 let mut prot = ThriftSliceInputProtocol::new(buf.as_slice());
828 if self.read_stats {
829 Ok(PageHeader::read_thrift(&mut prot)?)
830 } else {
831 Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
832 }
833 }
834 }
835 }
836
837 fn decrypt_page_data<T>(&self, buffer: T, page_index: usize, dictionary_page: bool) -> Result<T>
838 where
839 T: AsRef<[u8]>,
840 T: From<Vec<u8>>,
841 {
842 let page_crypto_context = self.page_crypto_context(page_index, dictionary_page);
843 if let Some(page_crypto_context) = page_crypto_context {
844 let decryptor = page_crypto_context.data_decryptor();
845 let aad = page_crypto_context.create_page_aad()?;
846 let decrypted = decryptor.decrypt(buffer.as_ref(), &aad)?;
847 Ok(T::from(decrypted))
848 } else {
849 Ok(buffer)
850 }
851 }
852
853 fn page_crypto_context(
854 &self,
855 page_index: usize,
856 dictionary_page: bool,
857 ) -> Option<Arc<CryptoContext>> {
858 self.crypto_context.as_ref().map(|c| {
859 Arc::new(if dictionary_page {
860 c.for_dictionary_page()
861 } else {
862 c.with_page_ordinal(page_index)
863 })
864 })
865 }
866}
867
868impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
869 type Item = Result<Page>;
870
871 fn next(&mut self) -> Option<Self::Item> {
872 self.get_next_page().transpose()
873 }
874}
875
876fn verify_page_header_len(header_len: usize, remaining_bytes: u64) -> Result<()> {
877 if header_len as u64 > remaining_bytes {
878 return Err(eof_err!("Invalid page header"));
879 }
880 Ok(())
881}
882
883fn verify_page_size(
884 compressed_size: i32,
885 uncompressed_size: i32,
886 remaining_bytes: u64,
887) -> Result<()> {
888 if compressed_size < 0 || compressed_size as u64 > remaining_bytes || uncompressed_size < 0 {
892 return Err(eof_err!("Invalid page header"));
893 }
894 Ok(())
895}
896
897impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
898 fn get_next_page(&mut self) -> Result<Option<Page>> {
899 loop {
900 let page = match &mut self.state {
901 SerializedPageReaderState::Values {
902 offset,
903 remaining_bytes: remaining,
904 next_page_header,
905 page_index,
906 require_dictionary,
907 } => {
908 if *remaining == 0 {
909 return Ok(None);
910 }
911
912 let mut read = self.reader.get_read(*offset)?;
913 let header = if let Some(header) = next_page_header.take() {
914 *header
915 } else {
916 let (header_len, header) = Self::read_page_header_len(
917 &self.context,
918 &mut read,
919 *page_index,
920 *require_dictionary,
921 )?;
922 verify_page_header_len(header_len, *remaining)?;
923 *offset += header_len as u64;
924 *remaining -= header_len as u64;
925 header
926 };
927 verify_page_size(
928 header.compressed_page_size,
929 header.uncompressed_page_size,
930 *remaining,
931 )?;
932 let data_len = header.compressed_page_size as usize;
933 let data_start = *offset;
934 *offset += data_len as u64;
935 *remaining -= data_len as u64;
936
937 if header.r#type == PageType::INDEX_PAGE {
938 continue;
939 }
940
941 let buffer = self.reader.get_bytes(data_start, data_len)?;
942
943 let buffer =
944 self.context
945 .decrypt_page_data(buffer, *page_index, *require_dictionary)?;
946
947 let page = decode_page(
948 header,
949 buffer,
950 self.physical_type,
951 self.decompressor.as_mut(),
952 )?;
953 if page.is_data_page() {
954 *page_index += 1;
955 } else if page.is_dictionary_page() {
956 *require_dictionary = false;
957 }
958 page
959 }
960 SerializedPageReaderState::Pages {
961 page_locations,
962 dictionary_page,
963 page_index,
964 ..
965 } => {
966 let (front, is_dictionary_page) = match dictionary_page.take() {
967 Some(front) => (front, true),
968 None => match page_locations.pop_front() {
969 Some(front) => (front, false),
970 None => return Ok(None),
971 },
972 };
973
974 let page_len = usize::try_from(front.compressed_page_size)?;
975 let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
976
977 let (offset, header) = Self::read_page_header_len_from_bytes(
978 &self.context,
979 buffer.as_ref(),
980 *page_index,
981 is_dictionary_page,
982 )?;
983 let bytes = buffer.slice(offset..);
984 let bytes =
985 self.context
986 .decrypt_page_data(bytes, *page_index, is_dictionary_page)?;
987
988 if !is_dictionary_page {
989 *page_index += 1;
990 }
991 decode_page(
992 header,
993 bytes,
994 self.physical_type,
995 self.decompressor.as_mut(),
996 )?
997 }
998 };
999
1000 return Ok(Some(page));
1001 }
1002 }
1003
1004 fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
1005 match &mut self.state {
1006 SerializedPageReaderState::Values {
1007 offset,
1008 remaining_bytes,
1009 next_page_header,
1010 page_index,
1011 require_dictionary,
1012 } => {
1013 loop {
1014 if *remaining_bytes == 0 {
1015 return Ok(None);
1016 }
1017 return if let Some(header) = next_page_header.as_ref() {
1018 if let Ok(page_meta) = (&**header).try_into() {
1019 Ok(Some(page_meta))
1020 } else {
1021 *next_page_header = None;
1023 continue;
1024 }
1025 } else {
1026 let mut read = self.reader.get_read(*offset)?;
1027 let (header_len, header) = Self::read_page_header_len(
1028 &self.context,
1029 &mut read,
1030 *page_index,
1031 *require_dictionary,
1032 )?;
1033 verify_page_header_len(header_len, *remaining_bytes)?;
1034 *offset += header_len as u64;
1035 *remaining_bytes -= header_len as u64;
1036 let page_meta = if let Ok(page_meta) = (&header).try_into() {
1037 Ok(Some(page_meta))
1038 } else {
1039 continue;
1041 };
1042 *next_page_header = Some(Box::new(header));
1043 page_meta
1044 };
1045 }
1046 }
1047 SerializedPageReaderState::Pages {
1048 page_locations,
1049 dictionary_page,
1050 total_rows,
1051 page_index: _,
1052 } => {
1053 if dictionary_page.is_some() {
1054 Ok(Some(PageMetadata {
1055 num_rows: None,
1056 num_levels: None,
1057 is_dict: true,
1058 }))
1059 } else if let Some(page) = page_locations.front() {
1060 let next_rows = page_locations
1061 .get(1)
1062 .map(|x| x.first_row_index as usize)
1063 .unwrap_or(*total_rows);
1064
1065 Ok(Some(PageMetadata {
1066 num_rows: Some(next_rows - page.first_row_index as usize),
1067 num_levels: None,
1068 is_dict: false,
1069 }))
1070 } else {
1071 Ok(None)
1072 }
1073 }
1074 }
1075 }
1076
1077 fn skip_next_page(&mut self) -> Result<()> {
1078 match &mut self.state {
1079 SerializedPageReaderState::Values {
1080 offset,
1081 remaining_bytes,
1082 next_page_header,
1083 page_index,
1084 require_dictionary,
1085 } => {
1086 if let Some(buffered_header) = next_page_header.take() {
1087 verify_page_size(
1088 buffered_header.compressed_page_size,
1089 buffered_header.uncompressed_page_size,
1090 *remaining_bytes,
1091 )?;
1092 *offset += buffered_header.compressed_page_size as u64;
1094 *remaining_bytes -= buffered_header.compressed_page_size as u64;
1095 } else {
1096 let mut read = self.reader.get_read(*offset)?;
1097 let (header_len, header) = Self::read_page_header_len(
1098 &self.context,
1099 &mut read,
1100 *page_index,
1101 *require_dictionary,
1102 )?;
1103 verify_page_header_len(header_len, *remaining_bytes)?;
1104 verify_page_size(
1105 header.compressed_page_size,
1106 header.uncompressed_page_size,
1107 *remaining_bytes,
1108 )?;
1109 let data_page_size = header.compressed_page_size as u64;
1110 *offset += header_len as u64 + data_page_size;
1111 *remaining_bytes -= header_len as u64 + data_page_size;
1112 }
1113 if *require_dictionary {
1114 *require_dictionary = false;
1115 } else {
1116 *page_index += 1;
1117 }
1118 Ok(())
1119 }
1120 SerializedPageReaderState::Pages {
1121 page_locations,
1122 dictionary_page,
1123 page_index,
1124 ..
1125 } => {
1126 if dictionary_page.is_some() {
1127 dictionary_page.take();
1129 } else {
1130 if page_locations.pop_front().is_some() {
1132 *page_index += 1;
1133 }
1134 }
1135
1136 Ok(())
1137 }
1138 }
1139 }
1140
1141 fn at_record_boundary(&mut self) -> Result<bool> {
1142 match &mut self.state {
1143 SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
1144 SerializedPageReaderState::Pages { .. } => Ok(true),
1145 }
1146 }
1147}
1148
1149#[cfg(test)]
1150mod tests {
1151 use std::collections::HashSet;
1152
1153 use bytes::Buf;
1154
1155 use crate::file::page_index::column_index::{
1156 ByteArrayColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex,
1157 };
1158 use crate::file::properties::{EnabledStatistics, WriterProperties};
1159
1160 use crate::basic::{self, BoundaryOrder, ColumnOrder, Encoding, SortOrder};
1161 use crate::column::reader::ColumnReader;
1162 use crate::data_type::private::ParquetValueType;
1163 use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1164 use crate::file::metadata::thrift::DataPageHeaderV2;
1165 #[allow(deprecated)]
1166 use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
1167 use crate::file::writer::SerializedFileWriter;
1168 use crate::record::RowAccessor;
1169 use crate::schema::parser::parse_message_type;
1170 use crate::util::test_common::file_util::{get_test_file, get_test_path};
1171
1172 use super::*;
1173
1174 #[test]
1175 fn test_decode_page_invalid_offset() {
1176 let page_header = PageHeader {
1177 r#type: PageType::DATA_PAGE_V2,
1178 uncompressed_page_size: 10,
1179 compressed_page_size: 10,
1180 data_page_header: None,
1181 index_page_header: None,
1182 dictionary_page_header: None,
1183 crc: None,
1184 data_page_header_v2: Some(DataPageHeaderV2 {
1185 num_nulls: 0,
1186 num_rows: 0,
1187 num_values: 0,
1188 encoding: Encoding::PLAIN,
1189 definition_levels_byte_length: 11,
1190 repetition_levels_byte_length: 0,
1191 is_compressed: None,
1192 statistics: None,
1193 }),
1194 };
1195
1196 let buffer = Bytes::new();
1197 let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1198 assert!(
1199 err.to_string()
1200 .contains("DataPage v2 header contains implausible values")
1201 );
1202 }
1203
1204 #[test]
1205 fn test_decode_unsupported_page() {
1206 let mut page_header = PageHeader {
1207 r#type: PageType::INDEX_PAGE,
1208 uncompressed_page_size: 10,
1209 compressed_page_size: 10,
1210 data_page_header: None,
1211 index_page_header: None,
1212 dictionary_page_header: None,
1213 crc: None,
1214 data_page_header_v2: None,
1215 };
1216 let buffer = Bytes::new();
1217 let err = decode_page(page_header.clone(), buffer.clone(), Type::INT32, None).unwrap_err();
1218 assert_eq!(
1219 err.to_string(),
1220 "Parquet error: Page type INDEX_PAGE is not supported"
1221 );
1222
1223 page_header.data_page_header_v2 = Some(DataPageHeaderV2 {
1224 num_nulls: 0,
1225 num_rows: 0,
1226 num_values: 0,
1227 encoding: Encoding::PLAIN,
1228 definition_levels_byte_length: 11,
1229 repetition_levels_byte_length: 0,
1230 is_compressed: None,
1231 statistics: None,
1232 });
1233 let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1234 assert!(
1235 err.to_string()
1236 .contains("DataPage v2 header contains implausible values")
1237 );
1238 }
1239
1240 #[test]
1241 fn test_cursor_and_file_has_the_same_behaviour() {
1242 let mut buf: Vec<u8> = Vec::new();
1243 get_test_file("alltypes_plain.parquet")
1244 .read_to_end(&mut buf)
1245 .unwrap();
1246 let cursor = Bytes::from(buf);
1247 let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1248
1249 let test_file = get_test_file("alltypes_plain.parquet");
1250 let read_from_file = SerializedFileReader::new(test_file).unwrap();
1251
1252 let file_iter = read_from_file.get_row_iter(None).unwrap();
1253 let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1254
1255 for (a, b) in file_iter.zip(cursor_iter) {
1256 assert_eq!(a.unwrap(), b.unwrap())
1257 }
1258 }
1259
1260 #[test]
1261 fn test_file_reader_try_from() {
1262 let test_file = get_test_file("alltypes_plain.parquet");
1264 let test_path_buf = get_test_path("alltypes_plain.parquet");
1265 let test_path = test_path_buf.as_path();
1266 let test_path_str = test_path.to_str().unwrap();
1267
1268 let reader = SerializedFileReader::try_from(test_file);
1269 assert!(reader.is_ok());
1270
1271 let reader = SerializedFileReader::try_from(test_path);
1272 assert!(reader.is_ok());
1273
1274 let reader = SerializedFileReader::try_from(test_path_str);
1275 assert!(reader.is_ok());
1276
1277 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1278 assert!(reader.is_ok());
1279
1280 let test_path = Path::new("invalid.parquet");
1282 let test_path_str = test_path.to_str().unwrap();
1283
1284 let reader = SerializedFileReader::try_from(test_path);
1285 assert!(reader.is_err());
1286
1287 let reader = SerializedFileReader::try_from(test_path_str);
1288 assert!(reader.is_err());
1289
1290 let reader = SerializedFileReader::try_from(test_path_str.to_string());
1291 assert!(reader.is_err());
1292 }
1293
1294 #[test]
1295 fn test_file_reader_into_iter() {
1296 let path = get_test_path("alltypes_plain.parquet");
1297 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1298 let iter = reader.into_iter();
1299 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1300
1301 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1302 }
1303
1304 #[test]
1305 fn test_file_reader_into_iter_project() {
1306 let path = get_test_path("alltypes_plain.parquet");
1307 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1308 let schema = "message schema { OPTIONAL INT32 id; }";
1309 let proj = parse_message_type(schema).ok();
1310 let iter = reader.into_iter().project(proj).unwrap();
1311 let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1312
1313 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1314 }
1315
1316 #[test]
1317 fn test_reuse_file_chunk() {
1318 let test_file = get_test_file("alltypes_plain.parquet");
1322 let reader = SerializedFileReader::new(test_file).unwrap();
1323 let row_group = reader.get_row_group(0).unwrap();
1324
1325 let mut page_readers = Vec::new();
1326 for i in 0..row_group.num_columns() {
1327 page_readers.push(row_group.get_column_page_reader(i).unwrap());
1328 }
1329
1330 for mut page_reader in page_readers {
1333 assert!(page_reader.get_next_page().is_ok());
1334 }
1335 }
1336
1337 #[test]
1338 fn test_file_reader() {
1339 let test_file = get_test_file("alltypes_plain.parquet");
1340 let reader_result = SerializedFileReader::new(test_file);
1341 assert!(reader_result.is_ok());
1342 let reader = reader_result.unwrap();
1343
1344 let metadata = reader.metadata();
1346 assert_eq!(metadata.num_row_groups(), 1);
1347
1348 let file_metadata = metadata.file_metadata();
1350 assert!(file_metadata.created_by().is_some());
1351 assert_eq!(
1352 file_metadata.created_by().unwrap(),
1353 "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1354 );
1355 assert!(file_metadata.key_value_metadata().is_none());
1356 assert_eq!(file_metadata.num_rows(), 8);
1357 assert_eq!(file_metadata.version(), 1);
1358 assert_eq!(file_metadata.column_orders(), None);
1359
1360 let row_group_metadata = metadata.row_group(0);
1362 assert_eq!(row_group_metadata.num_columns(), 11);
1363 assert_eq!(row_group_metadata.num_rows(), 8);
1364 assert_eq!(row_group_metadata.total_byte_size(), 671);
1365 for i in 0..row_group_metadata.num_columns() {
1367 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1368 }
1369
1370 let row_group_reader_result = reader.get_row_group(0);
1372 assert!(row_group_reader_result.is_ok());
1373 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1374 assert_eq!(
1375 row_group_reader.num_columns(),
1376 row_group_metadata.num_columns()
1377 );
1378 assert_eq!(
1379 row_group_reader.metadata().total_byte_size(),
1380 row_group_metadata.total_byte_size()
1381 );
1382
1383 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1386 assert!(page_reader_0_result.is_ok());
1387 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1388 let mut page_count = 0;
1389 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1390 let is_expected_page = match page {
1391 Page::DictionaryPage {
1392 buf,
1393 num_values,
1394 encoding,
1395 is_sorted,
1396 } => {
1397 assert_eq!(buf.len(), 32);
1398 assert_eq!(num_values, 8);
1399 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1400 assert!(!is_sorted);
1401 true
1402 }
1403 Page::DataPage {
1404 buf,
1405 num_values,
1406 encoding,
1407 def_level_encoding,
1408 rep_level_encoding,
1409 statistics,
1410 } => {
1411 assert_eq!(buf.len(), 11);
1412 assert_eq!(num_values, 8);
1413 assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1414 assert_eq!(def_level_encoding, Encoding::RLE);
1415 #[allow(deprecated)]
1416 let expected_rep_level_encoding = Encoding::BIT_PACKED;
1417 assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1418 assert!(statistics.is_none());
1419 true
1420 }
1421 _ => false,
1422 };
1423 assert!(is_expected_page);
1424 page_count += 1;
1425 }
1426 assert_eq!(page_count, 2);
1427 }
1428
1429 #[test]
1430 fn test_file_reader_datapage_v2() {
1431 let test_file = get_test_file("datapage_v2.snappy.parquet");
1432 let reader_result = SerializedFileReader::new(test_file);
1433 assert!(reader_result.is_ok());
1434 let reader = reader_result.unwrap();
1435
1436 let metadata = reader.metadata();
1438 assert_eq!(metadata.num_row_groups(), 1);
1439
1440 let file_metadata = metadata.file_metadata();
1442 assert!(file_metadata.created_by().is_some());
1443 assert_eq!(
1444 file_metadata.created_by().unwrap(),
1445 "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1446 );
1447 assert!(file_metadata.key_value_metadata().is_some());
1448 assert_eq!(
1449 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1450 1
1451 );
1452
1453 assert_eq!(file_metadata.num_rows(), 5);
1454 assert_eq!(file_metadata.version(), 1);
1455 assert_eq!(file_metadata.column_orders(), None);
1456
1457 let row_group_metadata = metadata.row_group(0);
1458
1459 for i in 0..row_group_metadata.num_columns() {
1461 assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1462 }
1463
1464 let row_group_reader_result = reader.get_row_group(0);
1466 assert!(row_group_reader_result.is_ok());
1467 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1468 assert_eq!(
1469 row_group_reader.num_columns(),
1470 row_group_metadata.num_columns()
1471 );
1472 assert_eq!(
1473 row_group_reader.metadata().total_byte_size(),
1474 row_group_metadata.total_byte_size()
1475 );
1476
1477 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1480 assert!(page_reader_0_result.is_ok());
1481 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1482 let mut page_count = 0;
1483 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1484 let is_expected_page = match page {
1485 Page::DictionaryPage {
1486 buf,
1487 num_values,
1488 encoding,
1489 is_sorted,
1490 } => {
1491 assert_eq!(buf.len(), 7);
1492 assert_eq!(num_values, 1);
1493 assert_eq!(encoding, Encoding::PLAIN);
1494 assert!(!is_sorted);
1495 true
1496 }
1497 Page::DataPageV2 {
1498 buf,
1499 num_values,
1500 encoding,
1501 num_nulls,
1502 num_rows,
1503 def_levels_byte_len,
1504 rep_levels_byte_len,
1505 is_compressed,
1506 statistics,
1507 } => {
1508 assert_eq!(buf.len(), 4);
1509 assert_eq!(num_values, 5);
1510 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1511 assert_eq!(num_nulls, 1);
1512 assert_eq!(num_rows, 5);
1513 assert_eq!(def_levels_byte_len, 2);
1514 assert_eq!(rep_levels_byte_len, 0);
1515 assert!(is_compressed);
1516 assert!(statistics.is_none()); true
1518 }
1519 _ => false,
1520 };
1521 assert!(is_expected_page);
1522 page_count += 1;
1523 }
1524 assert_eq!(page_count, 2);
1525 }
1526
1527 #[test]
1528 fn test_file_reader_empty_compressed_datapage_v2() {
1529 let test_file = get_test_file("page_v2_empty_compressed.parquet");
1531 let reader_result = SerializedFileReader::new(test_file);
1532 assert!(reader_result.is_ok());
1533 let reader = reader_result.unwrap();
1534
1535 let metadata = reader.metadata();
1537 assert_eq!(metadata.num_row_groups(), 1);
1538
1539 let file_metadata = metadata.file_metadata();
1541 assert!(file_metadata.created_by().is_some());
1542 assert_eq!(
1543 file_metadata.created_by().unwrap(),
1544 "parquet-cpp-arrow version 14.0.2"
1545 );
1546 assert!(file_metadata.key_value_metadata().is_some());
1547 assert_eq!(
1548 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1549 1
1550 );
1551
1552 assert_eq!(file_metadata.num_rows(), 10);
1553 assert_eq!(file_metadata.version(), 2);
1554 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1555 assert_eq!(
1556 file_metadata.column_orders(),
1557 Some(vec![expected_order].as_ref())
1558 );
1559
1560 let row_group_metadata = metadata.row_group(0);
1561
1562 for i in 0..row_group_metadata.num_columns() {
1564 assert_eq!(file_metadata.column_order(i), expected_order);
1565 }
1566
1567 let row_group_reader_result = reader.get_row_group(0);
1569 assert!(row_group_reader_result.is_ok());
1570 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1571 assert_eq!(
1572 row_group_reader.num_columns(),
1573 row_group_metadata.num_columns()
1574 );
1575 assert_eq!(
1576 row_group_reader.metadata().total_byte_size(),
1577 row_group_metadata.total_byte_size()
1578 );
1579
1580 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1582 assert!(page_reader_0_result.is_ok());
1583 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1584 let mut page_count = 0;
1585 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1586 let is_expected_page = match page {
1587 Page::DictionaryPage {
1588 buf,
1589 num_values,
1590 encoding,
1591 is_sorted,
1592 } => {
1593 assert_eq!(buf.len(), 0);
1594 assert_eq!(num_values, 0);
1595 assert_eq!(encoding, Encoding::PLAIN);
1596 assert!(!is_sorted);
1597 true
1598 }
1599 Page::DataPageV2 {
1600 buf,
1601 num_values,
1602 encoding,
1603 num_nulls,
1604 num_rows,
1605 def_levels_byte_len,
1606 rep_levels_byte_len,
1607 is_compressed,
1608 statistics,
1609 } => {
1610 assert_eq!(buf.len(), 3);
1611 assert_eq!(num_values, 10);
1612 assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1613 assert_eq!(num_nulls, 10);
1614 assert_eq!(num_rows, 10);
1615 assert_eq!(def_levels_byte_len, 2);
1616 assert_eq!(rep_levels_byte_len, 0);
1617 assert!(is_compressed);
1618 assert!(statistics.is_none()); true
1620 }
1621 _ => false,
1622 };
1623 assert!(is_expected_page);
1624 page_count += 1;
1625 }
1626 assert_eq!(page_count, 2);
1627 }
1628
1629 #[test]
1630 fn test_file_reader_empty_datapage_v2() {
1631 let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1633 let reader_result = SerializedFileReader::new(test_file);
1634 assert!(reader_result.is_ok());
1635 let reader = reader_result.unwrap();
1636
1637 let metadata = reader.metadata();
1639 assert_eq!(metadata.num_row_groups(), 1);
1640
1641 let file_metadata = metadata.file_metadata();
1643 assert!(file_metadata.created_by().is_some());
1644 assert_eq!(
1645 file_metadata.created_by().unwrap(),
1646 "parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1647 );
1648 assert!(file_metadata.key_value_metadata().is_some());
1649 assert_eq!(
1650 file_metadata.key_value_metadata().to_owned().unwrap().len(),
1651 2
1652 );
1653
1654 assert_eq!(file_metadata.num_rows(), 1);
1655 assert_eq!(file_metadata.version(), 1);
1656 let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1657 assert_eq!(
1658 file_metadata.column_orders(),
1659 Some(vec![expected_order].as_ref())
1660 );
1661
1662 let row_group_metadata = metadata.row_group(0);
1663
1664 for i in 0..row_group_metadata.num_columns() {
1666 assert_eq!(file_metadata.column_order(i), expected_order);
1667 }
1668
1669 let row_group_reader_result = reader.get_row_group(0);
1671 assert!(row_group_reader_result.is_ok());
1672 let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1673 assert_eq!(
1674 row_group_reader.num_columns(),
1675 row_group_metadata.num_columns()
1676 );
1677 assert_eq!(
1678 row_group_reader.metadata().total_byte_size(),
1679 row_group_metadata.total_byte_size()
1680 );
1681
1682 let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1684 assert!(page_reader_0_result.is_ok());
1685 let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1686 let mut page_count = 0;
1687 while let Some(page) = page_reader_0.get_next_page().unwrap() {
1688 let is_expected_page = match page {
1689 Page::DataPageV2 {
1690 buf,
1691 num_values,
1692 encoding,
1693 num_nulls,
1694 num_rows,
1695 def_levels_byte_len,
1696 rep_levels_byte_len,
1697 is_compressed,
1698 statistics,
1699 } => {
1700 assert_eq!(buf.len(), 2);
1701 assert_eq!(num_values, 1);
1702 assert_eq!(encoding, Encoding::PLAIN);
1703 assert_eq!(num_nulls, 1);
1704 assert_eq!(num_rows, 1);
1705 assert_eq!(def_levels_byte_len, 2);
1706 assert_eq!(rep_levels_byte_len, 0);
1707 assert!(is_compressed);
1708 assert!(statistics.is_none());
1709 true
1710 }
1711 _ => false,
1712 };
1713 assert!(is_expected_page);
1714 page_count += 1;
1715 }
1716 assert_eq!(page_count, 1);
1717 }
1718
1719 fn get_serialized_page_reader<R: ChunkReader>(
1720 file_reader: &SerializedFileReader<R>,
1721 row_group: usize,
1722 column: usize,
1723 ) -> Result<SerializedPageReader<R>> {
1724 let row_group = {
1725 let row_group_metadata = file_reader.metadata.row_group(row_group);
1726 let props = Arc::clone(&file_reader.props);
1727 let f = Arc::clone(&file_reader.chunk_reader);
1728 SerializedRowGroupReader::new(
1729 f,
1730 row_group_metadata,
1731 file_reader
1732 .metadata
1733 .offset_index()
1734 .map(|x| x[row_group].as_slice()),
1735 props,
1736 )?
1737 };
1738
1739 let col = row_group.metadata.column(column);
1740
1741 let page_locations = row_group
1742 .offset_index
1743 .map(|x| x[column].page_locations.clone());
1744
1745 let props = Arc::clone(&row_group.props);
1746 SerializedPageReader::new_with_properties(
1747 Arc::clone(&row_group.chunk_reader),
1748 col,
1749 usize::try_from(row_group.metadata.num_rows())?,
1750 page_locations,
1751 props,
1752 )
1753 }
1754
1755 #[test]
1756 fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1757 let test_file = get_test_file("alltypes_plain.parquet");
1758 let reader = SerializedFileReader::new(test_file)?;
1759
1760 let mut offset_set = HashSet::new();
1761 let num_row_groups = reader.metadata.num_row_groups();
1762 for row_group in 0..num_row_groups {
1763 let num_columns = reader.metadata.row_group(row_group).num_columns();
1764 for column in 0..num_columns {
1765 let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1766
1767 while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1768 match &page_reader.state {
1769 SerializedPageReaderState::Pages {
1770 page_locations,
1771 dictionary_page,
1772 ..
1773 } => {
1774 if let Some(page) = dictionary_page {
1775 assert_eq!(page.offset as u64, page_offset);
1776 } else if let Some(page) = page_locations.front() {
1777 assert_eq!(page.offset as u64, page_offset);
1778 } else {
1779 unreachable!()
1780 }
1781 }
1782 SerializedPageReaderState::Values {
1783 offset,
1784 next_page_header,
1785 ..
1786 } => {
1787 assert!(next_page_header.is_some());
1788 assert_eq!(*offset, page_offset);
1789 }
1790 }
1791 let page = page_reader.get_next_page()?;
1792 assert!(page.is_some());
1793 let newly_inserted = offset_set.insert(page_offset);
1794 assert!(newly_inserted);
1795 }
1796 }
1797 }
1798
1799 Ok(())
1800 }
1801
1802 #[test]
1803 fn test_page_iterator() {
1804 let file = get_test_file("alltypes_plain.parquet");
1805 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1806
1807 let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1808
1809 let page = page_iterator.next();
1811 assert!(page.is_some());
1812 assert!(page.unwrap().is_ok());
1813
1814 let page = page_iterator.next();
1816 assert!(page.is_none());
1817
1818 let row_group_indices = Box::new(0..1);
1819 let mut page_iterator =
1820 FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1821
1822 let page = page_iterator.next();
1824 assert!(page.is_some());
1825 assert!(page.unwrap().is_ok());
1826
1827 let page = page_iterator.next();
1829 assert!(page.is_none());
1830 }
1831
1832 #[test]
1833 fn test_file_reader_key_value_metadata() {
1834 let file = get_test_file("binary.parquet");
1835 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1836
1837 let metadata = file_reader
1838 .metadata
1839 .file_metadata()
1840 .key_value_metadata()
1841 .unwrap();
1842
1843 assert_eq!(metadata.len(), 3);
1844
1845 assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1846
1847 assert_eq!(metadata[1].key, "writer.model.name");
1848 assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1849
1850 assert_eq!(metadata[2].key, "parquet.proto.class");
1851 assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1852 }
1853
1854 #[test]
1855 fn test_file_reader_optional_metadata() {
1856 let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1858 let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1859
1860 let row_group_metadata = file_reader.metadata.row_group(0);
1861 let col0_metadata = row_group_metadata.column(0);
1862
1863 assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1865
1866 let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1868
1869 assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1870 assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1871 assert_eq!(page_encoding_stats.count, 1);
1872
1873 assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1875 assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1876
1877 assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1879 assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1880 }
1881
1882 #[test]
1883 fn test_file_reader_page_stats_mask() {
1884 let file = get_test_file("alltypes_tiny_pages.parquet");
1885 let options = ReadOptionsBuilder::new()
1886 .with_encoding_stats_as_mask(true)
1887 .build();
1888 let file_reader = Arc::new(SerializedFileReader::new_with_options(file, options).unwrap());
1889
1890 let row_group_metadata = file_reader.metadata.row_group(0);
1891
1892 let page_encoding_stats = row_group_metadata
1894 .column(0)
1895 .page_encoding_stats_mask()
1896 .unwrap();
1897 assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1898 let page_encoding_stats = row_group_metadata
1899 .column(2)
1900 .page_encoding_stats_mask()
1901 .unwrap();
1902 assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1903 }
1904
1905 #[test]
1906 fn test_file_reader_page_stats_skipped() {
1907 let file = get_test_file("alltypes_tiny_pages.parquet");
1908
1909 let options = ReadOptionsBuilder::new()
1911 .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1912 .build();
1913 let file_reader = Arc::new(
1914 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1915 );
1916
1917 let row_group_metadata = file_reader.metadata.row_group(0);
1918 for column in row_group_metadata.columns() {
1919 assert!(column.page_encoding_stats().is_none());
1920 assert!(column.page_encoding_stats_mask().is_none());
1921 }
1922
1923 let options = ReadOptionsBuilder::new()
1925 .with_encoding_stats_as_mask(true)
1926 .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1927 .build();
1928 let file_reader = Arc::new(
1929 SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1930 );
1931
1932 let row_group_metadata = file_reader.metadata.row_group(0);
1933 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1934 assert!(column.page_encoding_stats().is_none());
1935 assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1936 }
1937 }
1938
1939 #[test]
1940 fn test_file_reader_with_no_filter() -> Result<()> {
1941 let test_file = get_test_file("alltypes_plain.parquet");
1942 let origin_reader = SerializedFileReader::new(test_file)?;
1943 let metadata = origin_reader.metadata();
1945 assert_eq!(metadata.num_row_groups(), 1);
1946 Ok(())
1947 }
1948
1949 #[test]
1950 fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1951 let test_file = get_test_file("alltypes_plain.parquet");
1952 let read_options = ReadOptionsBuilder::new()
1953 .with_predicate(Box::new(|_, _| false))
1954 .build();
1955 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1956 let metadata = reader.metadata();
1957 assert_eq!(metadata.num_row_groups(), 0);
1958 Ok(())
1959 }
1960
1961 #[test]
1962 fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1963 let test_file = get_test_file("alltypes_plain.parquet");
1964 let origin_reader = SerializedFileReader::new(test_file)?;
1965 let metadata = origin_reader.metadata();
1967 assert_eq!(metadata.num_row_groups(), 1);
1968 let mid = get_midpoint_offset(metadata.row_group(0));
1969
1970 let test_file = get_test_file("alltypes_plain.parquet");
1971 let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1972 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1973 let metadata = reader.metadata();
1974 assert_eq!(metadata.num_row_groups(), 1);
1975
1976 let test_file = get_test_file("alltypes_plain.parquet");
1977 let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1978 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1979 let metadata = reader.metadata();
1980 assert_eq!(metadata.num_row_groups(), 0);
1981 Ok(())
1982 }
1983
1984 #[test]
1985 fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1986 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1987 let origin_reader = SerializedFileReader::new(test_file)?;
1988 let metadata = origin_reader.metadata();
1989 let mid = get_midpoint_offset(metadata.row_group(0));
1990
1991 let test_file = get_test_file("alltypes_tiny_pages.parquet");
1993 let read_options = ReadOptionsBuilder::new()
1994 .with_page_index()
1995 .with_predicate(Box::new(|_, _| true))
1996 .with_range(mid, mid + 1)
1997 .build();
1998 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1999 let metadata = reader.metadata();
2000 assert_eq!(metadata.num_row_groups(), 1);
2001 assert_eq!(metadata.column_index().unwrap().len(), 1);
2002 assert_eq!(metadata.offset_index().unwrap().len(), 1);
2003
2004 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2006 let read_options = ReadOptionsBuilder::new()
2007 .with_page_index()
2008 .with_predicate(Box::new(|_, _| true))
2009 .with_range(0, mid)
2010 .build();
2011 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2012 let metadata = reader.metadata();
2013 assert_eq!(metadata.num_row_groups(), 0);
2014 assert!(metadata.column_index().is_none());
2015 assert!(metadata.offset_index().is_none());
2016
2017 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2019 let read_options = ReadOptionsBuilder::new()
2020 .with_page_index()
2021 .with_predicate(Box::new(|_, _| false))
2022 .with_range(mid, mid + 1)
2023 .build();
2024 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2025 let metadata = reader.metadata();
2026 assert_eq!(metadata.num_row_groups(), 0);
2027 assert!(metadata.column_index().is_none());
2028 assert!(metadata.offset_index().is_none());
2029
2030 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2032 let read_options = ReadOptionsBuilder::new()
2033 .with_page_index()
2034 .with_predicate(Box::new(|_, _| false))
2035 .with_range(0, mid)
2036 .build();
2037 let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2038 let metadata = reader.metadata();
2039 assert_eq!(metadata.num_row_groups(), 0);
2040 assert!(metadata.column_index().is_none());
2041 assert!(metadata.offset_index().is_none());
2042 Ok(())
2043 }
2044
2045 #[test]
2046 fn test_file_reader_invalid_metadata() {
2047 let data = [
2048 255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
2049 80, 65, 82, 49,
2050 ];
2051 let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
2052 assert_eq!(
2053 ret.err().unwrap().to_string(),
2054 "Parquet error: Received empty union from remote ColumnOrder"
2055 );
2056 }
2057
2058 #[test]
2059 fn test_page_index_reader() {
2076 let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
2077 let builder = ReadOptionsBuilder::new();
2078 let options = builder.with_page_index().build();
2080 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2081 let reader = reader_result.unwrap();
2082
2083 let metadata = reader.metadata();
2085 assert_eq!(metadata.num_row_groups(), 1);
2086
2087 let column_index = metadata.column_index().unwrap();
2088
2089 assert_eq!(column_index.len(), 1);
2091 let index = if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][0] {
2092 index
2093 } else {
2094 unreachable!()
2095 };
2096
2097 assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
2098
2099 assert_eq!(index.num_pages(), 1);
2101
2102 let min = index.min_value(0).unwrap();
2103 let max = index.max_value(0).unwrap();
2104 assert_eq!(b"Hello", min.as_bytes());
2105 assert_eq!(b"today", max.as_bytes());
2106
2107 let offset_indexes = metadata.offset_index().unwrap();
2108 assert_eq!(offset_indexes.len(), 1);
2110 let offset_index = &offset_indexes[0];
2111 let page_offset = &offset_index[0].page_locations()[0];
2112
2113 assert_eq!(4, page_offset.offset);
2114 assert_eq!(152, page_offset.compressed_page_size);
2115 assert_eq!(0, page_offset.first_row_index);
2116 }
2117
2118 #[test]
2119 #[allow(deprecated)]
2120 fn test_page_index_reader_out_of_order() {
2121 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2122 let options = ReadOptionsBuilder::new().with_page_index().build();
2123 let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
2124 let metadata = reader.metadata();
2125
2126 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2127 let columns = metadata.row_group(0).columns();
2128 let reversed: Vec<_> = columns.iter().cloned().rev().collect();
2129
2130 let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
2131 let mut b = read_columns_indexes(&test_file, &reversed)
2132 .unwrap()
2133 .unwrap();
2134 b.reverse();
2135 assert_eq!(a, b);
2136
2137 let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
2138 let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
2139 b.reverse();
2140 assert_eq!(a, b);
2141 }
2142
2143 #[test]
2144 fn test_page_index_reader_all_type() {
2145 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2146 let builder = ReadOptionsBuilder::new();
2147 let options = builder.with_page_index().build();
2149 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2150 let reader = reader_result.unwrap();
2151
2152 let metadata = reader.metadata();
2154 assert_eq!(metadata.num_row_groups(), 1);
2155
2156 let column_index = metadata.column_index().unwrap();
2157 let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
2158
2159 assert_eq!(column_index.len(), 1);
2161 let row_group_metadata = metadata.row_group(0);
2162
2163 assert!(!&column_index[0][0].is_sorted());
2165 let boundary_order = &column_index[0][0].get_boundary_order();
2166 assert!(boundary_order.is_some());
2167 matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
2168 if let ColumnIndexMetaData::INT32(index) = &column_index[0][0] {
2169 check_native_page_index(
2170 index,
2171 325,
2172 get_row_group_min_max_bytes(row_group_metadata, 0),
2173 BoundaryOrder::UNORDERED,
2174 );
2175 assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
2176 } else {
2177 unreachable!()
2178 };
2179 assert!(&column_index[0][1].is_sorted());
2181 if let ColumnIndexMetaData::BOOLEAN(index) = &column_index[0][1] {
2182 assert_eq!(index.num_pages(), 82);
2183 assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
2184 } else {
2185 unreachable!()
2186 };
2187 assert!(&column_index[0][2].is_sorted());
2189 if let ColumnIndexMetaData::INT32(index) = &column_index[0][2] {
2190 check_native_page_index(
2191 index,
2192 325,
2193 get_row_group_min_max_bytes(row_group_metadata, 2),
2194 BoundaryOrder::ASCENDING,
2195 );
2196 assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
2197 } else {
2198 unreachable!()
2199 };
2200 assert!(&column_index[0][3].is_sorted());
2202 if let ColumnIndexMetaData::INT32(index) = &column_index[0][3] {
2203 check_native_page_index(
2204 index,
2205 325,
2206 get_row_group_min_max_bytes(row_group_metadata, 3),
2207 BoundaryOrder::ASCENDING,
2208 );
2209 assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
2210 } else {
2211 unreachable!()
2212 };
2213 assert!(&column_index[0][4].is_sorted());
2215 if let ColumnIndexMetaData::INT32(index) = &column_index[0][4] {
2216 check_native_page_index(
2217 index,
2218 325,
2219 get_row_group_min_max_bytes(row_group_metadata, 4),
2220 BoundaryOrder::ASCENDING,
2221 );
2222 assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
2223 } else {
2224 unreachable!()
2225 };
2226 assert!(!&column_index[0][5].is_sorted());
2228 if let ColumnIndexMetaData::INT64(index) = &column_index[0][5] {
2229 check_native_page_index(
2230 index,
2231 528,
2232 get_row_group_min_max_bytes(row_group_metadata, 5),
2233 BoundaryOrder::UNORDERED,
2234 );
2235 assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
2236 } else {
2237 unreachable!()
2238 };
2239 assert!(&column_index[0][6].is_sorted());
2241 if let ColumnIndexMetaData::FLOAT(index) = &column_index[0][6] {
2242 check_native_page_index(
2243 index,
2244 325,
2245 get_row_group_min_max_bytes(row_group_metadata, 6),
2246 BoundaryOrder::ASCENDING,
2247 );
2248 assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
2249 } else {
2250 unreachable!()
2251 };
2252 assert!(!&column_index[0][7].is_sorted());
2254 if let ColumnIndexMetaData::DOUBLE(index) = &column_index[0][7] {
2255 check_native_page_index(
2256 index,
2257 528,
2258 get_row_group_min_max_bytes(row_group_metadata, 7),
2259 BoundaryOrder::UNORDERED,
2260 );
2261 assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
2262 } else {
2263 unreachable!()
2264 };
2265 assert!(!&column_index[0][8].is_sorted());
2267 if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][8] {
2268 check_byte_array_page_index(
2269 index,
2270 974,
2271 get_row_group_min_max_bytes(row_group_metadata, 8),
2272 BoundaryOrder::UNORDERED,
2273 );
2274 assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
2275 } else {
2276 unreachable!()
2277 };
2278 assert!(&column_index[0][9].is_sorted());
2280 if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][9] {
2281 check_byte_array_page_index(
2282 index,
2283 352,
2284 get_row_group_min_max_bytes(row_group_metadata, 9),
2285 BoundaryOrder::ASCENDING,
2286 );
2287 assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
2288 } else {
2289 unreachable!()
2290 };
2291 assert!(!&column_index[0][10].is_sorted());
2294 if let ColumnIndexMetaData::NONE = &column_index[0][10] {
2295 assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
2296 } else {
2297 unreachable!()
2298 };
2299 assert!(&column_index[0][11].is_sorted());
2301 if let ColumnIndexMetaData::INT32(index) = &column_index[0][11] {
2302 check_native_page_index(
2303 index,
2304 325,
2305 get_row_group_min_max_bytes(row_group_metadata, 11),
2306 BoundaryOrder::ASCENDING,
2307 );
2308 assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
2309 } else {
2310 unreachable!()
2311 };
2312 assert!(!&column_index[0][12].is_sorted());
2314 if let ColumnIndexMetaData::INT32(index) = &column_index[0][12] {
2315 check_native_page_index(
2316 index,
2317 325,
2318 get_row_group_min_max_bytes(row_group_metadata, 12),
2319 BoundaryOrder::UNORDERED,
2320 );
2321 assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
2322 } else {
2323 unreachable!()
2324 };
2325 }
2326
2327 fn check_native_page_index<T: ParquetValueType>(
2328 row_group_index: &PrimitiveColumnIndex<T>,
2329 page_size: usize,
2330 min_max: (&[u8], &[u8]),
2331 boundary_order: BoundaryOrder,
2332 ) {
2333 assert_eq!(row_group_index.num_pages() as usize, page_size);
2334 assert_eq!(row_group_index.boundary_order, boundary_order);
2335 assert!(row_group_index.min_values().iter().all(|x| {
2336 x >= &T::try_from_le_slice(min_max.0).unwrap()
2337 && x <= &T::try_from_le_slice(min_max.1).unwrap()
2338 }));
2339 }
2340
2341 fn check_byte_array_page_index(
2342 row_group_index: &ByteArrayColumnIndex,
2343 page_size: usize,
2344 min_max: (&[u8], &[u8]),
2345 boundary_order: BoundaryOrder,
2346 ) {
2347 assert_eq!(row_group_index.num_pages() as usize, page_size);
2348 assert_eq!(row_group_index.boundary_order, boundary_order);
2349 for i in 0..row_group_index.num_pages() as usize {
2350 let x = row_group_index.min_value(i).unwrap();
2351 assert!(x >= min_max.0 && x <= min_max.1);
2352 }
2353 }
2354
2355 fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
2356 let statistics = r.column(col_num).statistics().unwrap();
2357 (
2358 statistics.min_bytes_opt().unwrap_or_default(),
2359 statistics.max_bytes_opt().unwrap_or_default(),
2360 )
2361 }
2362
2363 #[test]
2364 fn test_skip_next_page_with_dictionary_page() {
2365 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2366 let builder = ReadOptionsBuilder::new();
2367 let options = builder.with_page_index().build();
2369 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2370 let reader = reader_result.unwrap();
2371
2372 let row_group_reader = reader.get_row_group(0).unwrap();
2373
2374 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2376
2377 let mut vec = vec![];
2378
2379 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2381 assert!(meta.is_dict);
2382
2383 column_page_reader.skip_next_page().unwrap();
2385
2386 let page = column_page_reader.get_next_page().unwrap().unwrap();
2388 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2389
2390 for _i in 0..351 {
2392 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2394 assert!(!meta.is_dict); vec.push(meta);
2396
2397 let page = column_page_reader.get_next_page().unwrap().unwrap();
2398 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2399 }
2400
2401 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2403 assert!(column_page_reader.get_next_page().unwrap().is_none());
2404
2405 assert_eq!(vec.len(), 351);
2407 }
2408
2409 #[test]
2410 fn test_skip_page_with_offset_index() {
2411 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2412 let builder = ReadOptionsBuilder::new();
2413 let options = builder.with_page_index().build();
2415 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2416 let reader = reader_result.unwrap();
2417
2418 let row_group_reader = reader.get_row_group(0).unwrap();
2419
2420 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2422
2423 let mut vec = vec![];
2424
2425 for i in 0..325 {
2426 if i % 2 == 0 {
2427 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2428 } else {
2429 column_page_reader.skip_next_page().unwrap();
2430 }
2431 }
2432 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2434 assert!(column_page_reader.get_next_page().unwrap().is_none());
2435
2436 assert_eq!(vec.len(), 163);
2437 }
2438
2439 #[test]
2440 fn test_skip_page_without_offset_index() {
2441 let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2442
2443 let reader_result = SerializedFileReader::new(test_file);
2445 let reader = reader_result.unwrap();
2446
2447 let row_group_reader = reader.get_row_group(0).unwrap();
2448
2449 let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2451
2452 let mut vec = vec![];
2453
2454 for i in 0..325 {
2455 if i % 2 == 0 {
2456 vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2457 } else {
2458 column_page_reader.peek_next_page().unwrap().unwrap();
2459 column_page_reader.skip_next_page().unwrap();
2460 }
2461 }
2462 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2464 assert!(column_page_reader.get_next_page().unwrap().is_none());
2465
2466 assert_eq!(vec.len(), 163);
2467 }
2468
2469 #[test]
2470 fn test_peek_page_with_dictionary_page() {
2471 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2472 let builder = ReadOptionsBuilder::new();
2473 let options = builder.with_page_index().build();
2475 let reader_result = SerializedFileReader::new_with_options(test_file, options);
2476 let reader = reader_result.unwrap();
2477 let row_group_reader = reader.get_row_group(0).unwrap();
2478
2479 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2481
2482 let mut vec = vec![];
2483
2484 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2485 assert!(meta.is_dict);
2486 let page = column_page_reader.get_next_page().unwrap().unwrap();
2487 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2488
2489 for i in 0..352 {
2490 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2491 if i != 351 {
2494 assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
2495 } else {
2496 assert_eq!(meta.num_rows, Some(10));
2499 }
2500 assert!(!meta.is_dict);
2501 vec.push(meta);
2502 let page = column_page_reader.get_next_page().unwrap().unwrap();
2503 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2504 }
2505
2506 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2508 assert!(column_page_reader.get_next_page().unwrap().is_none());
2509
2510 assert_eq!(vec.len(), 352);
2511 }
2512
2513 #[test]
2514 fn test_peek_page_with_dictionary_page_without_offset_index() {
2515 let test_file = get_test_file("alltypes_tiny_pages.parquet");
2516
2517 let reader_result = SerializedFileReader::new(test_file);
2518 let reader = reader_result.unwrap();
2519 let row_group_reader = reader.get_row_group(0).unwrap();
2520
2521 let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2523
2524 let mut vec = vec![];
2525
2526 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2527 assert!(meta.is_dict);
2528 let page = column_page_reader.get_next_page().unwrap().unwrap();
2529 assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2530
2531 for i in 0..352 {
2532 let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2533 if i != 351 {
2536 assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2537 } else {
2538 assert_eq!(meta.num_levels, Some(10));
2541 }
2542 assert!(!meta.is_dict);
2543 vec.push(meta);
2544 let page = column_page_reader.get_next_page().unwrap().unwrap();
2545 assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2546 }
2547
2548 assert!(column_page_reader.peek_next_page().unwrap().is_none());
2550 assert!(column_page_reader.get_next_page().unwrap().is_none());
2551
2552 assert_eq!(vec.len(), 352);
2553 }
2554
2555 #[test]
2556 fn test_fixed_length_index() {
2557 let message_type = "
2558 message test_schema {
2559 OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2560 }
2561 ";
2562
2563 let schema = parse_message_type(message_type).unwrap();
2564 let mut out = Vec::with_capacity(1024);
2565 let mut writer =
2566 SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2567
2568 let mut r = writer.next_row_group().unwrap();
2569 let mut c = r.next_column().unwrap().unwrap();
2570 c.typed::<FixedLenByteArrayType>()
2571 .write_batch(
2572 &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2573 Some(&[1, 1, 0, 1]),
2574 None,
2575 )
2576 .unwrap();
2577 c.close().unwrap();
2578 r.close().unwrap();
2579 writer.close().unwrap();
2580
2581 let b = Bytes::from(out);
2582 let options = ReadOptionsBuilder::new().with_page_index().build();
2583 let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2584 let index = reader.metadata().column_index().unwrap();
2585
2586 assert_eq!(index.len(), 1);
2588 let c = &index[0];
2589 assert_eq!(c.len(), 1);
2591
2592 match &c[0] {
2593 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(v) => {
2594 assert_eq!(v.num_pages(), 1);
2595 assert_eq!(v.null_count(0).unwrap(), 1);
2596 assert_eq!(v.min_value(0).unwrap(), &[0; 11]);
2597 assert_eq!(v.max_value(0).unwrap(), &[5; 11]);
2598 }
2599 _ => unreachable!(),
2600 }
2601 }
2602
2603 #[test]
2604 fn test_multi_gz() {
2605 let file = get_test_file("concatenated_gzip_members.parquet");
2606 let reader = SerializedFileReader::new(file).unwrap();
2607 let row_group_reader = reader.get_row_group(0).unwrap();
2608 match row_group_reader.get_column_reader(0).unwrap() {
2609 ColumnReader::Int64ColumnReader(mut reader) => {
2610 let mut buffer = Vec::with_capacity(1024);
2611 let mut def_levels = Vec::with_capacity(1024);
2612 let (num_records, num_values, num_levels) = reader
2613 .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2614 .unwrap();
2615
2616 assert_eq!(num_records, 513);
2617 assert_eq!(num_values, 513);
2618 assert_eq!(num_levels, 513);
2619
2620 let expected: Vec<i64> = (1..514).collect();
2621 assert_eq!(&buffer, &expected);
2622 }
2623 _ => unreachable!(),
2624 }
2625 }
2626
2627 #[test]
2628 fn test_byte_stream_split_extended() {
2629 let path = format!(
2630 "{}/byte_stream_split_extended.gzip.parquet",
2631 arrow::util::test_util::parquet_test_data(),
2632 );
2633 let file = File::open(path).unwrap();
2634 let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2635
2636 let mut iter = reader
2638 .get_row_iter(None)
2639 .expect("Failed to create row iterator");
2640
2641 let mut start = 0;
2642 let end = reader.metadata().file_metadata().num_rows();
2643
2644 let check_row = |row: Result<Row, ParquetError>| {
2645 assert!(row.is_ok());
2646 let r = row.unwrap();
2647 assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2648 assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2649 assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2650 assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2651 assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2652 assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2653 assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2654 };
2655
2656 while start < end {
2657 match iter.next() {
2658 Some(row) => check_row(row),
2659 None => break,
2660 };
2661 start += 1;
2662 }
2663 }
2664
2665 #[test]
2666 fn test_filtered_rowgroup_metadata() {
2667 let message_type = "
2668 message test_schema {
2669 REQUIRED INT32 a;
2670 }
2671 ";
2672 let schema = Arc::new(parse_message_type(message_type).unwrap());
2673 let props = Arc::new(
2674 WriterProperties::builder()
2675 .set_statistics_enabled(EnabledStatistics::Page)
2676 .build(),
2677 );
2678 let mut file: File = tempfile::tempfile().unwrap();
2679 let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2680 let data = [1, 2, 3, 4, 5];
2681
2682 for idx in 0..5 {
2684 let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2685 let mut row_group_writer = file_writer.next_row_group().unwrap();
2686 if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2687 writer
2688 .typed::<Int32Type>()
2689 .write_batch(data_i.as_slice(), None, None)
2690 .unwrap();
2691 writer.close().unwrap();
2692 }
2693 row_group_writer.close().unwrap();
2694 file_writer.flushed_row_groups();
2695 }
2696 let file_metadata = file_writer.close().unwrap();
2697
2698 assert_eq!(file_metadata.file_metadata().num_rows(), 25);
2699 assert_eq!(file_metadata.num_row_groups(), 5);
2700
2701 let read_options = ReadOptionsBuilder::new()
2703 .with_page_index()
2704 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2705 .build();
2706 let reader =
2707 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2708 .unwrap();
2709 let metadata = reader.metadata();
2710
2711 assert_eq!(metadata.num_row_groups(), 1);
2713 assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2714
2715 assert!(metadata.column_index().is_some());
2717 assert!(metadata.offset_index().is_some());
2718 assert_eq!(metadata.column_index().unwrap().len(), 1);
2719 assert_eq!(metadata.offset_index().unwrap().len(), 1);
2720 let col_idx = metadata.column_index().unwrap();
2721 let off_idx = metadata.offset_index().unwrap();
2722 let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2723 let pg_idx = &col_idx[0][0];
2724 let off_idx_i = &off_idx[0][0];
2725
2726 match pg_idx {
2728 ColumnIndexMetaData::INT32(int_idx) => {
2729 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2730 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2731 assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2732 assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2733 }
2734 _ => panic!("wrong stats type"),
2735 }
2736
2737 assert_eq!(
2739 off_idx_i.page_locations[0].offset,
2740 metadata.row_group(0).column(0).data_page_offset()
2741 );
2742
2743 let read_options = ReadOptionsBuilder::new()
2745 .with_page_index()
2746 .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2747 .build();
2748 let reader =
2749 SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2750 .unwrap();
2751 let metadata = reader.metadata();
2752
2753 assert_eq!(metadata.num_row_groups(), 2);
2755 assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2756 assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2757
2758 assert!(metadata.column_index().is_some());
2760 assert!(metadata.offset_index().is_some());
2761 assert_eq!(metadata.column_index().unwrap().len(), 2);
2762 assert_eq!(metadata.offset_index().unwrap().len(), 2);
2763 let col_idx = metadata.column_index().unwrap();
2764 let off_idx = metadata.offset_index().unwrap();
2765
2766 for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2767 let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2768 let pg_idx = &col_idx_i[0];
2769 let off_idx_i = &off_idx[i][0];
2770
2771 match pg_idx {
2773 ColumnIndexMetaData::INT32(int_idx) => {
2774 let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2775 let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2776 assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2777 assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2778 }
2779 _ => panic!("wrong stats type"),
2780 }
2781
2782 assert_eq!(
2784 off_idx_i.page_locations[0].offset,
2785 metadata.row_group(i).column(0).data_page_offset()
2786 );
2787 }
2788 }
2789
2790 #[test]
2791 fn test_reuse_schema() {
2792 let file = get_test_file("alltypes_plain.parquet");
2793 let file_reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
2794 let schema = file_reader.metadata().file_metadata().schema_descr_ptr();
2795 let expected = file_reader.metadata;
2796
2797 let options = ReadOptionsBuilder::new()
2798 .with_parquet_schema(schema)
2799 .build();
2800 let file_reader = SerializedFileReader::new_with_options(file, options).unwrap();
2801
2802 assert_eq!(expected.as_ref(), file_reader.metadata.as_ref());
2803 assert!(Arc::ptr_eq(
2805 &expected.file_metadata().schema_descr_ptr(),
2806 &file_reader.metadata.file_metadata().schema_descr_ptr()
2807 ));
2808 }
2809
2810 #[test]
2811 fn test_read_unknown_logical_type() {
2812 let file = get_test_file("unknown-logical-type.parquet");
2813 let reader = SerializedFileReader::new(file).expect("Error opening file");
2814
2815 let schema = reader.metadata().file_metadata().schema_descr();
2816 assert_eq!(
2817 schema.column(0).logical_type_ref(),
2818 Some(&basic::LogicalType::String)
2819 );
2820 assert_eq!(
2821 schema.column(1).logical_type_ref(),
2822 Some(&basic::LogicalType::_Unknown { field_id: 2555 })
2823 );
2824 assert_eq!(schema.column(1).physical_type(), Type::BYTE_ARRAY);
2825
2826 let mut iter = reader
2827 .get_row_iter(None)
2828 .expect("Failed to create row iterator");
2829
2830 let mut num_rows = 0;
2831 while iter.next().is_some() {
2832 num_rows += 1;
2833 }
2834 assert_eq!(num_rows, reader.metadata().file_metadata().num_rows());
2835 }
2836}