1#[cfg(feature = "encryption")]
19use crate::encryption::decrypt::FileDecryptionProperties;
20use crate::errors::{ParquetError, Result};
21use crate::file::FOOTER_SIZE;
22use crate::file::metadata::parser::decode_metadata;
23use crate::file::metadata::thrift::parquet_schema_from_bytes;
24use crate::file::metadata::{
25 FooterTail, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataPushDecoder,
26};
27use crate::file::reader::ChunkReader;
28use crate::schema::types::SchemaDescriptor;
29use bytes::Bytes;
30use std::sync::Arc;
31use std::{io::Read, ops::Range};
32
33use crate::DecodeResult;
34#[cfg(all(feature = "async", feature = "arrow"))]
35use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
36
37#[derive(Default, Debug)]
71pub struct ParquetMetaDataReader {
72 metadata: Option<ParquetMetaData>,
73 column_index: PageIndexPolicy,
74 offset_index: PageIndexPolicy,
75 prefetch_hint: Option<usize>,
76 metadata_options: Option<Arc<ParquetMetaDataOptions>>,
77 metadata_size: Option<usize>,
80 #[cfg(feature = "encryption")]
81 file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
86pub enum PageIndexPolicy {
87 #[default]
89 Skip,
90 Optional,
92 Required,
94}
95
96impl From<bool> for PageIndexPolicy {
97 fn from(value: bool) -> Self {
98 match value {
99 true => Self::Required,
100 false => Self::Skip,
101 }
102 }
103}
104
105impl ParquetMetaDataReader {
106 pub fn new() -> Self {
108 Default::default()
109 }
110
111 pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
114 Self {
115 metadata: Some(metadata),
116 ..Default::default()
117 }
118 }
119
120 #[deprecated(since = "56.1.0", note = "Use `with_page_index_policy` instead")]
125 pub fn with_page_indexes(self, val: bool) -> Self {
126 self.with_page_index_policy(PageIndexPolicy::from(val))
127 }
128
129 #[deprecated(since = "56.1.0", note = "Use `with_column_index_policy` instead")]
133 pub fn with_column_indexes(self, val: bool) -> Self {
134 let policy = PageIndexPolicy::from(val);
135 self.with_column_index_policy(policy)
136 }
137
138 #[deprecated(since = "56.1.0", note = "Use `with_offset_index_policy` instead")]
142 pub fn with_offset_indexes(self, val: bool) -> Self {
143 let policy = PageIndexPolicy::from(val);
144 self.with_offset_index_policy(policy)
145 }
146
147 pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
149 self.with_column_index_policy(policy)
150 .with_offset_index_policy(policy)
151 }
152
153 pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
155 self.column_index = policy;
156 self
157 }
158
159 pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
161 self.offset_index = policy;
162 self
163 }
164
165 pub fn with_metadata_options(mut self, options: Option<ParquetMetaDataOptions>) -> Self {
167 self.metadata_options = options.map(Arc::new);
168 self
169 }
170
171 pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
183 self.prefetch_hint = prefetch;
184 self
185 }
186
187 #[cfg(feature = "encryption")]
191 pub fn with_decryption_properties(
192 mut self,
193 properties: Option<std::sync::Arc<FileDecryptionProperties>>,
194 ) -> Self {
195 self.file_decryption_properties = properties;
196 self
197 }
198
199 pub fn has_metadata(&self) -> bool {
201 self.metadata.is_some()
202 }
203
204 pub fn finish(&mut self) -> Result<ParquetMetaData> {
206 self.metadata
207 .take()
208 .ok_or_else(|| general_err!("could not parse parquet metadata"))
209 }
210
211 pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
230 self.try_parse(reader)?;
231 self.finish()
232 }
233
234 pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
240 self.try_parse_sized(reader, reader.len())
241 }
242
243 pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: u64) -> Result<()> {
316 self.metadata = match self.parse_metadata(reader) {
317 Ok(metadata) => Some(metadata),
318 Err(ParquetError::NeedMoreData(needed)) => {
319 if file_size == reader.len() || needed as u64 > file_size {
322 return Err(eof_err!(
323 "Parquet file too small. Size is {} but need {}",
324 file_size,
325 needed
326 ));
327 } else {
328 return Err(ParquetError::NeedMoreData(needed));
330 }
331 }
332 Err(e) => return Err(e),
333 };
334
335 if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
337 {
338 return Ok(());
339 }
340
341 self.read_page_indexes_sized(reader, file_size)
342 }
343
344 pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
347 self.read_page_indexes_sized(reader, reader.len())
348 }
349
350 pub fn read_page_indexes_sized<R: ChunkReader>(
356 &mut self,
357 reader: &R,
358 file_size: u64,
359 ) -> Result<()> {
360 let Some(metadata) = self.metadata.take() else {
361 return Err(general_err!(
362 "Tried to read page indexes without ParquetMetaData metadata"
363 ));
364 };
365
366 let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
367 .with_offset_index_policy(self.offset_index)
368 .with_column_index_policy(self.column_index)
369 .with_metadata_options(self.metadata_options.clone());
370 let mut push_decoder = self.prepare_push_decoder(push_decoder);
371
372 let range = match needs_index_data(&mut push_decoder)? {
374 NeedsIndexData::No(metadata) => {
375 self.metadata = Some(metadata);
376 return Ok(());
377 }
378 NeedsIndexData::Yes(range) => range,
379 };
380
381 let file_range = file_size.saturating_sub(reader.len())..file_size;
384 if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
385 if range.end > file_size {
387 return Err(eof_err!(
388 "Parquet file too small. Range {range:?} is beyond file bounds {file_size}",
389 ));
390 } else {
391 return Err(ParquetError::NeedMoreData(
393 (file_size - range.start).try_into()?,
394 ));
395 }
396 }
397
398 if let Some(metadata_size) = self.metadata_size {
401 let metadata_range = file_size.saturating_sub(metadata_size as u64)..file_size;
402 if range.end > metadata_range.start {
403 return Err(eof_err!(
404 "Parquet file too small. Page index range {range:?} overlaps with file metadata {metadata_range:?}",
405 ));
406 }
407 }
408
409 let bytes_needed = usize::try_from(range.end - range.start)?;
411 let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?;
412
413 push_decoder.push_range(range, bytes)?;
414 let metadata = parse_index_data(&mut push_decoder)?;
415 self.metadata = Some(metadata);
416
417 Ok(())
418 }
419
420 #[cfg(all(feature = "async", feature = "arrow"))]
427 pub async fn load_and_finish<F: MetadataFetch>(
428 mut self,
429 fetch: F,
430 file_size: u64,
431 ) -> Result<ParquetMetaData> {
432 self.try_load(fetch, file_size).await?;
433 self.finish()
434 }
435
436 #[cfg(all(feature = "async", feature = "arrow"))]
443 pub async fn load_via_suffix_and_finish<F: MetadataSuffixFetch>(
444 mut self,
445 fetch: F,
446 ) -> Result<ParquetMetaData> {
447 self.try_load_via_suffix(fetch).await?;
448 self.finish()
449 }
450 #[cfg(all(feature = "async", feature = "arrow"))]
456 pub async fn try_load<F: MetadataFetch>(&mut self, mut fetch: F, file_size: u64) -> Result<()> {
457 let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
458
459 self.metadata = Some(metadata);
460
461 if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
463 {
464 return Ok(());
465 }
466
467 self.load_page_index_with_remainder(fetch, remainder).await
468 }
469
470 #[cfg(all(feature = "async", feature = "arrow"))]
476 pub async fn try_load_via_suffix<F: MetadataSuffixFetch>(
477 &mut self,
478 mut fetch: F,
479 ) -> Result<()> {
480 let (metadata, remainder) = self.load_metadata_via_suffix(&mut fetch).await?;
481
482 self.metadata = Some(metadata);
483
484 if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
486 {
487 return Ok(());
488 }
489
490 self.load_page_index_with_remainder(fetch, remainder).await
491 }
492
493 #[cfg(all(feature = "async", feature = "arrow"))]
496 pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
497 self.load_page_index_with_remainder(fetch, None).await
498 }
499
500 #[cfg(all(feature = "async", feature = "arrow"))]
501 async fn load_page_index_with_remainder<F: MetadataFetch>(
502 &mut self,
503 mut fetch: F,
504 remainder: Option<(usize, Bytes)>,
505 ) -> Result<()> {
506 let Some(metadata) = self.metadata.take() else {
507 return Err(general_err!("Footer metadata is not present"));
508 };
509
510 let file_size = u64::MAX;
513 let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
514 .with_offset_index_policy(self.offset_index)
515 .with_column_index_policy(self.column_index)
516 .with_metadata_options(self.metadata_options.clone());
517 let mut push_decoder = self.prepare_push_decoder(push_decoder);
518
519 let range = match needs_index_data(&mut push_decoder)? {
521 NeedsIndexData::No(metadata) => {
522 self.metadata = Some(metadata);
523 return Ok(());
524 }
525 NeedsIndexData::Yes(range) => range,
526 };
527
528 let bytes = match &remainder {
529 Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => {
530 let remainder_start = *remainder_start as u64;
531 let offset = usize::try_from(range.start - remainder_start)?;
532 let end = usize::try_from(range.end - remainder_start)?;
533 if end > remainder.len() {
534 return Err(general_err!(
535 "Corrupted parquet file: index data range ({:?}) exceeds remainder length ({})",
536 range,
537 remainder.len()
538 ));
539 }
540 remainder.slice(offset..end)
541 }
542 _ => fetch.fetch(range.start..range.end).await?,
544 };
545
546 if bytes.len() as u64 != range.end - range.start {
548 return Err(general_err!(
549 "Corrupted parquet file: index data length mismatch, expected {}, got {}",
550 range.end - range.start,
551 bytes.len()
552 ));
553 }
554 push_decoder.push_range(range.clone(), bytes)?;
555 let metadata = parse_index_data(&mut push_decoder)?;
556 self.metadata = Some(metadata);
557 Ok(())
558 }
559
560 fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
563 let file_size = chunk_reader.len();
565 if file_size < (FOOTER_SIZE as u64) {
566 return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
567 }
568
569 let mut footer = [0_u8; FOOTER_SIZE];
570 chunk_reader
571 .get_read(file_size - FOOTER_SIZE as u64)?
572 .read_exact(&mut footer)?;
573
574 let footer = FooterTail::try_new(&footer)?;
575 let metadata_len = footer.metadata_length();
576 let footer_metadata_len = FOOTER_SIZE + metadata_len;
577 self.metadata_size = Some(footer_metadata_len);
578
579 if footer_metadata_len as u64 > file_size {
580 return Err(ParquetError::NeedMoreData(footer_metadata_len));
581 }
582
583 let start = file_size - footer_metadata_len as u64;
584 let bytes = chunk_reader.get_bytes(start, metadata_len)?;
585 self.decode_footer_metadata(bytes, file_size, footer)
586 }
587
588 pub fn metadata_size(&self) -> Option<usize> {
591 self.metadata_size
592 }
593
594 #[cfg(all(feature = "async", feature = "arrow"))]
598 fn get_prefetch_size(&self) -> usize {
599 if let Some(prefetch) = self.prefetch_hint {
600 if prefetch > FOOTER_SIZE {
601 return prefetch;
602 }
603 }
604 FOOTER_SIZE
605 }
606
607 #[cfg(all(feature = "async", feature = "arrow"))]
608 async fn load_metadata<F: MetadataFetch>(
609 &self,
610 fetch: &mut F,
611 file_size: u64,
612 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
613 let prefetch = self.get_prefetch_size() as u64;
614
615 if file_size < FOOTER_SIZE as u64 {
616 return Err(eof_err!("file size of {} is less than footer", file_size));
617 }
618
619 let footer_start = file_size.saturating_sub(prefetch);
623
624 let suffix = fetch.fetch(footer_start..file_size).await?;
625 let suffix_len = suffix.len();
626 let fetch_len = (file_size - footer_start)
627 .try_into()
628 .expect("footer size should never be larger than u32");
629 if suffix_len < fetch_len {
630 return Err(eof_err!(
631 "metadata requires {} bytes, but could only read {}",
632 fetch_len,
633 suffix_len
634 ));
635 }
636
637 let mut footer = [0; FOOTER_SIZE];
638 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
639
640 let footer = FooterTail::try_new(&footer)?;
641 let length = footer.metadata_length();
642
643 if file_size < (length + FOOTER_SIZE) as u64 {
644 return Err(eof_err!(
645 "file size of {} is less than footer + metadata {}",
646 file_size,
647 length + FOOTER_SIZE
648 ));
649 }
650
651 if length > suffix_len - FOOTER_SIZE {
653 let metadata_start = file_size - (length + FOOTER_SIZE) as u64;
654 let meta = fetch
655 .fetch(metadata_start..(file_size - FOOTER_SIZE as u64))
656 .await?;
657 Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
658 } else {
659 let metadata_start = (file_size - (length + FOOTER_SIZE) as u64 - footer_start)
660 .try_into()
661 .expect("metadata length should never be larger than u32");
662 let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
663 Ok((
664 self.decode_footer_metadata(slice, file_size, footer)?,
665 Some((footer_start as usize, suffix.slice(..metadata_start))),
666 ))
667 }
668 }
669
670 #[cfg(all(feature = "async", feature = "arrow"))]
671 async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
672 &self,
673 fetch: &mut F,
674 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
675 let prefetch = self.get_prefetch_size();
676
677 let suffix = fetch.fetch_suffix(prefetch as _).await?;
678 let suffix_len = suffix.len();
679
680 if suffix_len < FOOTER_SIZE {
681 return Err(eof_err!(
682 "footer metadata requires {} bytes, but could only read {}",
683 FOOTER_SIZE,
684 suffix_len
685 ));
686 }
687
688 let mut footer = [0; FOOTER_SIZE];
689 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
690
691 let footer = FooterTail::try_new(&footer)?;
692 let length = footer.metadata_length();
693 let file_size = (length + FOOTER_SIZE) as u64;
696
697 let metadata_offset = length + FOOTER_SIZE;
699 if length > suffix_len - FOOTER_SIZE {
700 let meta = fetch.fetch_suffix(metadata_offset).await?;
701
702 if meta.len() < metadata_offset {
703 return Err(eof_err!(
704 "metadata requires {} bytes, but could only read {}",
705 metadata_offset,
706 meta.len()
707 ));
708 }
709
710 let meta = meta.slice(0..length);
712 Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
713 } else {
714 let metadata_start = suffix_len - metadata_offset;
715 let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
716 Ok((
717 self.decode_footer_metadata(slice, file_size, footer)?,
718 Some((0, suffix.slice(..metadata_start))),
719 ))
720 }
721 }
722
723 #[deprecated(since = "57.0.0", note = "Use FooterTail::try_from instead")]
725 pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result<FooterTail> {
726 FooterTail::try_new(slice)
727 }
728
729 #[deprecated(since = "54.3.0", note = "Use decode_footer_tail instead")]
731 pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
732 FooterTail::try_new(slice).map(|f| f.metadata_length())
733 }
734
735 pub(crate) fn decode_footer_metadata(
749 &self,
750 buf: Bytes,
751 file_size: u64,
752 footer_tail: FooterTail,
753 ) -> Result<ParquetMetaData> {
754 let ending_offset = file_size.checked_sub(FOOTER_SIZE as u64).ok_or_else(|| {
759 general_err!(
760 "file size {file_size} is smaller than footer size {}",
761 FOOTER_SIZE
762 )
763 })?;
764
765 let starting_offset = ending_offset.checked_sub(buf.len() as u64).ok_or_else(|| {
766 general_err!(
767 "file size {file_size} is smaller than buffer size {} + footer size {}",
768 buf.len(),
769 FOOTER_SIZE
770 )
771 })?;
772
773 let range = starting_offset..ending_offset;
774
775 let push_decoder =
776 ParquetMetaDataPushDecoder::try_new_with_footer_tail(file_size, footer_tail)?
777 .with_page_index_policy(PageIndexPolicy::Skip)
779 .with_metadata_options(self.metadata_options.clone());
780
781 let mut push_decoder = self.prepare_push_decoder(push_decoder);
782 push_decoder.push_range(range, buf)?;
783 match push_decoder.try_decode()? {
784 DecodeResult::Data(metadata) => Ok(metadata),
785 DecodeResult::Finished => Err(general_err!(
786 "could not parse parquet metadata -- previously finished"
787 )),
788 DecodeResult::NeedsData(ranges) => Err(general_err!(
789 "could not parse parquet metadata, needs ranges {:?}",
790 ranges
791 )),
792 }
793 }
794
795 #[cfg(feature = "encryption")]
797 fn prepare_push_decoder(
798 &self,
799 push_decoder: ParquetMetaDataPushDecoder,
800 ) -> ParquetMetaDataPushDecoder {
801 push_decoder.with_file_decryption_properties(
802 self.file_decryption_properties
803 .as_ref()
804 .map(std::sync::Arc::clone),
805 )
806 }
807 #[cfg(not(feature = "encryption"))]
808 fn prepare_push_decoder(
809 &self,
810 push_decoder: ParquetMetaDataPushDecoder,
811 ) -> ParquetMetaDataPushDecoder {
812 push_decoder
813 }
814
815 pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
823 decode_metadata(buf, None)
824 }
825
826 pub fn decode_metadata_with_options(
831 buf: &[u8],
832 options: Option<&ParquetMetaDataOptions>,
833 ) -> Result<ParquetMetaData> {
834 decode_metadata(buf, options)
835 }
836
837 pub fn decode_schema(buf: &[u8]) -> Result<Arc<SchemaDescriptor>> {
840 Ok(Arc::new(parquet_schema_from_bytes(buf)?))
841 }
842}
843
844#[allow(clippy::large_enum_variant)]
847enum NeedsIndexData {
848 No(ParquetMetaData),
850 Yes(Range<u64>),
852}
853
854fn needs_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> Result<NeedsIndexData> {
857 match push_decoder.try_decode()? {
858 DecodeResult::NeedsData(ranges) => {
859 let range = ranges
860 .into_iter()
861 .reduce(|a, b| a.start.min(b.start)..a.end.max(b.end))
862 .ok_or_else(|| general_err!("Internal error: no ranges provided"))?;
863 Ok(NeedsIndexData::Yes(range))
864 }
865 DecodeResult::Data(metadata) => Ok(NeedsIndexData::No(metadata)),
866 DecodeResult::Finished => Err(general_err!("Internal error: decoder was finished")),
867 }
868}
869
870fn parse_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> Result<ParquetMetaData> {
873 match push_decoder.try_decode()? {
874 DecodeResult::NeedsData(_) => Err(general_err!(
875 "Internal error: decoder still needs data after reading required range"
876 )),
877 DecodeResult::Data(metadata) => Ok(metadata),
878 DecodeResult::Finished => Err(general_err!("Internal error: decoder was finished")),
879 }
880}
881
882#[cfg(test)]
883mod tests {
884 use super::*;
885 use crate::file::reader::Length;
886 use crate::util::test_common::file_util::get_test_file;
887 use std::ops::Range;
888
889 #[test]
890 fn test_parse_metadata_size_smaller_than_footer() {
891 let test_file = tempfile::tempfile().unwrap();
892 let err = ParquetMetaDataReader::new()
893 .parse_metadata(&test_file)
894 .unwrap_err();
895 assert!(matches!(err, ParquetError::NeedMoreData(FOOTER_SIZE)));
896 }
897
898 #[test]
899 fn test_parse_metadata_corrupt_footer() {
900 let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
901 let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
902 assert_eq!(
903 reader_result.unwrap_err().to_string(),
904 "Parquet error: Invalid Parquet file. Corrupt footer"
905 );
906 }
907
908 #[test]
909 fn test_parse_metadata_invalid_start() {
910 let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
911 let err = ParquetMetaDataReader::new()
912 .parse_metadata(&test_file)
913 .unwrap_err();
914 assert!(matches!(err, ParquetError::NeedMoreData(263)));
915 }
916
917 #[test]
918 #[allow(deprecated)]
919 fn test_try_parse() {
920 let file = get_test_file("alltypes_tiny_pages.parquet");
921 let len = file.len();
922
923 let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
924
925 let bytes_for_range = |range: Range<u64>| {
926 file.get_bytes(range.start, (range.end - range.start).try_into().unwrap())
927 .unwrap()
928 };
929
930 let bytes = bytes_for_range(0..len);
932 reader.try_parse(&bytes).unwrap();
933 let metadata = reader.finish().unwrap();
934 assert!(metadata.column_index.is_some());
935 assert!(metadata.offset_index.is_some());
936
937 let bytes = bytes_for_range(320000..len);
939 reader.try_parse_sized(&bytes, len).unwrap();
940 let metadata = reader.finish().unwrap();
941 assert!(metadata.column_index.is_some());
942 assert!(metadata.offset_index.is_some());
943
944 let bytes = bytes_for_range(323583..len);
946 reader.try_parse_sized(&bytes, len).unwrap();
947 let metadata = reader.finish().unwrap();
948 assert!(metadata.column_index.is_some());
949 assert!(metadata.offset_index.is_some());
950
951 let bytes = bytes_for_range(323584..len);
953 match reader.try_parse_sized(&bytes, len).unwrap_err() {
955 ParquetError::NeedMoreData(needed) => {
957 let bytes = bytes_for_range(len - needed as u64..len);
958 reader.try_parse_sized(&bytes, len).unwrap();
959 let metadata = reader.finish().unwrap();
960 assert!(metadata.column_index.is_some());
961 assert!(metadata.offset_index.is_some());
962 }
963 _ => panic!("unexpected error"),
964 };
965
966 let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
968 let mut bytes = bytes_for_range(452505..len);
969 loop {
970 match reader.try_parse_sized(&bytes, len) {
971 Ok(_) => break,
972 Err(ParquetError::NeedMoreData(needed)) => {
973 bytes = bytes_for_range(len - needed as u64..len);
974 if reader.has_metadata() {
975 reader.read_page_indexes_sized(&bytes, len).unwrap();
976 break;
977 }
978 }
979 _ => panic!("unexpected error"),
980 }
981 }
982 let metadata = reader.finish().unwrap();
983 assert!(metadata.column_index.is_some());
984 assert!(metadata.offset_index.is_some());
985
986 let bytes = bytes_for_range(323584..len);
988 let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
989 assert_eq!(
990 reader_result.to_string(),
991 "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
992 );
993
994 let mut reader = ParquetMetaDataReader::new();
996 let bytes = bytes_for_range(452505..len);
997 match reader.try_parse_sized(&bytes, len).unwrap_err() {
999 ParquetError::NeedMoreData(needed) => {
1001 let bytes = bytes_for_range(len - needed as u64..len);
1002 reader.try_parse_sized(&bytes, len).unwrap();
1003 reader.finish().unwrap();
1004 }
1005 _ => panic!("unexpected error"),
1006 };
1007
1008 let reader_result = reader.try_parse(&bytes).unwrap_err();
1010 assert_eq!(
1011 reader_result.to_string(),
1012 "EOF: Parquet file too small. Size is 1728 but need 1729"
1013 );
1014
1015 let bytes = bytes_for_range(0..1000);
1017 let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
1018 assert_eq!(
1019 reader_result.to_string(),
1020 "Parquet error: Invalid Parquet file. Corrupt footer"
1021 );
1022
1023 let bytes = bytes_for_range(452510..len);
1025 let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
1026 assert_eq!(
1027 reader_result.to_string(),
1028 "EOF: Parquet file too small. Size is 1728 but need 1729"
1029 );
1030 }
1031}
1032
1033#[cfg(all(feature = "async", feature = "arrow", test))]
1034mod async_tests {
1035 use super::*;
1036
1037 use arrow::{array::Int32Array, datatypes::DataType};
1038 use arrow_array::RecordBatch;
1039 use arrow_schema::{Field, Schema};
1040 use bytes::Bytes;
1041 use futures::FutureExt;
1042 use futures::future::BoxFuture;
1043 use std::fs::File;
1044 use std::future::Future;
1045 use std::io::{Read, Seek, SeekFrom};
1046 use std::ops::Range;
1047 use std::sync::Arc;
1048 use std::sync::atomic::{AtomicUsize, Ordering};
1049 use tempfile::NamedTempFile;
1050
1051 use crate::arrow::ArrowWriter;
1052 use crate::file::properties::WriterProperties;
1053 use crate::file::reader::Length;
1054 use crate::util::test_common::file_util::get_test_file;
1055
1056 struct MetadataFetchFn<F>(F);
1057
1058 impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
1059 where
1060 F: FnMut(Range<u64>) -> Fut + Send,
1061 Fut: Future<Output = Result<Bytes>> + Send,
1062 {
1063 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1064 async move { self.0(range).await }.boxed()
1065 }
1066 }
1067
1068 struct MetadataSuffixFetchFn<F1, F2>(F1, F2);
1069
1070 impl<F1, Fut, F2> MetadataFetch for MetadataSuffixFetchFn<F1, F2>
1071 where
1072 F1: FnMut(Range<u64>) -> Fut + Send,
1073 Fut: Future<Output = Result<Bytes>> + Send,
1074 F2: Send,
1075 {
1076 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1077 async move { self.0(range).await }.boxed()
1078 }
1079 }
1080
1081 impl<F1, Fut, F2> MetadataSuffixFetch for MetadataSuffixFetchFn<F1, F2>
1082 where
1083 F1: FnMut(Range<u64>) -> Fut + Send,
1084 F2: FnMut(usize) -> Fut + Send,
1085 Fut: Future<Output = Result<Bytes>> + Send,
1086 {
1087 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
1088 async move { self.1(suffix).await }.boxed()
1089 }
1090 }
1091
1092 fn read_range(file: &mut File, range: Range<u64>) -> Result<Bytes> {
1093 file.seek(SeekFrom::Start(range.start as _))?;
1094 let len = range.end - range.start;
1095 let mut buf = Vec::with_capacity(len.try_into().unwrap());
1096 file.take(len as _).read_to_end(&mut buf)?;
1097 Ok(buf.into())
1098 }
1099
1100 fn read_suffix(file: &mut File, suffix: usize) -> Result<Bytes> {
1101 let file_len = file.len();
1102 file.seek(SeekFrom::End(0 - suffix.min(file_len as _) as i64))?;
1104 let mut buf = Vec::with_capacity(suffix);
1105 file.take(suffix as _).read_to_end(&mut buf)?;
1106 Ok(buf.into())
1107 }
1108
1109 #[tokio::test]
1110 async fn test_simple() {
1111 let mut file = get_test_file("nulls.snappy.parquet");
1112 let len = file.len();
1113
1114 let expected = ParquetMetaDataReader::new()
1115 .parse_and_finish(&file)
1116 .unwrap();
1117 let expected = expected.file_metadata().schema();
1118 let fetch_count = AtomicUsize::new(0);
1119
1120 let mut fetch = |range| {
1121 fetch_count.fetch_add(1, Ordering::SeqCst);
1122 futures::future::ready(read_range(&mut file, range))
1123 };
1124
1125 let input = MetadataFetchFn(&mut fetch);
1126 let actual = ParquetMetaDataReader::new()
1127 .load_and_finish(input, len)
1128 .await
1129 .unwrap();
1130 assert_eq!(actual.file_metadata().schema(), expected);
1131 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1132
1133 fetch_count.store(0, Ordering::SeqCst);
1135 let input = MetadataFetchFn(&mut fetch);
1136 let actual = ParquetMetaDataReader::new()
1137 .with_prefetch_hint(Some(7))
1138 .load_and_finish(input, len)
1139 .await
1140 .unwrap();
1141 assert_eq!(actual.file_metadata().schema(), expected);
1142 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1143
1144 fetch_count.store(0, Ordering::SeqCst);
1146 let input = MetadataFetchFn(&mut fetch);
1147 let actual = ParquetMetaDataReader::new()
1148 .with_prefetch_hint(Some(10))
1149 .load_and_finish(input, len)
1150 .await
1151 .unwrap();
1152 assert_eq!(actual.file_metadata().schema(), expected);
1153 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1154
1155 fetch_count.store(0, Ordering::SeqCst);
1157 let input = MetadataFetchFn(&mut fetch);
1158 let actual = ParquetMetaDataReader::new()
1159 .with_prefetch_hint(Some(500))
1160 .load_and_finish(input, len)
1161 .await
1162 .unwrap();
1163 assert_eq!(actual.file_metadata().schema(), expected);
1164 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1165
1166 fetch_count.store(0, Ordering::SeqCst);
1168 let input = MetadataFetchFn(&mut fetch);
1169 let actual = ParquetMetaDataReader::new()
1170 .with_prefetch_hint(Some(428))
1171 .load_and_finish(input, len)
1172 .await
1173 .unwrap();
1174 assert_eq!(actual.file_metadata().schema(), expected);
1175 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1176
1177 let input = MetadataFetchFn(&mut fetch);
1178 let err = ParquetMetaDataReader::new()
1179 .load_and_finish(input, 4)
1180 .await
1181 .unwrap_err()
1182 .to_string();
1183 assert_eq!(err, "EOF: file size of 4 is less than footer");
1184
1185 let input = MetadataFetchFn(&mut fetch);
1186 let err = ParquetMetaDataReader::new()
1187 .load_and_finish(input, 20)
1188 .await
1189 .unwrap_err()
1190 .to_string();
1191 assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
1192 }
1193
1194 #[tokio::test]
1195 async fn test_suffix() {
1196 let mut file = get_test_file("nulls.snappy.parquet");
1197 let mut file2 = file.try_clone().unwrap();
1198
1199 let expected = ParquetMetaDataReader::new()
1200 .parse_and_finish(&file)
1201 .unwrap();
1202 let expected = expected.file_metadata().schema();
1203 let fetch_count = AtomicUsize::new(0);
1204 let suffix_fetch_count = AtomicUsize::new(0);
1205
1206 let mut fetch = |range| {
1207 fetch_count.fetch_add(1, Ordering::SeqCst);
1208 futures::future::ready(read_range(&mut file, range))
1209 };
1210 let mut suffix_fetch = |suffix| {
1211 suffix_fetch_count.fetch_add(1, Ordering::SeqCst);
1212 futures::future::ready(read_suffix(&mut file2, suffix))
1213 };
1214
1215 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1216 let actual = ParquetMetaDataReader::new()
1217 .load_via_suffix_and_finish(input)
1218 .await
1219 .unwrap();
1220 assert_eq!(actual.file_metadata().schema(), expected);
1221 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1222 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1223
1224 fetch_count.store(0, Ordering::SeqCst);
1226 suffix_fetch_count.store(0, Ordering::SeqCst);
1227 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1228 let actual = ParquetMetaDataReader::new()
1229 .with_prefetch_hint(Some(7))
1230 .load_via_suffix_and_finish(input)
1231 .await
1232 .unwrap();
1233 assert_eq!(actual.file_metadata().schema(), expected);
1234 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1235 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1236
1237 fetch_count.store(0, Ordering::SeqCst);
1239 suffix_fetch_count.store(0, Ordering::SeqCst);
1240 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1241 let actual = ParquetMetaDataReader::new()
1242 .with_prefetch_hint(Some(10))
1243 .load_via_suffix_and_finish(input)
1244 .await
1245 .unwrap();
1246 assert_eq!(actual.file_metadata().schema(), expected);
1247 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1248 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1249
1250 dbg!("test");
1251 fetch_count.store(0, Ordering::SeqCst);
1253 suffix_fetch_count.store(0, Ordering::SeqCst);
1254 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1255 let actual = ParquetMetaDataReader::new()
1256 .with_prefetch_hint(Some(500))
1257 .load_via_suffix_and_finish(input)
1258 .await
1259 .unwrap();
1260 assert_eq!(actual.file_metadata().schema(), expected);
1261 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1262 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1263
1264 fetch_count.store(0, Ordering::SeqCst);
1266 suffix_fetch_count.store(0, Ordering::SeqCst);
1267 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1268 let actual = ParquetMetaDataReader::new()
1269 .with_prefetch_hint(Some(428))
1270 .load_via_suffix_and_finish(input)
1271 .await
1272 .unwrap();
1273 assert_eq!(actual.file_metadata().schema(), expected);
1274 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1275 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1276 }
1277
1278 #[cfg(feature = "encryption")]
1279 #[tokio::test]
1280 async fn test_suffix_with_encryption() {
1281 let mut file = get_test_file("uniform_encryption.parquet.encrypted");
1282 let mut file2 = file.try_clone().unwrap();
1283
1284 let mut fetch = |range| futures::future::ready(read_range(&mut file, range));
1285 let mut suffix_fetch = |suffix| futures::future::ready(read_suffix(&mut file2, suffix));
1286
1287 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1288
1289 let key_code: &[u8] = "0123456789012345".as_bytes();
1290 let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec())
1291 .build()
1292 .unwrap();
1293
1294 let expected = ParquetMetaDataReader::new()
1296 .with_decryption_properties(Some(decryption_properties))
1297 .load_via_suffix_and_finish(input)
1298 .await
1299 .unwrap();
1300 assert_eq!(expected.num_row_groups(), 1);
1301 }
1302
1303 #[tokio::test]
1304 #[allow(deprecated)]
1305 async fn test_page_index() {
1306 let mut file = get_test_file("alltypes_tiny_pages.parquet");
1307 let len = file.len();
1308 let fetch_count = AtomicUsize::new(0);
1309 let mut fetch = |range| {
1310 fetch_count.fetch_add(1, Ordering::SeqCst);
1311 futures::future::ready(read_range(&mut file, range))
1312 };
1313
1314 let f = MetadataFetchFn(&mut fetch);
1315 let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
1316 loader.try_load(f, len).await.unwrap();
1317 assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
1318 let metadata = loader.finish().unwrap();
1319 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1320
1321 fetch_count.store(0, Ordering::SeqCst);
1323 let f = MetadataFetchFn(&mut fetch);
1324 let mut loader = ParquetMetaDataReader::new()
1325 .with_page_indexes(true)
1326 .with_prefetch_hint(Some(1729));
1327 loader.try_load(f, len).await.unwrap();
1328 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1329 let metadata = loader.finish().unwrap();
1330 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1331
1332 fetch_count.store(0, Ordering::SeqCst);
1334 let f = MetadataFetchFn(&mut fetch);
1335 let mut loader = ParquetMetaDataReader::new()
1336 .with_page_indexes(true)
1337 .with_prefetch_hint(Some(130649));
1338 loader.try_load(f, len).await.unwrap();
1339 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1340 let metadata = loader.finish().unwrap();
1341 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1342
1343 fetch_count.store(0, Ordering::SeqCst);
1345 let f = MetadataFetchFn(&mut fetch);
1346 let metadata = ParquetMetaDataReader::new()
1347 .with_page_indexes(true)
1348 .with_prefetch_hint(Some(130650))
1349 .load_and_finish(f, len)
1350 .await
1351 .unwrap();
1352 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1353 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1354
1355 fetch_count.store(0, Ordering::SeqCst);
1357 let f = MetadataFetchFn(&mut fetch);
1358 let metadata = ParquetMetaDataReader::new()
1359 .with_page_indexes(true)
1360 .with_prefetch_hint(Some((len - 1000) as usize)) .load_and_finish(f, len)
1362 .await
1363 .unwrap();
1364 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1365 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1366
1367 fetch_count.store(0, Ordering::SeqCst);
1369 let f = MetadataFetchFn(&mut fetch);
1370 let metadata = ParquetMetaDataReader::new()
1371 .with_page_indexes(true)
1372 .with_prefetch_hint(Some(len as usize)) .load_and_finish(f, len)
1374 .await
1375 .unwrap();
1376 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1377 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1378
1379 fetch_count.store(0, Ordering::SeqCst);
1381 let f = MetadataFetchFn(&mut fetch);
1382 let metadata = ParquetMetaDataReader::new()
1383 .with_page_indexes(true)
1384 .with_prefetch_hint(Some((len + 1000) as usize)) .load_and_finish(f, len)
1386 .await
1387 .unwrap();
1388 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1389 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1390 }
1391
1392 fn write_parquet_file(offset_index_disabled: bool) -> Result<NamedTempFile> {
1393 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1394 let batch = RecordBatch::try_new(
1395 schema.clone(),
1396 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
1397 )?;
1398
1399 let file = NamedTempFile::new().unwrap();
1400
1401 let props = WriterProperties::builder()
1403 .set_offset_index_disabled(offset_index_disabled)
1404 .build();
1405
1406 let mut writer = ArrowWriter::try_new(file.reopen()?, schema, Some(props))?;
1407 writer.write(&batch)?;
1408 writer.close()?;
1409
1410 Ok(file)
1411 }
1412
1413 fn read_and_check(file: &File, policy: PageIndexPolicy) -> Result<ParquetMetaData> {
1414 let mut reader = ParquetMetaDataReader::new().with_page_index_policy(policy);
1415 reader.try_parse(file)?;
1416 reader.finish()
1417 }
1418
1419 #[test]
1420 fn test_page_index_policy() {
1421 let f = write_parquet_file(false).unwrap();
1423 read_and_check(f.as_file(), PageIndexPolicy::Required).unwrap();
1424 read_and_check(f.as_file(), PageIndexPolicy::Optional).unwrap();
1425 read_and_check(f.as_file(), PageIndexPolicy::Skip).unwrap();
1426
1427 let f = write_parquet_file(true).unwrap();
1429 let res = read_and_check(f.as_file(), PageIndexPolicy::Required);
1430 assert!(matches!(
1431 res,
1432 Err(ParquetError::General(e)) if e == "missing offset index"
1433 ));
1434 read_and_check(f.as_file(), PageIndexPolicy::Optional).unwrap();
1435 read_and_check(f.as_file(), PageIndexPolicy::Skip).unwrap();
1436 }
1437}