1#[cfg(feature = "encryption")]
19use crate::encryption::decrypt::FileDecryptionProperties;
20use crate::errors::{ParquetError, Result};
21use crate::file::FOOTER_SIZE;
22use crate::file::metadata::parser::decode_metadata;
23use crate::file::metadata::thrift::parquet_schema_from_bytes;
24use crate::file::metadata::{
25 FooterTail, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataPushDecoder,
26};
27use crate::file::reader::ChunkReader;
28use crate::schema::types::SchemaDescriptor;
29use bytes::Bytes;
30use std::sync::Arc;
31use std::{io::Read, ops::Range};
32
33use crate::DecodeResult;
34#[cfg(all(feature = "async", feature = "arrow"))]
35use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
36
37#[derive(Default, Debug)]
71pub struct ParquetMetaDataReader {
72 metadata: Option<ParquetMetaData>,
73 column_index: PageIndexPolicy,
74 offset_index: PageIndexPolicy,
75 prefetch_hint: Option<usize>,
76 metadata_options: Option<Arc<ParquetMetaDataOptions>>,
77 metadata_size: Option<usize>,
80 #[cfg(feature = "encryption")]
81 file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
86pub enum PageIndexPolicy {
87 #[default]
89 Skip,
90 Optional,
92 Required,
94}
95
96impl From<bool> for PageIndexPolicy {
97 fn from(value: bool) -> Self {
98 match value {
99 true => Self::Required,
100 false => Self::Skip,
101 }
102 }
103}
104
105impl ParquetMetaDataReader {
106 pub fn new() -> Self {
108 Default::default()
109 }
110
111 pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
114 Self {
115 metadata: Some(metadata),
116 ..Default::default()
117 }
118 }
119
120 pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
122 self.with_column_index_policy(policy)
123 .with_offset_index_policy(policy)
124 }
125
126 pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
128 self.column_index = policy;
129 self
130 }
131
132 pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
134 self.offset_index = policy;
135 self
136 }
137
138 pub fn with_metadata_options(mut self, options: Option<ParquetMetaDataOptions>) -> Self {
140 self.metadata_options = options.map(Arc::new);
141 self
142 }
143
144 pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
156 self.prefetch_hint = prefetch;
157 self
158 }
159
160 #[cfg(feature = "encryption")]
164 pub fn with_decryption_properties(
165 mut self,
166 properties: Option<std::sync::Arc<FileDecryptionProperties>>,
167 ) -> Self {
168 self.file_decryption_properties = properties;
169 self
170 }
171
172 pub fn has_metadata(&self) -> bool {
174 self.metadata.is_some()
175 }
176
177 pub fn finish(&mut self) -> Result<ParquetMetaData> {
179 self.metadata
180 .take()
181 .ok_or_else(|| general_err!("could not parse parquet metadata"))
182 }
183
184 pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
203 self.try_parse(reader)?;
204 self.finish()
205 }
206
207 pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
213 self.try_parse_sized(reader, reader.len())
214 }
215
216 pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: u64) -> Result<()> {
289 self.metadata = match self.parse_metadata(reader) {
290 Ok(metadata) => Some(metadata),
291 Err(ParquetError::NeedMoreData(needed)) => {
292 if file_size == reader.len() || needed as u64 > file_size {
295 return Err(eof_err!(
296 "Parquet file too small. Size is {} but need {}",
297 file_size,
298 needed
299 ));
300 } else {
301 return Err(ParquetError::NeedMoreData(needed));
303 }
304 }
305 Err(e) => return Err(e),
306 };
307
308 if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
310 {
311 return Ok(());
312 }
313
314 self.read_page_indexes_sized(reader, file_size)
315 }
316
317 pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
320 self.read_page_indexes_sized(reader, reader.len())
321 }
322
323 pub fn read_page_indexes_sized<R: ChunkReader>(
329 &mut self,
330 reader: &R,
331 file_size: u64,
332 ) -> Result<()> {
333 let Some(metadata) = self.metadata.take() else {
334 return Err(general_err!(
335 "Tried to read page indexes without ParquetMetaData metadata"
336 ));
337 };
338
339 let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
340 .with_offset_index_policy(self.offset_index)
341 .with_column_index_policy(self.column_index)
342 .with_metadata_options(self.metadata_options.clone());
343 let mut push_decoder = self.prepare_push_decoder(push_decoder);
344
345 let range = match needs_index_data(&mut push_decoder)? {
347 NeedsIndexData::No(metadata) => {
348 self.metadata = Some(metadata);
349 return Ok(());
350 }
351 NeedsIndexData::Yes(range) => range,
352 };
353
354 let file_range = file_size.saturating_sub(reader.len())..file_size;
357 if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
358 if range.end > file_size {
360 return Err(eof_err!(
361 "Parquet file too small. Range {range:?} is beyond file bounds {file_size}",
362 ));
363 } else {
364 return Err(ParquetError::NeedMoreData(
366 (file_size - range.start).try_into()?,
367 ));
368 }
369 }
370
371 if let Some(metadata_size) = self.metadata_size {
374 let metadata_range = file_size.saturating_sub(metadata_size as u64)..file_size;
375 if range.end > metadata_range.start {
376 return Err(eof_err!(
377 "Parquet file too small. Page index range {range:?} overlaps with file metadata {metadata_range:?}",
378 ));
379 }
380 }
381
382 let bytes_needed = usize::try_from(range.end - range.start)?;
384 let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?;
385
386 push_decoder.push_range(range, bytes)?;
387 let metadata = parse_index_data(&mut push_decoder)?;
388 self.metadata = Some(metadata);
389
390 Ok(())
391 }
392
393 #[cfg(all(feature = "async", feature = "arrow"))]
400 pub async fn load_and_finish<F: MetadataFetch>(
401 mut self,
402 fetch: F,
403 file_size: u64,
404 ) -> Result<ParquetMetaData> {
405 self.try_load(fetch, file_size).await?;
406 self.finish()
407 }
408
409 #[cfg(all(feature = "async", feature = "arrow"))]
416 pub async fn load_via_suffix_and_finish<F: MetadataSuffixFetch>(
417 mut self,
418 fetch: F,
419 ) -> Result<ParquetMetaData> {
420 self.try_load_via_suffix(fetch).await?;
421 self.finish()
422 }
423 #[cfg(all(feature = "async", feature = "arrow"))]
429 pub async fn try_load<F: MetadataFetch>(&mut self, mut fetch: F, file_size: u64) -> Result<()> {
430 let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
431
432 self.metadata = Some(metadata);
433
434 if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
436 {
437 return Ok(());
438 }
439
440 self.load_page_index_with_remainder(fetch, remainder).await
441 }
442
443 #[cfg(all(feature = "async", feature = "arrow"))]
449 pub async fn try_load_via_suffix<F: MetadataSuffixFetch>(
450 &mut self,
451 mut fetch: F,
452 ) -> Result<()> {
453 let (metadata, remainder) = self.load_metadata_via_suffix(&mut fetch).await?;
454
455 self.metadata = Some(metadata);
456
457 if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
459 {
460 return Ok(());
461 }
462
463 self.load_page_index_with_remainder(fetch, remainder).await
464 }
465
466 #[cfg(all(feature = "async", feature = "arrow"))]
469 pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
470 self.load_page_index_with_remainder(fetch, None).await
471 }
472
473 #[cfg(all(feature = "async", feature = "arrow"))]
474 async fn load_page_index_with_remainder<F: MetadataFetch>(
475 &mut self,
476 mut fetch: F,
477 remainder: Option<(usize, Bytes)>,
478 ) -> Result<()> {
479 let Some(metadata) = self.metadata.take() else {
480 return Err(general_err!("Footer metadata is not present"));
481 };
482
483 let file_size = u64::MAX;
486 let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
487 .with_offset_index_policy(self.offset_index)
488 .with_column_index_policy(self.column_index)
489 .with_metadata_options(self.metadata_options.clone());
490 let mut push_decoder = self.prepare_push_decoder(push_decoder);
491
492 let range = match needs_index_data(&mut push_decoder)? {
494 NeedsIndexData::No(metadata) => {
495 self.metadata = Some(metadata);
496 return Ok(());
497 }
498 NeedsIndexData::Yes(range) => range,
499 };
500
501 let bytes = match &remainder {
502 Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => {
503 let remainder_start = *remainder_start as u64;
504 let offset = usize::try_from(range.start - remainder_start)?;
505 let end = usize::try_from(range.end - remainder_start)?;
506 if end > remainder.len() {
507 return Err(general_err!(
508 "Corrupted parquet file: index data range ({:?}) exceeds remainder length ({})",
509 range,
510 remainder.len()
511 ));
512 }
513 remainder.slice(offset..end)
514 }
515 _ => fetch.fetch(range.start..range.end).await?,
517 };
518
519 if bytes.len() as u64 != range.end - range.start {
521 return Err(general_err!(
522 "Corrupted parquet file: index data length mismatch, expected {}, got {}",
523 range.end - range.start,
524 bytes.len()
525 ));
526 }
527 push_decoder.push_range(range.clone(), bytes)?;
528 let metadata = parse_index_data(&mut push_decoder)?;
529 self.metadata = Some(metadata);
530 Ok(())
531 }
532
533 fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
536 let file_size = chunk_reader.len();
538 if file_size < (FOOTER_SIZE as u64) {
539 return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
540 }
541
542 let mut footer = [0_u8; FOOTER_SIZE];
543 chunk_reader
544 .get_read(file_size - FOOTER_SIZE as u64)?
545 .read_exact(&mut footer)?;
546
547 let footer = FooterTail::try_new(&footer)?;
548 let metadata_len = footer.metadata_length();
549 let footer_metadata_len = FOOTER_SIZE + metadata_len;
550 self.metadata_size = Some(footer_metadata_len);
551
552 if footer_metadata_len as u64 > file_size {
553 return Err(ParquetError::NeedMoreData(footer_metadata_len));
554 }
555
556 let start = file_size - footer_metadata_len as u64;
557 let bytes = chunk_reader.get_bytes(start, metadata_len)?;
558 self.decode_footer_metadata(bytes, file_size, footer)
559 }
560
561 pub fn metadata_size(&self) -> Option<usize> {
564 self.metadata_size
565 }
566
567 #[cfg(all(feature = "async", feature = "arrow"))]
571 fn get_prefetch_size(&self) -> usize {
572 if let Some(prefetch) = self.prefetch_hint {
573 if prefetch > FOOTER_SIZE {
574 return prefetch;
575 }
576 }
577 FOOTER_SIZE
578 }
579
580 #[cfg(all(feature = "async", feature = "arrow"))]
581 async fn load_metadata<F: MetadataFetch>(
582 &self,
583 fetch: &mut F,
584 file_size: u64,
585 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
586 let prefetch = self.get_prefetch_size() as u64;
587
588 if file_size < FOOTER_SIZE as u64 {
589 return Err(eof_err!("file size of {} is less than footer", file_size));
590 }
591
592 let footer_start = file_size.saturating_sub(prefetch);
596
597 let suffix = fetch.fetch(footer_start..file_size).await?;
598 let suffix_len = suffix.len();
599 let fetch_len = (file_size - footer_start)
600 .try_into()
601 .expect("footer size should never be larger than u32");
602 if suffix_len < fetch_len {
603 return Err(eof_err!(
604 "metadata requires {} bytes, but could only read {}",
605 fetch_len,
606 suffix_len
607 ));
608 }
609
610 let mut footer = [0; FOOTER_SIZE];
611 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
612
613 let footer = FooterTail::try_new(&footer)?;
614 let length = footer.metadata_length();
615
616 if file_size < (length + FOOTER_SIZE) as u64 {
617 return Err(eof_err!(
618 "file size of {} is less than footer + metadata {}",
619 file_size,
620 length + FOOTER_SIZE
621 ));
622 }
623
624 if length > suffix_len - FOOTER_SIZE {
626 let metadata_start = file_size - (length + FOOTER_SIZE) as u64;
627 let meta = fetch
628 .fetch(metadata_start..(file_size - FOOTER_SIZE as u64))
629 .await?;
630 Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
631 } else {
632 let metadata_start = (file_size - (length + FOOTER_SIZE) as u64 - footer_start)
633 .try_into()
634 .expect("metadata length should never be larger than u32");
635 let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
636 Ok((
637 self.decode_footer_metadata(slice, file_size, footer)?,
638 Some((footer_start as usize, suffix.slice(..metadata_start))),
639 ))
640 }
641 }
642
643 #[cfg(all(feature = "async", feature = "arrow"))]
644 async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
645 &self,
646 fetch: &mut F,
647 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
648 let prefetch = self.get_prefetch_size();
649
650 let suffix = fetch.fetch_suffix(prefetch as _).await?;
651 let suffix_len = suffix.len();
652
653 if suffix_len < FOOTER_SIZE {
654 return Err(eof_err!(
655 "footer metadata requires {} bytes, but could only read {}",
656 FOOTER_SIZE,
657 suffix_len
658 ));
659 }
660
661 let mut footer = [0; FOOTER_SIZE];
662 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
663
664 let footer = FooterTail::try_new(&footer)?;
665 let length = footer.metadata_length();
666 let file_size = (length + FOOTER_SIZE) as u64;
669
670 let metadata_offset = length + FOOTER_SIZE;
672 if length > suffix_len - FOOTER_SIZE {
673 let meta = fetch.fetch_suffix(metadata_offset).await?;
674
675 if meta.len() < metadata_offset {
676 return Err(eof_err!(
677 "metadata requires {} bytes, but could only read {}",
678 metadata_offset,
679 meta.len()
680 ));
681 }
682
683 let meta = meta.slice(0..length);
685 Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
686 } else {
687 let metadata_start = suffix_len - metadata_offset;
688 let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
689 Ok((
690 self.decode_footer_metadata(slice, file_size, footer)?,
691 Some((0, suffix.slice(..metadata_start))),
692 ))
693 }
694 }
695
696 pub(crate) fn decode_footer_metadata(
710 &self,
711 buf: Bytes,
712 file_size: u64,
713 footer_tail: FooterTail,
714 ) -> Result<ParquetMetaData> {
715 let ending_offset = file_size.checked_sub(FOOTER_SIZE as u64).ok_or_else(|| {
720 general_err!(
721 "file size {file_size} is smaller than footer size {}",
722 FOOTER_SIZE
723 )
724 })?;
725
726 let starting_offset = ending_offset.checked_sub(buf.len() as u64).ok_or_else(|| {
727 general_err!(
728 "file size {file_size} is smaller than buffer size {} + footer size {}",
729 buf.len(),
730 FOOTER_SIZE
731 )
732 })?;
733
734 let range = starting_offset..ending_offset;
735
736 let push_decoder =
737 ParquetMetaDataPushDecoder::try_new_with_footer_tail(file_size, footer_tail)?
738 .with_page_index_policy(PageIndexPolicy::Skip)
740 .with_metadata_options(self.metadata_options.clone());
741
742 let mut push_decoder = self.prepare_push_decoder(push_decoder);
743 push_decoder.push_range(range, buf)?;
744 match push_decoder.try_decode()? {
745 DecodeResult::Data(metadata) => Ok(metadata),
746 DecodeResult::Finished => Err(general_err!(
747 "could not parse parquet metadata -- previously finished"
748 )),
749 DecodeResult::NeedsData(ranges) => Err(general_err!(
750 "could not parse parquet metadata, needs ranges {:?}",
751 ranges
752 )),
753 }
754 }
755
756 #[cfg(feature = "encryption")]
758 fn prepare_push_decoder(
759 &self,
760 push_decoder: ParquetMetaDataPushDecoder,
761 ) -> ParquetMetaDataPushDecoder {
762 push_decoder.with_file_decryption_properties(
763 self.file_decryption_properties
764 .as_ref()
765 .map(std::sync::Arc::clone),
766 )
767 }
768 #[cfg(not(feature = "encryption"))]
769 fn prepare_push_decoder(
770 &self,
771 push_decoder: ParquetMetaDataPushDecoder,
772 ) -> ParquetMetaDataPushDecoder {
773 push_decoder
774 }
775
776 pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
784 decode_metadata(buf, None)
785 }
786
787 pub fn decode_metadata_with_options(
792 buf: &[u8],
793 options: Option<&ParquetMetaDataOptions>,
794 ) -> Result<ParquetMetaData> {
795 decode_metadata(buf, options)
796 }
797
798 pub fn decode_schema(buf: &[u8]) -> Result<Arc<SchemaDescriptor>> {
801 Ok(Arc::new(parquet_schema_from_bytes(buf)?))
802 }
803}
804
805#[allow(clippy::large_enum_variant)]
808enum NeedsIndexData {
809 No(ParquetMetaData),
811 Yes(Range<u64>),
813}
814
815fn needs_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> Result<NeedsIndexData> {
818 match push_decoder.try_decode()? {
819 DecodeResult::NeedsData(ranges) => {
820 let range = ranges
821 .into_iter()
822 .reduce(|a, b| a.start.min(b.start)..a.end.max(b.end))
823 .ok_or_else(|| general_err!("Internal error: no ranges provided"))?;
824 Ok(NeedsIndexData::Yes(range))
825 }
826 DecodeResult::Data(metadata) => Ok(NeedsIndexData::No(metadata)),
827 DecodeResult::Finished => Err(general_err!("Internal error: decoder was finished")),
828 }
829}
830
831fn parse_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> Result<ParquetMetaData> {
834 match push_decoder.try_decode()? {
835 DecodeResult::NeedsData(_) => Err(general_err!(
836 "Internal error: decoder still needs data after reading required range"
837 )),
838 DecodeResult::Data(metadata) => Ok(metadata),
839 DecodeResult::Finished => Err(general_err!("Internal error: decoder was finished")),
840 }
841}
842
843#[cfg(test)]
844mod tests {
845 use super::*;
846 use crate::file::reader::Length;
847 use crate::util::test_common::file_util::get_test_file;
848 use std::ops::Range;
849
850 #[test]
851 fn test_parse_metadata_size_smaller_than_footer() {
852 let test_file = tempfile::tempfile().unwrap();
853 let err = ParquetMetaDataReader::new()
854 .parse_metadata(&test_file)
855 .unwrap_err();
856 assert!(matches!(err, ParquetError::NeedMoreData(FOOTER_SIZE)));
857 }
858
859 #[test]
860 fn test_parse_metadata_corrupt_footer() {
861 let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
862 let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
863 assert_eq!(
864 reader_result.unwrap_err().to_string(),
865 "Parquet error: Invalid Parquet file. Corrupt footer"
866 );
867 }
868
869 #[test]
870 fn test_parse_metadata_invalid_start() {
871 let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
872 let err = ParquetMetaDataReader::new()
873 .parse_metadata(&test_file)
874 .unwrap_err();
875 assert!(matches!(err, ParquetError::NeedMoreData(263)));
876 }
877
878 #[test]
879 fn test_try_parse() {
880 let file = get_test_file("alltypes_tiny_pages.parquet");
881 let len = file.len();
882
883 let mut reader =
884 ParquetMetaDataReader::new().with_page_index_policy(PageIndexPolicy::Required);
885
886 let bytes_for_range = |range: Range<u64>| {
887 file.get_bytes(range.start, (range.end - range.start).try_into().unwrap())
888 .unwrap()
889 };
890
891 let bytes = bytes_for_range(0..len);
893 reader.try_parse(&bytes).unwrap();
894 let metadata = reader.finish().unwrap();
895 assert!(metadata.column_index.is_some());
896 assert!(metadata.offset_index.is_some());
897
898 let bytes = bytes_for_range(320000..len);
900 reader.try_parse_sized(&bytes, len).unwrap();
901 let metadata = reader.finish().unwrap();
902 assert!(metadata.column_index.is_some());
903 assert!(metadata.offset_index.is_some());
904
905 let bytes = bytes_for_range(323583..len);
907 reader.try_parse_sized(&bytes, len).unwrap();
908 let metadata = reader.finish().unwrap();
909 assert!(metadata.column_index.is_some());
910 assert!(metadata.offset_index.is_some());
911
912 let bytes = bytes_for_range(323584..len);
914 match reader.try_parse_sized(&bytes, len).unwrap_err() {
916 ParquetError::NeedMoreData(needed) => {
918 let bytes = bytes_for_range(len - needed as u64..len);
919 reader.try_parse_sized(&bytes, len).unwrap();
920 let metadata = reader.finish().unwrap();
921 assert!(metadata.column_index.is_some());
922 assert!(metadata.offset_index.is_some());
923 }
924 _ => panic!("unexpected error"),
925 };
926
927 let mut reader =
929 ParquetMetaDataReader::new().with_page_index_policy(PageIndexPolicy::Required);
930 let mut bytes = bytes_for_range(452505..len);
931 loop {
932 match reader.try_parse_sized(&bytes, len) {
933 Ok(_) => break,
934 Err(ParquetError::NeedMoreData(needed)) => {
935 bytes = bytes_for_range(len - needed as u64..len);
936 if reader.has_metadata() {
937 reader.read_page_indexes_sized(&bytes, len).unwrap();
938 break;
939 }
940 }
941 _ => panic!("unexpected error"),
942 }
943 }
944 let metadata = reader.finish().unwrap();
945 assert!(metadata.column_index.is_some());
946 assert!(metadata.offset_index.is_some());
947
948 let bytes = bytes_for_range(323584..len);
950 let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
951 assert_eq!(
952 reader_result.to_string(),
953 "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
954 );
955
956 let mut reader = ParquetMetaDataReader::new();
958 let bytes = bytes_for_range(452505..len);
959 match reader.try_parse_sized(&bytes, len).unwrap_err() {
961 ParquetError::NeedMoreData(needed) => {
963 let bytes = bytes_for_range(len - needed as u64..len);
964 reader.try_parse_sized(&bytes, len).unwrap();
965 reader.finish().unwrap();
966 }
967 _ => panic!("unexpected error"),
968 };
969
970 let reader_result = reader.try_parse(&bytes).unwrap_err();
972 assert_eq!(
973 reader_result.to_string(),
974 "EOF: Parquet file too small. Size is 1728 but need 1729"
975 );
976
977 let bytes = bytes_for_range(0..1000);
979 let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
980 assert_eq!(
981 reader_result.to_string(),
982 "Parquet error: Invalid Parquet file. Corrupt footer"
983 );
984
985 let bytes = bytes_for_range(452510..len);
987 let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
988 assert_eq!(
989 reader_result.to_string(),
990 "EOF: Parquet file too small. Size is 1728 but need 1729"
991 );
992 }
993}
994
995#[cfg(all(feature = "async", feature = "arrow", test))]
996mod async_tests {
997 use super::*;
998
999 use arrow::{array::Int32Array, datatypes::DataType};
1000 use arrow_array::RecordBatch;
1001 use arrow_schema::{Field, Schema};
1002 use bytes::Bytes;
1003 use futures::FutureExt;
1004 use futures::future::BoxFuture;
1005 use std::fs::File;
1006 use std::future::Future;
1007 use std::io::{Read, Seek, SeekFrom};
1008 use std::ops::Range;
1009 use std::sync::Arc;
1010 use std::sync::atomic::{AtomicUsize, Ordering};
1011 use tempfile::NamedTempFile;
1012
1013 use crate::arrow::ArrowWriter;
1014 use crate::file::properties::WriterProperties;
1015 use crate::file::reader::Length;
1016 use crate::util::test_common::file_util::get_test_file;
1017
1018 struct MetadataFetchFn<F>(F);
1019
1020 impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
1021 where
1022 F: FnMut(Range<u64>) -> Fut + Send,
1023 Fut: Future<Output = Result<Bytes>> + Send,
1024 {
1025 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1026 async move { self.0(range).await }.boxed()
1027 }
1028 }
1029
1030 struct MetadataSuffixFetchFn<F1, F2>(F1, F2);
1031
1032 impl<F1, Fut, F2> MetadataFetch for MetadataSuffixFetchFn<F1, F2>
1033 where
1034 F1: FnMut(Range<u64>) -> Fut + Send,
1035 Fut: Future<Output = Result<Bytes>> + Send,
1036 F2: Send,
1037 {
1038 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1039 async move { self.0(range).await }.boxed()
1040 }
1041 }
1042
1043 impl<F1, Fut, F2> MetadataSuffixFetch for MetadataSuffixFetchFn<F1, F2>
1044 where
1045 F1: FnMut(Range<u64>) -> Fut + Send,
1046 F2: FnMut(usize) -> Fut + Send,
1047 Fut: Future<Output = Result<Bytes>> + Send,
1048 {
1049 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
1050 async move { self.1(suffix).await }.boxed()
1051 }
1052 }
1053
1054 fn read_range(file: &mut File, range: Range<u64>) -> Result<Bytes> {
1055 file.seek(SeekFrom::Start(range.start as _))?;
1056 let len = range.end - range.start;
1057 let mut buf = Vec::with_capacity(len.try_into().unwrap());
1058 file.take(len as _).read_to_end(&mut buf)?;
1059 Ok(buf.into())
1060 }
1061
1062 fn read_suffix(file: &mut File, suffix: usize) -> Result<Bytes> {
1063 let file_len = file.len();
1064 file.seek(SeekFrom::End(0 - suffix.min(file_len as _) as i64))?;
1066 let mut buf = Vec::with_capacity(suffix);
1067 file.take(suffix as _).read_to_end(&mut buf)?;
1068 Ok(buf.into())
1069 }
1070
1071 #[tokio::test]
1072 async fn test_simple() {
1073 let mut file = get_test_file("nulls.snappy.parquet");
1074 let len = file.len();
1075
1076 let expected = ParquetMetaDataReader::new()
1077 .parse_and_finish(&file)
1078 .unwrap();
1079 let expected = expected.file_metadata().schema();
1080 let fetch_count = AtomicUsize::new(0);
1081
1082 let mut fetch = |range| {
1083 fetch_count.fetch_add(1, Ordering::SeqCst);
1084 futures::future::ready(read_range(&mut file, range))
1085 };
1086
1087 let input = MetadataFetchFn(&mut fetch);
1088 let actual = ParquetMetaDataReader::new()
1089 .load_and_finish(input, len)
1090 .await
1091 .unwrap();
1092 assert_eq!(actual.file_metadata().schema(), expected);
1093 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1094
1095 fetch_count.store(0, Ordering::SeqCst);
1097 let input = MetadataFetchFn(&mut fetch);
1098 let actual = ParquetMetaDataReader::new()
1099 .with_prefetch_hint(Some(7))
1100 .load_and_finish(input, len)
1101 .await
1102 .unwrap();
1103 assert_eq!(actual.file_metadata().schema(), expected);
1104 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1105
1106 fetch_count.store(0, Ordering::SeqCst);
1108 let input = MetadataFetchFn(&mut fetch);
1109 let actual = ParquetMetaDataReader::new()
1110 .with_prefetch_hint(Some(10))
1111 .load_and_finish(input, len)
1112 .await
1113 .unwrap();
1114 assert_eq!(actual.file_metadata().schema(), expected);
1115 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1116
1117 fetch_count.store(0, Ordering::SeqCst);
1119 let input = MetadataFetchFn(&mut fetch);
1120 let actual = ParquetMetaDataReader::new()
1121 .with_prefetch_hint(Some(500))
1122 .load_and_finish(input, len)
1123 .await
1124 .unwrap();
1125 assert_eq!(actual.file_metadata().schema(), expected);
1126 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1127
1128 fetch_count.store(0, Ordering::SeqCst);
1130 let input = MetadataFetchFn(&mut fetch);
1131 let actual = ParquetMetaDataReader::new()
1132 .with_prefetch_hint(Some(428))
1133 .load_and_finish(input, len)
1134 .await
1135 .unwrap();
1136 assert_eq!(actual.file_metadata().schema(), expected);
1137 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1138
1139 let input = MetadataFetchFn(&mut fetch);
1140 let err = ParquetMetaDataReader::new()
1141 .load_and_finish(input, 4)
1142 .await
1143 .unwrap_err()
1144 .to_string();
1145 assert_eq!(err, "EOF: file size of 4 is less than footer");
1146
1147 let input = MetadataFetchFn(&mut fetch);
1148 let err = ParquetMetaDataReader::new()
1149 .load_and_finish(input, 20)
1150 .await
1151 .unwrap_err()
1152 .to_string();
1153 assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
1154 }
1155
1156 #[tokio::test]
1157 async fn test_suffix() {
1158 let mut file = get_test_file("nulls.snappy.parquet");
1159 let mut file2 = file.try_clone().unwrap();
1160
1161 let expected = ParquetMetaDataReader::new()
1162 .parse_and_finish(&file)
1163 .unwrap();
1164 let expected = expected.file_metadata().schema();
1165 let fetch_count = AtomicUsize::new(0);
1166 let suffix_fetch_count = AtomicUsize::new(0);
1167
1168 let mut fetch = |range| {
1169 fetch_count.fetch_add(1, Ordering::SeqCst);
1170 futures::future::ready(read_range(&mut file, range))
1171 };
1172 let mut suffix_fetch = |suffix| {
1173 suffix_fetch_count.fetch_add(1, Ordering::SeqCst);
1174 futures::future::ready(read_suffix(&mut file2, suffix))
1175 };
1176
1177 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1178 let actual = ParquetMetaDataReader::new()
1179 .load_via_suffix_and_finish(input)
1180 .await
1181 .unwrap();
1182 assert_eq!(actual.file_metadata().schema(), expected);
1183 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1184 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1185
1186 fetch_count.store(0, Ordering::SeqCst);
1188 suffix_fetch_count.store(0, Ordering::SeqCst);
1189 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1190 let actual = ParquetMetaDataReader::new()
1191 .with_prefetch_hint(Some(7))
1192 .load_via_suffix_and_finish(input)
1193 .await
1194 .unwrap();
1195 assert_eq!(actual.file_metadata().schema(), expected);
1196 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1197 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1198
1199 fetch_count.store(0, Ordering::SeqCst);
1201 suffix_fetch_count.store(0, Ordering::SeqCst);
1202 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1203 let actual = ParquetMetaDataReader::new()
1204 .with_prefetch_hint(Some(10))
1205 .load_via_suffix_and_finish(input)
1206 .await
1207 .unwrap();
1208 assert_eq!(actual.file_metadata().schema(), expected);
1209 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1210 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1211
1212 dbg!("test");
1213 fetch_count.store(0, Ordering::SeqCst);
1215 suffix_fetch_count.store(0, Ordering::SeqCst);
1216 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1217 let actual = ParquetMetaDataReader::new()
1218 .with_prefetch_hint(Some(500))
1219 .load_via_suffix_and_finish(input)
1220 .await
1221 .unwrap();
1222 assert_eq!(actual.file_metadata().schema(), expected);
1223 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1224 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1225
1226 fetch_count.store(0, Ordering::SeqCst);
1228 suffix_fetch_count.store(0, Ordering::SeqCst);
1229 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1230 let actual = ParquetMetaDataReader::new()
1231 .with_prefetch_hint(Some(428))
1232 .load_via_suffix_and_finish(input)
1233 .await
1234 .unwrap();
1235 assert_eq!(actual.file_metadata().schema(), expected);
1236 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1237 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1238 }
1239
1240 #[cfg(feature = "encryption")]
1241 #[tokio::test]
1242 async fn test_suffix_with_encryption() {
1243 let mut file = get_test_file("uniform_encryption.parquet.encrypted");
1244 let mut file2 = file.try_clone().unwrap();
1245
1246 let mut fetch = |range| futures::future::ready(read_range(&mut file, range));
1247 let mut suffix_fetch = |suffix| futures::future::ready(read_suffix(&mut file2, suffix));
1248
1249 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1250
1251 let key_code: &[u8] = "0123456789012345".as_bytes();
1252 let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec())
1253 .build()
1254 .unwrap();
1255
1256 let expected = ParquetMetaDataReader::new()
1258 .with_decryption_properties(Some(decryption_properties))
1259 .load_via_suffix_and_finish(input)
1260 .await
1261 .unwrap();
1262 assert_eq!(expected.num_row_groups(), 1);
1263 }
1264
1265 #[tokio::test]
1266 async fn test_page_index() {
1267 let mut file = get_test_file("alltypes_tiny_pages.parquet");
1268 let len = file.len();
1269 let fetch_count = AtomicUsize::new(0);
1270 let mut fetch = |range| {
1271 fetch_count.fetch_add(1, Ordering::SeqCst);
1272 futures::future::ready(read_range(&mut file, range))
1273 };
1274
1275 let f = MetadataFetchFn(&mut fetch);
1276 let mut loader =
1277 ParquetMetaDataReader::new().with_page_index_policy(PageIndexPolicy::Required);
1278 loader.try_load(f, len).await.unwrap();
1279 assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
1280 let metadata = loader.finish().unwrap();
1281 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1282
1283 fetch_count.store(0, Ordering::SeqCst);
1285 let f = MetadataFetchFn(&mut fetch);
1286 let mut loader = ParquetMetaDataReader::new()
1287 .with_page_index_policy(PageIndexPolicy::Required)
1288 .with_prefetch_hint(Some(1729));
1289 loader.try_load(f, len).await.unwrap();
1290 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1291 let metadata = loader.finish().unwrap();
1292 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1293
1294 fetch_count.store(0, Ordering::SeqCst);
1296 let f = MetadataFetchFn(&mut fetch);
1297 let mut loader = ParquetMetaDataReader::new()
1298 .with_page_index_policy(PageIndexPolicy::Required)
1299 .with_prefetch_hint(Some(130649));
1300 loader.try_load(f, len).await.unwrap();
1301 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1302 let metadata = loader.finish().unwrap();
1303 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1304
1305 fetch_count.store(0, Ordering::SeqCst);
1307 let f = MetadataFetchFn(&mut fetch);
1308 let metadata = ParquetMetaDataReader::new()
1309 .with_page_index_policy(PageIndexPolicy::Required)
1310 .with_prefetch_hint(Some(130650))
1311 .load_and_finish(f, len)
1312 .await
1313 .unwrap();
1314 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1315 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1316
1317 fetch_count.store(0, Ordering::SeqCst);
1319 let f = MetadataFetchFn(&mut fetch);
1320 let metadata = ParquetMetaDataReader::new()
1321 .with_page_index_policy(PageIndexPolicy::Required)
1322 .with_prefetch_hint(Some((len - 1000) as usize)) .load_and_finish(f, len)
1324 .await
1325 .unwrap();
1326 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1327 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1328
1329 fetch_count.store(0, Ordering::SeqCst);
1331 let f = MetadataFetchFn(&mut fetch);
1332 let metadata = ParquetMetaDataReader::new()
1333 .with_page_index_policy(PageIndexPolicy::Required)
1334 .with_prefetch_hint(Some(len as usize)) .load_and_finish(f, len)
1336 .await
1337 .unwrap();
1338 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1339 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1340
1341 fetch_count.store(0, Ordering::SeqCst);
1343 let f = MetadataFetchFn(&mut fetch);
1344 let metadata = ParquetMetaDataReader::new()
1345 .with_page_index_policy(PageIndexPolicy::Required)
1346 .with_prefetch_hint(Some((len + 1000) as usize)) .load_and_finish(f, len)
1348 .await
1349 .unwrap();
1350 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1351 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1352 }
1353
1354 fn write_parquet_file(offset_index_disabled: bool) -> Result<NamedTempFile> {
1355 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1356 let batch = RecordBatch::try_new(
1357 schema.clone(),
1358 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
1359 )?;
1360
1361 let file = NamedTempFile::new().unwrap();
1362
1363 let props = WriterProperties::builder()
1365 .set_offset_index_disabled(offset_index_disabled)
1366 .build();
1367
1368 let mut writer = ArrowWriter::try_new(file.reopen()?, schema, Some(props))?;
1369 writer.write(&batch)?;
1370 writer.close()?;
1371
1372 Ok(file)
1373 }
1374
1375 fn read_and_check(file: &File, policy: PageIndexPolicy) -> Result<ParquetMetaData> {
1376 let mut reader = ParquetMetaDataReader::new().with_page_index_policy(policy);
1377 reader.try_parse(file)?;
1378 reader.finish()
1379 }
1380
1381 #[test]
1382 fn test_page_index_policy() {
1383 let f = write_parquet_file(false).unwrap();
1385 read_and_check(f.as_file(), PageIndexPolicy::Required).unwrap();
1386 read_and_check(f.as_file(), PageIndexPolicy::Optional).unwrap();
1387 read_and_check(f.as_file(), PageIndexPolicy::Skip).unwrap();
1388
1389 let f = write_parquet_file(true).unwrap();
1391 let res = read_and_check(f.as_file(), PageIndexPolicy::Required);
1392 assert!(matches!(
1393 res,
1394 Err(ParquetError::General(e)) if e == "missing offset index"
1395 ));
1396 read_and_check(f.as_file(), PageIndexPolicy::Optional).unwrap();
1397 read_and_check(f.as_file(), PageIndexPolicy::Skip).unwrap();
1398 }
1399}