1use std::{io::Read, ops::Range, sync::Arc};
19
20use bytes::Bytes;
21
22use crate::basic::ColumnOrder;
23#[cfg(feature = "encryption")]
24use crate::encryption::{
25 decrypt::{FileDecryptionProperties, FileDecryptor},
26 modules::create_footer_aad,
27};
28
29use crate::errors::{ParquetError, Result};
30use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData};
31use crate::file::page_index::index::Index;
32use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
33use crate::file::reader::ChunkReader;
34use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER};
35use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
36#[cfg(feature = "encryption")]
37use crate::format::{EncryptionAlgorithm, FileCryptoMetaData as TFileCryptoMetaData};
38use crate::schema::types;
39use crate::schema::types::SchemaDescriptor;
40use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
41
42#[cfg(all(feature = "async", feature = "arrow"))]
43use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
44#[cfg(feature = "encryption")]
45use crate::encryption::decrypt::CryptoContext;
46use crate::file::page_index::offset_index::OffsetIndexMetaData;
47
48#[derive(Default)]
74pub struct ParquetMetaDataReader {
75 metadata: Option<ParquetMetaData>,
76 column_index: bool,
77 offset_index: bool,
78 prefetch_hint: Option<usize>,
79 metadata_size: Option<usize>,
82 #[cfg(feature = "encryption")]
83 file_decryption_properties: Option<FileDecryptionProperties>,
84}
85
86pub struct FooterTail {
90 metadata_length: usize,
91 encrypted_footer: bool,
92}
93
94impl FooterTail {
95 pub fn metadata_length(&self) -> usize {
97 self.metadata_length
98 }
99
100 pub fn is_encrypted_footer(&self) -> bool {
102 self.encrypted_footer
103 }
104}
105
106impl ParquetMetaDataReader {
107 pub fn new() -> Self {
109 Default::default()
110 }
111
112 pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
115 Self {
116 metadata: Some(metadata),
117 ..Default::default()
118 }
119 }
120
121 pub fn with_page_indexes(self, val: bool) -> Self {
127 self.with_column_indexes(val).with_offset_indexes(val)
128 }
129
130 pub fn with_column_indexes(mut self, val: bool) -> Self {
134 self.column_index = val;
135 self
136 }
137
138 pub fn with_offset_indexes(mut self, val: bool) -> Self {
142 self.offset_index = val;
143 self
144 }
145
146 pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
158 self.prefetch_hint = prefetch;
159 self
160 }
161
162 #[cfg(feature = "encryption")]
166 pub fn with_decryption_properties(
167 mut self,
168 properties: Option<&FileDecryptionProperties>,
169 ) -> Self {
170 self.file_decryption_properties = properties.cloned();
171 self
172 }
173
174 pub fn has_metadata(&self) -> bool {
176 self.metadata.is_some()
177 }
178
179 pub fn finish(&mut self) -> Result<ParquetMetaData> {
181 self.metadata
182 .take()
183 .ok_or_else(|| general_err!("could not parse parquet metadata"))
184 }
185
186 pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
205 self.try_parse(reader)?;
206 self.finish()
207 }
208
209 pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
215 self.try_parse_sized(reader, reader.len())
216 }
217
218 pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: u64) -> Result<()> {
291 self.metadata = match self.parse_metadata(reader) {
292 Ok(metadata) => Some(metadata),
293 Err(ParquetError::NeedMoreData(needed)) => {
294 if file_size == reader.len() || needed as u64 > file_size {
297 return Err(eof_err!(
298 "Parquet file too small. Size is {} but need {}",
299 file_size,
300 needed
301 ));
302 } else {
303 return Err(ParquetError::NeedMoreData(needed));
305 }
306 }
307 Err(e) => return Err(e),
308 };
309
310 if !self.column_index && !self.offset_index {
312 return Ok(());
313 }
314
315 self.read_page_indexes_sized(reader, file_size)
316 }
317
318 pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
321 self.read_page_indexes_sized(reader, reader.len())
322 }
323
324 pub fn read_page_indexes_sized<R: ChunkReader>(
330 &mut self,
331 reader: &R,
332 file_size: u64,
333 ) -> Result<()> {
334 if self.metadata.is_none() {
335 return Err(general_err!(
336 "Tried to read page indexes without ParquetMetaData metadata"
337 ));
338 }
339
340 let Some(range) = self.range_for_page_index() else {
351 return Ok(());
352 };
353
354 let file_range = file_size.saturating_sub(reader.len())..file_size;
357 if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
358 if range.end > file_size {
360 return Err(eof_err!(
361 "Parquet file too small. Range {:?} is beyond file bounds {file_size}",
362 range
363 ));
364 } else {
365 return Err(ParquetError::NeedMoreData(
367 (file_size - range.start).try_into()?,
368 ));
369 }
370 }
371
372 if let Some(metadata_size) = self.metadata_size {
375 let metadata_range = file_size.saturating_sub(metadata_size as u64)..file_size;
376 if range.end > metadata_range.start {
377 return Err(eof_err!(
378 "Parquet file too small. Page index range {:?} overlaps with file metadata {:?}",
379 range,
380 metadata_range
381 ));
382 }
383 }
384
385 let bytes_needed = usize::try_from(range.end - range.start)?;
386 let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?;
387 let offset = range.start;
388
389 self.parse_column_index(&bytes, offset)?;
390 self.parse_offset_index(&bytes, offset)?;
391
392 Ok(())
393 }
394
395 #[cfg(all(feature = "async", feature = "arrow"))]
402 pub async fn load_and_finish<F: MetadataFetch>(
403 mut self,
404 fetch: F,
405 file_size: u64,
406 ) -> Result<ParquetMetaData> {
407 self.try_load(fetch, file_size).await?;
408 self.finish()
409 }
410
411 #[cfg(all(feature = "async", feature = "arrow"))]
418 pub async fn load_via_suffix_and_finish<F: MetadataSuffixFetch>(
419 mut self,
420 fetch: F,
421 ) -> Result<ParquetMetaData> {
422 self.try_load_via_suffix(fetch).await?;
423 self.finish()
424 }
425 #[cfg(all(feature = "async", feature = "arrow"))]
431 pub async fn try_load<F: MetadataFetch>(&mut self, mut fetch: F, file_size: u64) -> Result<()> {
432 let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
433
434 self.metadata = Some(metadata);
435
436 if !self.column_index && !self.offset_index {
438 return Ok(());
439 }
440
441 self.load_page_index_with_remainder(fetch, remainder).await
442 }
443
444 #[cfg(all(feature = "async", feature = "arrow"))]
450 pub async fn try_load_via_suffix<F: MetadataSuffixFetch>(
451 &mut self,
452 mut fetch: F,
453 ) -> Result<()> {
454 let (metadata, remainder) = self.load_metadata_via_suffix(&mut fetch).await?;
455
456 self.metadata = Some(metadata);
457
458 if !self.column_index && !self.offset_index {
460 return Ok(());
461 }
462
463 self.load_page_index_with_remainder(fetch, remainder).await
464 }
465
466 #[cfg(all(feature = "async", feature = "arrow"))]
469 pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
470 self.load_page_index_with_remainder(fetch, None).await
471 }
472
473 #[cfg(all(feature = "async", feature = "arrow"))]
474 async fn load_page_index_with_remainder<F: MetadataFetch>(
475 &mut self,
476 mut fetch: F,
477 remainder: Option<(usize, Bytes)>,
478 ) -> Result<()> {
479 if self.metadata.is_none() {
480 return Err(general_err!("Footer metadata is not present"));
481 }
482
483 let range = self.range_for_page_index();
485 let range = match range {
486 Some(range) => range,
487 None => return Ok(()),
488 };
489
490 let bytes = match &remainder {
491 Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => {
492 let remainder_start = *remainder_start as u64;
493 let offset = usize::try_from(range.start - remainder_start)?;
494 let end = usize::try_from(range.end - remainder_start)?;
495 assert!(end <= remainder.len());
496 remainder.slice(offset..end)
497 }
498 _ => fetch.fetch(range.start..range.end).await?,
500 };
501
502 assert_eq!(bytes.len() as u64, range.end - range.start);
504
505 self.parse_column_index(&bytes, range.start)?;
506 self.parse_offset_index(&bytes, range.start)?;
507
508 Ok(())
509 }
510
511 fn parse_column_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> {
512 let metadata = self.metadata.as_mut().unwrap();
513 if self.column_index {
514 let index = metadata
515 .row_groups()
516 .iter()
517 .enumerate()
518 .map(|(rg_idx, x)| {
519 x.columns()
520 .iter()
521 .enumerate()
522 .map(|(col_idx, c)| match c.column_index_range() {
523 Some(r) => {
524 let r_start = usize::try_from(r.start - start_offset)?;
525 let r_end = usize::try_from(r.end - start_offset)?;
526 Self::parse_single_column_index(
527 &bytes[r_start..r_end],
528 metadata,
529 c,
530 rg_idx,
531 col_idx,
532 )
533 }
534 None => Ok(Index::NONE),
535 })
536 .collect::<Result<Vec<_>>>()
537 })
538 .collect::<Result<Vec<_>>>()?;
539 metadata.set_column_index(Some(index));
540 }
541 Ok(())
542 }
543
544 #[cfg(feature = "encryption")]
545 fn parse_single_column_index(
546 bytes: &[u8],
547 metadata: &ParquetMetaData,
548 column: &ColumnChunkMetaData,
549 row_group_index: usize,
550 col_index: usize,
551 ) -> Result<Index> {
552 match &column.column_crypto_metadata {
553 Some(crypto_metadata) => {
554 let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
555 general_err!("Cannot decrypt column index, no file decryptor set")
556 })?;
557 let crypto_context = CryptoContext::for_column(
558 file_decryptor,
559 crypto_metadata,
560 row_group_index,
561 col_index,
562 )?;
563 let column_decryptor = crypto_context.metadata_decryptor();
564 let aad = crypto_context.create_column_index_aad()?;
565 let plaintext = column_decryptor.decrypt(bytes, &aad)?;
566 decode_column_index(&plaintext, column.column_type())
567 }
568 None => decode_column_index(bytes, column.column_type()),
569 }
570 }
571
572 #[cfg(not(feature = "encryption"))]
573 fn parse_single_column_index(
574 bytes: &[u8],
575 _metadata: &ParquetMetaData,
576 column: &ColumnChunkMetaData,
577 _row_group_index: usize,
578 _col_index: usize,
579 ) -> Result<Index> {
580 decode_column_index(bytes, column.column_type())
581 }
582
583 fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> {
584 let metadata = self.metadata.as_mut().unwrap();
585 if self.offset_index {
586 let index = metadata
587 .row_groups()
588 .iter()
589 .enumerate()
590 .map(|(rg_idx, x)| {
591 x.columns()
592 .iter()
593 .enumerate()
594 .map(|(col_idx, c)| match c.offset_index_range() {
595 Some(r) => {
596 let r_start = usize::try_from(r.start - start_offset)?;
597 let r_end = usize::try_from(r.end - start_offset)?;
598 Self::parse_single_offset_index(
599 &bytes[r_start..r_end],
600 metadata,
601 c,
602 rg_idx,
603 col_idx,
604 )
605 }
606 None => Err(general_err!("missing offset index")),
607 })
608 .collect::<Result<Vec<_>>>()
609 })
610 .collect::<Result<Vec<_>>>()?;
611
612 metadata.set_offset_index(Some(index));
613 }
614 Ok(())
615 }
616
617 #[cfg(feature = "encryption")]
618 fn parse_single_offset_index(
619 bytes: &[u8],
620 metadata: &ParquetMetaData,
621 column: &ColumnChunkMetaData,
622 row_group_index: usize,
623 col_index: usize,
624 ) -> Result<OffsetIndexMetaData> {
625 match &column.column_crypto_metadata {
626 Some(crypto_metadata) => {
627 let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
628 general_err!("Cannot decrypt offset index, no file decryptor set")
629 })?;
630 let crypto_context = CryptoContext::for_column(
631 file_decryptor,
632 crypto_metadata,
633 row_group_index,
634 col_index,
635 )?;
636 let column_decryptor = crypto_context.metadata_decryptor();
637 let aad = crypto_context.create_offset_index_aad()?;
638 let plaintext = column_decryptor.decrypt(bytes, &aad)?;
639 decode_offset_index(&plaintext)
640 }
641 None => decode_offset_index(bytes),
642 }
643 }
644
645 #[cfg(not(feature = "encryption"))]
646 fn parse_single_offset_index(
647 bytes: &[u8],
648 _metadata: &ParquetMetaData,
649 _column: &ColumnChunkMetaData,
650 _row_group_index: usize,
651 _col_index: usize,
652 ) -> Result<OffsetIndexMetaData> {
653 decode_offset_index(bytes)
654 }
655
656 fn range_for_page_index(&self) -> Option<Range<u64>> {
657 self.metadata.as_ref()?;
659
660 let mut range = None;
662 let metadata = self.metadata.as_ref().unwrap();
663 for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
664 if self.column_index {
665 range = acc_range(range, c.column_index_range());
666 }
667 if self.offset_index {
668 range = acc_range(range, c.offset_index_range());
669 }
670 }
671 range
672 }
673
674 fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
677 let file_size = chunk_reader.len();
679 if file_size < (FOOTER_SIZE as u64) {
680 return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
681 }
682
683 let mut footer = [0_u8; 8];
684 chunk_reader
685 .get_read(file_size - 8)?
686 .read_exact(&mut footer)?;
687
688 let footer = Self::decode_footer_tail(&footer)?;
689 let metadata_len = footer.metadata_length();
690 let footer_metadata_len = FOOTER_SIZE + metadata_len;
691 self.metadata_size = Some(footer_metadata_len);
692
693 if footer_metadata_len as u64 > file_size {
694 return Err(ParquetError::NeedMoreData(footer_metadata_len));
695 }
696
697 let start = file_size - footer_metadata_len as u64;
698 self.decode_footer_metadata(
699 chunk_reader.get_bytes(start, metadata_len)?.as_ref(),
700 &footer,
701 )
702 }
703
704 #[cfg(all(feature = "async", feature = "arrow"))]
708 fn get_prefetch_size(&self) -> usize {
709 if let Some(prefetch) = self.prefetch_hint {
710 if prefetch > FOOTER_SIZE {
711 return prefetch;
712 }
713 }
714 FOOTER_SIZE
715 }
716
717 #[cfg(all(feature = "async", feature = "arrow"))]
718 async fn load_metadata<F: MetadataFetch>(
719 &self,
720 fetch: &mut F,
721 file_size: u64,
722 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
723 let prefetch = self.get_prefetch_size() as u64;
724
725 if file_size < FOOTER_SIZE as u64 {
726 return Err(eof_err!("file size of {} is less than footer", file_size));
727 }
728
729 let footer_start = file_size.saturating_sub(prefetch);
733
734 let suffix = fetch.fetch(footer_start..file_size).await?;
735 let suffix_len = suffix.len();
736 let fetch_len = (file_size - footer_start)
737 .try_into()
738 .expect("footer size should never be larger than u32");
739 if suffix_len < fetch_len {
740 return Err(eof_err!(
741 "metadata requires {} bytes, but could only read {}",
742 fetch_len,
743 suffix_len
744 ));
745 }
746
747 let mut footer = [0; FOOTER_SIZE];
748 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
749
750 let footer = Self::decode_footer_tail(&footer)?;
751 let length = footer.metadata_length();
752
753 if file_size < (length + FOOTER_SIZE) as u64 {
754 return Err(eof_err!(
755 "file size of {} is less than footer + metadata {}",
756 file_size,
757 length + FOOTER_SIZE
758 ));
759 }
760
761 if length > suffix_len - FOOTER_SIZE {
763 let metadata_start = file_size - (length + FOOTER_SIZE) as u64;
764 let meta = fetch
765 .fetch(metadata_start..(file_size - FOOTER_SIZE as u64))
766 .await?;
767 Ok((self.decode_footer_metadata(&meta, &footer)?, None))
768 } else {
769 let metadata_start = (file_size - (length + FOOTER_SIZE) as u64 - footer_start)
770 .try_into()
771 .expect("metadata length should never be larger than u32");
772 let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
773 Ok((
774 self.decode_footer_metadata(slice, &footer)?,
775 Some((footer_start as usize, suffix.slice(..metadata_start))),
776 ))
777 }
778 }
779
780 #[cfg(all(feature = "async", feature = "arrow"))]
781 async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
782 &self,
783 fetch: &mut F,
784 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
785 let prefetch = self.get_prefetch_size();
786
787 let suffix = fetch.fetch_suffix(prefetch as _).await?;
788 let suffix_len = suffix.len();
789
790 if suffix_len < FOOTER_SIZE {
791 return Err(eof_err!(
792 "footer metadata requires {} bytes, but could only read {}",
793 FOOTER_SIZE,
794 suffix_len
795 ));
796 }
797
798 let mut footer = [0; FOOTER_SIZE];
799 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
800
801 let footer = Self::decode_footer_tail(&footer)?;
802 let length = footer.metadata_length();
803
804 let metadata_offset = length + FOOTER_SIZE;
806 if length > suffix_len - FOOTER_SIZE {
807 let meta = fetch.fetch_suffix(metadata_offset).await?;
808
809 if meta.len() < metadata_offset {
810 return Err(eof_err!(
811 "metadata requires {} bytes, but could only read {}",
812 metadata_offset,
813 meta.len()
814 ));
815 }
816
817 Ok((
818 self.decode_footer_metadata(&meta.slice(0..length), &footer)?,
820 None,
821 ))
822 } else {
823 let metadata_start = suffix_len - metadata_offset;
824 let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
825 Ok((
826 self.decode_footer_metadata(slice, &footer)?,
827 Some((0, suffix.slice(..metadata_start))),
828 ))
829 }
830 }
831
832 pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result<FooterTail> {
844 let magic = &slice[4..];
845 let encrypted_footer = if magic == PARQUET_MAGIC_ENCR_FOOTER {
846 true
847 } else if magic == PARQUET_MAGIC {
848 false
849 } else {
850 return Err(general_err!("Invalid Parquet file. Corrupt footer"));
851 };
852 let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap());
854 Ok(FooterTail {
855 metadata_length: metadata_len as usize,
857 encrypted_footer,
858 })
859 }
860
861 #[deprecated(note = "use decode_footer_tail instead")]
863 pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
864 Self::decode_footer_tail(slice).map(|f| f.metadata_length)
865 }
866
867 pub(crate) fn decode_footer_metadata(
879 &self,
880 buf: &[u8],
881 footer_tail: &FooterTail,
882 ) -> Result<ParquetMetaData> {
883 #[cfg(feature = "encryption")]
884 let result = Self::decode_metadata_with_encryption(
885 buf,
886 footer_tail.is_encrypted_footer(),
887 self.file_decryption_properties.as_ref(),
888 );
889 #[cfg(not(feature = "encryption"))]
890 let result = {
891 if footer_tail.is_encrypted_footer() {
892 Err(general_err!(
893 "Parquet file has an encrypted footer but the encryption feature is disabled"
894 ))
895 } else {
896 Self::decode_metadata(buf)
897 }
898 };
899 result
900 }
901
902 #[cfg(feature = "encryption")]
912 fn decode_metadata_with_encryption(
913 buf: &[u8],
914 encrypted_footer: bool,
915 file_decryption_properties: Option<&FileDecryptionProperties>,
916 ) -> Result<ParquetMetaData> {
917 let mut prot = TCompactSliceInputProtocol::new(buf);
918 let mut file_decryptor = None;
919 let decrypted_fmd_buf;
920
921 if encrypted_footer {
922 if let Some(file_decryption_properties) = file_decryption_properties {
923 let t_file_crypto_metadata: TFileCryptoMetaData =
924 TFileCryptoMetaData::read_from_in_protocol(&mut prot)
925 .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
926 let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm {
927 EncryptionAlgorithm::AESGCMV1(algo) => algo.supply_aad_prefix,
928 _ => Some(false),
929 }
930 .unwrap_or(false);
931 if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() {
932 return Err(general_err!(
933 "Parquet file was encrypted with an AAD prefix that is not stored in the file, \
934 but no AAD prefix was provided in the file decryption properties"
935 ));
936 }
937 let decryptor = get_file_decryptor(
938 t_file_crypto_metadata.encryption_algorithm,
939 t_file_crypto_metadata.key_metadata.as_deref(),
940 file_decryption_properties,
941 )?;
942 let footer_decryptor = decryptor.get_footer_decryptor();
943 let aad_footer = create_footer_aad(decryptor.file_aad())?;
944
945 decrypted_fmd_buf = footer_decryptor?
946 .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
947 .map_err(|_| {
948 general_err!(
949 "Provided footer key and AAD were unable to decrypt parquet footer"
950 )
951 })?;
952 prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
953
954 file_decryptor = Some(decryptor);
955 } else {
956 return Err(general_err!("Parquet file has an encrypted footer but decryption properties were not provided"));
957 }
958 }
959
960 let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
961 .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
962 let schema = types::from_thrift(&t_file_metadata.schema)?;
963 let schema_descr = Arc::new(SchemaDescriptor::new(schema));
964
965 if let (Some(algo), Some(file_decryption_properties)) = (
966 t_file_metadata.encryption_algorithm,
967 file_decryption_properties,
968 ) {
969 file_decryptor = Some(get_file_decryptor(
971 algo,
972 t_file_metadata.footer_signing_key_metadata.as_deref(),
973 file_decryption_properties,
974 )?);
975 }
976
977 let mut row_groups = Vec::new();
978 for rg in t_file_metadata.row_groups {
979 let r = RowGroupMetaData::from_encrypted_thrift(
980 schema_descr.clone(),
981 rg,
982 file_decryptor.as_ref(),
983 )?;
984 row_groups.push(r);
985 }
986 let column_orders =
987 Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
988
989 let file_metadata = FileMetaData::new(
990 t_file_metadata.version,
991 t_file_metadata.num_rows,
992 t_file_metadata.created_by,
993 t_file_metadata.key_value_metadata,
994 schema_descr,
995 column_orders,
996 );
997 let mut metadata = ParquetMetaData::new(file_metadata, row_groups);
998
999 metadata.with_file_decryptor(file_decryptor);
1000
1001 Ok(metadata)
1002 }
1003
1004 pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
1012 let mut prot = TCompactSliceInputProtocol::new(buf);
1013
1014 let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
1015 .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
1016 let schema = types::from_thrift(&t_file_metadata.schema)?;
1017 let schema_descr = Arc::new(SchemaDescriptor::new(schema));
1018
1019 let mut row_groups = Vec::new();
1020 for rg in t_file_metadata.row_groups {
1021 row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
1022 }
1023 let column_orders =
1024 Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
1025
1026 let file_metadata = FileMetaData::new(
1027 t_file_metadata.version,
1028 t_file_metadata.num_rows,
1029 t_file_metadata.created_by,
1030 t_file_metadata.key_value_metadata,
1031 schema_descr,
1032 column_orders,
1033 );
1034
1035 Ok(ParquetMetaData::new(file_metadata, row_groups))
1036 }
1037
1038 fn parse_column_orders(
1041 t_column_orders: Option<Vec<TColumnOrder>>,
1042 schema_descr: &SchemaDescriptor,
1043 ) -> Result<Option<Vec<ColumnOrder>>> {
1044 match t_column_orders {
1045 Some(orders) => {
1046 if orders.len() != schema_descr.num_columns() {
1048 return Err(general_err!("Column order length mismatch"));
1049 };
1050 let mut res = Vec::new();
1051 for (i, column) in schema_descr.columns().iter().enumerate() {
1052 match orders[i] {
1053 TColumnOrder::TYPEORDER(_) => {
1054 let sort_order = ColumnOrder::get_sort_order(
1055 column.logical_type(),
1056 column.converted_type(),
1057 column.physical_type(),
1058 );
1059 res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
1060 }
1061 }
1062 }
1063 Ok(Some(res))
1064 }
1065 None => Ok(None),
1066 }
1067 }
1068}
1069
1070#[cfg(feature = "encryption")]
1071fn get_file_decryptor(
1072 encryption_algorithm: EncryptionAlgorithm,
1073 footer_key_metadata: Option<&[u8]>,
1074 file_decryption_properties: &FileDecryptionProperties,
1075) -> Result<FileDecryptor> {
1076 match encryption_algorithm {
1077 EncryptionAlgorithm::AESGCMV1(algo) => {
1078 let aad_file_unique = algo
1079 .aad_file_unique
1080 .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?;
1081 let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() {
1082 aad_prefix.clone()
1083 } else {
1084 algo.aad_prefix.unwrap_or_default()
1085 };
1086
1087 FileDecryptor::new(
1088 file_decryption_properties,
1089 footer_key_metadata,
1090 aad_file_unique,
1091 aad_prefix,
1092 )
1093 }
1094 EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!(
1095 "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
1096 )),
1097 }
1098}
1099
1100#[cfg(test)]
1101mod tests {
1102 use super::*;
1103 use bytes::Bytes;
1104
1105 use crate::basic::SortOrder;
1106 use crate::basic::Type;
1107 use crate::file::reader::Length;
1108 use crate::format::TypeDefinedOrder;
1109 use crate::schema::types::Type as SchemaType;
1110 use crate::util::test_common::file_util::get_test_file;
1111
1112 #[test]
1113 fn test_parse_metadata_size_smaller_than_footer() {
1114 let test_file = tempfile::tempfile().unwrap();
1115 let err = ParquetMetaDataReader::new()
1116 .parse_metadata(&test_file)
1117 .unwrap_err();
1118 assert!(matches!(err, ParquetError::NeedMoreData(8)));
1119 }
1120
1121 #[test]
1122 fn test_parse_metadata_corrupt_footer() {
1123 let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
1124 let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
1125 assert_eq!(
1126 reader_result.unwrap_err().to_string(),
1127 "Parquet error: Invalid Parquet file. Corrupt footer"
1128 );
1129 }
1130
1131 #[test]
1132 fn test_parse_metadata_invalid_start() {
1133 let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
1134 let err = ParquetMetaDataReader::new()
1135 .parse_metadata(&test_file)
1136 .unwrap_err();
1137 assert!(matches!(err, ParquetError::NeedMoreData(263)));
1138 }
1139
1140 #[test]
1141 fn test_metadata_column_orders_parse() {
1142 let fields = vec![
1144 Arc::new(
1145 SchemaType::primitive_type_builder("col1", Type::INT32)
1146 .build()
1147 .unwrap(),
1148 ),
1149 Arc::new(
1150 SchemaType::primitive_type_builder("col2", Type::FLOAT)
1151 .build()
1152 .unwrap(),
1153 ),
1154 ];
1155 let schema = SchemaType::group_type_builder("schema")
1156 .with_fields(fields)
1157 .build()
1158 .unwrap();
1159 let schema_descr = SchemaDescriptor::new(Arc::new(schema));
1160
1161 let t_column_orders = Some(vec![
1162 TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
1163 TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
1164 ]);
1165
1166 assert_eq!(
1167 ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr).unwrap(),
1168 Some(vec![
1169 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1170 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
1171 ])
1172 );
1173
1174 assert_eq!(
1176 ParquetMetaDataReader::parse_column_orders(None, &schema_descr).unwrap(),
1177 None
1178 );
1179 }
1180
1181 #[test]
1182 fn test_metadata_column_orders_len_mismatch() {
1183 let schema = SchemaType::group_type_builder("schema").build().unwrap();
1184 let schema_descr = SchemaDescriptor::new(Arc::new(schema));
1185
1186 let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);
1187
1188 let res = ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr);
1189 assert!(res.is_err());
1190 assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch"));
1191 }
1192
1193 #[test]
1194 fn test_try_parse() {
1195 let file = get_test_file("alltypes_tiny_pages.parquet");
1196 let len = file.len();
1197
1198 let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
1199
1200 let bytes_for_range = |range: Range<u64>| {
1201 file.get_bytes(range.start, (range.end - range.start).try_into().unwrap())
1202 .unwrap()
1203 };
1204
1205 let bytes = bytes_for_range(0..len);
1207 reader.try_parse(&bytes).unwrap();
1208 let metadata = reader.finish().unwrap();
1209 assert!(metadata.column_index.is_some());
1210 assert!(metadata.offset_index.is_some());
1211
1212 let bytes = bytes_for_range(320000..len);
1214 reader.try_parse_sized(&bytes, len).unwrap();
1215 let metadata = reader.finish().unwrap();
1216 assert!(metadata.column_index.is_some());
1217 assert!(metadata.offset_index.is_some());
1218
1219 let bytes = bytes_for_range(323583..len);
1221 reader.try_parse_sized(&bytes, len).unwrap();
1222 let metadata = reader.finish().unwrap();
1223 assert!(metadata.column_index.is_some());
1224 assert!(metadata.offset_index.is_some());
1225
1226 let bytes = bytes_for_range(323584..len);
1228 match reader.try_parse_sized(&bytes, len).unwrap_err() {
1230 ParquetError::NeedMoreData(needed) => {
1232 let bytes = bytes_for_range(len - needed as u64..len);
1233 reader.try_parse_sized(&bytes, len).unwrap();
1234 let metadata = reader.finish().unwrap();
1235 assert!(metadata.column_index.is_some());
1236 assert!(metadata.offset_index.is_some());
1237 }
1238 _ => panic!("unexpected error"),
1239 };
1240
1241 let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
1243 let mut bytes = bytes_for_range(452505..len);
1244 loop {
1245 match reader.try_parse_sized(&bytes, len) {
1246 Ok(_) => break,
1247 Err(ParquetError::NeedMoreData(needed)) => {
1248 bytes = bytes_for_range(len - needed as u64..len);
1249 if reader.has_metadata() {
1250 reader.read_page_indexes_sized(&bytes, len).unwrap();
1251 break;
1252 }
1253 }
1254 _ => panic!("unexpected error"),
1255 }
1256 }
1257 let metadata = reader.finish().unwrap();
1258 assert!(metadata.column_index.is_some());
1259 assert!(metadata.offset_index.is_some());
1260
1261 let bytes = bytes_for_range(323584..len);
1263 let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
1264 assert_eq!(
1265 reader_result.to_string(),
1266 "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
1267 );
1268
1269 let mut reader = ParquetMetaDataReader::new();
1271 let bytes = bytes_for_range(452505..len);
1272 match reader.try_parse_sized(&bytes, len).unwrap_err() {
1274 ParquetError::NeedMoreData(needed) => {
1276 let bytes = bytes_for_range(len - needed as u64..len);
1277 reader.try_parse_sized(&bytes, len).unwrap();
1278 reader.finish().unwrap();
1279 }
1280 _ => panic!("unexpected error"),
1281 };
1282
1283 let reader_result = reader.try_parse(&bytes).unwrap_err();
1285 assert_eq!(
1286 reader_result.to_string(),
1287 "EOF: Parquet file too small. Size is 1728 but need 1729"
1288 );
1289
1290 let bytes = bytes_for_range(0..1000);
1292 let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
1293 assert_eq!(
1294 reader_result.to_string(),
1295 "Parquet error: Invalid Parquet file. Corrupt footer"
1296 );
1297
1298 let bytes = bytes_for_range(452510..len);
1300 let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
1301 assert_eq!(
1302 reader_result.to_string(),
1303 "EOF: Parquet file too small. Size is 1728 but need 1729"
1304 );
1305 }
1306}
1307
1308#[cfg(all(feature = "async", feature = "arrow", test))]
1309mod async_tests {
1310 use super::*;
1311 use bytes::Bytes;
1312 use futures::future::BoxFuture;
1313 use futures::FutureExt;
1314 use std::fs::File;
1315 use std::future::Future;
1316 use std::io::{Read, Seek, SeekFrom};
1317 use std::ops::Range;
1318 use std::sync::atomic::{AtomicUsize, Ordering};
1319
1320 use crate::file::reader::Length;
1321 use crate::util::test_common::file_util::get_test_file;
1322
1323 struct MetadataFetchFn<F>(F);
1324
1325 impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
1326 where
1327 F: FnMut(Range<u64>) -> Fut + Send,
1328 Fut: Future<Output = Result<Bytes>> + Send,
1329 {
1330 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1331 async move { self.0(range).await }.boxed()
1332 }
1333 }
1334
1335 struct MetadataSuffixFetchFn<F1, F2>(F1, F2);
1336
1337 impl<F1, Fut, F2> MetadataFetch for MetadataSuffixFetchFn<F1, F2>
1338 where
1339 F1: FnMut(Range<u64>) -> Fut + Send,
1340 Fut: Future<Output = Result<Bytes>> + Send,
1341 F2: Send,
1342 {
1343 fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1344 async move { self.0(range).await }.boxed()
1345 }
1346 }
1347
1348 impl<F1, Fut, F2> MetadataSuffixFetch for MetadataSuffixFetchFn<F1, F2>
1349 where
1350 F1: FnMut(Range<u64>) -> Fut + Send,
1351 F2: FnMut(usize) -> Fut + Send,
1352 Fut: Future<Output = Result<Bytes>> + Send,
1353 {
1354 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
1355 async move { self.1(suffix).await }.boxed()
1356 }
1357 }
1358
1359 fn read_range(file: &mut File, range: Range<u64>) -> Result<Bytes> {
1360 file.seek(SeekFrom::Start(range.start as _))?;
1361 let len = range.end - range.start;
1362 let mut buf = Vec::with_capacity(len.try_into().unwrap());
1363 file.take(len as _).read_to_end(&mut buf)?;
1364 Ok(buf.into())
1365 }
1366
1367 fn read_suffix(file: &mut File, suffix: usize) -> Result<Bytes> {
1368 let file_len = file.len();
1369 file.seek(SeekFrom::End(0 - suffix.min(file_len as _) as i64))?;
1371 let mut buf = Vec::with_capacity(suffix);
1372 file.take(suffix as _).read_to_end(&mut buf)?;
1373 Ok(buf.into())
1374 }
1375
1376 #[tokio::test]
1377 async fn test_simple() {
1378 let mut file = get_test_file("nulls.snappy.parquet");
1379 let len = file.len();
1380
1381 let expected = ParquetMetaDataReader::new()
1382 .parse_and_finish(&file)
1383 .unwrap();
1384 let expected = expected.file_metadata().schema();
1385 let fetch_count = AtomicUsize::new(0);
1386
1387 let mut fetch = |range| {
1388 fetch_count.fetch_add(1, Ordering::SeqCst);
1389 futures::future::ready(read_range(&mut file, range))
1390 };
1391
1392 let input = MetadataFetchFn(&mut fetch);
1393 let actual = ParquetMetaDataReader::new()
1394 .load_and_finish(input, len)
1395 .await
1396 .unwrap();
1397 assert_eq!(actual.file_metadata().schema(), expected);
1398 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1399
1400 fetch_count.store(0, Ordering::SeqCst);
1402 let input = MetadataFetchFn(&mut fetch);
1403 let actual = ParquetMetaDataReader::new()
1404 .with_prefetch_hint(Some(7))
1405 .load_and_finish(input, len)
1406 .await
1407 .unwrap();
1408 assert_eq!(actual.file_metadata().schema(), expected);
1409 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1410
1411 fetch_count.store(0, Ordering::SeqCst);
1413 let input = MetadataFetchFn(&mut fetch);
1414 let actual = ParquetMetaDataReader::new()
1415 .with_prefetch_hint(Some(10))
1416 .load_and_finish(input, len)
1417 .await
1418 .unwrap();
1419 assert_eq!(actual.file_metadata().schema(), expected);
1420 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1421
1422 fetch_count.store(0, Ordering::SeqCst);
1424 let input = MetadataFetchFn(&mut fetch);
1425 let actual = ParquetMetaDataReader::new()
1426 .with_prefetch_hint(Some(500))
1427 .load_and_finish(input, len)
1428 .await
1429 .unwrap();
1430 assert_eq!(actual.file_metadata().schema(), expected);
1431 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1432
1433 fetch_count.store(0, Ordering::SeqCst);
1435 let input = MetadataFetchFn(&mut fetch);
1436 let actual = ParquetMetaDataReader::new()
1437 .with_prefetch_hint(Some(428))
1438 .load_and_finish(input, len)
1439 .await
1440 .unwrap();
1441 assert_eq!(actual.file_metadata().schema(), expected);
1442 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1443
1444 let input = MetadataFetchFn(&mut fetch);
1445 let err = ParquetMetaDataReader::new()
1446 .load_and_finish(input, 4)
1447 .await
1448 .unwrap_err()
1449 .to_string();
1450 assert_eq!(err, "EOF: file size of 4 is less than footer");
1451
1452 let input = MetadataFetchFn(&mut fetch);
1453 let err = ParquetMetaDataReader::new()
1454 .load_and_finish(input, 20)
1455 .await
1456 .unwrap_err()
1457 .to_string();
1458 assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
1459 }
1460
1461 #[tokio::test]
1462 async fn test_suffix() {
1463 let mut file = get_test_file("nulls.snappy.parquet");
1464 let mut file2 = file.try_clone().unwrap();
1465
1466 let expected = ParquetMetaDataReader::new()
1467 .parse_and_finish(&file)
1468 .unwrap();
1469 let expected = expected.file_metadata().schema();
1470 let fetch_count = AtomicUsize::new(0);
1471 let suffix_fetch_count = AtomicUsize::new(0);
1472
1473 let mut fetch = |range| {
1474 fetch_count.fetch_add(1, Ordering::SeqCst);
1475 futures::future::ready(read_range(&mut file, range))
1476 };
1477 let mut suffix_fetch = |suffix| {
1478 suffix_fetch_count.fetch_add(1, Ordering::SeqCst);
1479 futures::future::ready(read_suffix(&mut file2, suffix))
1480 };
1481
1482 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1483 let actual = ParquetMetaDataReader::new()
1484 .load_via_suffix_and_finish(input)
1485 .await
1486 .unwrap();
1487 assert_eq!(actual.file_metadata().schema(), expected);
1488 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1489 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1490
1491 fetch_count.store(0, Ordering::SeqCst);
1493 suffix_fetch_count.store(0, Ordering::SeqCst);
1494 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1495 let actual = ParquetMetaDataReader::new()
1496 .with_prefetch_hint(Some(7))
1497 .load_via_suffix_and_finish(input)
1498 .await
1499 .unwrap();
1500 assert_eq!(actual.file_metadata().schema(), expected);
1501 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1502 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1503
1504 fetch_count.store(0, Ordering::SeqCst);
1506 suffix_fetch_count.store(0, Ordering::SeqCst);
1507 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1508 let actual = ParquetMetaDataReader::new()
1509 .with_prefetch_hint(Some(10))
1510 .load_via_suffix_and_finish(input)
1511 .await
1512 .unwrap();
1513 assert_eq!(actual.file_metadata().schema(), expected);
1514 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1515 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1516
1517 dbg!("test");
1518 fetch_count.store(0, Ordering::SeqCst);
1520 suffix_fetch_count.store(0, Ordering::SeqCst);
1521 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1522 let actual = ParquetMetaDataReader::new()
1523 .with_prefetch_hint(Some(500))
1524 .load_via_suffix_and_finish(input)
1525 .await
1526 .unwrap();
1527 assert_eq!(actual.file_metadata().schema(), expected);
1528 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1529 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1530
1531 fetch_count.store(0, Ordering::SeqCst);
1533 suffix_fetch_count.store(0, Ordering::SeqCst);
1534 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1535 let actual = ParquetMetaDataReader::new()
1536 .with_prefetch_hint(Some(428))
1537 .load_via_suffix_and_finish(input)
1538 .await
1539 .unwrap();
1540 assert_eq!(actual.file_metadata().schema(), expected);
1541 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1542 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1543 }
1544
1545 #[cfg(feature = "encryption")]
1546 #[tokio::test]
1547 async fn test_suffix_with_encryption() {
1548 let mut file = get_test_file("uniform_encryption.parquet.encrypted");
1549 let mut file2 = file.try_clone().unwrap();
1550
1551 let mut fetch = |range| futures::future::ready(read_range(&mut file, range));
1552 let mut suffix_fetch = |suffix| futures::future::ready(read_suffix(&mut file2, suffix));
1553
1554 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1555
1556 let key_code: &[u8] = "0123456789012345".as_bytes();
1557 let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec())
1558 .build()
1559 .unwrap();
1560
1561 let expected = ParquetMetaDataReader::new()
1563 .with_decryption_properties(Some(&decryption_properties))
1564 .load_via_suffix_and_finish(input)
1565 .await
1566 .unwrap();
1567 assert_eq!(expected.num_row_groups(), 1);
1568 }
1569
1570 #[tokio::test]
1571 async fn test_page_index() {
1572 let mut file = get_test_file("alltypes_tiny_pages.parquet");
1573 let len = file.len();
1574 let fetch_count = AtomicUsize::new(0);
1575 let mut fetch = |range| {
1576 fetch_count.fetch_add(1, Ordering::SeqCst);
1577 futures::future::ready(read_range(&mut file, range))
1578 };
1579
1580 let f = MetadataFetchFn(&mut fetch);
1581 let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
1582 loader.try_load(f, len).await.unwrap();
1583 assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
1584 let metadata = loader.finish().unwrap();
1585 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1586
1587 fetch_count.store(0, Ordering::SeqCst);
1589 let f = MetadataFetchFn(&mut fetch);
1590 let mut loader = ParquetMetaDataReader::new()
1591 .with_page_indexes(true)
1592 .with_prefetch_hint(Some(1729));
1593 loader.try_load(f, len).await.unwrap();
1594 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1595 let metadata = loader.finish().unwrap();
1596 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1597
1598 fetch_count.store(0, Ordering::SeqCst);
1600 let f = MetadataFetchFn(&mut fetch);
1601 let mut loader = ParquetMetaDataReader::new()
1602 .with_page_indexes(true)
1603 .with_prefetch_hint(Some(130649));
1604 loader.try_load(f, len).await.unwrap();
1605 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1606 let metadata = loader.finish().unwrap();
1607 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1608
1609 fetch_count.store(0, Ordering::SeqCst);
1611 let f = MetadataFetchFn(&mut fetch);
1612 let metadata = ParquetMetaDataReader::new()
1613 .with_page_indexes(true)
1614 .with_prefetch_hint(Some(130650))
1615 .load_and_finish(f, len)
1616 .await
1617 .unwrap();
1618 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1619 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1620
1621 fetch_count.store(0, Ordering::SeqCst);
1623 let f = MetadataFetchFn(&mut fetch);
1624 let metadata = ParquetMetaDataReader::new()
1625 .with_page_indexes(true)
1626 .with_prefetch_hint(Some((len - 1000) as usize)) .load_and_finish(f, len)
1628 .await
1629 .unwrap();
1630 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1631 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1632
1633 fetch_count.store(0, Ordering::SeqCst);
1635 let f = MetadataFetchFn(&mut fetch);
1636 let metadata = ParquetMetaDataReader::new()
1637 .with_page_indexes(true)
1638 .with_prefetch_hint(Some(len as usize)) .load_and_finish(f, len)
1640 .await
1641 .unwrap();
1642 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1643 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1644
1645 fetch_count.store(0, Ordering::SeqCst);
1647 let f = MetadataFetchFn(&mut fetch);
1648 let metadata = ParquetMetaDataReader::new()
1649 .with_page_indexes(true)
1650 .with_prefetch_hint(Some((len + 1000) as usize)) .load_and_finish(f, len)
1652 .await
1653 .unwrap();
1654 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1655 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1656 }
1657}