1use std::{io::Read, ops::Range, sync::Arc};
19
20use bytes::Bytes;
21
22use crate::basic::ColumnOrder;
23#[cfg(feature = "encryption")]
24use crate::encryption::{
25 decrypt::{FileDecryptionProperties, FileDecryptor},
26 modules::create_footer_aad,
27};
28
29use crate::errors::{ParquetError, Result};
30use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
31use crate::file::page_index::index::Index;
32use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
33use crate::file::reader::ChunkReader;
34use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER};
35use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
36#[cfg(feature = "encryption")]
37use crate::format::{EncryptionAlgorithm, FileCryptoMetaData as TFileCryptoMetaData};
38use crate::schema::types;
39use crate::schema::types::SchemaDescriptor;
40use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
41
42#[cfg(all(feature = "async", feature = "arrow"))]
43use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
44
45#[derive(Default)]
71pub struct ParquetMetaDataReader {
72 metadata: Option<ParquetMetaData>,
73 column_index: bool,
74 offset_index: bool,
75 prefetch_hint: Option<usize>,
76 metadata_size: Option<usize>,
79 #[cfg(feature = "encryption")]
80 file_decryption_properties: Option<FileDecryptionProperties>,
81}
82
83pub struct FooterTail {
87 metadata_length: usize,
88 encrypted_footer: bool,
89}
90
91impl FooterTail {
92 pub fn metadata_length(&self) -> usize {
94 self.metadata_length
95 }
96
97 pub fn is_encrypted_footer(&self) -> bool {
99 self.encrypted_footer
100 }
101}
102
103impl ParquetMetaDataReader {
104 pub fn new() -> Self {
106 Default::default()
107 }
108
109 pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
112 Self {
113 metadata: Some(metadata),
114 ..Default::default()
115 }
116 }
117
118 pub fn with_page_indexes(self, val: bool) -> Self {
124 self.with_column_indexes(val).with_offset_indexes(val)
125 }
126
127 pub fn with_column_indexes(mut self, val: bool) -> Self {
131 self.column_index = val;
132 self
133 }
134
135 pub fn with_offset_indexes(mut self, val: bool) -> Self {
139 self.offset_index = val;
140 self
141 }
142
143 pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
155 self.prefetch_hint = prefetch;
156 self
157 }
158
159 #[cfg(feature = "encryption")]
163 pub fn with_decryption_properties(
164 mut self,
165 properties: Option<&FileDecryptionProperties>,
166 ) -> Self {
167 self.file_decryption_properties = properties.cloned();
168 self
169 }
170
171 pub fn has_metadata(&self) -> bool {
173 self.metadata.is_some()
174 }
175
176 pub fn finish(&mut self) -> Result<ParquetMetaData> {
178 self.metadata
179 .take()
180 .ok_or_else(|| general_err!("could not parse parquet metadata"))
181 }
182
183 pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
202 self.try_parse(reader)?;
203 self.finish()
204 }
205
206 pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
212 self.try_parse_sized(reader, reader.len() as usize)
213 }
214
215 pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: usize) -> Result<()> {
288 self.metadata = match self.parse_metadata(reader) {
289 Ok(metadata) => Some(metadata),
290 Err(ParquetError::NeedMoreData(needed)) => {
291 if file_size == reader.len() as usize || needed > file_size {
294 return Err(eof_err!(
295 "Parquet file too small. Size is {} but need {}",
296 file_size,
297 needed
298 ));
299 } else {
300 return Err(ParquetError::NeedMoreData(needed));
302 }
303 }
304 Err(e) => return Err(e),
305 };
306
307 if !self.column_index && !self.offset_index {
309 return Ok(());
310 }
311
312 self.read_page_indexes_sized(reader, file_size)
313 }
314
315 pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
318 self.read_page_indexes_sized(reader, reader.len() as usize)
319 }
320
321 pub fn read_page_indexes_sized<R: ChunkReader>(
327 &mut self,
328 reader: &R,
329 file_size: usize,
330 ) -> Result<()> {
331 if self.metadata.is_none() {
332 return Err(general_err!(
333 "Tried to read page indexes without ParquetMetaData metadata"
334 ));
335 }
336
337 let Some(range) = self.range_for_page_index() else {
348 return Ok(());
349 };
350
351 let file_range = file_size.saturating_sub(reader.len() as usize)..file_size;
354 if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
355 if range.end > file_size {
357 return Err(eof_err!(
358 "Parquet file too small. Range {:?} is beyond file bounds {file_size}",
359 range
360 ));
361 } else {
362 return Err(ParquetError::NeedMoreData(file_size - range.start));
364 }
365 }
366
367 if let Some(metadata_size) = self.metadata_size {
370 let metadata_range = file_size.saturating_sub(metadata_size)..file_size;
371 if range.end > metadata_range.start {
372 return Err(eof_err!(
373 "Parquet file too small. Page index range {:?} overlaps with file metadata {:?}",
374 range,
375 metadata_range
376 ));
377 }
378 }
379
380 let bytes_needed = range.end - range.start;
381 let bytes = reader.get_bytes((range.start - file_range.start) as u64, bytes_needed)?;
382 let offset = range.start;
383
384 self.parse_column_index(&bytes, offset)?;
385 self.parse_offset_index(&bytes, offset)?;
386
387 Ok(())
388 }
389
390 #[cfg(all(feature = "async", feature = "arrow"))]
397 pub async fn load_and_finish<F: MetadataFetch>(
398 mut self,
399 fetch: F,
400 file_size: usize,
401 ) -> Result<ParquetMetaData> {
402 self.try_load(fetch, file_size).await?;
403 self.finish()
404 }
405
406 #[cfg(all(feature = "async", feature = "arrow"))]
413 pub async fn load_via_suffix_and_finish<F: MetadataSuffixFetch>(
414 mut self,
415 fetch: F,
416 ) -> Result<ParquetMetaData> {
417 self.try_load_via_suffix(fetch).await?;
418 self.finish()
419 }
420 #[cfg(all(feature = "async", feature = "arrow"))]
426 pub async fn try_load<F: MetadataFetch>(
427 &mut self,
428 mut fetch: F,
429 file_size: usize,
430 ) -> Result<()> {
431 let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
432
433 self.metadata = Some(metadata);
434
435 if !self.column_index && !self.offset_index {
437 return Ok(());
438 }
439
440 self.load_page_index_with_remainder(fetch, remainder).await
441 }
442
443 #[cfg(all(feature = "async", feature = "arrow"))]
449 pub async fn try_load_via_suffix<F: MetadataSuffixFetch>(
450 &mut self,
451 mut fetch: F,
452 ) -> Result<()> {
453 let (metadata, remainder) = self.load_metadata_via_suffix(&mut fetch).await?;
454
455 self.metadata = Some(metadata);
456
457 if !self.column_index && !self.offset_index {
459 return Ok(());
460 }
461
462 self.load_page_index_with_remainder(fetch, remainder).await
463 }
464
465 #[cfg(all(feature = "async", feature = "arrow"))]
468 pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
469 self.load_page_index_with_remainder(fetch, None).await
470 }
471
472 #[cfg(all(feature = "async", feature = "arrow"))]
473 async fn load_page_index_with_remainder<F: MetadataFetch>(
474 &mut self,
475 mut fetch: F,
476 remainder: Option<(usize, Bytes)>,
477 ) -> Result<()> {
478 if self.metadata.is_none() {
479 return Err(general_err!("Footer metadata is not present"));
480 }
481
482 let range = self.range_for_page_index();
484 let range = match range {
485 Some(range) => range,
486 None => return Ok(()),
487 };
488
489 let bytes = match &remainder {
490 Some((remainder_start, remainder)) if *remainder_start <= range.start => {
491 let offset = range.start - *remainder_start;
492 let end = offset + range.end - range.start;
493 assert!(end <= remainder.len());
494 remainder.slice(offset..end)
495 }
496 _ => fetch.fetch(range.start..range.end).await?,
498 };
499
500 assert_eq!(bytes.len(), range.end - range.start);
502 let offset = range.start;
503
504 self.parse_column_index(&bytes, offset)?;
505 self.parse_offset_index(&bytes, offset)?;
506
507 Ok(())
508 }
509
510 fn parse_column_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> {
511 let metadata = self.metadata.as_mut().unwrap();
512 if self.column_index {
513 let index = metadata
514 .row_groups()
515 .iter()
516 .map(|x| {
517 x.columns()
518 .iter()
519 .map(|c| match c.column_index_range() {
520 Some(r) => decode_column_index(
521 &bytes[r.start - start_offset..r.end - start_offset],
522 c.column_type(),
523 ),
524 None => Ok(Index::NONE),
525 })
526 .collect::<Result<Vec<_>>>()
527 })
528 .collect::<Result<Vec<_>>>()?;
529 metadata.set_column_index(Some(index));
530 }
531 Ok(())
532 }
533
534 fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> {
535 let metadata = self.metadata.as_mut().unwrap();
536 if self.offset_index {
537 let index = metadata
538 .row_groups()
539 .iter()
540 .map(|x| {
541 x.columns()
542 .iter()
543 .map(|c| match c.offset_index_range() {
544 Some(r) => decode_offset_index(
545 &bytes[r.start - start_offset..r.end - start_offset],
546 ),
547 None => Err(general_err!("missing offset index")),
548 })
549 .collect::<Result<Vec<_>>>()
550 })
551 .collect::<Result<Vec<_>>>()?;
552
553 metadata.set_offset_index(Some(index));
554 }
555 Ok(())
556 }
557
558 fn range_for_page_index(&self) -> Option<Range<usize>> {
559 self.metadata.as_ref()?;
561
562 let mut range = None;
564 let metadata = self.metadata.as_ref().unwrap();
565 for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
566 if self.column_index {
567 range = acc_range(range, c.column_index_range());
568 }
569 if self.offset_index {
570 range = acc_range(range, c.offset_index_range());
571 }
572 }
573 range
574 }
575
576 fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
579 let file_size = chunk_reader.len();
581 if file_size < (FOOTER_SIZE as u64) {
582 return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
583 }
584
585 let mut footer = [0_u8; 8];
586 chunk_reader
587 .get_read(file_size - 8)?
588 .read_exact(&mut footer)?;
589
590 let footer = Self::decode_footer_tail(&footer)?;
591 let metadata_len = footer.metadata_length();
592 let footer_metadata_len = FOOTER_SIZE + metadata_len;
593 self.metadata_size = Some(footer_metadata_len);
594
595 if footer_metadata_len > file_size as usize {
596 return Err(ParquetError::NeedMoreData(footer_metadata_len));
597 }
598
599 let start = file_size - footer_metadata_len as u64;
600 self.decode_footer_metadata(
601 chunk_reader.get_bytes(start, metadata_len)?.as_ref(),
602 &footer,
603 )
604 }
605
606 #[cfg(all(feature = "async", feature = "arrow"))]
610 fn get_prefetch_size(&self) -> usize {
611 if let Some(prefetch) = self.prefetch_hint {
612 if prefetch > FOOTER_SIZE {
613 return prefetch;
614 }
615 }
616 FOOTER_SIZE
617 }
618
619 #[cfg(all(feature = "async", feature = "arrow"))]
620 async fn load_metadata<F: MetadataFetch>(
621 &self,
622 fetch: &mut F,
623 file_size: usize,
624 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
625 let prefetch = self.get_prefetch_size();
626
627 if file_size < FOOTER_SIZE {
628 return Err(eof_err!("file size of {} is less than footer", file_size));
629 }
630
631 let footer_start = file_size.saturating_sub(prefetch);
635
636 let suffix = fetch.fetch(footer_start..file_size).await?;
637 let suffix_len = suffix.len();
638 let fetch_len = file_size - footer_start;
639 if suffix_len < fetch_len {
640 return Err(eof_err!(
641 "metadata requires {} bytes, but could only read {}",
642 fetch_len,
643 suffix_len
644 ));
645 }
646
647 let mut footer = [0; FOOTER_SIZE];
648 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
649
650 let footer = Self::decode_footer_tail(&footer)?;
651 let length = footer.metadata_length();
652
653 if file_size < length + FOOTER_SIZE {
654 return Err(eof_err!(
655 "file size of {} is less than footer + metadata {}",
656 file_size,
657 length + FOOTER_SIZE
658 ));
659 }
660
661 if length > suffix_len - FOOTER_SIZE {
663 let metadata_start = file_size - length - FOOTER_SIZE;
664 let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?;
665 Ok((self.decode_footer_metadata(&meta, &footer)?, None))
666 } else {
667 let metadata_start = file_size - length - FOOTER_SIZE - footer_start;
668 let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
669 Ok((
670 self.decode_footer_metadata(slice, &footer)?,
671 Some((footer_start, suffix.slice(..metadata_start))),
672 ))
673 }
674 }
675
676 #[cfg(all(feature = "async", feature = "arrow"))]
677 async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
678 &self,
679 fetch: &mut F,
680 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
681 let prefetch = self.get_prefetch_size();
682
683 let suffix = fetch.fetch_suffix(prefetch).await?;
684 let suffix_len = suffix.len();
685
686 if suffix_len < FOOTER_SIZE {
687 return Err(eof_err!(
688 "footer metadata requires {} bytes, but could only read {}",
689 FOOTER_SIZE,
690 suffix_len
691 ));
692 }
693
694 let mut footer = [0; FOOTER_SIZE];
695 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
696
697 let footer = Self::decode_footer_tail(&footer)?;
698 let length = footer.metadata_length();
699
700 let metadata_offset = length + FOOTER_SIZE;
702 if length > suffix_len - FOOTER_SIZE {
703 let meta = fetch.fetch_suffix(metadata_offset).await?;
704
705 if meta.len() < metadata_offset {
706 return Err(eof_err!(
707 "metadata requires {} bytes, but could only read {}",
708 metadata_offset,
709 meta.len()
710 ));
711 }
712
713 Ok((
714 self.decode_footer_metadata(&meta.slice(0..length), &footer)?,
716 None,
717 ))
718 } else {
719 let metadata_start = suffix_len - metadata_offset;
720 let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
721 Ok((
722 self.decode_footer_metadata(slice, &footer)?,
723 Some((0, suffix.slice(..metadata_start))),
724 ))
725 }
726 }
727
728 pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result<FooterTail> {
740 let magic = &slice[4..];
741 let encrypted_footer = if magic == PARQUET_MAGIC_ENCR_FOOTER {
742 true
743 } else if magic == PARQUET_MAGIC {
744 false
745 } else {
746 return Err(general_err!("Invalid Parquet file. Corrupt footer"));
747 };
748 let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap());
750 Ok(FooterTail {
751 metadata_length: metadata_len as usize,
753 encrypted_footer,
754 })
755 }
756
757 #[deprecated(note = "use decode_footer_tail instead")]
759 pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
760 Self::decode_footer_tail(slice).map(|f| f.metadata_length)
761 }
762
763 pub(crate) fn decode_footer_metadata(
775 &self,
776 buf: &[u8],
777 footer_tail: &FooterTail,
778 ) -> Result<ParquetMetaData> {
779 #[cfg(feature = "encryption")]
780 let result = Self::decode_metadata_with_encryption(
781 buf,
782 footer_tail.is_encrypted_footer(),
783 self.file_decryption_properties.as_ref(),
784 );
785 #[cfg(not(feature = "encryption"))]
786 let result = {
787 if footer_tail.is_encrypted_footer() {
788 Err(general_err!(
789 "Parquet file has an encrypted footer but the encryption feature is disabled"
790 ))
791 } else {
792 Self::decode_metadata(buf)
793 }
794 };
795 result
796 }
797
798 #[cfg(feature = "encryption")]
808 fn decode_metadata_with_encryption(
809 buf: &[u8],
810 encrypted_footer: bool,
811 file_decryption_properties: Option<&FileDecryptionProperties>,
812 ) -> Result<ParquetMetaData> {
813 let mut prot = TCompactSliceInputProtocol::new(buf);
814 let mut file_decryptor = None;
815 let decrypted_fmd_buf;
816
817 if encrypted_footer {
818 if let Some(file_decryption_properties) = file_decryption_properties {
819 let t_file_crypto_metadata: TFileCryptoMetaData =
820 TFileCryptoMetaData::read_from_in_protocol(&mut prot)
821 .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
822 let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm {
823 EncryptionAlgorithm::AESGCMV1(algo) => algo.supply_aad_prefix,
824 _ => Some(false),
825 }
826 .unwrap_or(false);
827 if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() {
828 return Err(general_err!(
829 "Parquet file was encrypted with an AAD prefix that is not stored in the file, \
830 but no AAD prefix was provided in the file decryption properties"
831 ));
832 }
833 let decryptor = get_file_decryptor(
834 t_file_crypto_metadata.encryption_algorithm,
835 t_file_crypto_metadata.key_metadata.as_deref(),
836 file_decryption_properties,
837 )?;
838 let footer_decryptor = decryptor.get_footer_decryptor();
839 let aad_footer = create_footer_aad(decryptor.file_aad())?;
840
841 decrypted_fmd_buf = footer_decryptor?
842 .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
843 .map_err(|_| {
844 general_err!(
845 "Provided footer key and AAD were unable to decrypt parquet footer"
846 )
847 })?;
848 prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
849
850 file_decryptor = Some(decryptor);
851 } else {
852 return Err(general_err!("Parquet file has an encrypted footer but decryption properties were not provided"));
853 }
854 }
855
856 let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
857 .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
858 let schema = types::from_thrift(&t_file_metadata.schema)?;
859 let schema_descr = Arc::new(SchemaDescriptor::new(schema));
860
861 if let (Some(algo), Some(file_decryption_properties)) = (
862 t_file_metadata.encryption_algorithm,
863 file_decryption_properties,
864 ) {
865 file_decryptor = Some(get_file_decryptor(
867 algo,
868 t_file_metadata.footer_signing_key_metadata.as_deref(),
869 file_decryption_properties,
870 )?);
871 }
872
873 let mut row_groups = Vec::new();
874 for rg in t_file_metadata.row_groups {
875 let r = RowGroupMetaData::from_encrypted_thrift(
876 schema_descr.clone(),
877 rg,
878 file_decryptor.as_ref(),
879 )?;
880 row_groups.push(r);
881 }
882 let column_orders =
883 Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
884
885 let file_metadata = FileMetaData::new(
886 t_file_metadata.version,
887 t_file_metadata.num_rows,
888 t_file_metadata.created_by,
889 t_file_metadata.key_value_metadata,
890 schema_descr,
891 column_orders,
892 );
893 let mut metadata = ParquetMetaData::new(file_metadata, row_groups);
894
895 metadata.with_file_decryptor(file_decryptor);
896
897 Ok(metadata)
898 }
899
900 pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
908 let mut prot = TCompactSliceInputProtocol::new(buf);
909
910 let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
911 .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
912 let schema = types::from_thrift(&t_file_metadata.schema)?;
913 let schema_descr = Arc::new(SchemaDescriptor::new(schema));
914
915 let mut row_groups = Vec::new();
916 for rg in t_file_metadata.row_groups {
917 row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
918 }
919 let column_orders =
920 Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
921
922 let file_metadata = FileMetaData::new(
923 t_file_metadata.version,
924 t_file_metadata.num_rows,
925 t_file_metadata.created_by,
926 t_file_metadata.key_value_metadata,
927 schema_descr,
928 column_orders,
929 );
930
931 Ok(ParquetMetaData::new(file_metadata, row_groups))
932 }
933
934 fn parse_column_orders(
937 t_column_orders: Option<Vec<TColumnOrder>>,
938 schema_descr: &SchemaDescriptor,
939 ) -> Result<Option<Vec<ColumnOrder>>> {
940 match t_column_orders {
941 Some(orders) => {
942 if orders.len() != schema_descr.num_columns() {
944 return Err(general_err!("Column order length mismatch"));
945 };
946 let mut res = Vec::new();
947 for (i, column) in schema_descr.columns().iter().enumerate() {
948 match orders[i] {
949 TColumnOrder::TYPEORDER(_) => {
950 let sort_order = ColumnOrder::get_sort_order(
951 column.logical_type(),
952 column.converted_type(),
953 column.physical_type(),
954 );
955 res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
956 }
957 }
958 }
959 Ok(Some(res))
960 }
961 None => Ok(None),
962 }
963 }
964}
965
966#[cfg(feature = "encryption")]
967fn get_file_decryptor(
968 encryption_algorithm: EncryptionAlgorithm,
969 footer_key_metadata: Option<&[u8]>,
970 file_decryption_properties: &FileDecryptionProperties,
971) -> Result<FileDecryptor> {
972 match encryption_algorithm {
973 EncryptionAlgorithm::AESGCMV1(algo) => {
974 let aad_file_unique = algo
975 .aad_file_unique
976 .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?;
977 let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() {
978 aad_prefix.clone()
979 } else {
980 algo.aad_prefix.unwrap_or_default()
981 };
982
983 FileDecryptor::new(
984 file_decryption_properties,
985 footer_key_metadata,
986 aad_file_unique,
987 aad_prefix,
988 )
989 }
990 EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!(
991 "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
992 )),
993 }
994}
995
996#[cfg(test)]
997mod tests {
998 use super::*;
999 use bytes::Bytes;
1000
1001 use crate::basic::SortOrder;
1002 use crate::basic::Type;
1003 use crate::file::reader::Length;
1004 use crate::format::TypeDefinedOrder;
1005 use crate::schema::types::Type as SchemaType;
1006 use crate::util::test_common::file_util::get_test_file;
1007
1008 #[test]
1009 fn test_parse_metadata_size_smaller_than_footer() {
1010 let test_file = tempfile::tempfile().unwrap();
1011 let err = ParquetMetaDataReader::new()
1012 .parse_metadata(&test_file)
1013 .unwrap_err();
1014 assert!(matches!(err, ParquetError::NeedMoreData(8)));
1015 }
1016
1017 #[test]
1018 fn test_parse_metadata_corrupt_footer() {
1019 let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
1020 let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
1021 assert_eq!(
1022 reader_result.unwrap_err().to_string(),
1023 "Parquet error: Invalid Parquet file. Corrupt footer"
1024 );
1025 }
1026
1027 #[test]
1028 fn test_parse_metadata_invalid_start() {
1029 let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
1030 let err = ParquetMetaDataReader::new()
1031 .parse_metadata(&test_file)
1032 .unwrap_err();
1033 assert!(matches!(err, ParquetError::NeedMoreData(263)));
1034 }
1035
1036 #[test]
1037 fn test_metadata_column_orders_parse() {
1038 let fields = vec![
1040 Arc::new(
1041 SchemaType::primitive_type_builder("col1", Type::INT32)
1042 .build()
1043 .unwrap(),
1044 ),
1045 Arc::new(
1046 SchemaType::primitive_type_builder("col2", Type::FLOAT)
1047 .build()
1048 .unwrap(),
1049 ),
1050 ];
1051 let schema = SchemaType::group_type_builder("schema")
1052 .with_fields(fields)
1053 .build()
1054 .unwrap();
1055 let schema_descr = SchemaDescriptor::new(Arc::new(schema));
1056
1057 let t_column_orders = Some(vec![
1058 TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
1059 TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
1060 ]);
1061
1062 assert_eq!(
1063 ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr).unwrap(),
1064 Some(vec![
1065 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1066 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
1067 ])
1068 );
1069
1070 assert_eq!(
1072 ParquetMetaDataReader::parse_column_orders(None, &schema_descr).unwrap(),
1073 None
1074 );
1075 }
1076
1077 #[test]
1078 fn test_metadata_column_orders_len_mismatch() {
1079 let schema = SchemaType::group_type_builder("schema").build().unwrap();
1080 let schema_descr = SchemaDescriptor::new(Arc::new(schema));
1081
1082 let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);
1083
1084 let res = ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr);
1085 assert!(res.is_err());
1086 assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch"));
1087 }
1088
1089 #[test]
1090 fn test_try_parse() {
1091 let file = get_test_file("alltypes_tiny_pages.parquet");
1092 let len = file.len() as usize;
1093
1094 let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
1095
1096 let bytes_for_range = |range: Range<usize>| {
1097 file.get_bytes(range.start as u64, range.end - range.start)
1098 .unwrap()
1099 };
1100
1101 let bytes = bytes_for_range(0..len);
1103 reader.try_parse(&bytes).unwrap();
1104 let metadata = reader.finish().unwrap();
1105 assert!(metadata.column_index.is_some());
1106 assert!(metadata.offset_index.is_some());
1107
1108 let bytes = bytes_for_range(320000..len);
1110 reader.try_parse_sized(&bytes, len).unwrap();
1111 let metadata = reader.finish().unwrap();
1112 assert!(metadata.column_index.is_some());
1113 assert!(metadata.offset_index.is_some());
1114
1115 let bytes = bytes_for_range(323583..len);
1117 reader.try_parse_sized(&bytes, len).unwrap();
1118 let metadata = reader.finish().unwrap();
1119 assert!(metadata.column_index.is_some());
1120 assert!(metadata.offset_index.is_some());
1121
1122 let bytes = bytes_for_range(323584..len);
1124 match reader.try_parse_sized(&bytes, len).unwrap_err() {
1126 ParquetError::NeedMoreData(needed) => {
1128 let bytes = bytes_for_range(len - needed..len);
1129 reader.try_parse_sized(&bytes, len).unwrap();
1130 let metadata = reader.finish().unwrap();
1131 assert!(metadata.column_index.is_some());
1132 assert!(metadata.offset_index.is_some());
1133 }
1134 _ => panic!("unexpected error"),
1135 };
1136
1137 let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
1139 let mut bytes = bytes_for_range(452505..len);
1140 loop {
1141 match reader.try_parse_sized(&bytes, len) {
1142 Ok(_) => break,
1143 Err(ParquetError::NeedMoreData(needed)) => {
1144 bytes = bytes_for_range(len - needed..len);
1145 if reader.has_metadata() {
1146 reader.read_page_indexes_sized(&bytes, len).unwrap();
1147 break;
1148 }
1149 }
1150 _ => panic!("unexpected error"),
1151 }
1152 }
1153 let metadata = reader.finish().unwrap();
1154 assert!(metadata.column_index.is_some());
1155 assert!(metadata.offset_index.is_some());
1156
1157 let bytes = bytes_for_range(323584..len);
1159 let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
1160 assert_eq!(
1161 reader_result.to_string(),
1162 "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
1163 );
1164
1165 let mut reader = ParquetMetaDataReader::new();
1167 let bytes = bytes_for_range(452505..len);
1168 match reader.try_parse_sized(&bytes, len).unwrap_err() {
1170 ParquetError::NeedMoreData(needed) => {
1172 let bytes = bytes_for_range(len - needed..len);
1173 reader.try_parse_sized(&bytes, len).unwrap();
1174 reader.finish().unwrap();
1175 }
1176 _ => panic!("unexpected error"),
1177 };
1178
1179 let reader_result = reader.try_parse(&bytes).unwrap_err();
1181 assert_eq!(
1182 reader_result.to_string(),
1183 "EOF: Parquet file too small. Size is 1728 but need 1729"
1184 );
1185
1186 let bytes = bytes_for_range(0..1000);
1188 let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
1189 assert_eq!(
1190 reader_result.to_string(),
1191 "Parquet error: Invalid Parquet file. Corrupt footer"
1192 );
1193
1194 let bytes = bytes_for_range(452510..len);
1196 let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
1197 assert_eq!(
1198 reader_result.to_string(),
1199 "EOF: Parquet file too small. Size is 1728 but need 1729"
1200 );
1201 }
1202}
1203
1204#[cfg(all(feature = "async", feature = "arrow", test))]
1205mod async_tests {
1206 use super::*;
1207 use bytes::Bytes;
1208 use futures::future::BoxFuture;
1209 use futures::FutureExt;
1210 use std::fs::File;
1211 use std::future::Future;
1212 use std::io::{Read, Seek, SeekFrom};
1213 use std::ops::Range;
1214 use std::sync::atomic::{AtomicUsize, Ordering};
1215
1216 use crate::file::reader::Length;
1217 use crate::util::test_common::file_util::get_test_file;
1218
1219 struct MetadataFetchFn<F>(F);
1220
1221 impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
1222 where
1223 F: FnMut(Range<usize>) -> Fut + Send,
1224 Fut: Future<Output = Result<Bytes>> + Send,
1225 {
1226 fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
1227 async move { self.0(range).await }.boxed()
1228 }
1229 }
1230
1231 struct MetadataSuffixFetchFn<F1, F2>(F1, F2);
1232
1233 impl<F1, Fut, F2> MetadataFetch for MetadataSuffixFetchFn<F1, F2>
1234 where
1235 F1: FnMut(Range<usize>) -> Fut + Send,
1236 Fut: Future<Output = Result<Bytes>> + Send,
1237 F2: Send,
1238 {
1239 fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
1240 async move { self.0(range).await }.boxed()
1241 }
1242 }
1243
1244 impl<F1, Fut, F2> MetadataSuffixFetch for MetadataSuffixFetchFn<F1, F2>
1245 where
1246 F1: FnMut(Range<usize>) -> Fut + Send,
1247 F2: FnMut(usize) -> Fut + Send,
1248 Fut: Future<Output = Result<Bytes>> + Send,
1249 {
1250 fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
1251 async move { self.1(suffix).await }.boxed()
1252 }
1253 }
1254
1255 fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
1256 file.seek(SeekFrom::Start(range.start as _))?;
1257 let len = range.end - range.start;
1258 let mut buf = Vec::with_capacity(len);
1259 file.take(len as _).read_to_end(&mut buf)?;
1260 Ok(buf.into())
1261 }
1262
1263 fn read_suffix(file: &mut File, suffix: usize) -> Result<Bytes> {
1264 let file_len = file.len();
1265 file.seek(SeekFrom::End(0 - suffix.min(file_len as usize) as i64))?;
1267 let mut buf = Vec::with_capacity(suffix);
1268 file.take(suffix as _).read_to_end(&mut buf)?;
1269 Ok(buf.into())
1270 }
1271
1272 #[tokio::test]
1273 async fn test_simple() {
1274 let mut file = get_test_file("nulls.snappy.parquet");
1275 let len = file.len() as usize;
1276
1277 let expected = ParquetMetaDataReader::new()
1278 .parse_and_finish(&file)
1279 .unwrap();
1280 let expected = expected.file_metadata().schema();
1281 let fetch_count = AtomicUsize::new(0);
1282
1283 let mut fetch = |range| {
1284 fetch_count.fetch_add(1, Ordering::SeqCst);
1285 futures::future::ready(read_range(&mut file, range))
1286 };
1287
1288 let input = MetadataFetchFn(&mut fetch);
1289 let actual = ParquetMetaDataReader::new()
1290 .load_and_finish(input, len)
1291 .await
1292 .unwrap();
1293 assert_eq!(actual.file_metadata().schema(), expected);
1294 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1295
1296 fetch_count.store(0, Ordering::SeqCst);
1298 let input = MetadataFetchFn(&mut fetch);
1299 let actual = ParquetMetaDataReader::new()
1300 .with_prefetch_hint(Some(7))
1301 .load_and_finish(input, len)
1302 .await
1303 .unwrap();
1304 assert_eq!(actual.file_metadata().schema(), expected);
1305 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1306
1307 fetch_count.store(0, Ordering::SeqCst);
1309 let input = MetadataFetchFn(&mut fetch);
1310 let actual = ParquetMetaDataReader::new()
1311 .with_prefetch_hint(Some(10))
1312 .load_and_finish(input, len)
1313 .await
1314 .unwrap();
1315 assert_eq!(actual.file_metadata().schema(), expected);
1316 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1317
1318 fetch_count.store(0, Ordering::SeqCst);
1320 let input = MetadataFetchFn(&mut fetch);
1321 let actual = ParquetMetaDataReader::new()
1322 .with_prefetch_hint(Some(500))
1323 .load_and_finish(input, len)
1324 .await
1325 .unwrap();
1326 assert_eq!(actual.file_metadata().schema(), expected);
1327 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1328
1329 fetch_count.store(0, Ordering::SeqCst);
1331 let input = MetadataFetchFn(&mut fetch);
1332 let actual = ParquetMetaDataReader::new()
1333 .with_prefetch_hint(Some(428))
1334 .load_and_finish(input, len)
1335 .await
1336 .unwrap();
1337 assert_eq!(actual.file_metadata().schema(), expected);
1338 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1339
1340 let input = MetadataFetchFn(&mut fetch);
1341 let err = ParquetMetaDataReader::new()
1342 .load_and_finish(input, 4)
1343 .await
1344 .unwrap_err()
1345 .to_string();
1346 assert_eq!(err, "EOF: file size of 4 is less than footer");
1347
1348 let input = MetadataFetchFn(&mut fetch);
1349 let err = ParquetMetaDataReader::new()
1350 .load_and_finish(input, 20)
1351 .await
1352 .unwrap_err()
1353 .to_string();
1354 assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
1355 }
1356
1357 #[tokio::test]
1358 async fn test_suffix() {
1359 let mut file = get_test_file("nulls.snappy.parquet");
1360 let mut file2 = file.try_clone().unwrap();
1361
1362 let expected = ParquetMetaDataReader::new()
1363 .parse_and_finish(&file)
1364 .unwrap();
1365 let expected = expected.file_metadata().schema();
1366 let fetch_count = AtomicUsize::new(0);
1367 let suffix_fetch_count = AtomicUsize::new(0);
1368
1369 let mut fetch = |range| {
1370 fetch_count.fetch_add(1, Ordering::SeqCst);
1371 futures::future::ready(read_range(&mut file, range))
1372 };
1373 let mut suffix_fetch = |suffix| {
1374 suffix_fetch_count.fetch_add(1, Ordering::SeqCst);
1375 futures::future::ready(read_suffix(&mut file2, suffix))
1376 };
1377
1378 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1379 let actual = ParquetMetaDataReader::new()
1380 .load_via_suffix_and_finish(input)
1381 .await
1382 .unwrap();
1383 assert_eq!(actual.file_metadata().schema(), expected);
1384 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1385 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1386
1387 fetch_count.store(0, Ordering::SeqCst);
1389 suffix_fetch_count.store(0, Ordering::SeqCst);
1390 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1391 let actual = ParquetMetaDataReader::new()
1392 .with_prefetch_hint(Some(7))
1393 .load_via_suffix_and_finish(input)
1394 .await
1395 .unwrap();
1396 assert_eq!(actual.file_metadata().schema(), expected);
1397 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1398 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1399
1400 fetch_count.store(0, Ordering::SeqCst);
1402 suffix_fetch_count.store(0, Ordering::SeqCst);
1403 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1404 let actual = ParquetMetaDataReader::new()
1405 .with_prefetch_hint(Some(10))
1406 .load_via_suffix_and_finish(input)
1407 .await
1408 .unwrap();
1409 assert_eq!(actual.file_metadata().schema(), expected);
1410 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1411 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1412
1413 dbg!("test");
1414 fetch_count.store(0, Ordering::SeqCst);
1416 suffix_fetch_count.store(0, Ordering::SeqCst);
1417 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1418 let actual = ParquetMetaDataReader::new()
1419 .with_prefetch_hint(Some(500))
1420 .load_via_suffix_and_finish(input)
1421 .await
1422 .unwrap();
1423 assert_eq!(actual.file_metadata().schema(), expected);
1424 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1425 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1426
1427 fetch_count.store(0, Ordering::SeqCst);
1429 suffix_fetch_count.store(0, Ordering::SeqCst);
1430 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1431 let actual = ParquetMetaDataReader::new()
1432 .with_prefetch_hint(Some(428))
1433 .load_via_suffix_and_finish(input)
1434 .await
1435 .unwrap();
1436 assert_eq!(actual.file_metadata().schema(), expected);
1437 assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1438 assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1439 }
1440
1441 #[cfg(feature = "encryption")]
1442 #[tokio::test]
1443 async fn test_suffix_with_encryption() {
1444 let mut file = get_test_file("uniform_encryption.parquet.encrypted");
1445 let mut file2 = file.try_clone().unwrap();
1446
1447 let mut fetch = |range| futures::future::ready(read_range(&mut file, range));
1448 let mut suffix_fetch = |suffix| futures::future::ready(read_suffix(&mut file2, suffix));
1449
1450 let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1451
1452 let key_code: &[u8] = "0123456789012345".as_bytes();
1453 let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec())
1454 .build()
1455 .unwrap();
1456
1457 let expected = ParquetMetaDataReader::new()
1459 .with_decryption_properties(Some(&decryption_properties))
1460 .load_via_suffix_and_finish(input)
1461 .await
1462 .unwrap();
1463 assert_eq!(expected.num_row_groups(), 1);
1464 }
1465
1466 #[tokio::test]
1467 async fn test_page_index() {
1468 let mut file = get_test_file("alltypes_tiny_pages.parquet");
1469 let len = file.len() as usize;
1470 let fetch_count = AtomicUsize::new(0);
1471 let mut fetch = |range| {
1472 fetch_count.fetch_add(1, Ordering::SeqCst);
1473 futures::future::ready(read_range(&mut file, range))
1474 };
1475
1476 let f = MetadataFetchFn(&mut fetch);
1477 let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
1478 loader.try_load(f, len).await.unwrap();
1479 assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
1480 let metadata = loader.finish().unwrap();
1481 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1482
1483 fetch_count.store(0, Ordering::SeqCst);
1485 let f = MetadataFetchFn(&mut fetch);
1486 let mut loader = ParquetMetaDataReader::new()
1487 .with_page_indexes(true)
1488 .with_prefetch_hint(Some(1729));
1489 loader.try_load(f, len).await.unwrap();
1490 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1491 let metadata = loader.finish().unwrap();
1492 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1493
1494 fetch_count.store(0, Ordering::SeqCst);
1496 let f = MetadataFetchFn(&mut fetch);
1497 let mut loader = ParquetMetaDataReader::new()
1498 .with_page_indexes(true)
1499 .with_prefetch_hint(Some(130649));
1500 loader.try_load(f, len).await.unwrap();
1501 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1502 let metadata = loader.finish().unwrap();
1503 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1504
1505 fetch_count.store(0, Ordering::SeqCst);
1507 let f = MetadataFetchFn(&mut fetch);
1508 let metadata = ParquetMetaDataReader::new()
1509 .with_page_indexes(true)
1510 .with_prefetch_hint(Some(130650))
1511 .load_and_finish(f, len)
1512 .await
1513 .unwrap();
1514 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1515 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1516
1517 fetch_count.store(0, Ordering::SeqCst);
1519 let f = MetadataFetchFn(&mut fetch);
1520 let metadata = ParquetMetaDataReader::new()
1521 .with_page_indexes(true)
1522 .with_prefetch_hint(Some(len - 1000)) .load_and_finish(f, len)
1524 .await
1525 .unwrap();
1526 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1527 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1528
1529 fetch_count.store(0, Ordering::SeqCst);
1531 let f = MetadataFetchFn(&mut fetch);
1532 let metadata = ParquetMetaDataReader::new()
1533 .with_page_indexes(true)
1534 .with_prefetch_hint(Some(len)) .load_and_finish(f, len)
1536 .await
1537 .unwrap();
1538 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1539 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1540
1541 fetch_count.store(0, Ordering::SeqCst);
1543 let f = MetadataFetchFn(&mut fetch);
1544 let metadata = ParquetMetaDataReader::new()
1545 .with_page_indexes(true)
1546 .with_prefetch_hint(Some(len + 1000)) .load_and_finish(f, len)
1548 .await
1549 .unwrap();
1550 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1551 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1552 }
1553}