1#[cfg(feature = "encryption")]
19use crate::encryption::decrypt::FileDecryptionProperties;
20use crate::errors::{ParquetError, Result};
21use crate::file::FOOTER_SIZE;
22use crate::file::metadata::parser::decode_metadata;
23use crate::file::metadata::thrift::parquet_schema_from_bytes;
24use crate::file::metadata::{
25 FooterTail, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataPushDecoder,
26};
27use crate::file::reader::ChunkReader;
28use crate::schema::types::SchemaDescriptor;
29use bytes::Bytes;
30use std::sync::Arc;
31use std::{io::Read, ops::Range};
32
33use crate::DecodeResult;
34#[cfg(all(feature = "async", feature = "arrow"))]
35use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
36
37#[derive(Default, Debug)]
71pub struct ParquetMetaDataReader {
72 metadata: Option<ParquetMetaData>,
73 column_index: PageIndexPolicy,
74 offset_index: PageIndexPolicy,
75 prefetch_hint: Option<usize>,
76 metadata_options: Option<Arc<ParquetMetaDataOptions>>,
77 metadata_size: Option<usize>,
80 #[cfg(feature = "encryption")]
81 file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
86pub enum PageIndexPolicy {
87 #[default]
89 Skip,
90 Optional,
92 Required,
94}
95
96impl From<bool> for PageIndexPolicy {
97 fn from(value: bool) -> Self {
98 match value {
99 true => Self::Required,
100 false => Self::Skip,
101 }
102 }
103}
104
105impl ParquetMetaDataReader {
106 pub fn new() -> Self {
108 Default::default()
109 }
110
111 pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
114 Self {
115 metadata: Some(metadata),
116 ..Default::default()
117 }
118 }
119
120 #[deprecated(since = "56.1.0", note = "Use `with_page_index_policy` instead")]
125 pub fn with_page_indexes(self, val: bool) -> Self {
126 let policy = PageIndexPolicy::from(val);
127 self.with_column_index_policy(policy)
128 .with_offset_index_policy(policy)
129 }
130
131 #[deprecated(since = "56.1.0", note = "Use `with_column_index_policy` instead")]
135 pub fn with_column_indexes(self, val: bool) -> Self {
136 let policy = PageIndexPolicy::from(val);
137 self.with_column_index_policy(policy)
138 }
139
140 #[deprecated(since = "56.1.0", note = "Use `with_offset_index_policy` instead")]
144 pub fn with_offset_indexes(self, val: bool) -> Self {
145 let policy = PageIndexPolicy::from(val);
146 self.with_offset_index_policy(policy)
147 }
148
149 pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
151 self.with_column_index_policy(policy)
152 .with_offset_index_policy(policy)
153 }
154
155 pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
157 self.column_index = policy;
158 self
159 }
160
161 pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
163 self.offset_index = policy;
164 self
165 }
166
167 pub fn with_metadata_options(mut self, options: Option<ParquetMetaDataOptions>) -> Self {
169 self.metadata_options = options.map(Arc::new);
170 self
171 }
172
173 pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
185 self.prefetch_hint = prefetch;
186 self
187 }
188
189 #[cfg(feature = "encryption")]
193 pub fn with_decryption_properties(
194 mut self,
195 properties: Option<std::sync::Arc<FileDecryptionProperties>>,
196 ) -> Self {
197 self.file_decryption_properties = properties;
198 self
199 }
200
201 pub fn has_metadata(&self) -> bool {
203 self.metadata.is_some()
204 }
205
206 pub fn finish(&mut self) -> Result<ParquetMetaData> {
208 self.metadata
209 .take()
210 .ok_or_else(|| general_err!("could not parse parquet metadata"))
211 }
212
213 pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
232 self.try_parse(reader)?;
233 self.finish()
234 }
235
236 pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
242 self.try_parse_sized(reader, reader.len())
243 }
244
245 pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: u64) -> Result<()> {
318 self.metadata = match self.parse_metadata(reader) {
319 Ok(metadata) => Some(metadata),
320 Err(ParquetError::NeedMoreData(needed)) => {
321 if file_size == reader.len() || needed as u64 > file_size {
324 return Err(eof_err!(
325 "Parquet file too small. Size is {} but need {}",
326 file_size,
327 needed
328 ));
329 } else {
330 return Err(ParquetError::NeedMoreData(needed));
332 }
333 }
334 Err(e) => return Err(e),
335 };
336
337 if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
339 {
340 return Ok(());
341 }
342
343 self.read_page_indexes_sized(reader, file_size)
344 }
345
346 pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
349 self.read_page_indexes_sized(reader, reader.len())
350 }
351
352 pub fn read_page_indexes_sized<R: ChunkReader>(
358 &mut self,
359 reader: &R,
360 file_size: u64,
361 ) -> Result<()> {
362 let Some(metadata) = self.metadata.take() else {
363 return Err(general_err!(
364 "Tried to read page indexes without ParquetMetaData metadata"
365 ));
366 };
367
368 let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
369 .with_offset_index_policy(self.offset_index)
370 .with_column_index_policy(self.column_index)
371 .with_metadata_options(self.metadata_options.clone());
372 let mut push_decoder = self.prepare_push_decoder(push_decoder);
373
374 let range = match needs_index_data(&mut push_decoder)? {
376 NeedsIndexData::No(metadata) => {
377 self.metadata = Some(metadata);
378 return Ok(());
379 }
380 NeedsIndexData::Yes(range) => range,
381 };
382
383 let file_range = file_size.saturating_sub(reader.len())..file_size;
386 if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
387 if range.end > file_size {
389 return Err(eof_err!(
390 "Parquet file too small. Range {range:?} is beyond file bounds {file_size}",
391 ));
392 } else {
393 return Err(ParquetError::NeedMoreData(
395 (file_size - range.start).try_into()?,
396 ));
397 }
398 }
399
400 if let Some(metadata_size) = self.metadata_size {
403 let metadata_range = file_size.saturating_sub(metadata_size as u64)..file_size;
404 if range.end > metadata_range.start {
405 return Err(eof_err!(
406 "Parquet file too small. Page index range {range:?} overlaps with file metadata {metadata_range:?}",
407 ));
408 }
409 }
410
411 let bytes_needed = usize::try_from(range.end - range.start)?;
413 let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?;
414
415 push_decoder.push_range(range, bytes)?;
416 let metadata = parse_index_data(&mut push_decoder)?;
417 self.metadata = Some(metadata);
418
419 Ok(())
420 }
421
422 #[cfg(all(feature = "async", feature = "arrow"))]
429 pub async fn load_and_finish<F: MetadataFetch>(
430 mut self,
431 fetch: F,
432 file_size: u64,
433 ) -> Result<ParquetMetaData> {
434 self.try_load(fetch, file_size).await?;
435 self.finish()
436 }
437
438 #[cfg(all(feature = "async", feature = "arrow"))]
445 pub async fn load_via_suffix_and_finish<F: MetadataSuffixFetch>(
446 mut self,
447 fetch: F,
448 ) -> Result<ParquetMetaData> {
449 self.try_load_via_suffix(fetch).await?;
450 self.finish()
451 }
452 #[cfg(all(feature = "async", feature = "arrow"))]
458 pub async fn try_load<F: MetadataFetch>(&mut self, mut fetch: F, file_size: u64) -> Result<()> {
459 let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
460
461 self.metadata = Some(metadata);
462
463 if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
465 {
466 return Ok(());
467 }
468
469 self.load_page_index_with_remainder(fetch, remainder).await
470 }
471
472 #[cfg(all(feature = "async", feature = "arrow"))]
478 pub async fn try_load_via_suffix<F: MetadataSuffixFetch>(
479 &mut self,
480 mut fetch: F,
481 ) -> Result<()> {
482 let (metadata, remainder) = self.load_metadata_via_suffix(&mut fetch).await?;
483
484 self.metadata = Some(metadata);
485
486 if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
488 {
489 return Ok(());
490 }
491
492 self.load_page_index_with_remainder(fetch, remainder).await
493 }
494
495 #[cfg(all(feature = "async", feature = "arrow"))]
498 pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
499 self.load_page_index_with_remainder(fetch, None).await
500 }
501
502 #[cfg(all(feature = "async", feature = "arrow"))]
503 async fn load_page_index_with_remainder<F: MetadataFetch>(
504 &mut self,
505 mut fetch: F,
506 remainder: Option<(usize, Bytes)>,
507 ) -> Result<()> {
508 let Some(metadata) = self.metadata.take() else {
509 return Err(general_err!("Footer metadata is not present"));
510 };
511
512 let file_size = u64::MAX;
515 let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
516 .with_offset_index_policy(self.offset_index)
517 .with_column_index_policy(self.column_index)
518 .with_metadata_options(self.metadata_options.clone());
519 let mut push_decoder = self.prepare_push_decoder(push_decoder);
520
521 let range = match needs_index_data(&mut push_decoder)? {
523 NeedsIndexData::No(metadata) => {
524 self.metadata = Some(metadata);
525 return Ok(());
526 }
527 NeedsIndexData::Yes(range) => range,
528 };
529
530 let bytes = match &remainder {
531 Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => {
532 let remainder_start = *remainder_start as u64;
533 let offset = usize::try_from(range.start - remainder_start)?;
534 let end = usize::try_from(range.end - remainder_start)?;
535 assert!(end <= remainder.len());
536 remainder.slice(offset..end)
537 }
538 _ => fetch.fetch(range.start..range.end).await?,
540 };
541
542 assert_eq!(bytes.len() as u64, range.end - range.start);
544 push_decoder.push_range(range.clone(), bytes)?;
545 let metadata = parse_index_data(&mut push_decoder)?;
546 self.metadata = Some(metadata);
547 Ok(())
548 }
549
550 fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
553 let file_size = chunk_reader.len();
555 if file_size < (FOOTER_SIZE as u64) {
556 return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
557 }
558
559 let mut footer = [0_u8; FOOTER_SIZE];
560 chunk_reader
561 .get_read(file_size - FOOTER_SIZE as u64)?
562 .read_exact(&mut footer)?;
563
564 let footer = FooterTail::try_new(&footer)?;
565 let metadata_len = footer.metadata_length();
566 let footer_metadata_len = FOOTER_SIZE + metadata_len;
567 self.metadata_size = Some(footer_metadata_len);
568
569 if footer_metadata_len as u64 > file_size {
570 return Err(ParquetError::NeedMoreData(footer_metadata_len));
571 }
572
573 let start = file_size - footer_metadata_len as u64;
574 let bytes = chunk_reader.get_bytes(start, metadata_len)?;
575 self.decode_footer_metadata(bytes, file_size, footer)
576 }
577
578 pub fn metadata_size(&self) -> Option<usize> {
581 self.metadata_size
582 }
583
584 #[cfg(all(feature = "async", feature = "arrow"))]
588 fn get_prefetch_size(&self) -> usize {
589 if let Some(prefetch) = self.prefetch_hint {
590 if prefetch > FOOTER_SIZE {
591 return prefetch;
592 }
593 }
594 FOOTER_SIZE
595 }
596
597 #[cfg(all(feature = "async", feature = "arrow"))]
598 async fn load_metadata<F: MetadataFetch>(
599 &self,
600 fetch: &mut F,
601 file_size: u64,
602 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
603 let prefetch = self.get_prefetch_size() as u64;
604
605 if file_size < FOOTER_SIZE as u64 {
606 return Err(eof_err!("file size of {} is less than footer", file_size));
607 }
608
609 let footer_start = file_size.saturating_sub(prefetch);
613
614 let suffix = fetch.fetch(footer_start..file_size).await?;
615 let suffix_len = suffix.len();
616 let fetch_len = (file_size - footer_start)
617 .try_into()
618 .expect("footer size should never be larger than u32");
619 if suffix_len < fetch_len {
620 return Err(eof_err!(
621 "metadata requires {} bytes, but could only read {}",
622 fetch_len,
623 suffix_len
624 ));
625 }
626
627 let mut footer = [0; FOOTER_SIZE];
628 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
629
630 let footer = FooterTail::try_new(&footer)?;
631 let length = footer.metadata_length();
632
633 if file_size < (length + FOOTER_SIZE) as u64 {
634 return Err(eof_err!(
635 "file size of {} is less than footer + metadata {}",
636 file_size,
637 length + FOOTER_SIZE
638 ));
639 }
640
641 if length > suffix_len - FOOTER_SIZE {
643 let metadata_start = file_size - (length + FOOTER_SIZE) as u64;
644 let meta = fetch
645 .fetch(metadata_start..(file_size - FOOTER_SIZE as u64))
646 .await?;
647 Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
648 } else {
649 let metadata_start = (file_size - (length + FOOTER_SIZE) as u64 - footer_start)
650 .try_into()
651 .expect("metadata length should never be larger than u32");
652 let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
653 Ok((
654 self.decode_footer_metadata(slice, file_size, footer)?,
655 Some((footer_start as usize, suffix.slice(..metadata_start))),
656 ))
657 }
658 }
659
660 #[cfg(all(feature = "async", feature = "arrow"))]
661 async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
662 &self,
663 fetch: &mut F,
664 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
665 let prefetch = self.get_prefetch_size();
666
667 let suffix = fetch.fetch_suffix(prefetch as _).await?;
668 let suffix_len = suffix.len();
669
670 if suffix_len < FOOTER_SIZE {
671 return Err(eof_err!(
672 "footer metadata requires {} bytes, but could only read {}",
673 FOOTER_SIZE,
674 suffix_len
675 ));
676 }
677
678 let mut footer = [0; FOOTER_SIZE];
679 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
680
681 let footer = FooterTail::try_new(&footer)?;
682 let length = footer.metadata_length();
683 let file_size = (length + FOOTER_SIZE) as u64;
686
687 let metadata_offset = length + FOOTER_SIZE;
689 if length > suffix_len - FOOTER_SIZE {
690 let meta = fetch.fetch_suffix(metadata_offset).await?;
691
692 if meta.len() < metadata_offset {
693 return Err(eof_err!(
694 "metadata requires {} bytes, but could only read {}",
695 metadata_offset,
696 meta.len()
697 ));
698 }
699
700 let meta = meta.slice(0..length);
702 Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
703 } else {
704 let metadata_start = suffix_len - metadata_offset;
705 let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
706 Ok((
707 self.decode_footer_metadata(slice, file_size, footer)?,
708 Some((0, suffix.slice(..metadata_start))),
709 ))
710 }
711 }
712
713 #[deprecated(since = "57.0.0", note = "Use FooterTail::try_from instead")]
715 pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result<FooterTail> {
716 FooterTail::try_new(slice)
717 }
718
719 #[deprecated(since = "54.3.0", note = "Use decode_footer_tail instead")]
721 pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
722 FooterTail::try_new(slice).map(|f| f.metadata_length())
723 }
724
725 pub(crate) fn decode_footer_metadata(
739 &self,
740 buf: Bytes,
741 file_size: u64,
742 footer_tail: FooterTail,
743 ) -> Result<ParquetMetaData> {
744 let ending_offset = file_size.checked_sub(FOOTER_SIZE as u64).ok_or_else(|| {
749 general_err!(
750 "file size {file_size} is smaller than footer size {}",
751 FOOTER_SIZE
752 )
753 })?;
754
755 let starting_offset = ending_offset.checked_sub(buf.len() as u64).ok_or_else(|| {
756 general_err!(
757 "file size {file_size} is smaller than buffer size {} + footer size {}",
758 buf.len(),
759 FOOTER_SIZE
760 )
761 })?;
762
763 let range = starting_offset..ending_offset;
764
765 let push_decoder =
766 ParquetMetaDataPushDecoder::try_new_with_footer_tail(file_size, footer_tail)?
767 .with_page_index_policy(PageIndexPolicy::Skip)
769 .with_metadata_options(self.metadata_options.clone());
770
771 let mut push_decoder = self.prepare_push_decoder(push_decoder);
772 push_decoder.push_range(range, buf)?;
773 match push_decoder.try_decode()? {
774 DecodeResult::Data(metadata) => Ok(metadata),
775 DecodeResult::Finished => Err(general_err!(
776 "could not parse parquet metadata -- previously finished"
777 )),
778 DecodeResult::NeedsData(ranges) => Err(general_err!(
779 "could not parse parquet metadata, needs ranges {:?}",
780 ranges
781 )),
782 }
783 }
784
785 #[cfg(feature = "encryption")]
787 fn prepare_push_decoder(
788 &self,
789 push_decoder: ParquetMetaDataPushDecoder,
790 ) -> ParquetMetaDataPushDecoder {
791 push_decoder.with_file_decryption_properties(
792 self.file_decryption_properties
793 .as_ref()
794 .map(std::sync::Arc::clone),
795 )
796 }
797 #[cfg(not(feature = "encryption"))]
798 fn prepare_push_decoder(
799 &self,
800 push_decoder: ParquetMetaDataPushDecoder,
801 ) -> ParquetMetaDataPushDecoder {
802 push_decoder
803 }
804
805 pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
813 decode_metadata(buf, None)
814 }
815
816 pub fn decode_metadata_with_options(
821 buf: &[u8],
822 options: Option<&ParquetMetaDataOptions>,
823 ) -> Result<ParquetMetaData> {
824 decode_metadata(buf, options)
825 }
826
827 pub fn decode_schema(buf: &[u8]) -> Result<Arc<SchemaDescriptor>> {
830 Ok(Arc::new(parquet_schema_from_bytes(buf)?))
831 }
832}
833
834#[allow(clippy::large_enum_variant)]
837enum NeedsIndexData {
838 No(ParquetMetaData),
840 Yes(Range<u64>),
842}
843
844fn needs_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> Result<NeedsIndexData> {
847 match push_decoder.try_decode()? {
848 DecodeResult::NeedsData(ranges) => {
849 let range = ranges
850 .into_iter()
851 .reduce(|a, b| a.start.min(b.start)..a.end.max(b.end))
852 .ok_or_else(|| general_err!("Internal error: no ranges provided"))?;
853 Ok(NeedsIndexData::Yes(range))
854 }
855 DecodeResult::Data(metadata) => Ok(NeedsIndexData::No(metadata)),
856 DecodeResult::Finished => Err(general_err!("Internal error: decoder was finished")),
857 }
858}
859
860fn parse_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> Result<ParquetMetaData> {
863 match push_decoder.try_decode()? {
864 DecodeResult::NeedsData(_) => Err(general_err!(
865 "Internal error: decoder still needs data after reading required range"
866 )),
867 DecodeResult::Data(metadata) => Ok(metadata),
868 DecodeResult::Finished => Err(general_err!("Internal error: decoder was finished")),
869 }
870}
871
872#[cfg(test)]
873mod tests {
874 use super::*;
875 use crate::file::reader::Length;
876 use crate::util::test_common::file_util::get_test_file;
877 use std::ops::Range;
878
879 #[test]
880 fn test_parse_metadata_size_smaller_than_footer() {
881 let test_file = tempfile::tempfile().unwrap();
882 let err = ParquetMetaDataReader::new()
883 .parse_metadata(&test_file)
884 .unwrap_err();
885 assert!(matches!(err, ParquetError::NeedMoreData(FOOTER_SIZE)));
886 }
887
888 #[test]
889 fn test_parse_metadata_corrupt_footer() {
890 let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
891 let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
892 assert_eq!(
893 reader_result.unwrap_err().to_string(),
894 "Parquet error: Invalid Parquet file. Corrupt footer"
895 );
896 }
897
898 #[test]
899 fn test_parse_metadata_invalid_start() {
900 let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
901 let err = ParquetMetaDataReader::new()
902 .parse_metadata(&test_file)
903 .unwrap_err();
904 assert!(matches!(err, ParquetError::NeedMoreData(263)));
905 }
906
907 #[test]
908 #[allow(deprecated)]
909 fn test_try_parse() {
910 let file = get_test_file("alltypes_tiny_pages.parquet");
911 let len = file.len();
912
913 let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
914
915 let bytes_for_range = |range: Range<u64>| {
916 file.get_bytes(range.start, (range.end - range.start).try_into().unwrap())
917 .unwrap()
918 };
919
920 let bytes = bytes_for_range(0..len);
922 reader.try_parse(&bytes).unwrap();
923 let metadata = reader.finish().unwrap();
924 assert!(metadata.column_index.is_some());
925 assert!(metadata.offset_index.is_some());
926
927 let bytes = bytes_for_range(320000..len);
929 reader.try_parse_sized(&bytes, len).unwrap();
930 let metadata = reader.finish().unwrap();
931 assert!(metadata.column_index.is_some());
932 assert!(metadata.offset_index.is_some());
933
934 let bytes = bytes_for_range(323583..len);
936 reader.try_parse_sized(&bytes, len).unwrap();
937 let metadata = reader.finish().unwrap();
938 assert!(metadata.column_index.is_some());
939 assert!(metadata.offset_index.is_some());
940
941 let bytes = bytes_for_range(323584..len);
943 match reader.try_parse_sized(&bytes, len).unwrap_err() {
945 ParquetError::NeedMoreData(needed) => {
947 let bytes = bytes_for_range(len - needed as u64..len);
948 reader.try_parse_sized(&bytes, len).unwrap();
949 let metadata = reader.finish().unwrap();
950 assert!(metadata.column_index.is_some());
951 assert!(metadata.offset_index.is_some());
952 }
953 _ => panic!("unexpected error"),
954 };
955
956 let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
958 let mut bytes = bytes_for_range(452505..len);
959 loop {
960 match reader.try_parse_sized(&bytes, len) {
961 Ok(_) => break,
962 Err(ParquetError::NeedMoreData(needed)) => {
963 bytes = bytes_for_range(len - needed as u64..len);
964 if reader.has_metadata() {
965 reader.read_page_indexes_sized(&bytes, len).unwrap();
966 break;
967 }
968 }
969 _ => panic!("unexpected error"),
970 }
971 }
972 let metadata = reader.finish().unwrap();
973 assert!(metadata.column_index.is_some());
974 assert!(metadata.offset_index.is_some());
975
976 let bytes = bytes_for_range(323584..len);
978 let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
979 assert_eq!(
980 reader_result.to_string(),
981 "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
982 );
983
984 let mut reader = ParquetMetaDataReader::new();
986 let bytes = bytes_for_range(452505..len);
987 match reader.try_parse_sized(&bytes, len).unwrap_err() {
989 ParquetError::NeedMoreData(needed) => {
991 let bytes = bytes_for_range(len - needed as u64..len);
992 reader.try_parse_sized(&bytes, len).unwrap();
993 reader.finish().unwrap();
994 }
995 _ => panic!("unexpected error"),
996 };
997
998 let reader_result = reader.try_parse(&bytes).unwrap_err();
1000 assert_eq!(
1001 reader_result.to_string(),
1002 "EOF: Parquet file too small. Size is 1728 but need 1729"
1003 );
1004
1005 let bytes = bytes_for_range(0..1000);
1007 let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
1008 assert_eq!(
1009 reader_result.to_string(),
1010 "Parquet error: Invalid Parquet file. Corrupt footer"
1011 );
1012
1013 let bytes = bytes_for_range(452510..len);
1015 let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
1016 assert_eq!(
1017 reader_result.to_string(),
1018 "EOF: Parquet file too small. Size is 1728 but need 1729"
1019 );
1020 }
1021}
1022
1023#[cfg(all(feature = "async", feature = "arrow", test))]
1024mod async_tests {
1025 use super::*;
1026
1027 use arrow::{array::Int32Array, datatypes::DataType};
1028 use arrow_array::RecordBatch;
1029 use arrow_schema::{Field, Schema};
1030 use bytes::Bytes;
1031 use futures::FutureExt;
1032 use futures::future::BoxFuture;
1033 use std::fs::File;
1034 use std::future::Future;
1035 use std::io::{Read, Seek, SeekFrom};
1036 use std::ops::Range;
1037 use std::sync::Arc;
1038 use std::sync::atomic::{AtomicUsize, Ordering};
1039 use tempfile::NamedTempFile;
1040
1041 use crate::arrow::ArrowWriter;
1042 use crate::file::properties::WriterProperties;
1043 use crate::file::reader::Length;
1044 use crate::util::test_common::file_util::get_test_file;
1045
1046 struct MetadataFetchFn<F>(F);
1047
1048 impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
1049 where
1050 F: FnMut(Range<u64>) -> Fut + Send,
1051 Fut: Future<Output = Result<Bytes>> + Send,
1052 {
1053 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1054 async move { self.0(range).await }.boxed()
1055 }
1056 }
1057
1058 struct MetadataSuffixFetchFn<F1, F2>(F1, F2);
1059
1060 impl<F1, Fut, F2> MetadataFetch for MetadataSuffixFetchFn<F1, F2>
1061 where
1062 F1: FnMut(Range<u64>) -> Fut + Send,
1063 Fut: Future<Output = Result<Bytes>> + Send,
1064 F2: Send,
1065 {
1066 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1067 async move { self.0(range).await }.boxed()
1068 }
1069 }
1070
1071 impl<F1, Fut, F2> MetadataSuffixFetch for MetadataSuffixFetchFn<F1, F2>
1072 where
1073 F1: FnMut(Range<u64>) -> Fut + Send,
1074 F2: FnMut(usize) -> Fut + Send,
1075 Fut: Future<Output = Result<Bytes>> + Send,
1076 {
1077 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
1078 async move { self.1(suffix).await }.boxed()
1079 }
1080 }
1081
1082 fn read_range(file: &mut File, range: Range<u64>) -> Result<Bytes> {
1083 file.seek(SeekFrom::Start(range.start as _))?;
1084 let len = range.end - range.start;
1085 let mut buf = Vec::with_capacity(len.try_into().unwrap());
1086 file.take(len as _).read_to_end(&mut buf)?;
1087 Ok(buf.into())
1088 }
1089
1090 fn read_suffix(file: &mut File, suffix: usize) -> Result<Bytes> {
1091 let file_len = file.len();
1092 file.seek(SeekFrom::End(0 - suffix.min(file_len as _) as i64))?;
1094 let mut buf = Vec::with_capacity(suffix);
1095 file.take(suffix as _).read_to_end(&mut buf)?;
1096 Ok(buf.into())
1097 }
1098
1099 #[tokio::test]
1100 async fn test_simple() {
1101 let mut file = get_test_file("nulls.snappy.parquet");
1102 let len = file.len();
1103
1104 let expected = ParquetMetaDataReader::new()
1105 .parse_and_finish(&file)
1106 .unwrap();
1107 let expected = expected.file_metadata().schema();
1108 let fetch_count = AtomicUsize::new(0);
1109
1110 let mut fetch = |range| {
1111 fetch_count.fetch_add(1, Ordering::SeqCst);
1112 futures::future::ready(read_range(&mut file, range))
1113 };
1114
1115 let input = MetadataFetchFn(&mut fetch);
1116 let actual = ParquetMetaDataReader::new()
1117 .load_and_finish(input, len)
1118 .await
1119 .unwrap();
1120 assert_eq!(actual.file_metadata().schema(), expected);
1121 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1122
1123 fetch_count.store(0, Ordering::SeqCst);
1125 let input = MetadataFetchFn(&mut fetch);
1126 let actual = ParquetMetaDataReader::new()
1127 .with_prefetch_hint(Some(7))
1128 .load_and_finish(input, len)
1129 .await
1130 .unwrap();
1131 assert_eq!(actual.file_metadata().schema(), expected);
1132 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1133
1134 fetch_count.store(0, Ordering::SeqCst);
1136 let input = MetadataFetchFn(&mut fetch);
1137 let actual = ParquetMetaDataReader::new()
1138 .with_prefetch_hint(Some(10))
1139 .load_and_finish(input, len)
1140 .await
1141 .unwrap();
1142 assert_eq!(actual.file_metadata().schema(), expected);
1143 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1144
1145 fetch_count.store(0, Ordering::SeqCst);
1147 let input = MetadataFetchFn(&mut fetch);
1148 let actual = ParquetMetaDataReader::new()
1149 .with_prefetch_hint(Some(500))
1150 .load_and_finish(input, len)
1151 .await
1152 .unwrap();
1153 assert_eq!(actual.file_metadata().schema(), expected);
1154 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1155
1156 fetch_count.store(0, Ordering::SeqCst);
1158 let input = MetadataFetchFn(&mut fetch);
1159 let actual = ParquetMetaDataReader::new()
1160 .with_prefetch_hint(Some(428))
1161 .load_and_finish(input, len)
1162 .await
1163 .unwrap();
1164 assert_eq!(actual.file_metadata().schema(), expected);
1165 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1166
1167 let input = MetadataFetchFn(&mut fetch);
1168 let err = ParquetMetaDataReader::new()
1169 .load_and_finish(input, 4)
1170 .await
1171 .unwrap_err()
1172 .to_string();
1173 assert_eq!(err, "EOF: file size of 4 is less than footer");
1174
1175 let input = MetadataFetchFn(&mut fetch);
1176 let err = ParquetMetaDataReader::new()
1177 .load_and_finish(input, 20)
1178 .await
1179 .unwrap_err()
1180 .to_string();
1181 assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
1182 }
1183
1184 #[tokio::test]
1185 async fn test_suffix() {
1186 let mut file = get_test_file("nulls.snappy.parquet");
1187 let mut file2 = file.try_clone().unwrap();
1188
1189 let expected = ParquetMetaDataReader::new()
1190 .parse_and_finish(&file)
1191 .unwrap();
1192 let expected = expected.file_metadata().schema();
1193 let fetch_count = AtomicUsize::new(0);
1194 let suffix_fetch_count = AtomicUsize::new(0);
1195
1196 let mut fetch = |range| {
1197 fetch_count.fetch_add(1, Ordering::SeqCst);
1198 futures::future::ready(read_range(&mut file, range))
1199 };
1200 let mut suffix_fetch = |suffix| {
1201 suffix_fetch_count.fetch_add(1, Ordering::SeqCst);
1202 futures::future::ready(read_suffix(&mut file2, suffix))
1203 };
1204
1205 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1206 let actual = ParquetMetaDataReader::new()
1207 .load_via_suffix_and_finish(input)
1208 .await
1209 .unwrap();
1210 assert_eq!(actual.file_metadata().schema(), expected);
1211 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1212 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1213
1214 fetch_count.store(0, Ordering::SeqCst);
1216 suffix_fetch_count.store(0, Ordering::SeqCst);
1217 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1218 let actual = ParquetMetaDataReader::new()
1219 .with_prefetch_hint(Some(7))
1220 .load_via_suffix_and_finish(input)
1221 .await
1222 .unwrap();
1223 assert_eq!(actual.file_metadata().schema(), expected);
1224 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1225 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1226
1227 fetch_count.store(0, Ordering::SeqCst);
1229 suffix_fetch_count.store(0, Ordering::SeqCst);
1230 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1231 let actual = ParquetMetaDataReader::new()
1232 .with_prefetch_hint(Some(10))
1233 .load_via_suffix_and_finish(input)
1234 .await
1235 .unwrap();
1236 assert_eq!(actual.file_metadata().schema(), expected);
1237 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1238 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1239
1240 dbg!("test");
1241 fetch_count.store(0, Ordering::SeqCst);
1243 suffix_fetch_count.store(0, Ordering::SeqCst);
1244 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1245 let actual = ParquetMetaDataReader::new()
1246 .with_prefetch_hint(Some(500))
1247 .load_via_suffix_and_finish(input)
1248 .await
1249 .unwrap();
1250 assert_eq!(actual.file_metadata().schema(), expected);
1251 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1252 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1253
1254 fetch_count.store(0, Ordering::SeqCst);
1256 suffix_fetch_count.store(0, Ordering::SeqCst);
1257 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1258 let actual = ParquetMetaDataReader::new()
1259 .with_prefetch_hint(Some(428))
1260 .load_via_suffix_and_finish(input)
1261 .await
1262 .unwrap();
1263 assert_eq!(actual.file_metadata().schema(), expected);
1264 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1265 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1266 }
1267
1268 #[cfg(feature = "encryption")]
1269 #[tokio::test]
1270 async fn test_suffix_with_encryption() {
1271 let mut file = get_test_file("uniform_encryption.parquet.encrypted");
1272 let mut file2 = file.try_clone().unwrap();
1273
1274 let mut fetch = |range| futures::future::ready(read_range(&mut file, range));
1275 let mut suffix_fetch = |suffix| futures::future::ready(read_suffix(&mut file2, suffix));
1276
1277 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1278
1279 let key_code: &[u8] = "0123456789012345".as_bytes();
1280 let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec())
1281 .build()
1282 .unwrap();
1283
1284 let expected = ParquetMetaDataReader::new()
1286 .with_decryption_properties(Some(decryption_properties))
1287 .load_via_suffix_and_finish(input)
1288 .await
1289 .unwrap();
1290 assert_eq!(expected.num_row_groups(), 1);
1291 }
1292
1293 #[tokio::test]
1294 #[allow(deprecated)]
1295 async fn test_page_index() {
1296 let mut file = get_test_file("alltypes_tiny_pages.parquet");
1297 let len = file.len();
1298 let fetch_count = AtomicUsize::new(0);
1299 let mut fetch = |range| {
1300 fetch_count.fetch_add(1, Ordering::SeqCst);
1301 futures::future::ready(read_range(&mut file, range))
1302 };
1303
1304 let f = MetadataFetchFn(&mut fetch);
1305 let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
1306 loader.try_load(f, len).await.unwrap();
1307 assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
1308 let metadata = loader.finish().unwrap();
1309 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1310
1311 fetch_count.store(0, Ordering::SeqCst);
1313 let f = MetadataFetchFn(&mut fetch);
1314 let mut loader = ParquetMetaDataReader::new()
1315 .with_page_indexes(true)
1316 .with_prefetch_hint(Some(1729));
1317 loader.try_load(f, len).await.unwrap();
1318 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1319 let metadata = loader.finish().unwrap();
1320 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1321
1322 fetch_count.store(0, Ordering::SeqCst);
1324 let f = MetadataFetchFn(&mut fetch);
1325 let mut loader = ParquetMetaDataReader::new()
1326 .with_page_indexes(true)
1327 .with_prefetch_hint(Some(130649));
1328 loader.try_load(f, len).await.unwrap();
1329 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1330 let metadata = loader.finish().unwrap();
1331 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1332
1333 fetch_count.store(0, Ordering::SeqCst);
1335 let f = MetadataFetchFn(&mut fetch);
1336 let metadata = ParquetMetaDataReader::new()
1337 .with_page_indexes(true)
1338 .with_prefetch_hint(Some(130650))
1339 .load_and_finish(f, len)
1340 .await
1341 .unwrap();
1342 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1343 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1344
1345 fetch_count.store(0, Ordering::SeqCst);
1347 let f = MetadataFetchFn(&mut fetch);
1348 let metadata = ParquetMetaDataReader::new()
1349 .with_page_indexes(true)
1350 .with_prefetch_hint(Some((len - 1000) as usize)) .load_and_finish(f, len)
1352 .await
1353 .unwrap();
1354 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1355 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1356
1357 fetch_count.store(0, Ordering::SeqCst);
1359 let f = MetadataFetchFn(&mut fetch);
1360 let metadata = ParquetMetaDataReader::new()
1361 .with_page_indexes(true)
1362 .with_prefetch_hint(Some(len as usize)) .load_and_finish(f, len)
1364 .await
1365 .unwrap();
1366 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1367 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1368
1369 fetch_count.store(0, Ordering::SeqCst);
1371 let f = MetadataFetchFn(&mut fetch);
1372 let metadata = ParquetMetaDataReader::new()
1373 .with_page_indexes(true)
1374 .with_prefetch_hint(Some((len + 1000) as usize)) .load_and_finish(f, len)
1376 .await
1377 .unwrap();
1378 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1379 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1380 }
1381
1382 fn write_parquet_file(offset_index_disabled: bool) -> Result<NamedTempFile> {
1383 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1384 let batch = RecordBatch::try_new(
1385 schema.clone(),
1386 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
1387 )?;
1388
1389 let file = NamedTempFile::new().unwrap();
1390
1391 let props = WriterProperties::builder()
1393 .set_offset_index_disabled(offset_index_disabled)
1394 .build();
1395
1396 let mut writer = ArrowWriter::try_new(file.reopen()?, schema, Some(props))?;
1397 writer.write(&batch)?;
1398 writer.close()?;
1399
1400 Ok(file)
1401 }
1402
1403 fn read_and_check(file: &File, policy: PageIndexPolicy) -> Result<ParquetMetaData> {
1404 let mut reader = ParquetMetaDataReader::new().with_page_index_policy(policy);
1405 reader.try_parse(file)?;
1406 reader.finish()
1407 }
1408
1409 #[test]
1410 fn test_page_index_policy() {
1411 let f = write_parquet_file(false).unwrap();
1413 read_and_check(f.as_file(), PageIndexPolicy::Required).unwrap();
1414 read_and_check(f.as_file(), PageIndexPolicy::Optional).unwrap();
1415 read_and_check(f.as_file(), PageIndexPolicy::Skip).unwrap();
1416
1417 let f = write_parquet_file(true).unwrap();
1419 let res = read_and_check(f.as_file(), PageIndexPolicy::Required);
1420 assert!(matches!(
1421 res,
1422 Err(ParquetError::General(e)) if e == "missing offset index"
1423 ));
1424 read_and_check(f.as_file(), PageIndexPolicy::Optional).unwrap();
1425 read_and_check(f.as_file(), PageIndexPolicy::Skip).unwrap();
1426 }
1427}