1#[cfg(feature = "encryption")]
19use crate::encryption::decrypt::FileDecryptionProperties;
20use crate::errors::{ParquetError, Result};
21use crate::file::FOOTER_SIZE;
22use crate::file::metadata::parser::decode_metadata;
23use crate::file::metadata::thrift::parquet_schema_from_bytes;
24use crate::file::metadata::{
25 FooterTail, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataPushDecoder,
26};
27use crate::file::reader::ChunkReader;
28use crate::schema::types::SchemaDescriptor;
29use bytes::Bytes;
30use std::sync::Arc;
31use std::{io::Read, ops::Range};
32
33use crate::DecodeResult;
34#[cfg(all(feature = "async", feature = "arrow"))]
35use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
36
37#[derive(Default, Debug)]
71pub struct ParquetMetaDataReader {
72 metadata: Option<ParquetMetaData>,
73 column_index: PageIndexPolicy,
74 offset_index: PageIndexPolicy,
75 prefetch_hint: Option<usize>,
76 metadata_options: Option<Arc<ParquetMetaDataOptions>>,
77 metadata_size: Option<usize>,
80 #[cfg(feature = "encryption")]
81 file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
86pub enum PageIndexPolicy {
87 #[default]
89 Skip,
90 Optional,
92 Required,
94}
95
96impl From<bool> for PageIndexPolicy {
97 fn from(value: bool) -> Self {
98 match value {
99 true => Self::Required,
100 false => Self::Skip,
101 }
102 }
103}
104
105impl ParquetMetaDataReader {
106 pub fn new() -> Self {
108 Default::default()
109 }
110
111 pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
114 Self {
115 metadata: Some(metadata),
116 ..Default::default()
117 }
118 }
119
120 #[deprecated(since = "56.1.0", note = "Use `with_page_index_policy` instead")]
125 pub fn with_page_indexes(self, val: bool) -> Self {
126 self.with_page_index_policy(PageIndexPolicy::from(val))
127 }
128
129 #[deprecated(since = "56.1.0", note = "Use `with_column_index_policy` instead")]
133 pub fn with_column_indexes(self, val: bool) -> Self {
134 let policy = PageIndexPolicy::from(val);
135 self.with_column_index_policy(policy)
136 }
137
138 #[deprecated(since = "56.1.0", note = "Use `with_offset_index_policy` instead")]
142 pub fn with_offset_indexes(self, val: bool) -> Self {
143 let policy = PageIndexPolicy::from(val);
144 self.with_offset_index_policy(policy)
145 }
146
147 pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
149 self.with_column_index_policy(policy)
150 .with_offset_index_policy(policy)
151 }
152
153 pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
155 self.column_index = policy;
156 self
157 }
158
159 pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
161 self.offset_index = policy;
162 self
163 }
164
165 pub fn with_metadata_options(mut self, options: Option<ParquetMetaDataOptions>) -> Self {
167 self.metadata_options = options.map(Arc::new);
168 self
169 }
170
171 pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
183 self.prefetch_hint = prefetch;
184 self
185 }
186
187 #[cfg(feature = "encryption")]
191 pub fn with_decryption_properties(
192 mut self,
193 properties: Option<std::sync::Arc<FileDecryptionProperties>>,
194 ) -> Self {
195 self.file_decryption_properties = properties;
196 self
197 }
198
199 pub fn has_metadata(&self) -> bool {
201 self.metadata.is_some()
202 }
203
204 pub fn finish(&mut self) -> Result<ParquetMetaData> {
206 self.metadata
207 .take()
208 .ok_or_else(|| general_err!("could not parse parquet metadata"))
209 }
210
211 pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
230 self.try_parse(reader)?;
231 self.finish()
232 }
233
234 pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
240 self.try_parse_sized(reader, reader.len())
241 }
242
243 pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: u64) -> Result<()> {
316 self.metadata = match self.parse_metadata(reader) {
317 Ok(metadata) => Some(metadata),
318 Err(ParquetError::NeedMoreData(needed)) => {
319 if file_size == reader.len() || needed as u64 > file_size {
322 return Err(eof_err!(
323 "Parquet file too small. Size is {} but need {}",
324 file_size,
325 needed
326 ));
327 } else {
328 return Err(ParquetError::NeedMoreData(needed));
330 }
331 }
332 Err(e) => return Err(e),
333 };
334
335 if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
337 {
338 return Ok(());
339 }
340
341 self.read_page_indexes_sized(reader, file_size)
342 }
343
344 pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
347 self.read_page_indexes_sized(reader, reader.len())
348 }
349
350 pub fn read_page_indexes_sized<R: ChunkReader>(
356 &mut self,
357 reader: &R,
358 file_size: u64,
359 ) -> Result<()> {
360 let Some(metadata) = self.metadata.take() else {
361 return Err(general_err!(
362 "Tried to read page indexes without ParquetMetaData metadata"
363 ));
364 };
365
366 let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
367 .with_offset_index_policy(self.offset_index)
368 .with_column_index_policy(self.column_index)
369 .with_metadata_options(self.metadata_options.clone());
370 let mut push_decoder = self.prepare_push_decoder(push_decoder);
371
372 let range = match needs_index_data(&mut push_decoder)? {
374 NeedsIndexData::No(metadata) => {
375 self.metadata = Some(metadata);
376 return Ok(());
377 }
378 NeedsIndexData::Yes(range) => range,
379 };
380
381 let file_range = file_size.saturating_sub(reader.len())..file_size;
384 if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
385 if range.end > file_size {
387 return Err(eof_err!(
388 "Parquet file too small. Range {range:?} is beyond file bounds {file_size}",
389 ));
390 } else {
391 return Err(ParquetError::NeedMoreData(
393 (file_size - range.start).try_into()?,
394 ));
395 }
396 }
397
398 if let Some(metadata_size) = self.metadata_size {
401 let metadata_range = file_size.saturating_sub(metadata_size as u64)..file_size;
402 if range.end > metadata_range.start {
403 return Err(eof_err!(
404 "Parquet file too small. Page index range {range:?} overlaps with file metadata {metadata_range:?}",
405 ));
406 }
407 }
408
409 let bytes_needed = usize::try_from(range.end - range.start)?;
411 let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?;
412
413 push_decoder.push_range(range, bytes)?;
414 let metadata = parse_index_data(&mut push_decoder)?;
415 self.metadata = Some(metadata);
416
417 Ok(())
418 }
419
420 #[cfg(all(feature = "async", feature = "arrow"))]
427 pub async fn load_and_finish<F: MetadataFetch>(
428 mut self,
429 fetch: F,
430 file_size: u64,
431 ) -> Result<ParquetMetaData> {
432 self.try_load(fetch, file_size).await?;
433 self.finish()
434 }
435
436 #[cfg(all(feature = "async", feature = "arrow"))]
443 pub async fn load_via_suffix_and_finish<F: MetadataSuffixFetch>(
444 mut self,
445 fetch: F,
446 ) -> Result<ParquetMetaData> {
447 self.try_load_via_suffix(fetch).await?;
448 self.finish()
449 }
450 #[cfg(all(feature = "async", feature = "arrow"))]
456 pub async fn try_load<F: MetadataFetch>(&mut self, mut fetch: F, file_size: u64) -> Result<()> {
457 let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
458
459 self.metadata = Some(metadata);
460
461 if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
463 {
464 return Ok(());
465 }
466
467 self.load_page_index_with_remainder(fetch, remainder).await
468 }
469
470 #[cfg(all(feature = "async", feature = "arrow"))]
476 pub async fn try_load_via_suffix<F: MetadataSuffixFetch>(
477 &mut self,
478 mut fetch: F,
479 ) -> Result<()> {
480 let (metadata, remainder) = self.load_metadata_via_suffix(&mut fetch).await?;
481
482 self.metadata = Some(metadata);
483
484 if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
486 {
487 return Ok(());
488 }
489
490 self.load_page_index_with_remainder(fetch, remainder).await
491 }
492
493 #[cfg(all(feature = "async", feature = "arrow"))]
496 pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
497 self.load_page_index_with_remainder(fetch, None).await
498 }
499
500 #[cfg(all(feature = "async", feature = "arrow"))]
501 async fn load_page_index_with_remainder<F: MetadataFetch>(
502 &mut self,
503 mut fetch: F,
504 remainder: Option<(usize, Bytes)>,
505 ) -> Result<()> {
506 let Some(metadata) = self.metadata.take() else {
507 return Err(general_err!("Footer metadata is not present"));
508 };
509
510 let file_size = u64::MAX;
513 let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
514 .with_offset_index_policy(self.offset_index)
515 .with_column_index_policy(self.column_index)
516 .with_metadata_options(self.metadata_options.clone());
517 let mut push_decoder = self.prepare_push_decoder(push_decoder);
518
519 let range = match needs_index_data(&mut push_decoder)? {
521 NeedsIndexData::No(metadata) => {
522 self.metadata = Some(metadata);
523 return Ok(());
524 }
525 NeedsIndexData::Yes(range) => range,
526 };
527
528 let bytes = match &remainder {
529 Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => {
530 let remainder_start = *remainder_start as u64;
531 let offset = usize::try_from(range.start - remainder_start)?;
532 let end = usize::try_from(range.end - remainder_start)?;
533 assert!(end <= remainder.len());
534 remainder.slice(offset..end)
535 }
536 _ => fetch.fetch(range.start..range.end).await?,
538 };
539
540 assert_eq!(bytes.len() as u64, range.end - range.start);
542 push_decoder.push_range(range.clone(), bytes)?;
543 let metadata = parse_index_data(&mut push_decoder)?;
544 self.metadata = Some(metadata);
545 Ok(())
546 }
547
548 fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
551 let file_size = chunk_reader.len();
553 if file_size < (FOOTER_SIZE as u64) {
554 return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
555 }
556
557 let mut footer = [0_u8; FOOTER_SIZE];
558 chunk_reader
559 .get_read(file_size - FOOTER_SIZE as u64)?
560 .read_exact(&mut footer)?;
561
562 let footer = FooterTail::try_new(&footer)?;
563 let metadata_len = footer.metadata_length();
564 let footer_metadata_len = FOOTER_SIZE + metadata_len;
565 self.metadata_size = Some(footer_metadata_len);
566
567 if footer_metadata_len as u64 > file_size {
568 return Err(ParquetError::NeedMoreData(footer_metadata_len));
569 }
570
571 let start = file_size - footer_metadata_len as u64;
572 let bytes = chunk_reader.get_bytes(start, metadata_len)?;
573 self.decode_footer_metadata(bytes, file_size, footer)
574 }
575
576 pub fn metadata_size(&self) -> Option<usize> {
579 self.metadata_size
580 }
581
582 #[cfg(all(feature = "async", feature = "arrow"))]
586 fn get_prefetch_size(&self) -> usize {
587 if let Some(prefetch) = self.prefetch_hint {
588 if prefetch > FOOTER_SIZE {
589 return prefetch;
590 }
591 }
592 FOOTER_SIZE
593 }
594
595 #[cfg(all(feature = "async", feature = "arrow"))]
596 async fn load_metadata<F: MetadataFetch>(
597 &self,
598 fetch: &mut F,
599 file_size: u64,
600 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
601 let prefetch = self.get_prefetch_size() as u64;
602
603 if file_size < FOOTER_SIZE as u64 {
604 return Err(eof_err!("file size of {} is less than footer", file_size));
605 }
606
607 let footer_start = file_size.saturating_sub(prefetch);
611
612 let suffix = fetch.fetch(footer_start..file_size).await?;
613 let suffix_len = suffix.len();
614 let fetch_len = (file_size - footer_start)
615 .try_into()
616 .expect("footer size should never be larger than u32");
617 if suffix_len < fetch_len {
618 return Err(eof_err!(
619 "metadata requires {} bytes, but could only read {}",
620 fetch_len,
621 suffix_len
622 ));
623 }
624
625 let mut footer = [0; FOOTER_SIZE];
626 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
627
628 let footer = FooterTail::try_new(&footer)?;
629 let length = footer.metadata_length();
630
631 if file_size < (length + FOOTER_SIZE) as u64 {
632 return Err(eof_err!(
633 "file size of {} is less than footer + metadata {}",
634 file_size,
635 length + FOOTER_SIZE
636 ));
637 }
638
639 if length > suffix_len - FOOTER_SIZE {
641 let metadata_start = file_size - (length + FOOTER_SIZE) as u64;
642 let meta = fetch
643 .fetch(metadata_start..(file_size - FOOTER_SIZE as u64))
644 .await?;
645 Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
646 } else {
647 let metadata_start = (file_size - (length + FOOTER_SIZE) as u64 - footer_start)
648 .try_into()
649 .expect("metadata length should never be larger than u32");
650 let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
651 Ok((
652 self.decode_footer_metadata(slice, file_size, footer)?,
653 Some((footer_start as usize, suffix.slice(..metadata_start))),
654 ))
655 }
656 }
657
658 #[cfg(all(feature = "async", feature = "arrow"))]
659 async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
660 &self,
661 fetch: &mut F,
662 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
663 let prefetch = self.get_prefetch_size();
664
665 let suffix = fetch.fetch_suffix(prefetch as _).await?;
666 let suffix_len = suffix.len();
667
668 if suffix_len < FOOTER_SIZE {
669 return Err(eof_err!(
670 "footer metadata requires {} bytes, but could only read {}",
671 FOOTER_SIZE,
672 suffix_len
673 ));
674 }
675
676 let mut footer = [0; FOOTER_SIZE];
677 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
678
679 let footer = FooterTail::try_new(&footer)?;
680 let length = footer.metadata_length();
681 let file_size = (length + FOOTER_SIZE) as u64;
684
685 let metadata_offset = length + FOOTER_SIZE;
687 if length > suffix_len - FOOTER_SIZE {
688 let meta = fetch.fetch_suffix(metadata_offset).await?;
689
690 if meta.len() < metadata_offset {
691 return Err(eof_err!(
692 "metadata requires {} bytes, but could only read {}",
693 metadata_offset,
694 meta.len()
695 ));
696 }
697
698 let meta = meta.slice(0..length);
700 Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
701 } else {
702 let metadata_start = suffix_len - metadata_offset;
703 let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
704 Ok((
705 self.decode_footer_metadata(slice, file_size, footer)?,
706 Some((0, suffix.slice(..metadata_start))),
707 ))
708 }
709 }
710
711 #[deprecated(since = "57.0.0", note = "Use FooterTail::try_from instead")]
713 pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result<FooterTail> {
714 FooterTail::try_new(slice)
715 }
716
717 #[deprecated(since = "54.3.0", note = "Use decode_footer_tail instead")]
719 pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
720 FooterTail::try_new(slice).map(|f| f.metadata_length())
721 }
722
723 pub(crate) fn decode_footer_metadata(
737 &self,
738 buf: Bytes,
739 file_size: u64,
740 footer_tail: FooterTail,
741 ) -> Result<ParquetMetaData> {
742 let ending_offset = file_size.checked_sub(FOOTER_SIZE as u64).ok_or_else(|| {
747 general_err!(
748 "file size {file_size} is smaller than footer size {}",
749 FOOTER_SIZE
750 )
751 })?;
752
753 let starting_offset = ending_offset.checked_sub(buf.len() as u64).ok_or_else(|| {
754 general_err!(
755 "file size {file_size} is smaller than buffer size {} + footer size {}",
756 buf.len(),
757 FOOTER_SIZE
758 )
759 })?;
760
761 let range = starting_offset..ending_offset;
762
763 let push_decoder =
764 ParquetMetaDataPushDecoder::try_new_with_footer_tail(file_size, footer_tail)?
765 .with_page_index_policy(PageIndexPolicy::Skip)
767 .with_metadata_options(self.metadata_options.clone());
768
769 let mut push_decoder = self.prepare_push_decoder(push_decoder);
770 push_decoder.push_range(range, buf)?;
771 match push_decoder.try_decode()? {
772 DecodeResult::Data(metadata) => Ok(metadata),
773 DecodeResult::Finished => Err(general_err!(
774 "could not parse parquet metadata -- previously finished"
775 )),
776 DecodeResult::NeedsData(ranges) => Err(general_err!(
777 "could not parse parquet metadata, needs ranges {:?}",
778 ranges
779 )),
780 }
781 }
782
783 #[cfg(feature = "encryption")]
785 fn prepare_push_decoder(
786 &self,
787 push_decoder: ParquetMetaDataPushDecoder,
788 ) -> ParquetMetaDataPushDecoder {
789 push_decoder.with_file_decryption_properties(
790 self.file_decryption_properties
791 .as_ref()
792 .map(std::sync::Arc::clone),
793 )
794 }
795 #[cfg(not(feature = "encryption"))]
796 fn prepare_push_decoder(
797 &self,
798 push_decoder: ParquetMetaDataPushDecoder,
799 ) -> ParquetMetaDataPushDecoder {
800 push_decoder
801 }
802
803 pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
811 decode_metadata(buf, None)
812 }
813
814 pub fn decode_metadata_with_options(
819 buf: &[u8],
820 options: Option<&ParquetMetaDataOptions>,
821 ) -> Result<ParquetMetaData> {
822 decode_metadata(buf, options)
823 }
824
825 pub fn decode_schema(buf: &[u8]) -> Result<Arc<SchemaDescriptor>> {
828 Ok(Arc::new(parquet_schema_from_bytes(buf)?))
829 }
830}
831
832#[allow(clippy::large_enum_variant)]
835enum NeedsIndexData {
836 No(ParquetMetaData),
838 Yes(Range<u64>),
840}
841
842fn needs_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> Result<NeedsIndexData> {
845 match push_decoder.try_decode()? {
846 DecodeResult::NeedsData(ranges) => {
847 let range = ranges
848 .into_iter()
849 .reduce(|a, b| a.start.min(b.start)..a.end.max(b.end))
850 .ok_or_else(|| general_err!("Internal error: no ranges provided"))?;
851 Ok(NeedsIndexData::Yes(range))
852 }
853 DecodeResult::Data(metadata) => Ok(NeedsIndexData::No(metadata)),
854 DecodeResult::Finished => Err(general_err!("Internal error: decoder was finished")),
855 }
856}
857
858fn parse_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> Result<ParquetMetaData> {
861 match push_decoder.try_decode()? {
862 DecodeResult::NeedsData(_) => Err(general_err!(
863 "Internal error: decoder still needs data after reading required range"
864 )),
865 DecodeResult::Data(metadata) => Ok(metadata),
866 DecodeResult::Finished => Err(general_err!("Internal error: decoder was finished")),
867 }
868}
869
870#[cfg(test)]
871mod tests {
872 use super::*;
873 use crate::file::reader::Length;
874 use crate::util::test_common::file_util::get_test_file;
875 use std::ops::Range;
876
877 #[test]
878 fn test_parse_metadata_size_smaller_than_footer() {
879 let test_file = tempfile::tempfile().unwrap();
880 let err = ParquetMetaDataReader::new()
881 .parse_metadata(&test_file)
882 .unwrap_err();
883 assert!(matches!(err, ParquetError::NeedMoreData(FOOTER_SIZE)));
884 }
885
886 #[test]
887 fn test_parse_metadata_corrupt_footer() {
888 let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
889 let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
890 assert_eq!(
891 reader_result.unwrap_err().to_string(),
892 "Parquet error: Invalid Parquet file. Corrupt footer"
893 );
894 }
895
896 #[test]
897 fn test_parse_metadata_invalid_start() {
898 let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
899 let err = ParquetMetaDataReader::new()
900 .parse_metadata(&test_file)
901 .unwrap_err();
902 assert!(matches!(err, ParquetError::NeedMoreData(263)));
903 }
904
905 #[test]
906 #[allow(deprecated)]
907 fn test_try_parse() {
908 let file = get_test_file("alltypes_tiny_pages.parquet");
909 let len = file.len();
910
911 let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
912
913 let bytes_for_range = |range: Range<u64>| {
914 file.get_bytes(range.start, (range.end - range.start).try_into().unwrap())
915 .unwrap()
916 };
917
918 let bytes = bytes_for_range(0..len);
920 reader.try_parse(&bytes).unwrap();
921 let metadata = reader.finish().unwrap();
922 assert!(metadata.column_index.is_some());
923 assert!(metadata.offset_index.is_some());
924
925 let bytes = bytes_for_range(320000..len);
927 reader.try_parse_sized(&bytes, len).unwrap();
928 let metadata = reader.finish().unwrap();
929 assert!(metadata.column_index.is_some());
930 assert!(metadata.offset_index.is_some());
931
932 let bytes = bytes_for_range(323583..len);
934 reader.try_parse_sized(&bytes, len).unwrap();
935 let metadata = reader.finish().unwrap();
936 assert!(metadata.column_index.is_some());
937 assert!(metadata.offset_index.is_some());
938
939 let bytes = bytes_for_range(323584..len);
941 match reader.try_parse_sized(&bytes, len).unwrap_err() {
943 ParquetError::NeedMoreData(needed) => {
945 let bytes = bytes_for_range(len - needed as u64..len);
946 reader.try_parse_sized(&bytes, len).unwrap();
947 let metadata = reader.finish().unwrap();
948 assert!(metadata.column_index.is_some());
949 assert!(metadata.offset_index.is_some());
950 }
951 _ => panic!("unexpected error"),
952 };
953
954 let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
956 let mut bytes = bytes_for_range(452505..len);
957 loop {
958 match reader.try_parse_sized(&bytes, len) {
959 Ok(_) => break,
960 Err(ParquetError::NeedMoreData(needed)) => {
961 bytes = bytes_for_range(len - needed as u64..len);
962 if reader.has_metadata() {
963 reader.read_page_indexes_sized(&bytes, len).unwrap();
964 break;
965 }
966 }
967 _ => panic!("unexpected error"),
968 }
969 }
970 let metadata = reader.finish().unwrap();
971 assert!(metadata.column_index.is_some());
972 assert!(metadata.offset_index.is_some());
973
974 let bytes = bytes_for_range(323584..len);
976 let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
977 assert_eq!(
978 reader_result.to_string(),
979 "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
980 );
981
982 let mut reader = ParquetMetaDataReader::new();
984 let bytes = bytes_for_range(452505..len);
985 match reader.try_parse_sized(&bytes, len).unwrap_err() {
987 ParquetError::NeedMoreData(needed) => {
989 let bytes = bytes_for_range(len - needed as u64..len);
990 reader.try_parse_sized(&bytes, len).unwrap();
991 reader.finish().unwrap();
992 }
993 _ => panic!("unexpected error"),
994 };
995
996 let reader_result = reader.try_parse(&bytes).unwrap_err();
998 assert_eq!(
999 reader_result.to_string(),
1000 "EOF: Parquet file too small. Size is 1728 but need 1729"
1001 );
1002
1003 let bytes = bytes_for_range(0..1000);
1005 let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
1006 assert_eq!(
1007 reader_result.to_string(),
1008 "Parquet error: Invalid Parquet file. Corrupt footer"
1009 );
1010
1011 let bytes = bytes_for_range(452510..len);
1013 let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
1014 assert_eq!(
1015 reader_result.to_string(),
1016 "EOF: Parquet file too small. Size is 1728 but need 1729"
1017 );
1018 }
1019}
1020
1021#[cfg(all(feature = "async", feature = "arrow", test))]
1022mod async_tests {
1023 use super::*;
1024
1025 use arrow::{array::Int32Array, datatypes::DataType};
1026 use arrow_array::RecordBatch;
1027 use arrow_schema::{Field, Schema};
1028 use bytes::Bytes;
1029 use futures::FutureExt;
1030 use futures::future::BoxFuture;
1031 use std::fs::File;
1032 use std::future::Future;
1033 use std::io::{Read, Seek, SeekFrom};
1034 use std::ops::Range;
1035 use std::sync::Arc;
1036 use std::sync::atomic::{AtomicUsize, Ordering};
1037 use tempfile::NamedTempFile;
1038
1039 use crate::arrow::ArrowWriter;
1040 use crate::file::properties::WriterProperties;
1041 use crate::file::reader::Length;
1042 use crate::util::test_common::file_util::get_test_file;
1043
1044 struct MetadataFetchFn<F>(F);
1045
1046 impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
1047 where
1048 F: FnMut(Range<u64>) -> Fut + Send,
1049 Fut: Future<Output = Result<Bytes>> + Send,
1050 {
1051 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1052 async move { self.0(range).await }.boxed()
1053 }
1054 }
1055
1056 struct MetadataSuffixFetchFn<F1, F2>(F1, F2);
1057
1058 impl<F1, Fut, F2> MetadataFetch for MetadataSuffixFetchFn<F1, F2>
1059 where
1060 F1: FnMut(Range<u64>) -> Fut + Send,
1061 Fut: Future<Output = Result<Bytes>> + Send,
1062 F2: Send,
1063 {
1064 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1065 async move { self.0(range).await }.boxed()
1066 }
1067 }
1068
1069 impl<F1, Fut, F2> MetadataSuffixFetch for MetadataSuffixFetchFn<F1, F2>
1070 where
1071 F1: FnMut(Range<u64>) -> Fut + Send,
1072 F2: FnMut(usize) -> Fut + Send,
1073 Fut: Future<Output = Result<Bytes>> + Send,
1074 {
1075 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
1076 async move { self.1(suffix).await }.boxed()
1077 }
1078 }
1079
1080 fn read_range(file: &mut File, range: Range<u64>) -> Result<Bytes> {
1081 file.seek(SeekFrom::Start(range.start as _))?;
1082 let len = range.end - range.start;
1083 let mut buf = Vec::with_capacity(len.try_into().unwrap());
1084 file.take(len as _).read_to_end(&mut buf)?;
1085 Ok(buf.into())
1086 }
1087
1088 fn read_suffix(file: &mut File, suffix: usize) -> Result<Bytes> {
1089 let file_len = file.len();
1090 file.seek(SeekFrom::End(0 - suffix.min(file_len as _) as i64))?;
1092 let mut buf = Vec::with_capacity(suffix);
1093 file.take(suffix as _).read_to_end(&mut buf)?;
1094 Ok(buf.into())
1095 }
1096
1097 #[tokio::test]
1098 async fn test_simple() {
1099 let mut file = get_test_file("nulls.snappy.parquet");
1100 let len = file.len();
1101
1102 let expected = ParquetMetaDataReader::new()
1103 .parse_and_finish(&file)
1104 .unwrap();
1105 let expected = expected.file_metadata().schema();
1106 let fetch_count = AtomicUsize::new(0);
1107
1108 let mut fetch = |range| {
1109 fetch_count.fetch_add(1, Ordering::SeqCst);
1110 futures::future::ready(read_range(&mut file, range))
1111 };
1112
1113 let input = MetadataFetchFn(&mut fetch);
1114 let actual = ParquetMetaDataReader::new()
1115 .load_and_finish(input, len)
1116 .await
1117 .unwrap();
1118 assert_eq!(actual.file_metadata().schema(), expected);
1119 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1120
1121 fetch_count.store(0, Ordering::SeqCst);
1123 let input = MetadataFetchFn(&mut fetch);
1124 let actual = ParquetMetaDataReader::new()
1125 .with_prefetch_hint(Some(7))
1126 .load_and_finish(input, len)
1127 .await
1128 .unwrap();
1129 assert_eq!(actual.file_metadata().schema(), expected);
1130 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1131
1132 fetch_count.store(0, Ordering::SeqCst);
1134 let input = MetadataFetchFn(&mut fetch);
1135 let actual = ParquetMetaDataReader::new()
1136 .with_prefetch_hint(Some(10))
1137 .load_and_finish(input, len)
1138 .await
1139 .unwrap();
1140 assert_eq!(actual.file_metadata().schema(), expected);
1141 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1142
1143 fetch_count.store(0, Ordering::SeqCst);
1145 let input = MetadataFetchFn(&mut fetch);
1146 let actual = ParquetMetaDataReader::new()
1147 .with_prefetch_hint(Some(500))
1148 .load_and_finish(input, len)
1149 .await
1150 .unwrap();
1151 assert_eq!(actual.file_metadata().schema(), expected);
1152 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1153
1154 fetch_count.store(0, Ordering::SeqCst);
1156 let input = MetadataFetchFn(&mut fetch);
1157 let actual = ParquetMetaDataReader::new()
1158 .with_prefetch_hint(Some(428))
1159 .load_and_finish(input, len)
1160 .await
1161 .unwrap();
1162 assert_eq!(actual.file_metadata().schema(), expected);
1163 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1164
1165 let input = MetadataFetchFn(&mut fetch);
1166 let err = ParquetMetaDataReader::new()
1167 .load_and_finish(input, 4)
1168 .await
1169 .unwrap_err()
1170 .to_string();
1171 assert_eq!(err, "EOF: file size of 4 is less than footer");
1172
1173 let input = MetadataFetchFn(&mut fetch);
1174 let err = ParquetMetaDataReader::new()
1175 .load_and_finish(input, 20)
1176 .await
1177 .unwrap_err()
1178 .to_string();
1179 assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
1180 }
1181
1182 #[tokio::test]
1183 async fn test_suffix() {
1184 let mut file = get_test_file("nulls.snappy.parquet");
1185 let mut file2 = file.try_clone().unwrap();
1186
1187 let expected = ParquetMetaDataReader::new()
1188 .parse_and_finish(&file)
1189 .unwrap();
1190 let expected = expected.file_metadata().schema();
1191 let fetch_count = AtomicUsize::new(0);
1192 let suffix_fetch_count = AtomicUsize::new(0);
1193
1194 let mut fetch = |range| {
1195 fetch_count.fetch_add(1, Ordering::SeqCst);
1196 futures::future::ready(read_range(&mut file, range))
1197 };
1198 let mut suffix_fetch = |suffix| {
1199 suffix_fetch_count.fetch_add(1, Ordering::SeqCst);
1200 futures::future::ready(read_suffix(&mut file2, suffix))
1201 };
1202
1203 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1204 let actual = ParquetMetaDataReader::new()
1205 .load_via_suffix_and_finish(input)
1206 .await
1207 .unwrap();
1208 assert_eq!(actual.file_metadata().schema(), expected);
1209 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1210 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1211
1212 fetch_count.store(0, Ordering::SeqCst);
1214 suffix_fetch_count.store(0, Ordering::SeqCst);
1215 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1216 let actual = ParquetMetaDataReader::new()
1217 .with_prefetch_hint(Some(7))
1218 .load_via_suffix_and_finish(input)
1219 .await
1220 .unwrap();
1221 assert_eq!(actual.file_metadata().schema(), expected);
1222 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1223 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1224
1225 fetch_count.store(0, Ordering::SeqCst);
1227 suffix_fetch_count.store(0, Ordering::SeqCst);
1228 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1229 let actual = ParquetMetaDataReader::new()
1230 .with_prefetch_hint(Some(10))
1231 .load_via_suffix_and_finish(input)
1232 .await
1233 .unwrap();
1234 assert_eq!(actual.file_metadata().schema(), expected);
1235 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1236 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1237
1238 dbg!("test");
1239 fetch_count.store(0, Ordering::SeqCst);
1241 suffix_fetch_count.store(0, Ordering::SeqCst);
1242 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1243 let actual = ParquetMetaDataReader::new()
1244 .with_prefetch_hint(Some(500))
1245 .load_via_suffix_and_finish(input)
1246 .await
1247 .unwrap();
1248 assert_eq!(actual.file_metadata().schema(), expected);
1249 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1250 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1251
1252 fetch_count.store(0, Ordering::SeqCst);
1254 suffix_fetch_count.store(0, Ordering::SeqCst);
1255 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1256 let actual = ParquetMetaDataReader::new()
1257 .with_prefetch_hint(Some(428))
1258 .load_via_suffix_and_finish(input)
1259 .await
1260 .unwrap();
1261 assert_eq!(actual.file_metadata().schema(), expected);
1262 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1263 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1264 }
1265
1266 #[cfg(feature = "encryption")]
1267 #[tokio::test]
1268 async fn test_suffix_with_encryption() {
1269 let mut file = get_test_file("uniform_encryption.parquet.encrypted");
1270 let mut file2 = file.try_clone().unwrap();
1271
1272 let mut fetch = |range| futures::future::ready(read_range(&mut file, range));
1273 let mut suffix_fetch = |suffix| futures::future::ready(read_suffix(&mut file2, suffix));
1274
1275 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1276
1277 let key_code: &[u8] = "0123456789012345".as_bytes();
1278 let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec())
1279 .build()
1280 .unwrap();
1281
1282 let expected = ParquetMetaDataReader::new()
1284 .with_decryption_properties(Some(decryption_properties))
1285 .load_via_suffix_and_finish(input)
1286 .await
1287 .unwrap();
1288 assert_eq!(expected.num_row_groups(), 1);
1289 }
1290
1291 #[tokio::test]
1292 #[allow(deprecated)]
1293 async fn test_page_index() {
1294 let mut file = get_test_file("alltypes_tiny_pages.parquet");
1295 let len = file.len();
1296 let fetch_count = AtomicUsize::new(0);
1297 let mut fetch = |range| {
1298 fetch_count.fetch_add(1, Ordering::SeqCst);
1299 futures::future::ready(read_range(&mut file, range))
1300 };
1301
1302 let f = MetadataFetchFn(&mut fetch);
1303 let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
1304 loader.try_load(f, len).await.unwrap();
1305 assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
1306 let metadata = loader.finish().unwrap();
1307 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1308
1309 fetch_count.store(0, Ordering::SeqCst);
1311 let f = MetadataFetchFn(&mut fetch);
1312 let mut loader = ParquetMetaDataReader::new()
1313 .with_page_indexes(true)
1314 .with_prefetch_hint(Some(1729));
1315 loader.try_load(f, len).await.unwrap();
1316 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1317 let metadata = loader.finish().unwrap();
1318 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1319
1320 fetch_count.store(0, Ordering::SeqCst);
1322 let f = MetadataFetchFn(&mut fetch);
1323 let mut loader = ParquetMetaDataReader::new()
1324 .with_page_indexes(true)
1325 .with_prefetch_hint(Some(130649));
1326 loader.try_load(f, len).await.unwrap();
1327 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1328 let metadata = loader.finish().unwrap();
1329 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1330
1331 fetch_count.store(0, Ordering::SeqCst);
1333 let f = MetadataFetchFn(&mut fetch);
1334 let metadata = ParquetMetaDataReader::new()
1335 .with_page_indexes(true)
1336 .with_prefetch_hint(Some(130650))
1337 .load_and_finish(f, len)
1338 .await
1339 .unwrap();
1340 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1341 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1342
1343 fetch_count.store(0, Ordering::SeqCst);
1345 let f = MetadataFetchFn(&mut fetch);
1346 let metadata = ParquetMetaDataReader::new()
1347 .with_page_indexes(true)
1348 .with_prefetch_hint(Some((len - 1000) as usize)) .load_and_finish(f, len)
1350 .await
1351 .unwrap();
1352 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1353 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1354
1355 fetch_count.store(0, Ordering::SeqCst);
1357 let f = MetadataFetchFn(&mut fetch);
1358 let metadata = ParquetMetaDataReader::new()
1359 .with_page_indexes(true)
1360 .with_prefetch_hint(Some(len as usize)) .load_and_finish(f, len)
1362 .await
1363 .unwrap();
1364 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1365 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1366
1367 fetch_count.store(0, Ordering::SeqCst);
1369 let f = MetadataFetchFn(&mut fetch);
1370 let metadata = ParquetMetaDataReader::new()
1371 .with_page_indexes(true)
1372 .with_prefetch_hint(Some((len + 1000) as usize)) .load_and_finish(f, len)
1374 .await
1375 .unwrap();
1376 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1377 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1378 }
1379
1380 fn write_parquet_file(offset_index_disabled: bool) -> Result<NamedTempFile> {
1381 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1382 let batch = RecordBatch::try_new(
1383 schema.clone(),
1384 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
1385 )?;
1386
1387 let file = NamedTempFile::new().unwrap();
1388
1389 let props = WriterProperties::builder()
1391 .set_offset_index_disabled(offset_index_disabled)
1392 .build();
1393
1394 let mut writer = ArrowWriter::try_new(file.reopen()?, schema, Some(props))?;
1395 writer.write(&batch)?;
1396 writer.close()?;
1397
1398 Ok(file)
1399 }
1400
1401 fn read_and_check(file: &File, policy: PageIndexPolicy) -> Result<ParquetMetaData> {
1402 let mut reader = ParquetMetaDataReader::new().with_page_index_policy(policy);
1403 reader.try_parse(file)?;
1404 reader.finish()
1405 }
1406
1407 #[test]
1408 fn test_page_index_policy() {
1409 let f = write_parquet_file(false).unwrap();
1411 read_and_check(f.as_file(), PageIndexPolicy::Required).unwrap();
1412 read_and_check(f.as_file(), PageIndexPolicy::Optional).unwrap();
1413 read_and_check(f.as_file(), PageIndexPolicy::Skip).unwrap();
1414
1415 let f = write_parquet_file(true).unwrap();
1417 let res = read_and_check(f.as_file(), PageIndexPolicy::Required);
1418 assert!(matches!(
1419 res,
1420 Err(ParquetError::General(e)) if e == "missing offset index"
1421 ));
1422 read_and_check(f.as_file(), PageIndexPolicy::Optional).unwrap();
1423 read_and_check(f.as_file(), PageIndexPolicy::Skip).unwrap();
1424 }
1425}