1use std::{io::Read, ops::Range, sync::Arc};
19
20use crate::basic::ColumnOrder;
21#[cfg(feature = "encryption")]
22use crate::encryption::{
23 decrypt::{FileDecryptionProperties, FileDecryptor},
24 modules::create_footer_aad,
25};
26use bytes::Bytes;
27
28use crate::errors::{ParquetError, Result};
29use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData};
30use crate::file::page_index::index::Index;
31use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
32use crate::file::reader::ChunkReader;
33use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER};
34use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
35#[cfg(feature = "encryption")]
36use crate::format::{EncryptionAlgorithm, FileCryptoMetaData as TFileCryptoMetaData};
37use crate::schema::types;
38use crate::schema::types::SchemaDescriptor;
39use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
40
41#[cfg(all(feature = "async", feature = "arrow"))]
42use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
43#[cfg(feature = "encryption")]
44use crate::encryption::decrypt::CryptoContext;
45use crate::file::page_index::offset_index::OffsetIndexMetaData;
46
47#[derive(Default)]
73pub struct ParquetMetaDataReader {
74 metadata: Option<ParquetMetaData>,
75 column_index: bool,
76 offset_index: bool,
77 prefetch_hint: Option<usize>,
78 metadata_size: Option<usize>,
81 #[cfg(feature = "encryption")]
82 file_decryption_properties: Option<FileDecryptionProperties>,
83}
84
85pub struct FooterTail {
89 metadata_length: usize,
90 encrypted_footer: bool,
91}
92
93impl FooterTail {
94 pub fn metadata_length(&self) -> usize {
96 self.metadata_length
97 }
98
99 pub fn is_encrypted_footer(&self) -> bool {
101 self.encrypted_footer
102 }
103}
104
105impl ParquetMetaDataReader {
106 pub fn new() -> Self {
108 Default::default()
109 }
110
111 pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
114 Self {
115 metadata: Some(metadata),
116 ..Default::default()
117 }
118 }
119
120 pub fn with_page_indexes(self, val: bool) -> Self {
126 self.with_column_indexes(val).with_offset_indexes(val)
127 }
128
129 pub fn with_column_indexes(mut self, val: bool) -> Self {
133 self.column_index = val;
134 self
135 }
136
137 pub fn with_offset_indexes(mut self, val: bool) -> Self {
141 self.offset_index = val;
142 self
143 }
144
145 pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
157 self.prefetch_hint = prefetch;
158 self
159 }
160
161 #[cfg(feature = "encryption")]
165 pub fn with_decryption_properties(
166 mut self,
167 properties: Option<&FileDecryptionProperties>,
168 ) -> Self {
169 self.file_decryption_properties = properties.cloned();
170 self
171 }
172
173 pub fn has_metadata(&self) -> bool {
175 self.metadata.is_some()
176 }
177
178 pub fn finish(&mut self) -> Result<ParquetMetaData> {
180 self.metadata
181 .take()
182 .ok_or_else(|| general_err!("could not parse parquet metadata"))
183 }
184
185 pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
204 self.try_parse(reader)?;
205 self.finish()
206 }
207
208 pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
214 self.try_parse_sized(reader, reader.len())
215 }
216
217 pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: u64) -> Result<()> {
290 self.metadata = match self.parse_metadata(reader) {
291 Ok(metadata) => Some(metadata),
292 Err(ParquetError::NeedMoreData(needed)) => {
293 if file_size == reader.len() || needed as u64 > file_size {
296 return Err(eof_err!(
297 "Parquet file too small. Size is {} but need {}",
298 file_size,
299 needed
300 ));
301 } else {
302 return Err(ParquetError::NeedMoreData(needed));
304 }
305 }
306 Err(e) => return Err(e),
307 };
308
309 if !self.column_index && !self.offset_index {
311 return Ok(());
312 }
313
314 self.read_page_indexes_sized(reader, file_size)
315 }
316
317 pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
320 self.read_page_indexes_sized(reader, reader.len())
321 }
322
323 pub fn read_page_indexes_sized<R: ChunkReader>(
329 &mut self,
330 reader: &R,
331 file_size: u64,
332 ) -> Result<()> {
333 if self.metadata.is_none() {
334 return Err(general_err!(
335 "Tried to read page indexes without ParquetMetaData metadata"
336 ));
337 }
338
339 let Some(range) = self.range_for_page_index() else {
341 return Ok(());
342 };
343
344 let file_range = file_size.saturating_sub(reader.len())..file_size;
347 if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
348 if range.end > file_size {
350 return Err(eof_err!(
351 "Parquet file too small. Range {:?} is beyond file bounds {file_size}",
352 range
353 ));
354 } else {
355 return Err(ParquetError::NeedMoreData(
357 (file_size - range.start).try_into()?,
358 ));
359 }
360 }
361
362 if let Some(metadata_size) = self.metadata_size {
365 let metadata_range = file_size.saturating_sub(metadata_size as u64)..file_size;
366 if range.end > metadata_range.start {
367 return Err(eof_err!(
368 "Parquet file too small. Page index range {:?} overlaps with file metadata {:?}",
369 range,
370 metadata_range
371 ));
372 }
373 }
374
375 let bytes_needed = usize::try_from(range.end - range.start)?;
376 let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?;
377 let offset = range.start;
378
379 self.parse_column_index(&bytes, offset)?;
380 self.parse_offset_index(&bytes, offset)?;
381
382 Ok(())
383 }
384
385 #[cfg(all(feature = "async", feature = "arrow"))]
392 pub async fn load_and_finish<F: MetadataFetch>(
393 mut self,
394 fetch: F,
395 file_size: u64,
396 ) -> Result<ParquetMetaData> {
397 self.try_load(fetch, file_size).await?;
398 self.finish()
399 }
400
401 #[cfg(all(feature = "async", feature = "arrow"))]
408 pub async fn load_via_suffix_and_finish<F: MetadataSuffixFetch>(
409 mut self,
410 fetch: F,
411 ) -> Result<ParquetMetaData> {
412 self.try_load_via_suffix(fetch).await?;
413 self.finish()
414 }
415 #[cfg(all(feature = "async", feature = "arrow"))]
421 pub async fn try_load<F: MetadataFetch>(&mut self, mut fetch: F, file_size: u64) -> Result<()> {
422 let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
423
424 self.metadata = Some(metadata);
425
426 if !self.column_index && !self.offset_index {
428 return Ok(());
429 }
430
431 self.load_page_index_with_remainder(fetch, remainder).await
432 }
433
434 #[cfg(all(feature = "async", feature = "arrow"))]
440 pub async fn try_load_via_suffix<F: MetadataSuffixFetch>(
441 &mut self,
442 mut fetch: F,
443 ) -> Result<()> {
444 let (metadata, remainder) = self.load_metadata_via_suffix(&mut fetch).await?;
445
446 self.metadata = Some(metadata);
447
448 if !self.column_index && !self.offset_index {
450 return Ok(());
451 }
452
453 self.load_page_index_with_remainder(fetch, remainder).await
454 }
455
456 #[cfg(all(feature = "async", feature = "arrow"))]
459 pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
460 self.load_page_index_with_remainder(fetch, None).await
461 }
462
463 #[cfg(all(feature = "async", feature = "arrow"))]
464 async fn load_page_index_with_remainder<F: MetadataFetch>(
465 &mut self,
466 mut fetch: F,
467 remainder: Option<(usize, Bytes)>,
468 ) -> Result<()> {
469 if self.metadata.is_none() {
470 return Err(general_err!("Footer metadata is not present"));
471 }
472
473 let range = self.range_for_page_index();
475 let range = match range {
476 Some(range) => range,
477 None => return Ok(()),
478 };
479
480 let bytes = match &remainder {
481 Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => {
482 let remainder_start = *remainder_start as u64;
483 let offset = usize::try_from(range.start - remainder_start)?;
484 let end = usize::try_from(range.end - remainder_start)?;
485 assert!(end <= remainder.len());
486 remainder.slice(offset..end)
487 }
488 _ => fetch.fetch(range.start..range.end).await?,
490 };
491
492 assert_eq!(bytes.len() as u64, range.end - range.start);
494
495 self.parse_column_index(&bytes, range.start)?;
496 self.parse_offset_index(&bytes, range.start)?;
497
498 Ok(())
499 }
500
501 fn parse_column_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> {
502 let metadata = self.metadata.as_mut().unwrap();
503 if self.column_index {
504 let index = metadata
505 .row_groups()
506 .iter()
507 .enumerate()
508 .map(|(rg_idx, x)| {
509 x.columns()
510 .iter()
511 .enumerate()
512 .map(|(col_idx, c)| match c.column_index_range() {
513 Some(r) => {
514 let r_start = usize::try_from(r.start - start_offset)?;
515 let r_end = usize::try_from(r.end - start_offset)?;
516 Self::parse_single_column_index(
517 &bytes[r_start..r_end],
518 metadata,
519 c,
520 rg_idx,
521 col_idx,
522 )
523 }
524 None => Ok(Index::NONE),
525 })
526 .collect::<Result<Vec<_>>>()
527 })
528 .collect::<Result<Vec<_>>>()?;
529 metadata.set_column_index(Some(index));
530 }
531 Ok(())
532 }
533
534 #[cfg(feature = "encryption")]
535 fn parse_single_column_index(
536 bytes: &[u8],
537 metadata: &ParquetMetaData,
538 column: &ColumnChunkMetaData,
539 row_group_index: usize,
540 col_index: usize,
541 ) -> Result<Index> {
542 match &column.column_crypto_metadata {
543 Some(crypto_metadata) => {
544 let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
545 general_err!("Cannot decrypt column index, no file decryptor set")
546 })?;
547 let crypto_context = CryptoContext::for_column(
548 file_decryptor,
549 crypto_metadata,
550 row_group_index,
551 col_index,
552 )?;
553 let column_decryptor = crypto_context.metadata_decryptor();
554 let aad = crypto_context.create_column_index_aad()?;
555 let plaintext = column_decryptor.decrypt(bytes, &aad)?;
556 decode_column_index(&plaintext, column.column_type())
557 }
558 None => decode_column_index(bytes, column.column_type()),
559 }
560 }
561
562 #[cfg(not(feature = "encryption"))]
563 fn parse_single_column_index(
564 bytes: &[u8],
565 _metadata: &ParquetMetaData,
566 column: &ColumnChunkMetaData,
567 _row_group_index: usize,
568 _col_index: usize,
569 ) -> Result<Index> {
570 decode_column_index(bytes, column.column_type())
571 }
572
573 fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> {
574 let metadata = self.metadata.as_mut().unwrap();
575 if self.offset_index {
576 let index = metadata
577 .row_groups()
578 .iter()
579 .enumerate()
580 .map(|(rg_idx, x)| {
581 x.columns()
582 .iter()
583 .enumerate()
584 .map(|(col_idx, c)| match c.offset_index_range() {
585 Some(r) => {
586 let r_start = usize::try_from(r.start - start_offset)?;
587 let r_end = usize::try_from(r.end - start_offset)?;
588 Self::parse_single_offset_index(
589 &bytes[r_start..r_end],
590 metadata,
591 c,
592 rg_idx,
593 col_idx,
594 )
595 }
596 None => Err(general_err!("missing offset index")),
597 })
598 .collect::<Result<Vec<_>>>()
599 })
600 .collect::<Result<Vec<_>>>()?;
601
602 metadata.set_offset_index(Some(index));
603 }
604 Ok(())
605 }
606
607 #[cfg(feature = "encryption")]
608 fn parse_single_offset_index(
609 bytes: &[u8],
610 metadata: &ParquetMetaData,
611 column: &ColumnChunkMetaData,
612 row_group_index: usize,
613 col_index: usize,
614 ) -> Result<OffsetIndexMetaData> {
615 match &column.column_crypto_metadata {
616 Some(crypto_metadata) => {
617 let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
618 general_err!("Cannot decrypt offset index, no file decryptor set")
619 })?;
620 let crypto_context = CryptoContext::for_column(
621 file_decryptor,
622 crypto_metadata,
623 row_group_index,
624 col_index,
625 )?;
626 let column_decryptor = crypto_context.metadata_decryptor();
627 let aad = crypto_context.create_offset_index_aad()?;
628 let plaintext = column_decryptor.decrypt(bytes, &aad)?;
629 decode_offset_index(&plaintext)
630 }
631 None => decode_offset_index(bytes),
632 }
633 }
634
635 #[cfg(not(feature = "encryption"))]
636 fn parse_single_offset_index(
637 bytes: &[u8],
638 _metadata: &ParquetMetaData,
639 _column: &ColumnChunkMetaData,
640 _row_group_index: usize,
641 _col_index: usize,
642 ) -> Result<OffsetIndexMetaData> {
643 decode_offset_index(bytes)
644 }
645
646 fn range_for_page_index(&self) -> Option<Range<u64>> {
647 self.metadata.as_ref()?;
649
650 let mut range = None;
652 let metadata = self.metadata.as_ref().unwrap();
653 for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
654 if self.column_index {
655 range = acc_range(range, c.column_index_range());
656 }
657 if self.offset_index {
658 range = acc_range(range, c.offset_index_range());
659 }
660 }
661 range
662 }
663
664 fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
667 let file_size = chunk_reader.len();
669 if file_size < (FOOTER_SIZE as u64) {
670 return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
671 }
672
673 let mut footer = [0_u8; 8];
674 chunk_reader
675 .get_read(file_size - 8)?
676 .read_exact(&mut footer)?;
677
678 let footer = Self::decode_footer_tail(&footer)?;
679 let metadata_len = footer.metadata_length();
680 let footer_metadata_len = FOOTER_SIZE + metadata_len;
681 self.metadata_size = Some(footer_metadata_len);
682
683 if footer_metadata_len as u64 > file_size {
684 return Err(ParquetError::NeedMoreData(footer_metadata_len));
685 }
686
687 let start = file_size - footer_metadata_len as u64;
688 self.decode_footer_metadata(
689 chunk_reader.get_bytes(start, metadata_len)?.as_ref(),
690 &footer,
691 )
692 }
693
694 #[cfg(all(feature = "async", feature = "arrow"))]
698 fn get_prefetch_size(&self) -> usize {
699 if let Some(prefetch) = self.prefetch_hint {
700 if prefetch > FOOTER_SIZE {
701 return prefetch;
702 }
703 }
704 FOOTER_SIZE
705 }
706
707 #[cfg(all(feature = "async", feature = "arrow"))]
708 async fn load_metadata<F: MetadataFetch>(
709 &self,
710 fetch: &mut F,
711 file_size: u64,
712 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
713 let prefetch = self.get_prefetch_size() as u64;
714
715 if file_size < FOOTER_SIZE as u64 {
716 return Err(eof_err!("file size of {} is less than footer", file_size));
717 }
718
719 let footer_start = file_size.saturating_sub(prefetch);
723
724 let suffix = fetch.fetch(footer_start..file_size).await?;
725 let suffix_len = suffix.len();
726 let fetch_len = (file_size - footer_start)
727 .try_into()
728 .expect("footer size should never be larger than u32");
729 if suffix_len < fetch_len {
730 return Err(eof_err!(
731 "metadata requires {} bytes, but could only read {}",
732 fetch_len,
733 suffix_len
734 ));
735 }
736
737 let mut footer = [0; FOOTER_SIZE];
738 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
739
740 let footer = Self::decode_footer_tail(&footer)?;
741 let length = footer.metadata_length();
742
743 if file_size < (length + FOOTER_SIZE) as u64 {
744 return Err(eof_err!(
745 "file size of {} is less than footer + metadata {}",
746 file_size,
747 length + FOOTER_SIZE
748 ));
749 }
750
751 if length > suffix_len - FOOTER_SIZE {
753 let metadata_start = file_size - (length + FOOTER_SIZE) as u64;
754 let meta = fetch
755 .fetch(metadata_start..(file_size - FOOTER_SIZE as u64))
756 .await?;
757 Ok((self.decode_footer_metadata(&meta, &footer)?, None))
758 } else {
759 let metadata_start = (file_size - (length + FOOTER_SIZE) as u64 - footer_start)
760 .try_into()
761 .expect("metadata length should never be larger than u32");
762 let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
763 Ok((
764 self.decode_footer_metadata(slice, &footer)?,
765 Some((footer_start as usize, suffix.slice(..metadata_start))),
766 ))
767 }
768 }
769
770 #[cfg(all(feature = "async", feature = "arrow"))]
771 async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
772 &self,
773 fetch: &mut F,
774 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
775 let prefetch = self.get_prefetch_size();
776
777 let suffix = fetch.fetch_suffix(prefetch as _).await?;
778 let suffix_len = suffix.len();
779
780 if suffix_len < FOOTER_SIZE {
781 return Err(eof_err!(
782 "footer metadata requires {} bytes, but could only read {}",
783 FOOTER_SIZE,
784 suffix_len
785 ));
786 }
787
788 let mut footer = [0; FOOTER_SIZE];
789 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
790
791 let footer = Self::decode_footer_tail(&footer)?;
792 let length = footer.metadata_length();
793
794 let metadata_offset = length + FOOTER_SIZE;
796 if length > suffix_len - FOOTER_SIZE {
797 let meta = fetch.fetch_suffix(metadata_offset).await?;
798
799 if meta.len() < metadata_offset {
800 return Err(eof_err!(
801 "metadata requires {} bytes, but could only read {}",
802 metadata_offset,
803 meta.len()
804 ));
805 }
806
807 Ok((
808 self.decode_footer_metadata(&meta.slice(0..length), &footer)?,
810 None,
811 ))
812 } else {
813 let metadata_start = suffix_len - metadata_offset;
814 let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
815 Ok((
816 self.decode_footer_metadata(slice, &footer)?,
817 Some((0, suffix.slice(..metadata_start))),
818 ))
819 }
820 }
821
822 pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result<FooterTail> {
834 let magic = &slice[4..];
835 let encrypted_footer = if magic == PARQUET_MAGIC_ENCR_FOOTER {
836 true
837 } else if magic == PARQUET_MAGIC {
838 false
839 } else {
840 return Err(general_err!("Invalid Parquet file. Corrupt footer"));
841 };
842 let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap());
844 Ok(FooterTail {
845 metadata_length: metadata_len as usize,
847 encrypted_footer,
848 })
849 }
850
851 #[deprecated(since = "54.3.0", note = "Use decode_footer_tail instead")]
853 pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
854 Self::decode_footer_tail(slice).map(|f| f.metadata_length)
855 }
856
857 pub(crate) fn decode_footer_metadata(
869 &self,
870 buf: &[u8],
871 footer_tail: &FooterTail,
872 ) -> Result<ParquetMetaData> {
873 #[cfg(feature = "encryption")]
874 let result = Self::decode_metadata_with_encryption(
875 buf,
876 footer_tail.is_encrypted_footer(),
877 self.file_decryption_properties.as_ref(),
878 );
879 #[cfg(not(feature = "encryption"))]
880 let result = {
881 if footer_tail.is_encrypted_footer() {
882 Err(general_err!(
883 "Parquet file has an encrypted footer but the encryption feature is disabled"
884 ))
885 } else {
886 Self::decode_metadata(buf)
887 }
888 };
889 result
890 }
891
892 #[cfg(feature = "encryption")]
902 fn decode_metadata_with_encryption(
903 buf: &[u8],
904 encrypted_footer: bool,
905 file_decryption_properties: Option<&FileDecryptionProperties>,
906 ) -> Result<ParquetMetaData> {
907 let mut prot = TCompactSliceInputProtocol::new(buf);
908 let mut file_decryptor = None;
909 let decrypted_fmd_buf;
910
911 if encrypted_footer {
912 if let Some(file_decryption_properties) = file_decryption_properties {
913 let t_file_crypto_metadata: TFileCryptoMetaData =
914 TFileCryptoMetaData::read_from_in_protocol(&mut prot)
915 .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
916 let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm {
917 EncryptionAlgorithm::AESGCMV1(algo) => algo.supply_aad_prefix,
918 _ => Some(false),
919 }
920 .unwrap_or(false);
921 if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() {
922 return Err(general_err!(
923 "Parquet file was encrypted with an AAD prefix that is not stored in the file, \
924 but no AAD prefix was provided in the file decryption properties"
925 ));
926 }
927 let decryptor = get_file_decryptor(
928 t_file_crypto_metadata.encryption_algorithm,
929 t_file_crypto_metadata.key_metadata.as_deref(),
930 file_decryption_properties,
931 )?;
932 let footer_decryptor = decryptor.get_footer_decryptor();
933 let aad_footer = create_footer_aad(decryptor.file_aad())?;
934
935 decrypted_fmd_buf = footer_decryptor?
936 .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
937 .map_err(|_| {
938 general_err!(
939 "Provided footer key and AAD were unable to decrypt parquet footer"
940 )
941 })?;
942 prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
943
944 file_decryptor = Some(decryptor);
945 } else {
946 return Err(general_err!("Parquet file has an encrypted footer but decryption properties were not provided"));
947 }
948 }
949
950 let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
951 .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
952 let schema = types::from_thrift(&t_file_metadata.schema)?;
953 let schema_descr = Arc::new(SchemaDescriptor::new(schema));
954
955 if let (Some(algo), Some(file_decryption_properties)) = (
956 t_file_metadata.encryption_algorithm,
957 file_decryption_properties,
958 ) {
959 let file_decryptor_value = get_file_decryptor(
961 algo,
962 t_file_metadata.footer_signing_key_metadata.as_deref(),
963 file_decryption_properties,
964 )?;
965 if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer {
966 file_decryptor_value.verify_plaintext_footer_signature(buf)?;
967 }
968 file_decryptor = Some(file_decryptor_value);
969 }
970
971 let mut row_groups = Vec::new();
972 for rg in t_file_metadata.row_groups {
973 let r = RowGroupMetaData::from_encrypted_thrift(
974 schema_descr.clone(),
975 rg,
976 file_decryptor.as_ref(),
977 )?;
978 row_groups.push(r);
979 }
980 let column_orders =
981 Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
982
983 let file_metadata = FileMetaData::new(
984 t_file_metadata.version,
985 t_file_metadata.num_rows,
986 t_file_metadata.created_by,
987 t_file_metadata.key_value_metadata,
988 schema_descr,
989 column_orders,
990 );
991 let mut metadata = ParquetMetaData::new(file_metadata, row_groups);
992
993 metadata.with_file_decryptor(file_decryptor);
994
995 Ok(metadata)
996 }
997
998 pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
1006 let mut prot = TCompactSliceInputProtocol::new(buf);
1007
1008 let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
1009 .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
1010 let schema = types::from_thrift(&t_file_metadata.schema)?;
1011 let schema_descr = Arc::new(SchemaDescriptor::new(schema));
1012
1013 let mut row_groups = Vec::new();
1014 for rg in t_file_metadata.row_groups {
1015 row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
1016 }
1017 let column_orders =
1018 Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
1019
1020 let file_metadata = FileMetaData::new(
1021 t_file_metadata.version,
1022 t_file_metadata.num_rows,
1023 t_file_metadata.created_by,
1024 t_file_metadata.key_value_metadata,
1025 schema_descr,
1026 column_orders,
1027 );
1028
1029 Ok(ParquetMetaData::new(file_metadata, row_groups))
1030 }
1031
1032 fn parse_column_orders(
1035 t_column_orders: Option<Vec<TColumnOrder>>,
1036 schema_descr: &SchemaDescriptor,
1037 ) -> Result<Option<Vec<ColumnOrder>>> {
1038 match t_column_orders {
1039 Some(orders) => {
1040 if orders.len() != schema_descr.num_columns() {
1042 return Err(general_err!("Column order length mismatch"));
1043 };
1044 let mut res = Vec::new();
1045 for (i, column) in schema_descr.columns().iter().enumerate() {
1046 match orders[i] {
1047 TColumnOrder::TYPEORDER(_) => {
1048 let sort_order = ColumnOrder::get_sort_order(
1049 column.logical_type(),
1050 column.converted_type(),
1051 column.physical_type(),
1052 );
1053 res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
1054 }
1055 }
1056 }
1057 Ok(Some(res))
1058 }
1059 None => Ok(None),
1060 }
1061 }
1062}
1063
1064#[cfg(feature = "encryption")]
1065fn get_file_decryptor(
1066 encryption_algorithm: EncryptionAlgorithm,
1067 footer_key_metadata: Option<&[u8]>,
1068 file_decryption_properties: &FileDecryptionProperties,
1069) -> Result<FileDecryptor> {
1070 match encryption_algorithm {
1071 EncryptionAlgorithm::AESGCMV1(algo) => {
1072 let aad_file_unique = algo
1073 .aad_file_unique
1074 .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?;
1075 let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() {
1076 aad_prefix.clone()
1077 } else {
1078 algo.aad_prefix.unwrap_or_default()
1079 };
1080
1081 FileDecryptor::new(
1082 file_decryption_properties,
1083 footer_key_metadata,
1084 aad_file_unique,
1085 aad_prefix,
1086 )
1087 }
1088 EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!(
1089 "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
1090 )),
1091 }
1092}
1093
1094#[cfg(test)]
1095mod tests {
1096 use super::*;
1097 use bytes::Bytes;
1098
1099 use crate::basic::SortOrder;
1100 use crate::basic::Type;
1101 use crate::file::reader::Length;
1102 use crate::format::TypeDefinedOrder;
1103 use crate::schema::types::Type as SchemaType;
1104 use crate::util::test_common::file_util::get_test_file;
1105
1106 #[test]
1107 fn test_parse_metadata_size_smaller_than_footer() {
1108 let test_file = tempfile::tempfile().unwrap();
1109 let err = ParquetMetaDataReader::new()
1110 .parse_metadata(&test_file)
1111 .unwrap_err();
1112 assert!(matches!(err, ParquetError::NeedMoreData(8)));
1113 }
1114
1115 #[test]
1116 fn test_parse_metadata_corrupt_footer() {
1117 let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
1118 let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
1119 assert_eq!(
1120 reader_result.unwrap_err().to_string(),
1121 "Parquet error: Invalid Parquet file. Corrupt footer"
1122 );
1123 }
1124
1125 #[test]
1126 fn test_parse_metadata_invalid_start() {
1127 let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
1128 let err = ParquetMetaDataReader::new()
1129 .parse_metadata(&test_file)
1130 .unwrap_err();
1131 assert!(matches!(err, ParquetError::NeedMoreData(263)));
1132 }
1133
1134 #[test]
1135 fn test_metadata_column_orders_parse() {
1136 let fields = vec![
1138 Arc::new(
1139 SchemaType::primitive_type_builder("col1", Type::INT32)
1140 .build()
1141 .unwrap(),
1142 ),
1143 Arc::new(
1144 SchemaType::primitive_type_builder("col2", Type::FLOAT)
1145 .build()
1146 .unwrap(),
1147 ),
1148 ];
1149 let schema = SchemaType::group_type_builder("schema")
1150 .with_fields(fields)
1151 .build()
1152 .unwrap();
1153 let schema_descr = SchemaDescriptor::new(Arc::new(schema));
1154
1155 let t_column_orders = Some(vec![
1156 TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
1157 TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
1158 ]);
1159
1160 assert_eq!(
1161 ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr).unwrap(),
1162 Some(vec![
1163 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1164 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
1165 ])
1166 );
1167
1168 assert_eq!(
1170 ParquetMetaDataReader::parse_column_orders(None, &schema_descr).unwrap(),
1171 None
1172 );
1173 }
1174
1175 #[test]
1176 fn test_metadata_column_orders_len_mismatch() {
1177 let schema = SchemaType::group_type_builder("schema").build().unwrap();
1178 let schema_descr = SchemaDescriptor::new(Arc::new(schema));
1179
1180 let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);
1181
1182 let res = ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr);
1183 assert!(res.is_err());
1184 assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch"));
1185 }
1186
1187 #[test]
1188 fn test_try_parse() {
1189 let file = get_test_file("alltypes_tiny_pages.parquet");
1190 let len = file.len();
1191
1192 let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
1193
1194 let bytes_for_range = |range: Range<u64>| {
1195 file.get_bytes(range.start, (range.end - range.start).try_into().unwrap())
1196 .unwrap()
1197 };
1198
1199 let bytes = bytes_for_range(0..len);
1201 reader.try_parse(&bytes).unwrap();
1202 let metadata = reader.finish().unwrap();
1203 assert!(metadata.column_index.is_some());
1204 assert!(metadata.offset_index.is_some());
1205
1206 let bytes = bytes_for_range(320000..len);
1208 reader.try_parse_sized(&bytes, len).unwrap();
1209 let metadata = reader.finish().unwrap();
1210 assert!(metadata.column_index.is_some());
1211 assert!(metadata.offset_index.is_some());
1212
1213 let bytes = bytes_for_range(323583..len);
1215 reader.try_parse_sized(&bytes, len).unwrap();
1216 let metadata = reader.finish().unwrap();
1217 assert!(metadata.column_index.is_some());
1218 assert!(metadata.offset_index.is_some());
1219
1220 let bytes = bytes_for_range(323584..len);
1222 match reader.try_parse_sized(&bytes, len).unwrap_err() {
1224 ParquetError::NeedMoreData(needed) => {
1226 let bytes = bytes_for_range(len - needed as u64..len);
1227 reader.try_parse_sized(&bytes, len).unwrap();
1228 let metadata = reader.finish().unwrap();
1229 assert!(metadata.column_index.is_some());
1230 assert!(metadata.offset_index.is_some());
1231 }
1232 _ => panic!("unexpected error"),
1233 };
1234
1235 let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
1237 let mut bytes = bytes_for_range(452505..len);
1238 loop {
1239 match reader.try_parse_sized(&bytes, len) {
1240 Ok(_) => break,
1241 Err(ParquetError::NeedMoreData(needed)) => {
1242 bytes = bytes_for_range(len - needed as u64..len);
1243 if reader.has_metadata() {
1244 reader.read_page_indexes_sized(&bytes, len).unwrap();
1245 break;
1246 }
1247 }
1248 _ => panic!("unexpected error"),
1249 }
1250 }
1251 let metadata = reader.finish().unwrap();
1252 assert!(metadata.column_index.is_some());
1253 assert!(metadata.offset_index.is_some());
1254
1255 let bytes = bytes_for_range(323584..len);
1257 let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
1258 assert_eq!(
1259 reader_result.to_string(),
1260 "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
1261 );
1262
1263 let mut reader = ParquetMetaDataReader::new();
1265 let bytes = bytes_for_range(452505..len);
1266 match reader.try_parse_sized(&bytes, len).unwrap_err() {
1268 ParquetError::NeedMoreData(needed) => {
1270 let bytes = bytes_for_range(len - needed as u64..len);
1271 reader.try_parse_sized(&bytes, len).unwrap();
1272 reader.finish().unwrap();
1273 }
1274 _ => panic!("unexpected error"),
1275 };
1276
1277 let reader_result = reader.try_parse(&bytes).unwrap_err();
1279 assert_eq!(
1280 reader_result.to_string(),
1281 "EOF: Parquet file too small. Size is 1728 but need 1729"
1282 );
1283
1284 let bytes = bytes_for_range(0..1000);
1286 let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
1287 assert_eq!(
1288 reader_result.to_string(),
1289 "Parquet error: Invalid Parquet file. Corrupt footer"
1290 );
1291
1292 let bytes = bytes_for_range(452510..len);
1294 let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
1295 assert_eq!(
1296 reader_result.to_string(),
1297 "EOF: Parquet file too small. Size is 1728 but need 1729"
1298 );
1299 }
1300}
1301
1302#[cfg(all(feature = "async", feature = "arrow", test))]
1303mod async_tests {
1304 use super::*;
1305 use bytes::Bytes;
1306 use futures::future::BoxFuture;
1307 use futures::FutureExt;
1308 use std::fs::File;
1309 use std::future::Future;
1310 use std::io::{Read, Seek, SeekFrom};
1311 use std::ops::Range;
1312 use std::sync::atomic::{AtomicUsize, Ordering};
1313
1314 use crate::file::reader::Length;
1315 use crate::util::test_common::file_util::get_test_file;
1316
1317 struct MetadataFetchFn<F>(F);
1318
1319 impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
1320 where
1321 F: FnMut(Range<u64>) -> Fut + Send,
1322 Fut: Future<Output = Result<Bytes>> + Send,
1323 {
1324 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1325 async move { self.0(range).await }.boxed()
1326 }
1327 }
1328
1329 struct MetadataSuffixFetchFn<F1, F2>(F1, F2);
1330
1331 impl<F1, Fut, F2> MetadataFetch for MetadataSuffixFetchFn<F1, F2>
1332 where
1333 F1: FnMut(Range<u64>) -> Fut + Send,
1334 Fut: Future<Output = Result<Bytes>> + Send,
1335 F2: Send,
1336 {
1337 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1338 async move { self.0(range).await }.boxed()
1339 }
1340 }
1341
1342 impl<F1, Fut, F2> MetadataSuffixFetch for MetadataSuffixFetchFn<F1, F2>
1343 where
1344 F1: FnMut(Range<u64>) -> Fut + Send,
1345 F2: FnMut(usize) -> Fut + Send,
1346 Fut: Future<Output = Result<Bytes>> + Send,
1347 {
1348 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
1349 async move { self.1(suffix).await }.boxed()
1350 }
1351 }
1352
1353 fn read_range(file: &mut File, range: Range<u64>) -> Result<Bytes> {
1354 file.seek(SeekFrom::Start(range.start as _))?;
1355 let len = range.end - range.start;
1356 let mut buf = Vec::with_capacity(len.try_into().unwrap());
1357 file.take(len as _).read_to_end(&mut buf)?;
1358 Ok(buf.into())
1359 }
1360
1361 fn read_suffix(file: &mut File, suffix: usize) -> Result<Bytes> {
1362 let file_len = file.len();
1363 file.seek(SeekFrom::End(0 - suffix.min(file_len as _) as i64))?;
1365 let mut buf = Vec::with_capacity(suffix);
1366 file.take(suffix as _).read_to_end(&mut buf)?;
1367 Ok(buf.into())
1368 }
1369
1370 #[tokio::test]
1371 async fn test_simple() {
1372 let mut file = get_test_file("nulls.snappy.parquet");
1373 let len = file.len();
1374
1375 let expected = ParquetMetaDataReader::new()
1376 .parse_and_finish(&file)
1377 .unwrap();
1378 let expected = expected.file_metadata().schema();
1379 let fetch_count = AtomicUsize::new(0);
1380
1381 let mut fetch = |range| {
1382 fetch_count.fetch_add(1, Ordering::SeqCst);
1383 futures::future::ready(read_range(&mut file, range))
1384 };
1385
1386 let input = MetadataFetchFn(&mut fetch);
1387 let actual = ParquetMetaDataReader::new()
1388 .load_and_finish(input, len)
1389 .await
1390 .unwrap();
1391 assert_eq!(actual.file_metadata().schema(), expected);
1392 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1393
1394 fetch_count.store(0, Ordering::SeqCst);
1396 let input = MetadataFetchFn(&mut fetch);
1397 let actual = ParquetMetaDataReader::new()
1398 .with_prefetch_hint(Some(7))
1399 .load_and_finish(input, len)
1400 .await
1401 .unwrap();
1402 assert_eq!(actual.file_metadata().schema(), expected);
1403 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1404
1405 fetch_count.store(0, Ordering::SeqCst);
1407 let input = MetadataFetchFn(&mut fetch);
1408 let actual = ParquetMetaDataReader::new()
1409 .with_prefetch_hint(Some(10))
1410 .load_and_finish(input, len)
1411 .await
1412 .unwrap();
1413 assert_eq!(actual.file_metadata().schema(), expected);
1414 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1415
1416 fetch_count.store(0, Ordering::SeqCst);
1418 let input = MetadataFetchFn(&mut fetch);
1419 let actual = ParquetMetaDataReader::new()
1420 .with_prefetch_hint(Some(500))
1421 .load_and_finish(input, len)
1422 .await
1423 .unwrap();
1424 assert_eq!(actual.file_metadata().schema(), expected);
1425 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1426
1427 fetch_count.store(0, Ordering::SeqCst);
1429 let input = MetadataFetchFn(&mut fetch);
1430 let actual = ParquetMetaDataReader::new()
1431 .with_prefetch_hint(Some(428))
1432 .load_and_finish(input, len)
1433 .await
1434 .unwrap();
1435 assert_eq!(actual.file_metadata().schema(), expected);
1436 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1437
1438 let input = MetadataFetchFn(&mut fetch);
1439 let err = ParquetMetaDataReader::new()
1440 .load_and_finish(input, 4)
1441 .await
1442 .unwrap_err()
1443 .to_string();
1444 assert_eq!(err, "EOF: file size of 4 is less than footer");
1445
1446 let input = MetadataFetchFn(&mut fetch);
1447 let err = ParquetMetaDataReader::new()
1448 .load_and_finish(input, 20)
1449 .await
1450 .unwrap_err()
1451 .to_string();
1452 assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
1453 }
1454
1455 #[tokio::test]
1456 async fn test_suffix() {
1457 let mut file = get_test_file("nulls.snappy.parquet");
1458 let mut file2 = file.try_clone().unwrap();
1459
1460 let expected = ParquetMetaDataReader::new()
1461 .parse_and_finish(&file)
1462 .unwrap();
1463 let expected = expected.file_metadata().schema();
1464 let fetch_count = AtomicUsize::new(0);
1465 let suffix_fetch_count = AtomicUsize::new(0);
1466
1467 let mut fetch = |range| {
1468 fetch_count.fetch_add(1, Ordering::SeqCst);
1469 futures::future::ready(read_range(&mut file, range))
1470 };
1471 let mut suffix_fetch = |suffix| {
1472 suffix_fetch_count.fetch_add(1, Ordering::SeqCst);
1473 futures::future::ready(read_suffix(&mut file2, suffix))
1474 };
1475
1476 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1477 let actual = ParquetMetaDataReader::new()
1478 .load_via_suffix_and_finish(input)
1479 .await
1480 .unwrap();
1481 assert_eq!(actual.file_metadata().schema(), expected);
1482 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1483 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1484
1485 fetch_count.store(0, Ordering::SeqCst);
1487 suffix_fetch_count.store(0, Ordering::SeqCst);
1488 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1489 let actual = ParquetMetaDataReader::new()
1490 .with_prefetch_hint(Some(7))
1491 .load_via_suffix_and_finish(input)
1492 .await
1493 .unwrap();
1494 assert_eq!(actual.file_metadata().schema(), expected);
1495 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1496 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1497
1498 fetch_count.store(0, Ordering::SeqCst);
1500 suffix_fetch_count.store(0, Ordering::SeqCst);
1501 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1502 let actual = ParquetMetaDataReader::new()
1503 .with_prefetch_hint(Some(10))
1504 .load_via_suffix_and_finish(input)
1505 .await
1506 .unwrap();
1507 assert_eq!(actual.file_metadata().schema(), expected);
1508 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1509 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1510
1511 dbg!("test");
1512 fetch_count.store(0, Ordering::SeqCst);
1514 suffix_fetch_count.store(0, Ordering::SeqCst);
1515 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1516 let actual = ParquetMetaDataReader::new()
1517 .with_prefetch_hint(Some(500))
1518 .load_via_suffix_and_finish(input)
1519 .await
1520 .unwrap();
1521 assert_eq!(actual.file_metadata().schema(), expected);
1522 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1523 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1524
1525 fetch_count.store(0, Ordering::SeqCst);
1527 suffix_fetch_count.store(0, Ordering::SeqCst);
1528 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1529 let actual = ParquetMetaDataReader::new()
1530 .with_prefetch_hint(Some(428))
1531 .load_via_suffix_and_finish(input)
1532 .await
1533 .unwrap();
1534 assert_eq!(actual.file_metadata().schema(), expected);
1535 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1536 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1537 }
1538
1539 #[cfg(feature = "encryption")]
1540 #[tokio::test]
1541 async fn test_suffix_with_encryption() {
1542 let mut file = get_test_file("uniform_encryption.parquet.encrypted");
1543 let mut file2 = file.try_clone().unwrap();
1544
1545 let mut fetch = |range| futures::future::ready(read_range(&mut file, range));
1546 let mut suffix_fetch = |suffix| futures::future::ready(read_suffix(&mut file2, suffix));
1547
1548 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1549
1550 let key_code: &[u8] = "0123456789012345".as_bytes();
1551 let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec())
1552 .build()
1553 .unwrap();
1554
1555 let expected = ParquetMetaDataReader::new()
1557 .with_decryption_properties(Some(&decryption_properties))
1558 .load_via_suffix_and_finish(input)
1559 .await
1560 .unwrap();
1561 assert_eq!(expected.num_row_groups(), 1);
1562 }
1563
1564 #[tokio::test]
1565 async fn test_page_index() {
1566 let mut file = get_test_file("alltypes_tiny_pages.parquet");
1567 let len = file.len();
1568 let fetch_count = AtomicUsize::new(0);
1569 let mut fetch = |range| {
1570 fetch_count.fetch_add(1, Ordering::SeqCst);
1571 futures::future::ready(read_range(&mut file, range))
1572 };
1573
1574 let f = MetadataFetchFn(&mut fetch);
1575 let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
1576 loader.try_load(f, len).await.unwrap();
1577 assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
1578 let metadata = loader.finish().unwrap();
1579 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1580
1581 fetch_count.store(0, Ordering::SeqCst);
1583 let f = MetadataFetchFn(&mut fetch);
1584 let mut loader = ParquetMetaDataReader::new()
1585 .with_page_indexes(true)
1586 .with_prefetch_hint(Some(1729));
1587 loader.try_load(f, len).await.unwrap();
1588 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1589 let metadata = loader.finish().unwrap();
1590 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1591
1592 fetch_count.store(0, Ordering::SeqCst);
1594 let f = MetadataFetchFn(&mut fetch);
1595 let mut loader = ParquetMetaDataReader::new()
1596 .with_page_indexes(true)
1597 .with_prefetch_hint(Some(130649));
1598 loader.try_load(f, len).await.unwrap();
1599 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1600 let metadata = loader.finish().unwrap();
1601 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1602
1603 fetch_count.store(0, Ordering::SeqCst);
1605 let f = MetadataFetchFn(&mut fetch);
1606 let metadata = ParquetMetaDataReader::new()
1607 .with_page_indexes(true)
1608 .with_prefetch_hint(Some(130650))
1609 .load_and_finish(f, len)
1610 .await
1611 .unwrap();
1612 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1613 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1614
1615 fetch_count.store(0, Ordering::SeqCst);
1617 let f = MetadataFetchFn(&mut fetch);
1618 let metadata = ParquetMetaDataReader::new()
1619 .with_page_indexes(true)
1620 .with_prefetch_hint(Some((len - 1000) as usize)) .load_and_finish(f, len)
1622 .await
1623 .unwrap();
1624 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1625 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1626
1627 fetch_count.store(0, Ordering::SeqCst);
1629 let f = MetadataFetchFn(&mut fetch);
1630 let metadata = ParquetMetaDataReader::new()
1631 .with_page_indexes(true)
1632 .with_prefetch_hint(Some(len as usize)) .load_and_finish(f, len)
1634 .await
1635 .unwrap();
1636 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1637 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1638
1639 fetch_count.store(0, Ordering::SeqCst);
1641 let f = MetadataFetchFn(&mut fetch);
1642 let metadata = ParquetMetaDataReader::new()
1643 .with_page_indexes(true)
1644 .with_prefetch_hint(Some((len + 1000) as usize)) .load_and_finish(f, len)
1646 .await
1647 .unwrap();
1648 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1649 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1650 }
1651}