1use std::{io::Read, ops::Range, sync::Arc};
19
20use crate::basic::ColumnOrder;
21#[cfg(feature = "encryption")]
22use crate::encryption::{
23 decrypt::{FileDecryptionProperties, FileDecryptor},
24 modules::create_footer_aad,
25};
26use bytes::Bytes;
27
28use crate::errors::{ParquetError, Result};
29use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData};
30use crate::file::page_index::index::Index;
31use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
32use crate::file::reader::ChunkReader;
33use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER};
34use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
35#[cfg(feature = "encryption")]
36use crate::format::{EncryptionAlgorithm, FileCryptoMetaData as TFileCryptoMetaData};
37use crate::schema::types;
38use crate::schema::types::SchemaDescriptor;
39use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
40
41#[cfg(all(feature = "async", feature = "arrow"))]
42use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
43#[cfg(feature = "encryption")]
44use crate::encryption::decrypt::CryptoContext;
45use crate::file::page_index::offset_index::OffsetIndexMetaData;
46
47#[derive(Default)]
73pub struct ParquetMetaDataReader {
74 metadata: Option<ParquetMetaData>,
75 column_index: bool,
76 offset_index: bool,
77 prefetch_hint: Option<usize>,
78 metadata_size: Option<usize>,
81 #[cfg(feature = "encryption")]
82 file_decryption_properties: Option<FileDecryptionProperties>,
83}
84
85pub struct FooterTail {
89 metadata_length: usize,
90 encrypted_footer: bool,
91}
92
93impl FooterTail {
94 pub fn metadata_length(&self) -> usize {
96 self.metadata_length
97 }
98
99 pub fn is_encrypted_footer(&self) -> bool {
101 self.encrypted_footer
102 }
103}
104
105impl ParquetMetaDataReader {
106 pub fn new() -> Self {
108 Default::default()
109 }
110
111 pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
114 Self {
115 metadata: Some(metadata),
116 ..Default::default()
117 }
118 }
119
120 pub fn with_page_indexes(self, val: bool) -> Self {
126 self.with_column_indexes(val).with_offset_indexes(val)
127 }
128
129 pub fn with_column_indexes(mut self, val: bool) -> Self {
133 self.column_index = val;
134 self
135 }
136
137 pub fn with_offset_indexes(mut self, val: bool) -> Self {
141 self.offset_index = val;
142 self
143 }
144
145 pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
157 self.prefetch_hint = prefetch;
158 self
159 }
160
161 #[cfg(feature = "encryption")]
165 pub fn with_decryption_properties(
166 mut self,
167 properties: Option<&FileDecryptionProperties>,
168 ) -> Self {
169 self.file_decryption_properties = properties.cloned();
170 self
171 }
172
173 pub fn has_metadata(&self) -> bool {
175 self.metadata.is_some()
176 }
177
178 pub fn finish(&mut self) -> Result<ParquetMetaData> {
180 self.metadata
181 .take()
182 .ok_or_else(|| general_err!("could not parse parquet metadata"))
183 }
184
185 pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
204 self.try_parse(reader)?;
205 self.finish()
206 }
207
208 pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
214 self.try_parse_sized(reader, reader.len())
215 }
216
217 pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: u64) -> Result<()> {
290 self.metadata = match self.parse_metadata(reader) {
291 Ok(metadata) => Some(metadata),
292 Err(ParquetError::NeedMoreData(needed)) => {
293 if file_size == reader.len() || needed as u64 > file_size {
296 return Err(eof_err!(
297 "Parquet file too small. Size is {} but need {}",
298 file_size,
299 needed
300 ));
301 } else {
302 return Err(ParquetError::NeedMoreData(needed));
304 }
305 }
306 Err(e) => return Err(e),
307 };
308
309 if !self.column_index && !self.offset_index {
311 return Ok(());
312 }
313
314 self.read_page_indexes_sized(reader, file_size)
315 }
316
317 pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
320 self.read_page_indexes_sized(reader, reader.len())
321 }
322
323 pub fn read_page_indexes_sized<R: ChunkReader>(
329 &mut self,
330 reader: &R,
331 file_size: u64,
332 ) -> Result<()> {
333 if self.metadata.is_none() {
334 return Err(general_err!(
335 "Tried to read page indexes without ParquetMetaData metadata"
336 ));
337 }
338
339 let Some(range) = self.range_for_page_index() else {
350 return Ok(());
351 };
352
353 let file_range = file_size.saturating_sub(reader.len())..file_size;
356 if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
357 if range.end > file_size {
359 return Err(eof_err!(
360 "Parquet file too small. Range {:?} is beyond file bounds {file_size}",
361 range
362 ));
363 } else {
364 return Err(ParquetError::NeedMoreData(
366 (file_size - range.start).try_into()?,
367 ));
368 }
369 }
370
371 if let Some(metadata_size) = self.metadata_size {
374 let metadata_range = file_size.saturating_sub(metadata_size as u64)..file_size;
375 if range.end > metadata_range.start {
376 return Err(eof_err!(
377 "Parquet file too small. Page index range {:?} overlaps with file metadata {:?}",
378 range,
379 metadata_range
380 ));
381 }
382 }
383
384 let bytes_needed = usize::try_from(range.end - range.start)?;
385 let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?;
386 let offset = range.start;
387
388 self.parse_column_index(&bytes, offset)?;
389 self.parse_offset_index(&bytes, offset)?;
390
391 Ok(())
392 }
393
394 #[cfg(all(feature = "async", feature = "arrow"))]
401 pub async fn load_and_finish<F: MetadataFetch>(
402 mut self,
403 fetch: F,
404 file_size: u64,
405 ) -> Result<ParquetMetaData> {
406 self.try_load(fetch, file_size).await?;
407 self.finish()
408 }
409
410 #[cfg(all(feature = "async", feature = "arrow"))]
417 pub async fn load_via_suffix_and_finish<F: MetadataSuffixFetch>(
418 mut self,
419 fetch: F,
420 ) -> Result<ParquetMetaData> {
421 self.try_load_via_suffix(fetch).await?;
422 self.finish()
423 }
424 #[cfg(all(feature = "async", feature = "arrow"))]
430 pub async fn try_load<F: MetadataFetch>(&mut self, mut fetch: F, file_size: u64) -> Result<()> {
431 let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
432
433 self.metadata = Some(metadata);
434
435 if !self.column_index && !self.offset_index {
437 return Ok(());
438 }
439
440 self.load_page_index_with_remainder(fetch, remainder).await
441 }
442
443 #[cfg(all(feature = "async", feature = "arrow"))]
449 pub async fn try_load_via_suffix<F: MetadataSuffixFetch>(
450 &mut self,
451 mut fetch: F,
452 ) -> Result<()> {
453 let (metadata, remainder) = self.load_metadata_via_suffix(&mut fetch).await?;
454
455 self.metadata = Some(metadata);
456
457 if !self.column_index && !self.offset_index {
459 return Ok(());
460 }
461
462 self.load_page_index_with_remainder(fetch, remainder).await
463 }
464
465 #[cfg(all(feature = "async", feature = "arrow"))]
468 pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
469 self.load_page_index_with_remainder(fetch, None).await
470 }
471
472 #[cfg(all(feature = "async", feature = "arrow"))]
473 async fn load_page_index_with_remainder<F: MetadataFetch>(
474 &mut self,
475 mut fetch: F,
476 remainder: Option<(usize, Bytes)>,
477 ) -> Result<()> {
478 if self.metadata.is_none() {
479 return Err(general_err!("Footer metadata is not present"));
480 }
481
482 let range = self.range_for_page_index();
484 let range = match range {
485 Some(range) => range,
486 None => return Ok(()),
487 };
488
489 let bytes = match &remainder {
490 Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => {
491 let remainder_start = *remainder_start as u64;
492 let offset = usize::try_from(range.start - remainder_start)?;
493 let end = usize::try_from(range.end - remainder_start)?;
494 assert!(end <= remainder.len());
495 remainder.slice(offset..end)
496 }
497 _ => fetch.fetch(range.start..range.end).await?,
499 };
500
501 assert_eq!(bytes.len() as u64, range.end - range.start);
503
504 self.parse_column_index(&bytes, range.start)?;
505 self.parse_offset_index(&bytes, range.start)?;
506
507 Ok(())
508 }
509
510 fn parse_column_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> {
511 let metadata = self.metadata.as_mut().unwrap();
512 if self.column_index {
513 let index = metadata
514 .row_groups()
515 .iter()
516 .enumerate()
517 .map(|(rg_idx, x)| {
518 x.columns()
519 .iter()
520 .enumerate()
521 .map(|(col_idx, c)| match c.column_index_range() {
522 Some(r) => {
523 let r_start = usize::try_from(r.start - start_offset)?;
524 let r_end = usize::try_from(r.end - start_offset)?;
525 Self::parse_single_column_index(
526 &bytes[r_start..r_end],
527 metadata,
528 c,
529 rg_idx,
530 col_idx,
531 )
532 }
533 None => Ok(Index::NONE),
534 })
535 .collect::<Result<Vec<_>>>()
536 })
537 .collect::<Result<Vec<_>>>()?;
538 metadata.set_column_index(Some(index));
539 }
540 Ok(())
541 }
542
543 #[cfg(feature = "encryption")]
544 fn parse_single_column_index(
545 bytes: &[u8],
546 metadata: &ParquetMetaData,
547 column: &ColumnChunkMetaData,
548 row_group_index: usize,
549 col_index: usize,
550 ) -> Result<Index> {
551 match &column.column_crypto_metadata {
552 Some(crypto_metadata) => {
553 let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
554 general_err!("Cannot decrypt column index, no file decryptor set")
555 })?;
556 let crypto_context = CryptoContext::for_column(
557 file_decryptor,
558 crypto_metadata,
559 row_group_index,
560 col_index,
561 )?;
562 let column_decryptor = crypto_context.metadata_decryptor();
563 let aad = crypto_context.create_column_index_aad()?;
564 let plaintext = column_decryptor.decrypt(bytes, &aad)?;
565 decode_column_index(&plaintext, column.column_type())
566 }
567 None => decode_column_index(bytes, column.column_type()),
568 }
569 }
570
571 #[cfg(not(feature = "encryption"))]
572 fn parse_single_column_index(
573 bytes: &[u8],
574 _metadata: &ParquetMetaData,
575 column: &ColumnChunkMetaData,
576 _row_group_index: usize,
577 _col_index: usize,
578 ) -> Result<Index> {
579 decode_column_index(bytes, column.column_type())
580 }
581
582 fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> {
583 let metadata = self.metadata.as_mut().unwrap();
584 if self.offset_index {
585 let index = metadata
586 .row_groups()
587 .iter()
588 .enumerate()
589 .map(|(rg_idx, x)| {
590 x.columns()
591 .iter()
592 .enumerate()
593 .map(|(col_idx, c)| match c.offset_index_range() {
594 Some(r) => {
595 let r_start = usize::try_from(r.start - start_offset)?;
596 let r_end = usize::try_from(r.end - start_offset)?;
597 Self::parse_single_offset_index(
598 &bytes[r_start..r_end],
599 metadata,
600 c,
601 rg_idx,
602 col_idx,
603 )
604 }
605 None => Err(general_err!("missing offset index")),
606 })
607 .collect::<Result<Vec<_>>>()
608 })
609 .collect::<Result<Vec<_>>>()?;
610
611 metadata.set_offset_index(Some(index));
612 }
613 Ok(())
614 }
615
616 #[cfg(feature = "encryption")]
617 fn parse_single_offset_index(
618 bytes: &[u8],
619 metadata: &ParquetMetaData,
620 column: &ColumnChunkMetaData,
621 row_group_index: usize,
622 col_index: usize,
623 ) -> Result<OffsetIndexMetaData> {
624 match &column.column_crypto_metadata {
625 Some(crypto_metadata) => {
626 let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
627 general_err!("Cannot decrypt offset index, no file decryptor set")
628 })?;
629 let crypto_context = CryptoContext::for_column(
630 file_decryptor,
631 crypto_metadata,
632 row_group_index,
633 col_index,
634 )?;
635 let column_decryptor = crypto_context.metadata_decryptor();
636 let aad = crypto_context.create_offset_index_aad()?;
637 let plaintext = column_decryptor.decrypt(bytes, &aad)?;
638 decode_offset_index(&plaintext)
639 }
640 None => decode_offset_index(bytes),
641 }
642 }
643
644 #[cfg(not(feature = "encryption"))]
645 fn parse_single_offset_index(
646 bytes: &[u8],
647 _metadata: &ParquetMetaData,
648 _column: &ColumnChunkMetaData,
649 _row_group_index: usize,
650 _col_index: usize,
651 ) -> Result<OffsetIndexMetaData> {
652 decode_offset_index(bytes)
653 }
654
655 fn range_for_page_index(&self) -> Option<Range<u64>> {
656 self.metadata.as_ref()?;
658
659 let mut range = None;
661 let metadata = self.metadata.as_ref().unwrap();
662 for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
663 if self.column_index {
664 range = acc_range(range, c.column_index_range());
665 }
666 if self.offset_index {
667 range = acc_range(range, c.offset_index_range());
668 }
669 }
670 range
671 }
672
673 fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
676 let file_size = chunk_reader.len();
678 if file_size < (FOOTER_SIZE as u64) {
679 return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
680 }
681
682 let mut footer = [0_u8; 8];
683 chunk_reader
684 .get_read(file_size - 8)?
685 .read_exact(&mut footer)?;
686
687 let footer = Self::decode_footer_tail(&footer)?;
688 let metadata_len = footer.metadata_length();
689 let footer_metadata_len = FOOTER_SIZE + metadata_len;
690 self.metadata_size = Some(footer_metadata_len);
691
692 if footer_metadata_len as u64 > file_size {
693 return Err(ParquetError::NeedMoreData(footer_metadata_len));
694 }
695
696 let start = file_size - footer_metadata_len as u64;
697 self.decode_footer_metadata(
698 chunk_reader.get_bytes(start, metadata_len)?.as_ref(),
699 &footer,
700 )
701 }
702
703 #[cfg(all(feature = "async", feature = "arrow"))]
707 fn get_prefetch_size(&self) -> usize {
708 if let Some(prefetch) = self.prefetch_hint {
709 if prefetch > FOOTER_SIZE {
710 return prefetch;
711 }
712 }
713 FOOTER_SIZE
714 }
715
716 #[cfg(all(feature = "async", feature = "arrow"))]
717 async fn load_metadata<F: MetadataFetch>(
718 &self,
719 fetch: &mut F,
720 file_size: u64,
721 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
722 let prefetch = self.get_prefetch_size() as u64;
723
724 if file_size < FOOTER_SIZE as u64 {
725 return Err(eof_err!("file size of {} is less than footer", file_size));
726 }
727
728 let footer_start = file_size.saturating_sub(prefetch);
732
733 let suffix = fetch.fetch(footer_start..file_size).await?;
734 let suffix_len = suffix.len();
735 let fetch_len = (file_size - footer_start)
736 .try_into()
737 .expect("footer size should never be larger than u32");
738 if suffix_len < fetch_len {
739 return Err(eof_err!(
740 "metadata requires {} bytes, but could only read {}",
741 fetch_len,
742 suffix_len
743 ));
744 }
745
746 let mut footer = [0; FOOTER_SIZE];
747 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
748
749 let footer = Self::decode_footer_tail(&footer)?;
750 let length = footer.metadata_length();
751
752 if file_size < (length + FOOTER_SIZE) as u64 {
753 return Err(eof_err!(
754 "file size of {} is less than footer + metadata {}",
755 file_size,
756 length + FOOTER_SIZE
757 ));
758 }
759
760 if length > suffix_len - FOOTER_SIZE {
762 let metadata_start = file_size - (length + FOOTER_SIZE) as u64;
763 let meta = fetch
764 .fetch(metadata_start..(file_size - FOOTER_SIZE as u64))
765 .await?;
766 Ok((self.decode_footer_metadata(&meta, &footer)?, None))
767 } else {
768 let metadata_start = (file_size - (length + FOOTER_SIZE) as u64 - footer_start)
769 .try_into()
770 .expect("metadata length should never be larger than u32");
771 let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
772 Ok((
773 self.decode_footer_metadata(slice, &footer)?,
774 Some((footer_start as usize, suffix.slice(..metadata_start))),
775 ))
776 }
777 }
778
779 #[cfg(all(feature = "async", feature = "arrow"))]
780 async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
781 &self,
782 fetch: &mut F,
783 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
784 let prefetch = self.get_prefetch_size();
785
786 let suffix = fetch.fetch_suffix(prefetch as _).await?;
787 let suffix_len = suffix.len();
788
789 if suffix_len < FOOTER_SIZE {
790 return Err(eof_err!(
791 "footer metadata requires {} bytes, but could only read {}",
792 FOOTER_SIZE,
793 suffix_len
794 ));
795 }
796
797 let mut footer = [0; FOOTER_SIZE];
798 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
799
800 let footer = Self::decode_footer_tail(&footer)?;
801 let length = footer.metadata_length();
802
803 let metadata_offset = length + FOOTER_SIZE;
805 if length > suffix_len - FOOTER_SIZE {
806 let meta = fetch.fetch_suffix(metadata_offset).await?;
807
808 if meta.len() < metadata_offset {
809 return Err(eof_err!(
810 "metadata requires {} bytes, but could only read {}",
811 metadata_offset,
812 meta.len()
813 ));
814 }
815
816 Ok((
817 self.decode_footer_metadata(&meta.slice(0..length), &footer)?,
819 None,
820 ))
821 } else {
822 let metadata_start = suffix_len - metadata_offset;
823 let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
824 Ok((
825 self.decode_footer_metadata(slice, &footer)?,
826 Some((0, suffix.slice(..metadata_start))),
827 ))
828 }
829 }
830
831 pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result<FooterTail> {
843 let magic = &slice[4..];
844 let encrypted_footer = if magic == PARQUET_MAGIC_ENCR_FOOTER {
845 true
846 } else if magic == PARQUET_MAGIC {
847 false
848 } else {
849 return Err(general_err!("Invalid Parquet file. Corrupt footer"));
850 };
851 let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap());
853 Ok(FooterTail {
854 metadata_length: metadata_len as usize,
856 encrypted_footer,
857 })
858 }
859
860 #[deprecated(note = "use decode_footer_tail instead")]
862 pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
863 Self::decode_footer_tail(slice).map(|f| f.metadata_length)
864 }
865
866 pub(crate) fn decode_footer_metadata(
878 &self,
879 buf: &[u8],
880 footer_tail: &FooterTail,
881 ) -> Result<ParquetMetaData> {
882 #[cfg(feature = "encryption")]
883 let result = Self::decode_metadata_with_encryption(
884 buf,
885 footer_tail.is_encrypted_footer(),
886 self.file_decryption_properties.as_ref(),
887 );
888 #[cfg(not(feature = "encryption"))]
889 let result = {
890 if footer_tail.is_encrypted_footer() {
891 Err(general_err!(
892 "Parquet file has an encrypted footer but the encryption feature is disabled"
893 ))
894 } else {
895 Self::decode_metadata(buf)
896 }
897 };
898 result
899 }
900
901 #[cfg(feature = "encryption")]
911 fn decode_metadata_with_encryption(
912 buf: &[u8],
913 encrypted_footer: bool,
914 file_decryption_properties: Option<&FileDecryptionProperties>,
915 ) -> Result<ParquetMetaData> {
916 let mut prot = TCompactSliceInputProtocol::new(buf);
917 let mut file_decryptor = None;
918 let decrypted_fmd_buf;
919
920 if encrypted_footer {
921 if let Some(file_decryption_properties) = file_decryption_properties {
922 let t_file_crypto_metadata: TFileCryptoMetaData =
923 TFileCryptoMetaData::read_from_in_protocol(&mut prot)
924 .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
925 let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm {
926 EncryptionAlgorithm::AESGCMV1(algo) => algo.supply_aad_prefix,
927 _ => Some(false),
928 }
929 .unwrap_or(false);
930 if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() {
931 return Err(general_err!(
932 "Parquet file was encrypted with an AAD prefix that is not stored in the file, \
933 but no AAD prefix was provided in the file decryption properties"
934 ));
935 }
936 let decryptor = get_file_decryptor(
937 t_file_crypto_metadata.encryption_algorithm,
938 t_file_crypto_metadata.key_metadata.as_deref(),
939 file_decryption_properties,
940 )?;
941 let footer_decryptor = decryptor.get_footer_decryptor();
942 let aad_footer = create_footer_aad(decryptor.file_aad())?;
943
944 decrypted_fmd_buf = footer_decryptor?
945 .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
946 .map_err(|_| {
947 general_err!(
948 "Provided footer key and AAD were unable to decrypt parquet footer"
949 )
950 })?;
951 prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
952
953 file_decryptor = Some(decryptor);
954 } else {
955 return Err(general_err!("Parquet file has an encrypted footer but decryption properties were not provided"));
956 }
957 }
958
959 let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
960 .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
961 let schema = types::from_thrift(&t_file_metadata.schema)?;
962 let schema_descr = Arc::new(SchemaDescriptor::new(schema));
963
964 if let (Some(algo), Some(file_decryption_properties)) = (
965 t_file_metadata.encryption_algorithm,
966 file_decryption_properties,
967 ) {
968 let file_decryptor_value = get_file_decryptor(
970 algo,
971 t_file_metadata.footer_signing_key_metadata.as_deref(),
972 file_decryption_properties,
973 )?;
974 if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer {
975 file_decryptor_value.verify_plaintext_footer_signature(buf)?;
976 }
977 file_decryptor = Some(file_decryptor_value);
978 }
979
980 let mut row_groups = Vec::new();
981 for rg in t_file_metadata.row_groups {
982 let r = RowGroupMetaData::from_encrypted_thrift(
983 schema_descr.clone(),
984 rg,
985 file_decryptor.as_ref(),
986 )?;
987 row_groups.push(r);
988 }
989 let column_orders =
990 Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
991
992 let file_metadata = FileMetaData::new(
993 t_file_metadata.version,
994 t_file_metadata.num_rows,
995 t_file_metadata.created_by,
996 t_file_metadata.key_value_metadata,
997 schema_descr,
998 column_orders,
999 );
1000 let mut metadata = ParquetMetaData::new(file_metadata, row_groups);
1001
1002 metadata.with_file_decryptor(file_decryptor);
1003
1004 Ok(metadata)
1005 }
1006
1007 pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
1015 let mut prot = TCompactSliceInputProtocol::new(buf);
1016
1017 let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
1018 .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
1019 let schema = types::from_thrift(&t_file_metadata.schema)?;
1020 let schema_descr = Arc::new(SchemaDescriptor::new(schema));
1021
1022 let mut row_groups = Vec::new();
1023 for rg in t_file_metadata.row_groups {
1024 row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
1025 }
1026 let column_orders =
1027 Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
1028
1029 let file_metadata = FileMetaData::new(
1030 t_file_metadata.version,
1031 t_file_metadata.num_rows,
1032 t_file_metadata.created_by,
1033 t_file_metadata.key_value_metadata,
1034 schema_descr,
1035 column_orders,
1036 );
1037
1038 Ok(ParquetMetaData::new(file_metadata, row_groups))
1039 }
1040
1041 fn parse_column_orders(
1044 t_column_orders: Option<Vec<TColumnOrder>>,
1045 schema_descr: &SchemaDescriptor,
1046 ) -> Result<Option<Vec<ColumnOrder>>> {
1047 match t_column_orders {
1048 Some(orders) => {
1049 if orders.len() != schema_descr.num_columns() {
1051 return Err(general_err!("Column order length mismatch"));
1052 };
1053 let mut res = Vec::new();
1054 for (i, column) in schema_descr.columns().iter().enumerate() {
1055 match orders[i] {
1056 TColumnOrder::TYPEORDER(_) => {
1057 let sort_order = ColumnOrder::get_sort_order(
1058 column.logical_type(),
1059 column.converted_type(),
1060 column.physical_type(),
1061 );
1062 res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
1063 }
1064 }
1065 }
1066 Ok(Some(res))
1067 }
1068 None => Ok(None),
1069 }
1070 }
1071}
1072
1073#[cfg(feature = "encryption")]
1074fn get_file_decryptor(
1075 encryption_algorithm: EncryptionAlgorithm,
1076 footer_key_metadata: Option<&[u8]>,
1077 file_decryption_properties: &FileDecryptionProperties,
1078) -> Result<FileDecryptor> {
1079 match encryption_algorithm {
1080 EncryptionAlgorithm::AESGCMV1(algo) => {
1081 let aad_file_unique = algo
1082 .aad_file_unique
1083 .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?;
1084 let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() {
1085 aad_prefix.clone()
1086 } else {
1087 algo.aad_prefix.unwrap_or_default()
1088 };
1089
1090 FileDecryptor::new(
1091 file_decryption_properties,
1092 footer_key_metadata,
1093 aad_file_unique,
1094 aad_prefix,
1095 )
1096 }
1097 EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!(
1098 "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
1099 )),
1100 }
1101}
1102
1103#[cfg(test)]
1104mod tests {
1105 use super::*;
1106 use bytes::Bytes;
1107
1108 use crate::basic::SortOrder;
1109 use crate::basic::Type;
1110 use crate::file::reader::Length;
1111 use crate::format::TypeDefinedOrder;
1112 use crate::schema::types::Type as SchemaType;
1113 use crate::util::test_common::file_util::get_test_file;
1114
1115 #[test]
1116 fn test_parse_metadata_size_smaller_than_footer() {
1117 let test_file = tempfile::tempfile().unwrap();
1118 let err = ParquetMetaDataReader::new()
1119 .parse_metadata(&test_file)
1120 .unwrap_err();
1121 assert!(matches!(err, ParquetError::NeedMoreData(8)));
1122 }
1123
1124 #[test]
1125 fn test_parse_metadata_corrupt_footer() {
1126 let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
1127 let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
1128 assert_eq!(
1129 reader_result.unwrap_err().to_string(),
1130 "Parquet error: Invalid Parquet file. Corrupt footer"
1131 );
1132 }
1133
1134 #[test]
1135 fn test_parse_metadata_invalid_start() {
1136 let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
1137 let err = ParquetMetaDataReader::new()
1138 .parse_metadata(&test_file)
1139 .unwrap_err();
1140 assert!(matches!(err, ParquetError::NeedMoreData(263)));
1141 }
1142
1143 #[test]
1144 fn test_metadata_column_orders_parse() {
1145 let fields = vec![
1147 Arc::new(
1148 SchemaType::primitive_type_builder("col1", Type::INT32)
1149 .build()
1150 .unwrap(),
1151 ),
1152 Arc::new(
1153 SchemaType::primitive_type_builder("col2", Type::FLOAT)
1154 .build()
1155 .unwrap(),
1156 ),
1157 ];
1158 let schema = SchemaType::group_type_builder("schema")
1159 .with_fields(fields)
1160 .build()
1161 .unwrap();
1162 let schema_descr = SchemaDescriptor::new(Arc::new(schema));
1163
1164 let t_column_orders = Some(vec![
1165 TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
1166 TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
1167 ]);
1168
1169 assert_eq!(
1170 ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr).unwrap(),
1171 Some(vec![
1172 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1173 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
1174 ])
1175 );
1176
1177 assert_eq!(
1179 ParquetMetaDataReader::parse_column_orders(None, &schema_descr).unwrap(),
1180 None
1181 );
1182 }
1183
1184 #[test]
1185 fn test_metadata_column_orders_len_mismatch() {
1186 let schema = SchemaType::group_type_builder("schema").build().unwrap();
1187 let schema_descr = SchemaDescriptor::new(Arc::new(schema));
1188
1189 let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);
1190
1191 let res = ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr);
1192 assert!(res.is_err());
1193 assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch"));
1194 }
1195
1196 #[test]
1197 fn test_try_parse() {
1198 let file = get_test_file("alltypes_tiny_pages.parquet");
1199 let len = file.len();
1200
1201 let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
1202
1203 let bytes_for_range = |range: Range<u64>| {
1204 file.get_bytes(range.start, (range.end - range.start).try_into().unwrap())
1205 .unwrap()
1206 };
1207
1208 let bytes = bytes_for_range(0..len);
1210 reader.try_parse(&bytes).unwrap();
1211 let metadata = reader.finish().unwrap();
1212 assert!(metadata.column_index.is_some());
1213 assert!(metadata.offset_index.is_some());
1214
1215 let bytes = bytes_for_range(320000..len);
1217 reader.try_parse_sized(&bytes, len).unwrap();
1218 let metadata = reader.finish().unwrap();
1219 assert!(metadata.column_index.is_some());
1220 assert!(metadata.offset_index.is_some());
1221
1222 let bytes = bytes_for_range(323583..len);
1224 reader.try_parse_sized(&bytes, len).unwrap();
1225 let metadata = reader.finish().unwrap();
1226 assert!(metadata.column_index.is_some());
1227 assert!(metadata.offset_index.is_some());
1228
1229 let bytes = bytes_for_range(323584..len);
1231 match reader.try_parse_sized(&bytes, len).unwrap_err() {
1233 ParquetError::NeedMoreData(needed) => {
1235 let bytes = bytes_for_range(len - needed as u64..len);
1236 reader.try_parse_sized(&bytes, len).unwrap();
1237 let metadata = reader.finish().unwrap();
1238 assert!(metadata.column_index.is_some());
1239 assert!(metadata.offset_index.is_some());
1240 }
1241 _ => panic!("unexpected error"),
1242 };
1243
1244 let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
1246 let mut bytes = bytes_for_range(452505..len);
1247 loop {
1248 match reader.try_parse_sized(&bytes, len) {
1249 Ok(_) => break,
1250 Err(ParquetError::NeedMoreData(needed)) => {
1251 bytes = bytes_for_range(len - needed as u64..len);
1252 if reader.has_metadata() {
1253 reader.read_page_indexes_sized(&bytes, len).unwrap();
1254 break;
1255 }
1256 }
1257 _ => panic!("unexpected error"),
1258 }
1259 }
1260 let metadata = reader.finish().unwrap();
1261 assert!(metadata.column_index.is_some());
1262 assert!(metadata.offset_index.is_some());
1263
1264 let bytes = bytes_for_range(323584..len);
1266 let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
1267 assert_eq!(
1268 reader_result.to_string(),
1269 "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
1270 );
1271
1272 let mut reader = ParquetMetaDataReader::new();
1274 let bytes = bytes_for_range(452505..len);
1275 match reader.try_parse_sized(&bytes, len).unwrap_err() {
1277 ParquetError::NeedMoreData(needed) => {
1279 let bytes = bytes_for_range(len - needed as u64..len);
1280 reader.try_parse_sized(&bytes, len).unwrap();
1281 reader.finish().unwrap();
1282 }
1283 _ => panic!("unexpected error"),
1284 };
1285
1286 let reader_result = reader.try_parse(&bytes).unwrap_err();
1288 assert_eq!(
1289 reader_result.to_string(),
1290 "EOF: Parquet file too small. Size is 1728 but need 1729"
1291 );
1292
1293 let bytes = bytes_for_range(0..1000);
1295 let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
1296 assert_eq!(
1297 reader_result.to_string(),
1298 "Parquet error: Invalid Parquet file. Corrupt footer"
1299 );
1300
1301 let bytes = bytes_for_range(452510..len);
1303 let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
1304 assert_eq!(
1305 reader_result.to_string(),
1306 "EOF: Parquet file too small. Size is 1728 but need 1729"
1307 );
1308 }
1309}
1310
1311#[cfg(all(feature = "async", feature = "arrow", test))]
1312mod async_tests {
1313 use super::*;
1314 use bytes::Bytes;
1315 use futures::future::BoxFuture;
1316 use futures::FutureExt;
1317 use std::fs::File;
1318 use std::future::Future;
1319 use std::io::{Read, Seek, SeekFrom};
1320 use std::ops::Range;
1321 use std::sync::atomic::{AtomicUsize, Ordering};
1322
1323 use crate::file::reader::Length;
1324 use crate::util::test_common::file_util::get_test_file;
1325
1326 struct MetadataFetchFn<F>(F);
1327
1328 impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
1329 where
1330 F: FnMut(Range<u64>) -> Fut + Send,
1331 Fut: Future<Output = Result<Bytes>> + Send,
1332 {
1333 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1334 async move { self.0(range).await }.boxed()
1335 }
1336 }
1337
1338 struct MetadataSuffixFetchFn<F1, F2>(F1, F2);
1339
1340 impl<F1, Fut, F2> MetadataFetch for MetadataSuffixFetchFn<F1, F2>
1341 where
1342 F1: FnMut(Range<u64>) -> Fut + Send,
1343 Fut: Future<Output = Result<Bytes>> + Send,
1344 F2: Send,
1345 {
1346 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1347 async move { self.0(range).await }.boxed()
1348 }
1349 }
1350
1351 impl<F1, Fut, F2> MetadataSuffixFetch for MetadataSuffixFetchFn<F1, F2>
1352 where
1353 F1: FnMut(Range<u64>) -> Fut + Send,
1354 F2: FnMut(usize) -> Fut + Send,
1355 Fut: Future<Output = Result<Bytes>> + Send,
1356 {
1357 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
1358 async move { self.1(suffix).await }.boxed()
1359 }
1360 }
1361
1362 fn read_range(file: &mut File, range: Range<u64>) -> Result<Bytes> {
1363 file.seek(SeekFrom::Start(range.start as _))?;
1364 let len = range.end - range.start;
1365 let mut buf = Vec::with_capacity(len.try_into().unwrap());
1366 file.take(len as _).read_to_end(&mut buf)?;
1367 Ok(buf.into())
1368 }
1369
1370 fn read_suffix(file: &mut File, suffix: usize) -> Result<Bytes> {
1371 let file_len = file.len();
1372 file.seek(SeekFrom::End(0 - suffix.min(file_len as _) as i64))?;
1374 let mut buf = Vec::with_capacity(suffix);
1375 file.take(suffix as _).read_to_end(&mut buf)?;
1376 Ok(buf.into())
1377 }
1378
1379 #[tokio::test]
1380 async fn test_simple() {
1381 let mut file = get_test_file("nulls.snappy.parquet");
1382 let len = file.len();
1383
1384 let expected = ParquetMetaDataReader::new()
1385 .parse_and_finish(&file)
1386 .unwrap();
1387 let expected = expected.file_metadata().schema();
1388 let fetch_count = AtomicUsize::new(0);
1389
1390 let mut fetch = |range| {
1391 fetch_count.fetch_add(1, Ordering::SeqCst);
1392 futures::future::ready(read_range(&mut file, range))
1393 };
1394
1395 let input = MetadataFetchFn(&mut fetch);
1396 let actual = ParquetMetaDataReader::new()
1397 .load_and_finish(input, len)
1398 .await
1399 .unwrap();
1400 assert_eq!(actual.file_metadata().schema(), expected);
1401 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1402
1403 fetch_count.store(0, Ordering::SeqCst);
1405 let input = MetadataFetchFn(&mut fetch);
1406 let actual = ParquetMetaDataReader::new()
1407 .with_prefetch_hint(Some(7))
1408 .load_and_finish(input, len)
1409 .await
1410 .unwrap();
1411 assert_eq!(actual.file_metadata().schema(), expected);
1412 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1413
1414 fetch_count.store(0, Ordering::SeqCst);
1416 let input = MetadataFetchFn(&mut fetch);
1417 let actual = ParquetMetaDataReader::new()
1418 .with_prefetch_hint(Some(10))
1419 .load_and_finish(input, len)
1420 .await
1421 .unwrap();
1422 assert_eq!(actual.file_metadata().schema(), expected);
1423 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1424
1425 fetch_count.store(0, Ordering::SeqCst);
1427 let input = MetadataFetchFn(&mut fetch);
1428 let actual = ParquetMetaDataReader::new()
1429 .with_prefetch_hint(Some(500))
1430 .load_and_finish(input, len)
1431 .await
1432 .unwrap();
1433 assert_eq!(actual.file_metadata().schema(), expected);
1434 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1435
1436 fetch_count.store(0, Ordering::SeqCst);
1438 let input = MetadataFetchFn(&mut fetch);
1439 let actual = ParquetMetaDataReader::new()
1440 .with_prefetch_hint(Some(428))
1441 .load_and_finish(input, len)
1442 .await
1443 .unwrap();
1444 assert_eq!(actual.file_metadata().schema(), expected);
1445 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1446
1447 let input = MetadataFetchFn(&mut fetch);
1448 let err = ParquetMetaDataReader::new()
1449 .load_and_finish(input, 4)
1450 .await
1451 .unwrap_err()
1452 .to_string();
1453 assert_eq!(err, "EOF: file size of 4 is less than footer");
1454
1455 let input = MetadataFetchFn(&mut fetch);
1456 let err = ParquetMetaDataReader::new()
1457 .load_and_finish(input, 20)
1458 .await
1459 .unwrap_err()
1460 .to_string();
1461 assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
1462 }
1463
1464 #[tokio::test]
1465 async fn test_suffix() {
1466 let mut file = get_test_file("nulls.snappy.parquet");
1467 let mut file2 = file.try_clone().unwrap();
1468
1469 let expected = ParquetMetaDataReader::new()
1470 .parse_and_finish(&file)
1471 .unwrap();
1472 let expected = expected.file_metadata().schema();
1473 let fetch_count = AtomicUsize::new(0);
1474 let suffix_fetch_count = AtomicUsize::new(0);
1475
1476 let mut fetch = |range| {
1477 fetch_count.fetch_add(1, Ordering::SeqCst);
1478 futures::future::ready(read_range(&mut file, range))
1479 };
1480 let mut suffix_fetch = |suffix| {
1481 suffix_fetch_count.fetch_add(1, Ordering::SeqCst);
1482 futures::future::ready(read_suffix(&mut file2, suffix))
1483 };
1484
1485 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1486 let actual = ParquetMetaDataReader::new()
1487 .load_via_suffix_and_finish(input)
1488 .await
1489 .unwrap();
1490 assert_eq!(actual.file_metadata().schema(), expected);
1491 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1492 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1493
1494 fetch_count.store(0, Ordering::SeqCst);
1496 suffix_fetch_count.store(0, Ordering::SeqCst);
1497 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1498 let actual = ParquetMetaDataReader::new()
1499 .with_prefetch_hint(Some(7))
1500 .load_via_suffix_and_finish(input)
1501 .await
1502 .unwrap();
1503 assert_eq!(actual.file_metadata().schema(), expected);
1504 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1505 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1506
1507 fetch_count.store(0, Ordering::SeqCst);
1509 suffix_fetch_count.store(0, Ordering::SeqCst);
1510 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1511 let actual = ParquetMetaDataReader::new()
1512 .with_prefetch_hint(Some(10))
1513 .load_via_suffix_and_finish(input)
1514 .await
1515 .unwrap();
1516 assert_eq!(actual.file_metadata().schema(), expected);
1517 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1518 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1519
1520 dbg!("test");
1521 fetch_count.store(0, Ordering::SeqCst);
1523 suffix_fetch_count.store(0, Ordering::SeqCst);
1524 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1525 let actual = ParquetMetaDataReader::new()
1526 .with_prefetch_hint(Some(500))
1527 .load_via_suffix_and_finish(input)
1528 .await
1529 .unwrap();
1530 assert_eq!(actual.file_metadata().schema(), expected);
1531 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1532 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1533
1534 fetch_count.store(0, Ordering::SeqCst);
1536 suffix_fetch_count.store(0, Ordering::SeqCst);
1537 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1538 let actual = ParquetMetaDataReader::new()
1539 .with_prefetch_hint(Some(428))
1540 .load_via_suffix_and_finish(input)
1541 .await
1542 .unwrap();
1543 assert_eq!(actual.file_metadata().schema(), expected);
1544 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1545 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1546 }
1547
1548 #[cfg(feature = "encryption")]
1549 #[tokio::test]
1550 async fn test_suffix_with_encryption() {
1551 let mut file = get_test_file("uniform_encryption.parquet.encrypted");
1552 let mut file2 = file.try_clone().unwrap();
1553
1554 let mut fetch = |range| futures::future::ready(read_range(&mut file, range));
1555 let mut suffix_fetch = |suffix| futures::future::ready(read_suffix(&mut file2, suffix));
1556
1557 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1558
1559 let key_code: &[u8] = "0123456789012345".as_bytes();
1560 let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec())
1561 .build()
1562 .unwrap();
1563
1564 let expected = ParquetMetaDataReader::new()
1566 .with_decryption_properties(Some(&decryption_properties))
1567 .load_via_suffix_and_finish(input)
1568 .await
1569 .unwrap();
1570 assert_eq!(expected.num_row_groups(), 1);
1571 }
1572
1573 #[tokio::test]
1574 async fn test_page_index() {
1575 let mut file = get_test_file("alltypes_tiny_pages.parquet");
1576 let len = file.len();
1577 let fetch_count = AtomicUsize::new(0);
1578 let mut fetch = |range| {
1579 fetch_count.fetch_add(1, Ordering::SeqCst);
1580 futures::future::ready(read_range(&mut file, range))
1581 };
1582
1583 let f = MetadataFetchFn(&mut fetch);
1584 let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
1585 loader.try_load(f, len).await.unwrap();
1586 assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
1587 let metadata = loader.finish().unwrap();
1588 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1589
1590 fetch_count.store(0, Ordering::SeqCst);
1592 let f = MetadataFetchFn(&mut fetch);
1593 let mut loader = ParquetMetaDataReader::new()
1594 .with_page_indexes(true)
1595 .with_prefetch_hint(Some(1729));
1596 loader.try_load(f, len).await.unwrap();
1597 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1598 let metadata = loader.finish().unwrap();
1599 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1600
1601 fetch_count.store(0, Ordering::SeqCst);
1603 let f = MetadataFetchFn(&mut fetch);
1604 let mut loader = ParquetMetaDataReader::new()
1605 .with_page_indexes(true)
1606 .with_prefetch_hint(Some(130649));
1607 loader.try_load(f, len).await.unwrap();
1608 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1609 let metadata = loader.finish().unwrap();
1610 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1611
1612 fetch_count.store(0, Ordering::SeqCst);
1614 let f = MetadataFetchFn(&mut fetch);
1615 let metadata = ParquetMetaDataReader::new()
1616 .with_page_indexes(true)
1617 .with_prefetch_hint(Some(130650))
1618 .load_and_finish(f, len)
1619 .await
1620 .unwrap();
1621 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1622 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1623
1624 fetch_count.store(0, Ordering::SeqCst);
1626 let f = MetadataFetchFn(&mut fetch);
1627 let metadata = ParquetMetaDataReader::new()
1628 .with_page_indexes(true)
1629 .with_prefetch_hint(Some((len - 1000) as usize)) .load_and_finish(f, len)
1631 .await
1632 .unwrap();
1633 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1634 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1635
1636 fetch_count.store(0, Ordering::SeqCst);
1638 let f = MetadataFetchFn(&mut fetch);
1639 let metadata = ParquetMetaDataReader::new()
1640 .with_page_indexes(true)
1641 .with_prefetch_hint(Some(len as usize)) .load_and_finish(f, len)
1643 .await
1644 .unwrap();
1645 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1646 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1647
1648 fetch_count.store(0, Ordering::SeqCst);
1650 let f = MetadataFetchFn(&mut fetch);
1651 let metadata = ParquetMetaDataReader::new()
1652 .with_page_indexes(true)
1653 .with_prefetch_hint(Some((len + 1000) as usize)) .load_and_finish(f, len)
1655 .await
1656 .unwrap();
1657 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1658 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1659 }
1660}