1#[cfg(feature = "encryption")]
19use crate::encryption::decrypt::FileDecryptionProperties;
20use crate::errors::{ParquetError, Result};
21use crate::file::FOOTER_SIZE;
22use crate::file::metadata::parser::decode_metadata;
23use crate::file::metadata::{FooterTail, ParquetMetaData, ParquetMetaDataPushDecoder};
24use crate::file::reader::ChunkReader;
25use bytes::Bytes;
26use std::{io::Read, ops::Range};
27
28use crate::DecodeResult;
29#[cfg(all(feature = "async", feature = "arrow"))]
30use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
31
32#[derive(Default, Debug)]
66pub struct ParquetMetaDataReader {
67 metadata: Option<ParquetMetaData>,
68 column_index: PageIndexPolicy,
69 offset_index: PageIndexPolicy,
70 prefetch_hint: Option<usize>,
71 metadata_size: Option<usize>,
74 #[cfg(feature = "encryption")]
75 file_decryption_properties: Option<std::sync::Arc<FileDecryptionProperties>>,
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
80pub enum PageIndexPolicy {
81 #[default]
83 Skip,
84 Optional,
86 Required,
88}
89
90impl From<bool> for PageIndexPolicy {
91 fn from(value: bool) -> Self {
92 match value {
93 true => Self::Required,
94 false => Self::Skip,
95 }
96 }
97}
98
99impl ParquetMetaDataReader {
100 pub fn new() -> Self {
102 Default::default()
103 }
104
105 pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
108 Self {
109 metadata: Some(metadata),
110 ..Default::default()
111 }
112 }
113
114 #[deprecated(since = "56.1.0", note = "Use `with_page_index_policy` instead")]
119 pub fn with_page_indexes(self, val: bool) -> Self {
120 let policy = PageIndexPolicy::from(val);
121 self.with_column_index_policy(policy)
122 .with_offset_index_policy(policy)
123 }
124
125 #[deprecated(since = "56.1.0", note = "Use `with_column_index_policy` instead")]
129 pub fn with_column_indexes(self, val: bool) -> Self {
130 let policy = PageIndexPolicy::from(val);
131 self.with_column_index_policy(policy)
132 }
133
134 #[deprecated(since = "56.1.0", note = "Use `with_offset_index_policy` instead")]
138 pub fn with_offset_indexes(self, val: bool) -> Self {
139 let policy = PageIndexPolicy::from(val);
140 self.with_offset_index_policy(policy)
141 }
142
143 pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
145 self.with_column_index_policy(policy)
146 .with_offset_index_policy(policy)
147 }
148
149 pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
151 self.column_index = policy;
152 self
153 }
154
155 pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
157 self.offset_index = policy;
158 self
159 }
160
161 pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
173 self.prefetch_hint = prefetch;
174 self
175 }
176
177 #[cfg(feature = "encryption")]
181 pub fn with_decryption_properties(
182 mut self,
183 properties: Option<std::sync::Arc<FileDecryptionProperties>>,
184 ) -> Self {
185 self.file_decryption_properties = properties;
186 self
187 }
188
189 pub fn has_metadata(&self) -> bool {
191 self.metadata.is_some()
192 }
193
194 pub fn finish(&mut self) -> Result<ParquetMetaData> {
196 self.metadata
197 .take()
198 .ok_or_else(|| general_err!("could not parse parquet metadata"))
199 }
200
201 pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
220 self.try_parse(reader)?;
221 self.finish()
222 }
223
224 pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
230 self.try_parse_sized(reader, reader.len())
231 }
232
233 pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: u64) -> Result<()> {
306 self.metadata = match self.parse_metadata(reader) {
307 Ok(metadata) => Some(metadata),
308 Err(ParquetError::NeedMoreData(needed)) => {
309 if file_size == reader.len() || needed as u64 > file_size {
312 return Err(eof_err!(
313 "Parquet file too small. Size is {} but need {}",
314 file_size,
315 needed
316 ));
317 } else {
318 return Err(ParquetError::NeedMoreData(needed));
320 }
321 }
322 Err(e) => return Err(e),
323 };
324
325 if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
327 {
328 return Ok(());
329 }
330
331 self.read_page_indexes_sized(reader, file_size)
332 }
333
334 pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
337 self.read_page_indexes_sized(reader, reader.len())
338 }
339
340 pub fn read_page_indexes_sized<R: ChunkReader>(
346 &mut self,
347 reader: &R,
348 file_size: u64,
349 ) -> Result<()> {
350 let Some(metadata) = self.metadata.take() else {
351 return Err(general_err!(
352 "Tried to read page indexes without ParquetMetaData metadata"
353 ));
354 };
355
356 let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
357 .with_offset_index_policy(self.offset_index)
358 .with_column_index_policy(self.column_index);
359 let mut push_decoder = self.prepare_push_decoder(push_decoder);
360
361 let range = match needs_index_data(&mut push_decoder)? {
363 NeedsIndexData::No(metadata) => {
364 self.metadata = Some(metadata);
365 return Ok(());
366 }
367 NeedsIndexData::Yes(range) => range,
368 };
369
370 let file_range = file_size.saturating_sub(reader.len())..file_size;
373 if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
374 if range.end > file_size {
376 return Err(eof_err!(
377 "Parquet file too small. Range {range:?} is beyond file bounds {file_size}",
378 ));
379 } else {
380 return Err(ParquetError::NeedMoreData(
382 (file_size - range.start).try_into()?,
383 ));
384 }
385 }
386
387 if let Some(metadata_size) = self.metadata_size {
390 let metadata_range = file_size.saturating_sub(metadata_size as u64)..file_size;
391 if range.end > metadata_range.start {
392 return Err(eof_err!(
393 "Parquet file too small. Page index range {range:?} overlaps with file metadata {metadata_range:?}",
394 ));
395 }
396 }
397
398 let bytes_needed = usize::try_from(range.end - range.start)?;
400 let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?;
401
402 push_decoder.push_range(range, bytes)?;
403 let metadata = parse_index_data(&mut push_decoder)?;
404 self.metadata = Some(metadata);
405
406 Ok(())
407 }
408
409 #[cfg(all(feature = "async", feature = "arrow"))]
416 pub async fn load_and_finish<F: MetadataFetch>(
417 mut self,
418 fetch: F,
419 file_size: u64,
420 ) -> Result<ParquetMetaData> {
421 self.try_load(fetch, file_size).await?;
422 self.finish()
423 }
424
425 #[cfg(all(feature = "async", feature = "arrow"))]
432 pub async fn load_via_suffix_and_finish<F: MetadataSuffixFetch>(
433 mut self,
434 fetch: F,
435 ) -> Result<ParquetMetaData> {
436 self.try_load_via_suffix(fetch).await?;
437 self.finish()
438 }
439 #[cfg(all(feature = "async", feature = "arrow"))]
445 pub async fn try_load<F: MetadataFetch>(&mut self, mut fetch: F, file_size: u64) -> Result<()> {
446 let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
447
448 self.metadata = Some(metadata);
449
450 if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
452 {
453 return Ok(());
454 }
455
456 self.load_page_index_with_remainder(fetch, remainder).await
457 }
458
459 #[cfg(all(feature = "async", feature = "arrow"))]
465 pub async fn try_load_via_suffix<F: MetadataSuffixFetch>(
466 &mut self,
467 mut fetch: F,
468 ) -> Result<()> {
469 let (metadata, remainder) = self.load_metadata_via_suffix(&mut fetch).await?;
470
471 self.metadata = Some(metadata);
472
473 if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
475 {
476 return Ok(());
477 }
478
479 self.load_page_index_with_remainder(fetch, remainder).await
480 }
481
482 #[cfg(all(feature = "async", feature = "arrow"))]
485 pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
486 self.load_page_index_with_remainder(fetch, None).await
487 }
488
489 #[cfg(all(feature = "async", feature = "arrow"))]
490 async fn load_page_index_with_remainder<F: MetadataFetch>(
491 &mut self,
492 mut fetch: F,
493 remainder: Option<(usize, Bytes)>,
494 ) -> Result<()> {
495 let Some(metadata) = self.metadata.take() else {
496 return Err(general_err!("Footer metadata is not present"));
497 };
498
499 let file_size = u64::MAX;
502 let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
503 .with_offset_index_policy(self.offset_index)
504 .with_column_index_policy(self.column_index);
505 let mut push_decoder = self.prepare_push_decoder(push_decoder);
506
507 let range = match needs_index_data(&mut push_decoder)? {
509 NeedsIndexData::No(metadata) => {
510 self.metadata = Some(metadata);
511 return Ok(());
512 }
513 NeedsIndexData::Yes(range) => range,
514 };
515
516 let bytes = match &remainder {
517 Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => {
518 let remainder_start = *remainder_start as u64;
519 let offset = usize::try_from(range.start - remainder_start)?;
520 let end = usize::try_from(range.end - remainder_start)?;
521 assert!(end <= remainder.len());
522 remainder.slice(offset..end)
523 }
524 _ => fetch.fetch(range.start..range.end).await?,
526 };
527
528 assert_eq!(bytes.len() as u64, range.end - range.start);
530 push_decoder.push_range(range.clone(), bytes)?;
531 let metadata = parse_index_data(&mut push_decoder)?;
532 self.metadata = Some(metadata);
533 Ok(())
534 }
535
536 fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
539 let file_size = chunk_reader.len();
541 if file_size < (FOOTER_SIZE as u64) {
542 return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
543 }
544
545 let mut footer = [0_u8; FOOTER_SIZE];
546 chunk_reader
547 .get_read(file_size - FOOTER_SIZE as u64)?
548 .read_exact(&mut footer)?;
549
550 let footer = FooterTail::try_new(&footer)?;
551 let metadata_len = footer.metadata_length();
552 let footer_metadata_len = FOOTER_SIZE + metadata_len;
553 self.metadata_size = Some(footer_metadata_len);
554
555 if footer_metadata_len as u64 > file_size {
556 return Err(ParquetError::NeedMoreData(footer_metadata_len));
557 }
558
559 let start = file_size - footer_metadata_len as u64;
560 let bytes = chunk_reader.get_bytes(start, metadata_len)?;
561 self.decode_footer_metadata(bytes, file_size, footer)
562 }
563
564 pub fn metadata_size(&self) -> Option<usize> {
567 self.metadata_size
568 }
569
570 #[cfg(all(feature = "async", feature = "arrow"))]
574 fn get_prefetch_size(&self) -> usize {
575 if let Some(prefetch) = self.prefetch_hint {
576 if prefetch > FOOTER_SIZE {
577 return prefetch;
578 }
579 }
580 FOOTER_SIZE
581 }
582
583 #[cfg(all(feature = "async", feature = "arrow"))]
584 async fn load_metadata<F: MetadataFetch>(
585 &self,
586 fetch: &mut F,
587 file_size: u64,
588 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
589 let prefetch = self.get_prefetch_size() as u64;
590
591 if file_size < FOOTER_SIZE as u64 {
592 return Err(eof_err!("file size of {} is less than footer", file_size));
593 }
594
595 let footer_start = file_size.saturating_sub(prefetch);
599
600 let suffix = fetch.fetch(footer_start..file_size).await?;
601 let suffix_len = suffix.len();
602 let fetch_len = (file_size - footer_start)
603 .try_into()
604 .expect("footer size should never be larger than u32");
605 if suffix_len < fetch_len {
606 return Err(eof_err!(
607 "metadata requires {} bytes, but could only read {}",
608 fetch_len,
609 suffix_len
610 ));
611 }
612
613 let mut footer = [0; FOOTER_SIZE];
614 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
615
616 let footer = FooterTail::try_new(&footer)?;
617 let length = footer.metadata_length();
618
619 if file_size < (length + FOOTER_SIZE) as u64 {
620 return Err(eof_err!(
621 "file size of {} is less than footer + metadata {}",
622 file_size,
623 length + FOOTER_SIZE
624 ));
625 }
626
627 if length > suffix_len - FOOTER_SIZE {
629 let metadata_start = file_size - (length + FOOTER_SIZE) as u64;
630 let meta = fetch
631 .fetch(metadata_start..(file_size - FOOTER_SIZE as u64))
632 .await?;
633 Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
634 } else {
635 let metadata_start = (file_size - (length + FOOTER_SIZE) as u64 - footer_start)
636 .try_into()
637 .expect("metadata length should never be larger than u32");
638 let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
639 Ok((
640 self.decode_footer_metadata(slice, file_size, footer)?,
641 Some((footer_start as usize, suffix.slice(..metadata_start))),
642 ))
643 }
644 }
645
646 #[cfg(all(feature = "async", feature = "arrow"))]
647 async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
648 &self,
649 fetch: &mut F,
650 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
651 let prefetch = self.get_prefetch_size();
652
653 let suffix = fetch.fetch_suffix(prefetch as _).await?;
654 let suffix_len = suffix.len();
655
656 if suffix_len < FOOTER_SIZE {
657 return Err(eof_err!(
658 "footer metadata requires {} bytes, but could only read {}",
659 FOOTER_SIZE,
660 suffix_len
661 ));
662 }
663
664 let mut footer = [0; FOOTER_SIZE];
665 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
666
667 let footer = FooterTail::try_new(&footer)?;
668 let length = footer.metadata_length();
669 let file_size = (length + FOOTER_SIZE) as u64;
672
673 let metadata_offset = length + FOOTER_SIZE;
675 if length > suffix_len - FOOTER_SIZE {
676 let meta = fetch.fetch_suffix(metadata_offset).await?;
677
678 if meta.len() < metadata_offset {
679 return Err(eof_err!(
680 "metadata requires {} bytes, but could only read {}",
681 metadata_offset,
682 meta.len()
683 ));
684 }
685
686 let meta = meta.slice(0..length);
688 Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
689 } else {
690 let metadata_start = suffix_len - metadata_offset;
691 let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
692 Ok((
693 self.decode_footer_metadata(slice, file_size, footer)?,
694 Some((0, suffix.slice(..metadata_start))),
695 ))
696 }
697 }
698
699 #[deprecated(since = "57.0.0", note = "Use FooterTail::try_from instead")]
701 pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result<FooterTail> {
702 FooterTail::try_new(slice)
703 }
704
705 #[deprecated(since = "54.3.0", note = "Use decode_footer_tail instead")]
707 pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
708 FooterTail::try_new(slice).map(|f| f.metadata_length())
709 }
710
711 pub(crate) fn decode_footer_metadata(
725 &self,
726 buf: Bytes,
727 file_size: u64,
728 footer_tail: FooterTail,
729 ) -> Result<ParquetMetaData> {
730 let ending_offset = file_size.checked_sub(FOOTER_SIZE as u64).ok_or_else(|| {
735 general_err!(
736 "file size {file_size} is smaller than footer size {}",
737 FOOTER_SIZE
738 )
739 })?;
740
741 let starting_offset = ending_offset.checked_sub(buf.len() as u64).ok_or_else(|| {
742 general_err!(
743 "file size {file_size} is smaller than buffer size {} + footer size {}",
744 buf.len(),
745 FOOTER_SIZE
746 )
747 })?;
748
749 let range = starting_offset..ending_offset;
750
751 let push_decoder =
752 ParquetMetaDataPushDecoder::try_new_with_footer_tail(file_size, footer_tail)?
753 .with_page_index_policy(PageIndexPolicy::Skip);
755
756 let mut push_decoder = self.prepare_push_decoder(push_decoder);
757 push_decoder.push_range(range, buf)?;
758 match push_decoder.try_decode()? {
759 DecodeResult::Data(metadata) => Ok(metadata),
760 DecodeResult::Finished => Err(general_err!(
761 "could not parse parquet metadata -- previously finished"
762 )),
763 DecodeResult::NeedsData(ranges) => Err(general_err!(
764 "could not parse parquet metadata, needs ranges {:?}",
765 ranges
766 )),
767 }
768 }
769
770 #[cfg(feature = "encryption")]
772 fn prepare_push_decoder(
773 &self,
774 push_decoder: ParquetMetaDataPushDecoder,
775 ) -> ParquetMetaDataPushDecoder {
776 push_decoder.with_file_decryption_properties(
777 self.file_decryption_properties
778 .as_ref()
779 .map(std::sync::Arc::clone),
780 )
781 }
782 #[cfg(not(feature = "encryption"))]
783 fn prepare_push_decoder(
784 &self,
785 push_decoder: ParquetMetaDataPushDecoder,
786 ) -> ParquetMetaDataPushDecoder {
787 push_decoder
788 }
789
790 pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
798 decode_metadata(buf)
799 }
800}
801
802#[allow(clippy::large_enum_variant)]
805enum NeedsIndexData {
806 No(ParquetMetaData),
808 Yes(Range<u64>),
810}
811
812fn needs_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> Result<NeedsIndexData> {
815 match push_decoder.try_decode()? {
816 DecodeResult::NeedsData(ranges) => {
817 let range = ranges
818 .into_iter()
819 .reduce(|a, b| a.start.min(b.start)..a.end.max(b.end))
820 .ok_or_else(|| general_err!("Internal error: no ranges provided"))?;
821 Ok(NeedsIndexData::Yes(range))
822 }
823 DecodeResult::Data(metadata) => Ok(NeedsIndexData::No(metadata)),
824 DecodeResult::Finished => Err(general_err!("Internal error: decoder was finished")),
825 }
826}
827
828fn parse_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> Result<ParquetMetaData> {
831 match push_decoder.try_decode()? {
832 DecodeResult::NeedsData(_) => Err(general_err!(
833 "Internal error: decoder still needs data after reading required range"
834 )),
835 DecodeResult::Data(metadata) => Ok(metadata),
836 DecodeResult::Finished => Err(general_err!("Internal error: decoder was finished")),
837 }
838}
839
840#[cfg(test)]
841mod tests {
842 use super::*;
843 use crate::file::reader::Length;
844 use crate::util::test_common::file_util::get_test_file;
845 use std::ops::Range;
846
847 #[test]
848 fn test_parse_metadata_size_smaller_than_footer() {
849 let test_file = tempfile::tempfile().unwrap();
850 let err = ParquetMetaDataReader::new()
851 .parse_metadata(&test_file)
852 .unwrap_err();
853 assert!(matches!(err, ParquetError::NeedMoreData(FOOTER_SIZE)));
854 }
855
856 #[test]
857 fn test_parse_metadata_corrupt_footer() {
858 let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
859 let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
860 assert_eq!(
861 reader_result.unwrap_err().to_string(),
862 "Parquet error: Invalid Parquet file. Corrupt footer"
863 );
864 }
865
866 #[test]
867 fn test_parse_metadata_invalid_start() {
868 let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
869 let err = ParquetMetaDataReader::new()
870 .parse_metadata(&test_file)
871 .unwrap_err();
872 assert!(matches!(err, ParquetError::NeedMoreData(263)));
873 }
874
875 #[test]
876 #[allow(deprecated)]
877 fn test_try_parse() {
878 let file = get_test_file("alltypes_tiny_pages.parquet");
879 let len = file.len();
880
881 let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
882
883 let bytes_for_range = |range: Range<u64>| {
884 file.get_bytes(range.start, (range.end - range.start).try_into().unwrap())
885 .unwrap()
886 };
887
888 let bytes = bytes_for_range(0..len);
890 reader.try_parse(&bytes).unwrap();
891 let metadata = reader.finish().unwrap();
892 assert!(metadata.column_index.is_some());
893 assert!(metadata.offset_index.is_some());
894
895 let bytes = bytes_for_range(320000..len);
897 reader.try_parse_sized(&bytes, len).unwrap();
898 let metadata = reader.finish().unwrap();
899 assert!(metadata.column_index.is_some());
900 assert!(metadata.offset_index.is_some());
901
902 let bytes = bytes_for_range(323583..len);
904 reader.try_parse_sized(&bytes, len).unwrap();
905 let metadata = reader.finish().unwrap();
906 assert!(metadata.column_index.is_some());
907 assert!(metadata.offset_index.is_some());
908
909 let bytes = bytes_for_range(323584..len);
911 match reader.try_parse_sized(&bytes, len).unwrap_err() {
913 ParquetError::NeedMoreData(needed) => {
915 let bytes = bytes_for_range(len - needed as u64..len);
916 reader.try_parse_sized(&bytes, len).unwrap();
917 let metadata = reader.finish().unwrap();
918 assert!(metadata.column_index.is_some());
919 assert!(metadata.offset_index.is_some());
920 }
921 _ => panic!("unexpected error"),
922 };
923
924 let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
926 let mut bytes = bytes_for_range(452505..len);
927 loop {
928 match reader.try_parse_sized(&bytes, len) {
929 Ok(_) => break,
930 Err(ParquetError::NeedMoreData(needed)) => {
931 bytes = bytes_for_range(len - needed as u64..len);
932 if reader.has_metadata() {
933 reader.read_page_indexes_sized(&bytes, len).unwrap();
934 break;
935 }
936 }
937 _ => panic!("unexpected error"),
938 }
939 }
940 let metadata = reader.finish().unwrap();
941 assert!(metadata.column_index.is_some());
942 assert!(metadata.offset_index.is_some());
943
944 let bytes = bytes_for_range(323584..len);
946 let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
947 assert_eq!(
948 reader_result.to_string(),
949 "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
950 );
951
952 let mut reader = ParquetMetaDataReader::new();
954 let bytes = bytes_for_range(452505..len);
955 match reader.try_parse_sized(&bytes, len).unwrap_err() {
957 ParquetError::NeedMoreData(needed) => {
959 let bytes = bytes_for_range(len - needed as u64..len);
960 reader.try_parse_sized(&bytes, len).unwrap();
961 reader.finish().unwrap();
962 }
963 _ => panic!("unexpected error"),
964 };
965
966 let reader_result = reader.try_parse(&bytes).unwrap_err();
968 assert_eq!(
969 reader_result.to_string(),
970 "EOF: Parquet file too small. Size is 1728 but need 1729"
971 );
972
973 let bytes = bytes_for_range(0..1000);
975 let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
976 assert_eq!(
977 reader_result.to_string(),
978 "Parquet error: Invalid Parquet file. Corrupt footer"
979 );
980
981 let bytes = bytes_for_range(452510..len);
983 let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
984 assert_eq!(
985 reader_result.to_string(),
986 "EOF: Parquet file too small. Size is 1728 but need 1729"
987 );
988 }
989}
990
991#[cfg(all(feature = "async", feature = "arrow", test))]
992mod async_tests {
993 use super::*;
994
995 use arrow::{array::Int32Array, datatypes::DataType};
996 use arrow_array::RecordBatch;
997 use arrow_schema::{Field, Schema};
998 use bytes::Bytes;
999 use futures::FutureExt;
1000 use futures::future::BoxFuture;
1001 use std::fs::File;
1002 use std::future::Future;
1003 use std::io::{Read, Seek, SeekFrom};
1004 use std::ops::Range;
1005 use std::sync::Arc;
1006 use std::sync::atomic::{AtomicUsize, Ordering};
1007 use tempfile::NamedTempFile;
1008
1009 use crate::arrow::ArrowWriter;
1010 use crate::file::properties::WriterProperties;
1011 use crate::file::reader::Length;
1012 use crate::util::test_common::file_util::get_test_file;
1013
1014 struct MetadataFetchFn<F>(F);
1015
1016 impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
1017 where
1018 F: FnMut(Range<u64>) -> Fut + Send,
1019 Fut: Future<Output = Result<Bytes>> + Send,
1020 {
1021 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1022 async move { self.0(range).await }.boxed()
1023 }
1024 }
1025
1026 struct MetadataSuffixFetchFn<F1, F2>(F1, F2);
1027
1028 impl<F1, Fut, F2> MetadataFetch for MetadataSuffixFetchFn<F1, F2>
1029 where
1030 F1: FnMut(Range<u64>) -> Fut + Send,
1031 Fut: Future<Output = Result<Bytes>> + Send,
1032 F2: Send,
1033 {
1034 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1035 async move { self.0(range).await }.boxed()
1036 }
1037 }
1038
1039 impl<F1, Fut, F2> MetadataSuffixFetch for MetadataSuffixFetchFn<F1, F2>
1040 where
1041 F1: FnMut(Range<u64>) -> Fut + Send,
1042 F2: FnMut(usize) -> Fut + Send,
1043 Fut: Future<Output = Result<Bytes>> + Send,
1044 {
1045 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
1046 async move { self.1(suffix).await }.boxed()
1047 }
1048 }
1049
1050 fn read_range(file: &mut File, range: Range<u64>) -> Result<Bytes> {
1051 file.seek(SeekFrom::Start(range.start as _))?;
1052 let len = range.end - range.start;
1053 let mut buf = Vec::with_capacity(len.try_into().unwrap());
1054 file.take(len as _).read_to_end(&mut buf)?;
1055 Ok(buf.into())
1056 }
1057
1058 fn read_suffix(file: &mut File, suffix: usize) -> Result<Bytes> {
1059 let file_len = file.len();
1060 file.seek(SeekFrom::End(0 - suffix.min(file_len as _) as i64))?;
1062 let mut buf = Vec::with_capacity(suffix);
1063 file.take(suffix as _).read_to_end(&mut buf)?;
1064 Ok(buf.into())
1065 }
1066
1067 #[tokio::test]
1068 async fn test_simple() {
1069 let mut file = get_test_file("nulls.snappy.parquet");
1070 let len = file.len();
1071
1072 let expected = ParquetMetaDataReader::new()
1073 .parse_and_finish(&file)
1074 .unwrap();
1075 let expected = expected.file_metadata().schema();
1076 let fetch_count = AtomicUsize::new(0);
1077
1078 let mut fetch = |range| {
1079 fetch_count.fetch_add(1, Ordering::SeqCst);
1080 futures::future::ready(read_range(&mut file, range))
1081 };
1082
1083 let input = MetadataFetchFn(&mut fetch);
1084 let actual = ParquetMetaDataReader::new()
1085 .load_and_finish(input, len)
1086 .await
1087 .unwrap();
1088 assert_eq!(actual.file_metadata().schema(), expected);
1089 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1090
1091 fetch_count.store(0, Ordering::SeqCst);
1093 let input = MetadataFetchFn(&mut fetch);
1094 let actual = ParquetMetaDataReader::new()
1095 .with_prefetch_hint(Some(7))
1096 .load_and_finish(input, len)
1097 .await
1098 .unwrap();
1099 assert_eq!(actual.file_metadata().schema(), expected);
1100 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1101
1102 fetch_count.store(0, Ordering::SeqCst);
1104 let input = MetadataFetchFn(&mut fetch);
1105 let actual = ParquetMetaDataReader::new()
1106 .with_prefetch_hint(Some(10))
1107 .load_and_finish(input, len)
1108 .await
1109 .unwrap();
1110 assert_eq!(actual.file_metadata().schema(), expected);
1111 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1112
1113 fetch_count.store(0, Ordering::SeqCst);
1115 let input = MetadataFetchFn(&mut fetch);
1116 let actual = ParquetMetaDataReader::new()
1117 .with_prefetch_hint(Some(500))
1118 .load_and_finish(input, len)
1119 .await
1120 .unwrap();
1121 assert_eq!(actual.file_metadata().schema(), expected);
1122 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1123
1124 fetch_count.store(0, Ordering::SeqCst);
1126 let input = MetadataFetchFn(&mut fetch);
1127 let actual = ParquetMetaDataReader::new()
1128 .with_prefetch_hint(Some(428))
1129 .load_and_finish(input, len)
1130 .await
1131 .unwrap();
1132 assert_eq!(actual.file_metadata().schema(), expected);
1133 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1134
1135 let input = MetadataFetchFn(&mut fetch);
1136 let err = ParquetMetaDataReader::new()
1137 .load_and_finish(input, 4)
1138 .await
1139 .unwrap_err()
1140 .to_string();
1141 assert_eq!(err, "EOF: file size of 4 is less than footer");
1142
1143 let input = MetadataFetchFn(&mut fetch);
1144 let err = ParquetMetaDataReader::new()
1145 .load_and_finish(input, 20)
1146 .await
1147 .unwrap_err()
1148 .to_string();
1149 assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
1150 }
1151
1152 #[tokio::test]
1153 async fn test_suffix() {
1154 let mut file = get_test_file("nulls.snappy.parquet");
1155 let mut file2 = file.try_clone().unwrap();
1156
1157 let expected = ParquetMetaDataReader::new()
1158 .parse_and_finish(&file)
1159 .unwrap();
1160 let expected = expected.file_metadata().schema();
1161 let fetch_count = AtomicUsize::new(0);
1162 let suffix_fetch_count = AtomicUsize::new(0);
1163
1164 let mut fetch = |range| {
1165 fetch_count.fetch_add(1, Ordering::SeqCst);
1166 futures::future::ready(read_range(&mut file, range))
1167 };
1168 let mut suffix_fetch = |suffix| {
1169 suffix_fetch_count.fetch_add(1, Ordering::SeqCst);
1170 futures::future::ready(read_suffix(&mut file2, suffix))
1171 };
1172
1173 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1174 let actual = ParquetMetaDataReader::new()
1175 .load_via_suffix_and_finish(input)
1176 .await
1177 .unwrap();
1178 assert_eq!(actual.file_metadata().schema(), expected);
1179 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1180 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1181
1182 fetch_count.store(0, Ordering::SeqCst);
1184 suffix_fetch_count.store(0, Ordering::SeqCst);
1185 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1186 let actual = ParquetMetaDataReader::new()
1187 .with_prefetch_hint(Some(7))
1188 .load_via_suffix_and_finish(input)
1189 .await
1190 .unwrap();
1191 assert_eq!(actual.file_metadata().schema(), expected);
1192 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1193 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1194
1195 fetch_count.store(0, Ordering::SeqCst);
1197 suffix_fetch_count.store(0, Ordering::SeqCst);
1198 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1199 let actual = ParquetMetaDataReader::new()
1200 .with_prefetch_hint(Some(10))
1201 .load_via_suffix_and_finish(input)
1202 .await
1203 .unwrap();
1204 assert_eq!(actual.file_metadata().schema(), expected);
1205 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1206 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1207
1208 dbg!("test");
1209 fetch_count.store(0, Ordering::SeqCst);
1211 suffix_fetch_count.store(0, Ordering::SeqCst);
1212 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1213 let actual = ParquetMetaDataReader::new()
1214 .with_prefetch_hint(Some(500))
1215 .load_via_suffix_and_finish(input)
1216 .await
1217 .unwrap();
1218 assert_eq!(actual.file_metadata().schema(), expected);
1219 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1220 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1221
1222 fetch_count.store(0, Ordering::SeqCst);
1224 suffix_fetch_count.store(0, Ordering::SeqCst);
1225 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1226 let actual = ParquetMetaDataReader::new()
1227 .with_prefetch_hint(Some(428))
1228 .load_via_suffix_and_finish(input)
1229 .await
1230 .unwrap();
1231 assert_eq!(actual.file_metadata().schema(), expected);
1232 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1233 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1234 }
1235
1236 #[cfg(feature = "encryption")]
1237 #[tokio::test]
1238 async fn test_suffix_with_encryption() {
1239 let mut file = get_test_file("uniform_encryption.parquet.encrypted");
1240 let mut file2 = file.try_clone().unwrap();
1241
1242 let mut fetch = |range| futures::future::ready(read_range(&mut file, range));
1243 let mut suffix_fetch = |suffix| futures::future::ready(read_suffix(&mut file2, suffix));
1244
1245 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1246
1247 let key_code: &[u8] = "0123456789012345".as_bytes();
1248 let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec())
1249 .build()
1250 .unwrap();
1251
1252 let expected = ParquetMetaDataReader::new()
1254 .with_decryption_properties(Some(decryption_properties))
1255 .load_via_suffix_and_finish(input)
1256 .await
1257 .unwrap();
1258 assert_eq!(expected.num_row_groups(), 1);
1259 }
1260
1261 #[tokio::test]
1262 #[allow(deprecated)]
1263 async fn test_page_index() {
1264 let mut file = get_test_file("alltypes_tiny_pages.parquet");
1265 let len = file.len();
1266 let fetch_count = AtomicUsize::new(0);
1267 let mut fetch = |range| {
1268 fetch_count.fetch_add(1, Ordering::SeqCst);
1269 futures::future::ready(read_range(&mut file, range))
1270 };
1271
1272 let f = MetadataFetchFn(&mut fetch);
1273 let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
1274 loader.try_load(f, len).await.unwrap();
1275 assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
1276 let metadata = loader.finish().unwrap();
1277 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1278
1279 fetch_count.store(0, Ordering::SeqCst);
1281 let f = MetadataFetchFn(&mut fetch);
1282 let mut loader = ParquetMetaDataReader::new()
1283 .with_page_indexes(true)
1284 .with_prefetch_hint(Some(1729));
1285 loader.try_load(f, len).await.unwrap();
1286 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1287 let metadata = loader.finish().unwrap();
1288 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1289
1290 fetch_count.store(0, Ordering::SeqCst);
1292 let f = MetadataFetchFn(&mut fetch);
1293 let mut loader = ParquetMetaDataReader::new()
1294 .with_page_indexes(true)
1295 .with_prefetch_hint(Some(130649));
1296 loader.try_load(f, len).await.unwrap();
1297 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1298 let metadata = loader.finish().unwrap();
1299 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1300
1301 fetch_count.store(0, Ordering::SeqCst);
1303 let f = MetadataFetchFn(&mut fetch);
1304 let metadata = ParquetMetaDataReader::new()
1305 .with_page_indexes(true)
1306 .with_prefetch_hint(Some(130650))
1307 .load_and_finish(f, len)
1308 .await
1309 .unwrap();
1310 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1311 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1312
1313 fetch_count.store(0, Ordering::SeqCst);
1315 let f = MetadataFetchFn(&mut fetch);
1316 let metadata = ParquetMetaDataReader::new()
1317 .with_page_indexes(true)
1318 .with_prefetch_hint(Some((len - 1000) as usize)) .load_and_finish(f, len)
1320 .await
1321 .unwrap();
1322 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1323 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1324
1325 fetch_count.store(0, Ordering::SeqCst);
1327 let f = MetadataFetchFn(&mut fetch);
1328 let metadata = ParquetMetaDataReader::new()
1329 .with_page_indexes(true)
1330 .with_prefetch_hint(Some(len as usize)) .load_and_finish(f, len)
1332 .await
1333 .unwrap();
1334 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1335 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1336
1337 fetch_count.store(0, Ordering::SeqCst);
1339 let f = MetadataFetchFn(&mut fetch);
1340 let metadata = ParquetMetaDataReader::new()
1341 .with_page_indexes(true)
1342 .with_prefetch_hint(Some((len + 1000) as usize)) .load_and_finish(f, len)
1344 .await
1345 .unwrap();
1346 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1347 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1348 }
1349
1350 fn write_parquet_file(offset_index_disabled: bool) -> Result<NamedTempFile> {
1351 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1352 let batch = RecordBatch::try_new(
1353 schema.clone(),
1354 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
1355 )?;
1356
1357 let file = NamedTempFile::new().unwrap();
1358
1359 let props = WriterProperties::builder()
1361 .set_offset_index_disabled(offset_index_disabled)
1362 .build();
1363
1364 let mut writer = ArrowWriter::try_new(file.reopen()?, schema, Some(props))?;
1365 writer.write(&batch)?;
1366 writer.close()?;
1367
1368 Ok(file)
1369 }
1370
1371 fn read_and_check(file: &File, policy: PageIndexPolicy) -> Result<ParquetMetaData> {
1372 let mut reader = ParquetMetaDataReader::new().with_page_index_policy(policy);
1373 reader.try_parse(file)?;
1374 reader.finish()
1375 }
1376
1377 #[test]
1378 fn test_page_index_policy() {
1379 let f = write_parquet_file(false).unwrap();
1381 read_and_check(f.as_file(), PageIndexPolicy::Required).unwrap();
1382 read_and_check(f.as_file(), PageIndexPolicy::Optional).unwrap();
1383 read_and_check(f.as_file(), PageIndexPolicy::Skip).unwrap();
1384
1385 let f = write_parquet_file(true).unwrap();
1387 let res = read_and_check(f.as_file(), PageIndexPolicy::Required);
1388 assert!(matches!(
1389 res,
1390 Err(ParquetError::General(e)) if e == "missing offset index"
1391 ));
1392 read_and_check(f.as_file(), PageIndexPolicy::Optional).unwrap();
1393 read_and_check(f.as_file(), PageIndexPolicy::Skip).unwrap();
1394 }
1395}