1use std::{io::Read, ops::Range, sync::Arc};
19
20use bytes::Bytes;
21
22use crate::basic::ColumnOrder;
23use crate::errors::{ParquetError, Result};
24use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
25use crate::file::page_index::index::Index;
26use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
27use crate::file::reader::ChunkReader;
28use crate::file::{FOOTER_SIZE, PARQUET_MAGIC};
29use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
30use crate::schema::types;
31use crate::schema::types::SchemaDescriptor;
32use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
33
34#[cfg(all(feature = "async", feature = "arrow"))]
35use crate::arrow::async_reader::MetadataFetch;
36
37#[derive(Default)]
63pub struct ParquetMetaDataReader {
64 metadata: Option<ParquetMetaData>,
65 column_index: bool,
66 offset_index: bool,
67 prefetch_hint: Option<usize>,
68 metadata_size: Option<usize>,
71}
72
73impl ParquetMetaDataReader {
74 pub fn new() -> Self {
76 Default::default()
77 }
78
79 pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
82 Self {
83 metadata: Some(metadata),
84 ..Default::default()
85 }
86 }
87
88 pub fn with_page_indexes(self, val: bool) -> Self {
94 self.with_column_indexes(val).with_offset_indexes(val)
95 }
96
97 pub fn with_column_indexes(mut self, val: bool) -> Self {
101 self.column_index = val;
102 self
103 }
104
105 pub fn with_offset_indexes(mut self, val: bool) -> Self {
109 self.offset_index = val;
110 self
111 }
112
113 pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
125 self.prefetch_hint = prefetch;
126 self
127 }
128
129 pub fn has_metadata(&self) -> bool {
131 self.metadata.is_some()
132 }
133
134 pub fn finish(&mut self) -> Result<ParquetMetaData> {
136 self.metadata
137 .take()
138 .ok_or_else(|| general_err!("could not parse parquet metadata"))
139 }
140
141 pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
160 self.try_parse(reader)?;
161 self.finish()
162 }
163
164 pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
170 self.try_parse_sized(reader, reader.len() as usize)
171 }
172
173 pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: usize) -> Result<()> {
246 self.metadata = match self.parse_metadata(reader) {
247 Ok(metadata) => Some(metadata),
248 Err(ParquetError::NeedMoreData(needed)) => {
249 if file_size == reader.len() as usize || needed > file_size {
252 return Err(eof_err!(
253 "Parquet file too small. Size is {} but need {}",
254 file_size,
255 needed
256 ));
257 } else {
258 return Err(ParquetError::NeedMoreData(needed));
260 }
261 }
262 Err(e) => return Err(e),
263 };
264
265 if !self.column_index && !self.offset_index {
267 return Ok(());
268 }
269
270 self.read_page_indexes_sized(reader, file_size)
271 }
272
273 pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
276 self.read_page_indexes_sized(reader, reader.len() as usize)
277 }
278
279 pub fn read_page_indexes_sized<R: ChunkReader>(
285 &mut self,
286 reader: &R,
287 file_size: usize,
288 ) -> Result<()> {
289 if self.metadata.is_none() {
290 return Err(general_err!(
291 "Tried to read page indexes without ParquetMetaData metadata"
292 ));
293 }
294
295 let Some(range) = self.range_for_page_index() else {
306 return Ok(());
307 };
308
309 let file_range = file_size.saturating_sub(reader.len() as usize)..file_size;
312 if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
313 if range.end > file_size {
315 return Err(eof_err!(
316 "Parquet file too small. Range {:?} is beyond file bounds {file_size}",
317 range
318 ));
319 } else {
320 return Err(ParquetError::NeedMoreData(file_size - range.start));
322 }
323 }
324
325 if let Some(metadata_size) = self.metadata_size {
328 let metadata_range = file_size.saturating_sub(metadata_size)..file_size;
329 if range.end > metadata_range.start {
330 return Err(eof_err!(
331 "Parquet file too small. Page index range {:?} overlaps with file metadata {:?}",
332 range,
333 metadata_range
334 ));
335 }
336 }
337
338 let bytes_needed = range.end - range.start;
339 let bytes = reader.get_bytes((range.start - file_range.start) as u64, bytes_needed)?;
340 let offset = range.start;
341
342 self.parse_column_index(&bytes, offset)?;
343 self.parse_offset_index(&bytes, offset)?;
344
345 Ok(())
346 }
347
348 #[cfg(all(feature = "async", feature = "arrow"))]
355 pub async fn load_and_finish<F: MetadataFetch>(
356 mut self,
357 fetch: F,
358 file_size: usize,
359 ) -> Result<ParquetMetaData> {
360 self.try_load(fetch, file_size).await?;
361 self.finish()
362 }
363
364 #[cfg(all(feature = "async", feature = "arrow"))]
370 pub async fn try_load<F: MetadataFetch>(
371 &mut self,
372 mut fetch: F,
373 file_size: usize,
374 ) -> Result<()> {
375 let (metadata, remainder) =
376 Self::load_metadata(&mut fetch, file_size, self.get_prefetch_size()).await?;
377
378 self.metadata = Some(metadata);
379
380 if !self.column_index && !self.offset_index {
382 return Ok(());
383 }
384
385 self.load_page_index_with_remainder(fetch, remainder).await
386 }
387
388 #[cfg(all(feature = "async", feature = "arrow"))]
391 pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
392 self.load_page_index_with_remainder(fetch, None).await
393 }
394
395 #[cfg(all(feature = "async", feature = "arrow"))]
396 async fn load_page_index_with_remainder<F: MetadataFetch>(
397 &mut self,
398 mut fetch: F,
399 remainder: Option<(usize, Bytes)>,
400 ) -> Result<()> {
401 if self.metadata.is_none() {
402 return Err(general_err!("Footer metadata is not present"));
403 }
404
405 let range = self.range_for_page_index();
407 let range = match range {
408 Some(range) => range,
409 None => return Ok(()),
410 };
411
412 let bytes = match &remainder {
413 Some((remainder_start, remainder)) if *remainder_start <= range.start => {
414 let offset = range.start - *remainder_start;
415 let end = offset + range.end - range.start;
416 assert!(end <= remainder.len());
417 remainder.slice(offset..end)
418 }
419 _ => fetch.fetch(range.start..range.end).await?,
421 };
422
423 assert_eq!(bytes.len(), range.end - range.start);
425 let offset = range.start;
426
427 self.parse_column_index(&bytes, offset)?;
428 self.parse_offset_index(&bytes, offset)?;
429
430 Ok(())
431 }
432
433 fn parse_column_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> {
434 let metadata = self.metadata.as_mut().unwrap();
435 if self.column_index {
436 let index = metadata
437 .row_groups()
438 .iter()
439 .map(|x| {
440 x.columns()
441 .iter()
442 .map(|c| match c.column_index_range() {
443 Some(r) => decode_column_index(
444 &bytes[r.start - start_offset..r.end - start_offset],
445 c.column_type(),
446 ),
447 None => Ok(Index::NONE),
448 })
449 .collect::<Result<Vec<_>>>()
450 })
451 .collect::<Result<Vec<_>>>()?;
452 metadata.set_column_index(Some(index));
453 }
454 Ok(())
455 }
456
457 fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> {
458 let metadata = self.metadata.as_mut().unwrap();
459 if self.offset_index {
460 let index = metadata
461 .row_groups()
462 .iter()
463 .map(|x| {
464 x.columns()
465 .iter()
466 .map(|c| match c.offset_index_range() {
467 Some(r) => decode_offset_index(
468 &bytes[r.start - start_offset..r.end - start_offset],
469 ),
470 None => Err(general_err!("missing offset index")),
471 })
472 .collect::<Result<Vec<_>>>()
473 })
474 .collect::<Result<Vec<_>>>()?;
475
476 metadata.set_offset_index(Some(index));
477 }
478 Ok(())
479 }
480
481 fn range_for_page_index(&self) -> Option<Range<usize>> {
482 self.metadata.as_ref()?;
484
485 let mut range = None;
487 let metadata = self.metadata.as_ref().unwrap();
488 for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
489 if self.column_index {
490 range = acc_range(range, c.column_index_range());
491 }
492 if self.offset_index {
493 range = acc_range(range, c.offset_index_range());
494 }
495 }
496 range
497 }
498
499 fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
502 let file_size = chunk_reader.len();
504 if file_size < (FOOTER_SIZE as u64) {
505 return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
506 }
507
508 let mut footer = [0_u8; 8];
509 chunk_reader
510 .get_read(file_size - 8)?
511 .read_exact(&mut footer)?;
512
513 let metadata_len = Self::decode_footer(&footer)?;
514 let footer_metadata_len = FOOTER_SIZE + metadata_len;
515 self.metadata_size = Some(footer_metadata_len);
516
517 if footer_metadata_len > file_size as usize {
518 return Err(ParquetError::NeedMoreData(footer_metadata_len));
519 }
520
521 let start = file_size - footer_metadata_len as u64;
522 Self::decode_metadata(chunk_reader.get_bytes(start, metadata_len)?.as_ref())
523 }
524
525 #[cfg(all(feature = "async", feature = "arrow"))]
529 fn get_prefetch_size(&self) -> usize {
530 if let Some(prefetch) = self.prefetch_hint {
531 if prefetch > FOOTER_SIZE {
532 return prefetch;
533 }
534 }
535 FOOTER_SIZE
536 }
537
538 #[cfg(all(feature = "async", feature = "arrow"))]
539 async fn load_metadata<F: MetadataFetch>(
540 fetch: &mut F,
541 file_size: usize,
542 prefetch: usize,
543 ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
544 if file_size < FOOTER_SIZE {
545 return Err(eof_err!("file size of {} is less than footer", file_size));
546 }
547
548 let footer_start = file_size.saturating_sub(prefetch);
552
553 let suffix = fetch.fetch(footer_start..file_size).await?;
554 let suffix_len = suffix.len();
555 let fetch_len = file_size - footer_start;
556 if suffix_len < fetch_len {
557 return Err(eof_err!(
558 "metadata requires {} bytes, but could only read {}",
559 fetch_len,
560 suffix_len
561 ));
562 }
563
564 let mut footer = [0; FOOTER_SIZE];
565 footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
566
567 let length = Self::decode_footer(&footer)?;
568
569 if file_size < length + FOOTER_SIZE {
570 return Err(eof_err!(
571 "file size of {} is less than footer + metadata {}",
572 file_size,
573 length + FOOTER_SIZE
574 ));
575 }
576
577 if length > suffix_len - FOOTER_SIZE {
579 let metadata_start = file_size - length - FOOTER_SIZE;
580 let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?;
581 Ok((Self::decode_metadata(&meta)?, None))
582 } else {
583 let metadata_start = file_size - length - FOOTER_SIZE - footer_start;
584 let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
585 Ok((
586 Self::decode_metadata(slice)?,
587 Some((footer_start, suffix.slice(..metadata_start))),
588 ))
589 }
590 }
591
592 pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
604 if slice[4..] != PARQUET_MAGIC {
606 return Err(general_err!("Invalid Parquet file. Corrupt footer"));
607 }
608
609 let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap());
611 Ok(metadata_len as usize)
613 }
614
615 pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
623 let mut prot = TCompactSliceInputProtocol::new(buf);
624 let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
625 .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
626 let schema = types::from_thrift(&t_file_metadata.schema)?;
627 let schema_descr = Arc::new(SchemaDescriptor::new(schema));
628 let mut row_groups = Vec::new();
629 for rg in t_file_metadata.row_groups {
630 row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
631 }
632 let column_orders =
633 Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
634
635 let file_metadata = FileMetaData::new(
636 t_file_metadata.version,
637 t_file_metadata.num_rows,
638 t_file_metadata.created_by,
639 t_file_metadata.key_value_metadata,
640 schema_descr,
641 column_orders,
642 );
643 Ok(ParquetMetaData::new(file_metadata, row_groups))
644 }
645
646 fn parse_column_orders(
649 t_column_orders: Option<Vec<TColumnOrder>>,
650 schema_descr: &SchemaDescriptor,
651 ) -> Result<Option<Vec<ColumnOrder>>> {
652 match t_column_orders {
653 Some(orders) => {
654 if orders.len() != schema_descr.num_columns() {
656 return Err(general_err!("Column order length mismatch"));
657 };
658 let mut res = Vec::new();
659 for (i, column) in schema_descr.columns().iter().enumerate() {
660 match orders[i] {
661 TColumnOrder::TYPEORDER(_) => {
662 let sort_order = ColumnOrder::get_sort_order(
663 column.logical_type(),
664 column.converted_type(),
665 column.physical_type(),
666 );
667 res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
668 }
669 }
670 }
671 Ok(Some(res))
672 }
673 None => Ok(None),
674 }
675 }
676}
677
678#[cfg(test)]
679mod tests {
680 use super::*;
681 use bytes::Bytes;
682
683 use crate::basic::SortOrder;
684 use crate::basic::Type;
685 use crate::file::reader::Length;
686 use crate::format::TypeDefinedOrder;
687 use crate::schema::types::Type as SchemaType;
688 use crate::util::test_common::file_util::get_test_file;
689
690 #[test]
691 fn test_parse_metadata_size_smaller_than_footer() {
692 let test_file = tempfile::tempfile().unwrap();
693 let err = ParquetMetaDataReader::new()
694 .parse_metadata(&test_file)
695 .unwrap_err();
696 assert!(matches!(err, ParquetError::NeedMoreData(8)));
697 }
698
699 #[test]
700 fn test_parse_metadata_corrupt_footer() {
701 let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
702 let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
703 assert_eq!(
704 reader_result.unwrap_err().to_string(),
705 "Parquet error: Invalid Parquet file. Corrupt footer"
706 );
707 }
708
709 #[test]
710 fn test_parse_metadata_invalid_start() {
711 let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
712 let err = ParquetMetaDataReader::new()
713 .parse_metadata(&test_file)
714 .unwrap_err();
715 assert!(matches!(err, ParquetError::NeedMoreData(263)));
716 }
717
718 #[test]
719 fn test_metadata_column_orders_parse() {
720 let fields = vec![
722 Arc::new(
723 SchemaType::primitive_type_builder("col1", Type::INT32)
724 .build()
725 .unwrap(),
726 ),
727 Arc::new(
728 SchemaType::primitive_type_builder("col2", Type::FLOAT)
729 .build()
730 .unwrap(),
731 ),
732 ];
733 let schema = SchemaType::group_type_builder("schema")
734 .with_fields(fields)
735 .build()
736 .unwrap();
737 let schema_descr = SchemaDescriptor::new(Arc::new(schema));
738
739 let t_column_orders = Some(vec![
740 TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
741 TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
742 ]);
743
744 assert_eq!(
745 ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr).unwrap(),
746 Some(vec![
747 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
748 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
749 ])
750 );
751
752 assert_eq!(
754 ParquetMetaDataReader::parse_column_orders(None, &schema_descr).unwrap(),
755 None
756 );
757 }
758
759 #[test]
760 fn test_metadata_column_orders_len_mismatch() {
761 let schema = SchemaType::group_type_builder("schema").build().unwrap();
762 let schema_descr = SchemaDescriptor::new(Arc::new(schema));
763
764 let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);
765
766 let res = ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr);
767 assert!(res.is_err());
768 assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch"));
769 }
770
771 #[test]
772 fn test_try_parse() {
773 let file = get_test_file("alltypes_tiny_pages.parquet");
774 let len = file.len() as usize;
775
776 let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
777
778 let bytes_for_range = |range: Range<usize>| {
779 file.get_bytes(range.start as u64, range.end - range.start)
780 .unwrap()
781 };
782
783 let bytes = bytes_for_range(0..len);
785 reader.try_parse(&bytes).unwrap();
786 let metadata = reader.finish().unwrap();
787 assert!(metadata.column_index.is_some());
788 assert!(metadata.offset_index.is_some());
789
790 let bytes = bytes_for_range(320000..len);
792 reader.try_parse_sized(&bytes, len).unwrap();
793 let metadata = reader.finish().unwrap();
794 assert!(metadata.column_index.is_some());
795 assert!(metadata.offset_index.is_some());
796
797 let bytes = bytes_for_range(323583..len);
799 reader.try_parse_sized(&bytes, len).unwrap();
800 let metadata = reader.finish().unwrap();
801 assert!(metadata.column_index.is_some());
802 assert!(metadata.offset_index.is_some());
803
804 let bytes = bytes_for_range(323584..len);
806 match reader.try_parse_sized(&bytes, len).unwrap_err() {
808 ParquetError::NeedMoreData(needed) => {
810 let bytes = bytes_for_range(len - needed..len);
811 reader.try_parse_sized(&bytes, len).unwrap();
812 let metadata = reader.finish().unwrap();
813 assert!(metadata.column_index.is_some());
814 assert!(metadata.offset_index.is_some());
815 }
816 _ => panic!("unexpected error"),
817 };
818
819 let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
821 let mut bytes = bytes_for_range(452505..len);
822 loop {
823 match reader.try_parse_sized(&bytes, len) {
824 Ok(_) => break,
825 Err(ParquetError::NeedMoreData(needed)) => {
826 bytes = bytes_for_range(len - needed..len);
827 if reader.has_metadata() {
828 reader.read_page_indexes_sized(&bytes, len).unwrap();
829 break;
830 }
831 }
832 _ => panic!("unexpected error"),
833 }
834 }
835 let metadata = reader.finish().unwrap();
836 assert!(metadata.column_index.is_some());
837 assert!(metadata.offset_index.is_some());
838
839 let bytes = bytes_for_range(323584..len);
841 let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
842 assert_eq!(
843 reader_result.to_string(),
844 "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
845 );
846
847 let mut reader = ParquetMetaDataReader::new();
849 let bytes = bytes_for_range(452505..len);
850 match reader.try_parse_sized(&bytes, len).unwrap_err() {
852 ParquetError::NeedMoreData(needed) => {
854 let bytes = bytes_for_range(len - needed..len);
855 reader.try_parse_sized(&bytes, len).unwrap();
856 reader.finish().unwrap();
857 }
858 _ => panic!("unexpected error"),
859 };
860
861 let reader_result = reader.try_parse(&bytes).unwrap_err();
863 assert_eq!(
864 reader_result.to_string(),
865 "EOF: Parquet file too small. Size is 1728 but need 1729"
866 );
867
868 let bytes = bytes_for_range(0..1000);
870 let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
871 assert_eq!(
872 reader_result.to_string(),
873 "Parquet error: Invalid Parquet file. Corrupt footer"
874 );
875
876 let bytes = bytes_for_range(452510..len);
878 let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
879 assert_eq!(
880 reader_result.to_string(),
881 "EOF: Parquet file too small. Size is 1728 but need 1729"
882 );
883 }
884}
885
886#[cfg(all(feature = "async", feature = "arrow", test))]
887mod async_tests {
888 use super::*;
889 use bytes::Bytes;
890 use futures::future::BoxFuture;
891 use futures::FutureExt;
892 use std::fs::File;
893 use std::future::Future;
894 use std::io::{Read, Seek, SeekFrom};
895 use std::ops::Range;
896 use std::sync::atomic::{AtomicUsize, Ordering};
897
898 use crate::arrow::async_reader::MetadataFetch;
899 use crate::file::reader::Length;
900 use crate::util::test_common::file_util::get_test_file;
901
902 struct MetadataFetchFn<F>(F);
903
904 impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
905 where
906 F: FnMut(Range<usize>) -> Fut + Send,
907 Fut: Future<Output = Result<Bytes>> + Send,
908 {
909 fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
910 async move { self.0(range).await }.boxed()
911 }
912 }
913
914 fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
915 file.seek(SeekFrom::Start(range.start as _))?;
916 let len = range.end - range.start;
917 let mut buf = Vec::with_capacity(len);
918 file.take(len as _).read_to_end(&mut buf)?;
919 Ok(buf.into())
920 }
921
922 #[tokio::test]
923 async fn test_simple() {
924 let mut file = get_test_file("nulls.snappy.parquet");
925 let len = file.len() as usize;
926
927 let expected = ParquetMetaDataReader::new()
928 .parse_and_finish(&file)
929 .unwrap();
930 let expected = expected.file_metadata().schema();
931 let fetch_count = AtomicUsize::new(0);
932
933 let mut fetch = |range| {
934 fetch_count.fetch_add(1, Ordering::SeqCst);
935 futures::future::ready(read_range(&mut file, range))
936 };
937
938 let input = MetadataFetchFn(&mut fetch);
939 let actual = ParquetMetaDataReader::new()
940 .load_and_finish(input, len)
941 .await
942 .unwrap();
943 assert_eq!(actual.file_metadata().schema(), expected);
944 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
945
946 fetch_count.store(0, Ordering::SeqCst);
948 let input = MetadataFetchFn(&mut fetch);
949 let actual = ParquetMetaDataReader::new()
950 .with_prefetch_hint(Some(7))
951 .load_and_finish(input, len)
952 .await
953 .unwrap();
954 assert_eq!(actual.file_metadata().schema(), expected);
955 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
956
957 fetch_count.store(0, Ordering::SeqCst);
959 let input = MetadataFetchFn(&mut fetch);
960 let actual = ParquetMetaDataReader::new()
961 .with_prefetch_hint(Some(10))
962 .load_and_finish(input, len)
963 .await
964 .unwrap();
965 assert_eq!(actual.file_metadata().schema(), expected);
966 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
967
968 fetch_count.store(0, Ordering::SeqCst);
970 let input = MetadataFetchFn(&mut fetch);
971 let actual = ParquetMetaDataReader::new()
972 .with_prefetch_hint(Some(500))
973 .load_and_finish(input, len)
974 .await
975 .unwrap();
976 assert_eq!(actual.file_metadata().schema(), expected);
977 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
978
979 fetch_count.store(0, Ordering::SeqCst);
981 let input = MetadataFetchFn(&mut fetch);
982 let actual = ParquetMetaDataReader::new()
983 .with_prefetch_hint(Some(428))
984 .load_and_finish(input, len)
985 .await
986 .unwrap();
987 assert_eq!(actual.file_metadata().schema(), expected);
988 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
989
990 let input = MetadataFetchFn(&mut fetch);
991 let err = ParquetMetaDataReader::new()
992 .load_and_finish(input, 4)
993 .await
994 .unwrap_err()
995 .to_string();
996 assert_eq!(err, "EOF: file size of 4 is less than footer");
997
998 let input = MetadataFetchFn(&mut fetch);
999 let err = ParquetMetaDataReader::new()
1000 .load_and_finish(input, 20)
1001 .await
1002 .unwrap_err()
1003 .to_string();
1004 assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
1005 }
1006
1007 #[tokio::test]
1008 async fn test_page_index() {
1009 let mut file = get_test_file("alltypes_tiny_pages.parquet");
1010 let len = file.len() as usize;
1011 let fetch_count = AtomicUsize::new(0);
1012 let mut fetch = |range| {
1013 fetch_count.fetch_add(1, Ordering::SeqCst);
1014 futures::future::ready(read_range(&mut file, range))
1015 };
1016
1017 let f = MetadataFetchFn(&mut fetch);
1018 let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
1019 loader.try_load(f, len).await.unwrap();
1020 assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
1021 let metadata = loader.finish().unwrap();
1022 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1023
1024 fetch_count.store(0, Ordering::SeqCst);
1026 let f = MetadataFetchFn(&mut fetch);
1027 let mut loader = ParquetMetaDataReader::new()
1028 .with_page_indexes(true)
1029 .with_prefetch_hint(Some(1729));
1030 loader.try_load(f, len).await.unwrap();
1031 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1032 let metadata = loader.finish().unwrap();
1033 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1034
1035 fetch_count.store(0, Ordering::SeqCst);
1037 let f = MetadataFetchFn(&mut fetch);
1038 let mut loader = ParquetMetaDataReader::new()
1039 .with_page_indexes(true)
1040 .with_prefetch_hint(Some(130649));
1041 loader.try_load(f, len).await.unwrap();
1042 assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1043 let metadata = loader.finish().unwrap();
1044 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1045
1046 fetch_count.store(0, Ordering::SeqCst);
1048 let f = MetadataFetchFn(&mut fetch);
1049 let metadata = ParquetMetaDataReader::new()
1050 .with_page_indexes(true)
1051 .with_prefetch_hint(Some(130650))
1052 .load_and_finish(f, len)
1053 .await
1054 .unwrap();
1055 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1056 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1057
1058 fetch_count.store(0, Ordering::SeqCst);
1060 let f = MetadataFetchFn(&mut fetch);
1061 let metadata = ParquetMetaDataReader::new()
1062 .with_page_indexes(true)
1063 .with_prefetch_hint(Some(len - 1000)) .load_and_finish(f, len)
1065 .await
1066 .unwrap();
1067 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1068 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1069
1070 fetch_count.store(0, Ordering::SeqCst);
1072 let f = MetadataFetchFn(&mut fetch);
1073 let metadata = ParquetMetaDataReader::new()
1074 .with_page_indexes(true)
1075 .with_prefetch_hint(Some(len)) .load_and_finish(f, len)
1077 .await
1078 .unwrap();
1079 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1080 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1081
1082 fetch_count.store(0, Ordering::SeqCst);
1084 let f = MetadataFetchFn(&mut fetch);
1085 let metadata = ParquetMetaDataReader::new()
1086 .with_page_indexes(true)
1087 .with_prefetch_hint(Some(len + 1000)) .load_and_finish(f, len)
1089 .await
1090 .unwrap();
1091 assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1092 assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1093 }
1094}