parquet/file/metadata/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{io::Read, ops::Range, sync::Arc};
19
20use bytes::Bytes;
21
22use crate::basic::ColumnOrder;
23#[cfg(feature = "encryption")]
24use crate::encryption::{
25    decrypt::{FileDecryptionProperties, FileDecryptor},
26    modules::create_footer_aad,
27};
28
29use crate::errors::{ParquetError, Result};
30use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
31use crate::file::page_index::index::Index;
32use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
33use crate::file::reader::ChunkReader;
34use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER};
35use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
36#[cfg(feature = "encryption")]
37use crate::format::{EncryptionAlgorithm, FileCryptoMetaData as TFileCryptoMetaData};
38use crate::schema::types;
39use crate::schema::types::SchemaDescriptor;
40use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
41
42#[cfg(all(feature = "async", feature = "arrow"))]
43use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
44
45/// Reads the [`ParquetMetaData`] from a byte stream.
46///
47/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for a description of
48/// the Parquet metadata.
49///
50/// Parquet metadata is not necessarily contiguous in the files: part is stored
51/// in the footer (the last bytes of the file), but other portions (such as the
52/// PageIndex) can be stored elsewhere.
53///
54/// This reader handles reading the footer as well as the non contiguous parts
55/// of the metadata such as the page indexes; excluding Bloom Filters.
56///
57/// # Example
58/// ```no_run
59/// # use parquet::file::metadata::ParquetMetaDataReader;
60/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
61/// // read parquet metadata including page indexes from a file
62/// let file = open_parquet_file("some_path.parquet");
63/// let mut reader = ParquetMetaDataReader::new()
64///     .with_page_indexes(true);
65/// reader.try_parse(&file).unwrap();
66/// let metadata = reader.finish().unwrap();
67/// assert!(metadata.column_index().is_some());
68/// assert!(metadata.offset_index().is_some());
69/// ```
70#[derive(Default)]
71pub struct ParquetMetaDataReader {
72    metadata: Option<ParquetMetaData>,
73    column_index: bool,
74    offset_index: bool,
75    prefetch_hint: Option<usize>,
76    // Size of the serialized thrift metadata plus the 8 byte footer. Only set if
77    // `self.parse_metadata` is called.
78    metadata_size: Option<usize>,
79    #[cfg(feature = "encryption")]
80    file_decryption_properties: Option<FileDecryptionProperties>,
81}
82
83/// Describes how the footer metadata is stored
84///
85/// This is parsed from the last 8 bytes of the Parquet file
86pub struct FooterTail {
87    metadata_length: usize,
88    encrypted_footer: bool,
89}
90
91impl FooterTail {
92    /// The length of the footer metadata in bytes
93    pub fn metadata_length(&self) -> usize {
94        self.metadata_length
95    }
96
97    /// Whether the footer metadata is encrypted
98    pub fn is_encrypted_footer(&self) -> bool {
99        self.encrypted_footer
100    }
101}
102
103impl ParquetMetaDataReader {
104    /// Create a new [`ParquetMetaDataReader`]
105    pub fn new() -> Self {
106        Default::default()
107    }
108
109    /// Create a new [`ParquetMetaDataReader`] populated with a [`ParquetMetaData`] struct
110    /// obtained via other means.
111    pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
112        Self {
113            metadata: Some(metadata),
114            ..Default::default()
115        }
116    }
117
118    /// Enable or disable reading the page index structures described in
119    /// "[Parquet page index]: Layout to Support Page Skipping". Equivalent to:
120    /// `self.with_column_indexes(val).with_offset_indexes(val)`
121    ///
122    /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
123    pub fn with_page_indexes(self, val: bool) -> Self {
124        self.with_column_indexes(val).with_offset_indexes(val)
125    }
126
127    /// Enable or disable reading the Parquet [ColumnIndex] structure.
128    ///
129    /// [ColumnIndex]:  https://github.com/apache/parquet-format/blob/master/PageIndex.md
130    pub fn with_column_indexes(mut self, val: bool) -> Self {
131        self.column_index = val;
132        self
133    }
134
135    /// Enable or disable reading the Parquet [OffsetIndex] structure.
136    ///
137    /// [OffsetIndex]:  https://github.com/apache/parquet-format/blob/master/PageIndex.md
138    pub fn with_offset_indexes(mut self, val: bool) -> Self {
139        self.offset_index = val;
140        self
141    }
142
143    /// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`].
144    /// Only used for the asynchronous [`Self::try_load()`] method.
145    ///
146    /// By default, the reader will first fetch the last 8 bytes of the input file to obtain the
147    /// size of the footer metadata. A second fetch will be performed to obtain the needed bytes.
148    /// After parsing the footer metadata, a third fetch will be performed to obtain the bytes
149    /// needed to decode the page index structures, if they have been requested. To avoid
150    /// unnecessary fetches, `prefetch` can be set to an estimate of the number of bytes needed
151    /// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and
152    /// reduce latency. Setting `prefetch` too small will not trigger an error, but will result
153    /// in extra fetches being performed.
154    pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
155        self.prefetch_hint = prefetch;
156        self
157    }
158
159    /// Provide the FileDecryptionProperties to use when decrypting the file.
160    ///
161    /// This is only necessary when the file is encrypted.
162    #[cfg(feature = "encryption")]
163    pub fn with_decryption_properties(
164        mut self,
165        properties: Option<&FileDecryptionProperties>,
166    ) -> Self {
167        self.file_decryption_properties = properties.cloned();
168        self
169    }
170
171    /// Indicates whether this reader has a [`ParquetMetaData`] internally.
172    pub fn has_metadata(&self) -> bool {
173        self.metadata.is_some()
174    }
175
176    /// Return the parsed [`ParquetMetaData`] struct, leaving `None` in its place.
177    pub fn finish(&mut self) -> Result<ParquetMetaData> {
178        self.metadata
179            .take()
180            .ok_or_else(|| general_err!("could not parse parquet metadata"))
181    }
182
183    /// Given a [`ChunkReader`], parse and return the [`ParquetMetaData`] in a single pass.
184    ///
185    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
186    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
187    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
188    ///
189    /// This call will consume `self`.
190    ///
191    /// # Example
192    /// ```no_run
193    /// # use parquet::file::metadata::ParquetMetaDataReader;
194    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
195    /// // read parquet metadata including page indexes
196    /// let file = open_parquet_file("some_path.parquet");
197    /// let metadata = ParquetMetaDataReader::new()
198    ///     .with_page_indexes(true)
199    ///     .parse_and_finish(&file).unwrap();
200    /// ```
201    pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
202        self.try_parse(reader)?;
203        self.finish()
204    }
205
206    /// Attempts to parse the footer metadata (and optionally page indexes) given a [`ChunkReader`].
207    ///
208    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
209    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
210    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
211    pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
212        self.try_parse_sized(reader, reader.len() as usize)
213    }
214
215    /// Same as [`Self::try_parse()`], but provide the original file size in the case that `reader`
216    /// is a [`Bytes`] struct that does not contain the entire file. This information is necessary
217    /// when the page indexes are desired. `reader` must have access to the Parquet footer.
218    ///
219    /// Using this function also allows for retrying with a larger buffer.
220    ///
221    /// # Errors
222    ///
223    /// This function will return [`ParquetError::NeedMoreData`] in the event `reader` does not
224    /// provide enough data to fully parse the metadata (see example below). The returned error
225    /// will be populated with a `usize` field indicating the number of bytes required from the
226    /// tail of the file to completely parse the requested metadata.
227    ///
228    /// Other errors returned include [`ParquetError::General`] and [`ParquetError::EOF`].
229    ///
230    /// # Example
231    /// ```no_run
232    /// # use parquet::file::metadata::ParquetMetaDataReader;
233    /// # use parquet::errors::ParquetError;
234    /// # use crate::parquet::file::reader::Length;
235    /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<usize>) -> bytes::Bytes { unimplemented!(); }
236    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
237    /// let file = open_parquet_file("some_path.parquet");
238    /// let len = file.len() as usize;
239    /// // Speculatively read 1 kilobyte from the end of the file
240    /// let bytes = get_bytes(&file, len - 1024..len);
241    /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
242    /// match reader.try_parse_sized(&bytes, len) {
243    ///     Ok(_) => (),
244    ///     Err(ParquetError::NeedMoreData(needed)) => {
245    ///         // Read the needed number of bytes from the end of the file
246    ///         let bytes = get_bytes(&file, len - needed..len);
247    ///         reader.try_parse_sized(&bytes, len).unwrap();
248    ///     }
249    ///     _ => panic!("unexpected error")
250    /// }
251    /// let metadata = reader.finish().unwrap();
252    /// ```
253    ///
254    /// Note that it is possible for the file metadata to be completely read, but there are
255    /// insufficient bytes available to read the page indexes. [`Self::has_metadata()`] can be used
256    /// to test for this. In the event the file metadata is present, re-parsing of the file
257    /// metadata can be skipped by using [`Self::read_page_indexes_sized()`], as shown below.
258    /// ```no_run
259    /// # use parquet::file::metadata::ParquetMetaDataReader;
260    /// # use parquet::errors::ParquetError;
261    /// # use crate::parquet::file::reader::Length;
262    /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<usize>) -> bytes::Bytes { unimplemented!(); }
263    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
264    /// let file = open_parquet_file("some_path.parquet");
265    /// let len = file.len() as usize;
266    /// // Speculatively read 1 kilobyte from the end of the file
267    /// let mut bytes = get_bytes(&file, len - 1024..len);
268    /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
269    /// // Loop until `bytes` is large enough
270    /// loop {
271    ///     match reader.try_parse_sized(&bytes, len) {
272    ///         Ok(_) => break,
273    ///         Err(ParquetError::NeedMoreData(needed)) => {
274    ///             // Read the needed number of bytes from the end of the file
275    ///             bytes = get_bytes(&file, len - needed..len);
276    ///             // If file metadata was read only read page indexes, otherwise continue loop
277    ///             if reader.has_metadata() {
278    ///                 reader.read_page_indexes_sized(&bytes, len);
279    ///                 break;
280    ///             }
281    ///         }
282    ///         _ => panic!("unexpected error")
283    ///     }
284    /// }
285    /// let metadata = reader.finish().unwrap();
286    /// ```
287    pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: usize) -> Result<()> {
288        self.metadata = match self.parse_metadata(reader) {
289            Ok(metadata) => Some(metadata),
290            Err(ParquetError::NeedMoreData(needed)) => {
291                // If reader is the same length as `file_size` then presumably there is no more to
292                // read, so return an EOF error.
293                if file_size == reader.len() as usize || needed > file_size {
294                    return Err(eof_err!(
295                        "Parquet file too small. Size is {} but need {}",
296                        file_size,
297                        needed
298                    ));
299                } else {
300                    // Ask for a larger buffer
301                    return Err(ParquetError::NeedMoreData(needed));
302                }
303            }
304            Err(e) => return Err(e),
305        };
306
307        // we can return if page indexes aren't requested
308        if !self.column_index && !self.offset_index {
309            return Ok(());
310        }
311
312        self.read_page_indexes_sized(reader, file_size)
313    }
314
315    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
316    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`].
317    pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
318        self.read_page_indexes_sized(reader, reader.len() as usize)
319    }
320
321    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
322    /// This variant is used when `reader` cannot access the entire Parquet file (e.g. it is
323    /// a [`Bytes`] struct containing the tail of the file).
324    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`]. Like
325    /// [`Self::try_parse_sized()`] this function may return [`ParquetError::NeedMoreData`].
326    pub fn read_page_indexes_sized<R: ChunkReader>(
327        &mut self,
328        reader: &R,
329        file_size: usize,
330    ) -> Result<()> {
331        if self.metadata.is_none() {
332            return Err(general_err!(
333                "Tried to read page indexes without ParquetMetaData metadata"
334            ));
335        }
336
337        // FIXME: there are differing implementations in the case where page indexes are missing
338        // from the file. `MetadataLoader` will leave them as `None`, while the parser in
339        // `index_reader::read_columns_indexes` returns a vector of empty vectors.
340        // It is best for this function to replicate the latter behavior for now, but in a future
341        // breaking release, the two paths to retrieve metadata should be made consistent. Note that this is only
342        // an issue if the user requested page indexes, so there is no need to provide empty
343        // vectors in `try_parse_sized()`.
344        // https://github.com/apache/arrow-rs/issues/6447
345
346        // Get bounds needed for page indexes (if any are present in the file).
347        let Some(range) = self.range_for_page_index() else {
348            return Ok(());
349        };
350
351        // Check to see if needed range is within `file_range`. Checking `range.end` seems
352        // redundant, but it guards against `range_for_page_index()` returning garbage.
353        let file_range = file_size.saturating_sub(reader.len() as usize)..file_size;
354        if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
355            // Requested range starts beyond EOF
356            if range.end > file_size {
357                return Err(eof_err!(
358                    "Parquet file too small. Range {:?} is beyond file bounds {file_size}",
359                    range
360                ));
361            } else {
362                // Ask for a larger buffer
363                return Err(ParquetError::NeedMoreData(file_size - range.start));
364            }
365        }
366
367        // Perform extra sanity check to make sure `range` and the footer metadata don't
368        // overlap.
369        if let Some(metadata_size) = self.metadata_size {
370            let metadata_range = file_size.saturating_sub(metadata_size)..file_size;
371            if range.end > metadata_range.start {
372                return Err(eof_err!(
373                    "Parquet file too small. Page index range {:?} overlaps with file metadata {:?}",
374                    range,
375                    metadata_range
376                ));
377            }
378        }
379
380        let bytes_needed = range.end - range.start;
381        let bytes = reader.get_bytes((range.start - file_range.start) as u64, bytes_needed)?;
382        let offset = range.start;
383
384        self.parse_column_index(&bytes, offset)?;
385        self.parse_offset_index(&bytes, offset)?;
386
387        Ok(())
388    }
389
390    /// Given a [`MetadataFetch`], parse and return the [`ParquetMetaData`] in a single pass.
391    ///
392    /// This call will consume `self`.
393    ///
394    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
395    /// performed by this function.
396    #[cfg(all(feature = "async", feature = "arrow"))]
397    pub async fn load_and_finish<F: MetadataFetch>(
398        mut self,
399        fetch: F,
400        file_size: usize,
401    ) -> Result<ParquetMetaData> {
402        self.try_load(fetch, file_size).await?;
403        self.finish()
404    }
405
406    /// Given a [`MetadataSuffixFetch`], parse and return the [`ParquetMetaData`] in a single pass.
407    ///
408    /// This call will consume `self`.
409    ///
410    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
411    /// performed by this function.
412    #[cfg(all(feature = "async", feature = "arrow"))]
413    pub async fn load_via_suffix_and_finish<F: MetadataSuffixFetch>(
414        mut self,
415        fetch: F,
416    ) -> Result<ParquetMetaData> {
417        self.try_load_via_suffix(fetch).await?;
418        self.finish()
419    }
420    /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
421    /// given a [`MetadataFetch`].
422    ///
423    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
424    /// performed by this function.
425    #[cfg(all(feature = "async", feature = "arrow"))]
426    pub async fn try_load<F: MetadataFetch>(
427        &mut self,
428        mut fetch: F,
429        file_size: usize,
430    ) -> Result<()> {
431        let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
432
433        self.metadata = Some(metadata);
434
435        // we can return if page indexes aren't requested
436        if !self.column_index && !self.offset_index {
437            return Ok(());
438        }
439
440        self.load_page_index_with_remainder(fetch, remainder).await
441    }
442
443    /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
444    /// given a [`MetadataSuffixFetch`].
445    ///
446    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
447    /// performed by this function.
448    #[cfg(all(feature = "async", feature = "arrow"))]
449    pub async fn try_load_via_suffix<F: MetadataSuffixFetch>(
450        &mut self,
451        mut fetch: F,
452    ) -> Result<()> {
453        let (metadata, remainder) = self.load_metadata_via_suffix(&mut fetch).await?;
454
455        self.metadata = Some(metadata);
456
457        // we can return if page indexes aren't requested
458        if !self.column_index && !self.offset_index {
459            return Ok(());
460        }
461
462        self.load_page_index_with_remainder(fetch, remainder).await
463    }
464
465    /// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
466    /// been obtained. See [`Self::new_with_metadata()`].
467    #[cfg(all(feature = "async", feature = "arrow"))]
468    pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
469        self.load_page_index_with_remainder(fetch, None).await
470    }
471
472    #[cfg(all(feature = "async", feature = "arrow"))]
473    async fn load_page_index_with_remainder<F: MetadataFetch>(
474        &mut self,
475        mut fetch: F,
476        remainder: Option<(usize, Bytes)>,
477    ) -> Result<()> {
478        if self.metadata.is_none() {
479            return Err(general_err!("Footer metadata is not present"));
480        }
481
482        // Get bounds needed for page indexes (if any are present in the file).
483        let range = self.range_for_page_index();
484        let range = match range {
485            Some(range) => range,
486            None => return Ok(()),
487        };
488
489        let bytes = match &remainder {
490            Some((remainder_start, remainder)) if *remainder_start <= range.start => {
491                let offset = range.start - *remainder_start;
492                let end = offset + range.end - range.start;
493                assert!(end <= remainder.len());
494                remainder.slice(offset..end)
495            }
496            // Note: this will potentially fetch data already in remainder, this keeps things simple
497            _ => fetch.fetch(range.start..range.end).await?,
498        };
499
500        // Sanity check
501        assert_eq!(bytes.len(), range.end - range.start);
502        let offset = range.start;
503
504        self.parse_column_index(&bytes, offset)?;
505        self.parse_offset_index(&bytes, offset)?;
506
507        Ok(())
508    }
509
510    fn parse_column_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> {
511        let metadata = self.metadata.as_mut().unwrap();
512        if self.column_index {
513            let index = metadata
514                .row_groups()
515                .iter()
516                .map(|x| {
517                    x.columns()
518                        .iter()
519                        .map(|c| match c.column_index_range() {
520                            Some(r) => decode_column_index(
521                                &bytes[r.start - start_offset..r.end - start_offset],
522                                c.column_type(),
523                            ),
524                            None => Ok(Index::NONE),
525                        })
526                        .collect::<Result<Vec<_>>>()
527                })
528                .collect::<Result<Vec<_>>>()?;
529            metadata.set_column_index(Some(index));
530        }
531        Ok(())
532    }
533
534    fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> {
535        let metadata = self.metadata.as_mut().unwrap();
536        if self.offset_index {
537            let index = metadata
538                .row_groups()
539                .iter()
540                .map(|x| {
541                    x.columns()
542                        .iter()
543                        .map(|c| match c.offset_index_range() {
544                            Some(r) => decode_offset_index(
545                                &bytes[r.start - start_offset..r.end - start_offset],
546                            ),
547                            None => Err(general_err!("missing offset index")),
548                        })
549                        .collect::<Result<Vec<_>>>()
550                })
551                .collect::<Result<Vec<_>>>()?;
552
553            metadata.set_offset_index(Some(index));
554        }
555        Ok(())
556    }
557
558    fn range_for_page_index(&self) -> Option<Range<usize>> {
559        // sanity check
560        self.metadata.as_ref()?;
561
562        // Get bounds needed for page indexes (if any are present in the file).
563        let mut range = None;
564        let metadata = self.metadata.as_ref().unwrap();
565        for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
566            if self.column_index {
567                range = acc_range(range, c.column_index_range());
568            }
569            if self.offset_index {
570                range = acc_range(range, c.offset_index_range());
571            }
572        }
573        range
574    }
575
576    // One-shot parse of footer.
577    // Side effect: this will set `self.metadata_size`
578    fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
579        // check file is large enough to hold footer
580        let file_size = chunk_reader.len();
581        if file_size < (FOOTER_SIZE as u64) {
582            return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
583        }
584
585        let mut footer = [0_u8; 8];
586        chunk_reader
587            .get_read(file_size - 8)?
588            .read_exact(&mut footer)?;
589
590        let footer = Self::decode_footer_tail(&footer)?;
591        let metadata_len = footer.metadata_length();
592        let footer_metadata_len = FOOTER_SIZE + metadata_len;
593        self.metadata_size = Some(footer_metadata_len);
594
595        if footer_metadata_len > file_size as usize {
596            return Err(ParquetError::NeedMoreData(footer_metadata_len));
597        }
598
599        let start = file_size - footer_metadata_len as u64;
600        self.decode_footer_metadata(
601            chunk_reader.get_bytes(start, metadata_len)?.as_ref(),
602            &footer,
603        )
604    }
605
606    /// Return the number of bytes to read in the initial pass. If `prefetch_size` has
607    /// been provided, then return that value if it is larger than the size of the Parquet
608    /// file footer (8 bytes). Otherwise returns `8`.
609    #[cfg(all(feature = "async", feature = "arrow"))]
610    fn get_prefetch_size(&self) -> usize {
611        if let Some(prefetch) = self.prefetch_hint {
612            if prefetch > FOOTER_SIZE {
613                return prefetch;
614            }
615        }
616        FOOTER_SIZE
617    }
618
619    #[cfg(all(feature = "async", feature = "arrow"))]
620    async fn load_metadata<F: MetadataFetch>(
621        &self,
622        fetch: &mut F,
623        file_size: usize,
624    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
625        let prefetch = self.get_prefetch_size();
626
627        if file_size < FOOTER_SIZE {
628            return Err(eof_err!("file size of {} is less than footer", file_size));
629        }
630
631        // If a size hint is provided, read more than the minimum size
632        // to try and avoid a second fetch.
633        // Note: prefetch > file_size is ok since we're using saturating_sub.
634        let footer_start = file_size.saturating_sub(prefetch);
635
636        let suffix = fetch.fetch(footer_start..file_size).await?;
637        let suffix_len = suffix.len();
638        let fetch_len = file_size - footer_start;
639        if suffix_len < fetch_len {
640            return Err(eof_err!(
641                "metadata requires {} bytes, but could only read {}",
642                fetch_len,
643                suffix_len
644            ));
645        }
646
647        let mut footer = [0; FOOTER_SIZE];
648        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
649
650        let footer = Self::decode_footer_tail(&footer)?;
651        let length = footer.metadata_length();
652
653        if file_size < length + FOOTER_SIZE {
654            return Err(eof_err!(
655                "file size of {} is less than footer + metadata {}",
656                file_size,
657                length + FOOTER_SIZE
658            ));
659        }
660
661        // Did not fetch the entire file metadata in the initial read, need to make a second request
662        if length > suffix_len - FOOTER_SIZE {
663            let metadata_start = file_size - length - FOOTER_SIZE;
664            let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?;
665            Ok((self.decode_footer_metadata(&meta, &footer)?, None))
666        } else {
667            let metadata_start = file_size - length - FOOTER_SIZE - footer_start;
668            let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
669            Ok((
670                self.decode_footer_metadata(slice, &footer)?,
671                Some((footer_start, suffix.slice(..metadata_start))),
672            ))
673        }
674    }
675
676    #[cfg(all(feature = "async", feature = "arrow"))]
677    async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
678        &self,
679        fetch: &mut F,
680    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
681        let prefetch = self.get_prefetch_size();
682
683        let suffix = fetch.fetch_suffix(prefetch).await?;
684        let suffix_len = suffix.len();
685
686        if suffix_len < FOOTER_SIZE {
687            return Err(eof_err!(
688                "footer metadata requires {} bytes, but could only read {}",
689                FOOTER_SIZE,
690                suffix_len
691            ));
692        }
693
694        let mut footer = [0; FOOTER_SIZE];
695        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
696
697        let footer = Self::decode_footer_tail(&footer)?;
698        let length = footer.metadata_length();
699
700        // Did not fetch the entire file metadata in the initial read, need to make a second request
701        let metadata_offset = length + FOOTER_SIZE;
702        if length > suffix_len - FOOTER_SIZE {
703            let meta = fetch.fetch_suffix(metadata_offset).await?;
704
705            if meta.len() < metadata_offset {
706                return Err(eof_err!(
707                    "metadata requires {} bytes, but could only read {}",
708                    metadata_offset,
709                    meta.len()
710                ));
711            }
712
713            Ok((
714                // need to slice off the footer or decryption fails
715                self.decode_footer_metadata(&meta.slice(0..length), &footer)?,
716                None,
717            ))
718        } else {
719            let metadata_start = suffix_len - metadata_offset;
720            let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
721            Ok((
722                self.decode_footer_metadata(slice, &footer)?,
723                Some((0, suffix.slice(..metadata_start))),
724            ))
725        }
726    }
727
728    /// Decodes the end of the Parquet footer
729    ///
730    /// There are 8 bytes at the end of the Parquet footer with the following layout:
731    /// * 4 bytes for the metadata length
732    /// * 4 bytes for the magic bytes 'PAR1' or 'PARE' (encrypted footer)
733    ///
734    /// ```text
735    /// +-----+------------------+
736    /// | len | 'PAR1' or 'PARE' |
737    /// +-----+------------------+
738    /// ```
739    pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result<FooterTail> {
740        let magic = &slice[4..];
741        let encrypted_footer = if magic == PARQUET_MAGIC_ENCR_FOOTER {
742            true
743        } else if magic == PARQUET_MAGIC {
744            false
745        } else {
746            return Err(general_err!("Invalid Parquet file. Corrupt footer"));
747        };
748        // get the metadata length from the footer
749        let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap());
750        Ok(FooterTail {
751            // u32 won't be larger than usize in most cases
752            metadata_length: metadata_len as usize,
753            encrypted_footer,
754        })
755    }
756
757    /// Decodes the Parquet footer, returning the metadata length in bytes
758    #[deprecated(note = "use decode_footer_tail instead")]
759    pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
760        Self::decode_footer_tail(slice).map(|f| f.metadata_length)
761    }
762
763    /// Decodes [`ParquetMetaData`] from the provided bytes.
764    ///
765    /// Typically, this is used to decode the metadata from the end of a parquet
766    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
767    /// by the [Parquet Spec].
768    ///
769    /// This method handles using either `decode_metadata` or
770    /// `decode_metadata_with_encryption` depending on whether the encryption
771    /// feature is enabled.
772    ///
773    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
774    pub(crate) fn decode_footer_metadata(
775        &self,
776        buf: &[u8],
777        footer_tail: &FooterTail,
778    ) -> Result<ParquetMetaData> {
779        #[cfg(feature = "encryption")]
780        let result = Self::decode_metadata_with_encryption(
781            buf,
782            footer_tail.is_encrypted_footer(),
783            self.file_decryption_properties.as_ref(),
784        );
785        #[cfg(not(feature = "encryption"))]
786        let result = {
787            if footer_tail.is_encrypted_footer() {
788                Err(general_err!(
789                    "Parquet file has an encrypted footer but the encryption feature is disabled"
790                ))
791            } else {
792                Self::decode_metadata(buf)
793            }
794        };
795        result
796    }
797
798    /// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata that may be encrypted.
799    ///
800    /// Typically this is used to decode the metadata from the end of a parquet
801    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
802    /// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR
803    /// ciphers as specfied in the [Parquet Encryption Spec].
804    ///
805    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
806    /// [Parquet Encryption Spec]: https://parquet.apache.org/docs/file-format/data-pages/encryption/
807    #[cfg(feature = "encryption")]
808    fn decode_metadata_with_encryption(
809        buf: &[u8],
810        encrypted_footer: bool,
811        file_decryption_properties: Option<&FileDecryptionProperties>,
812    ) -> Result<ParquetMetaData> {
813        let mut prot = TCompactSliceInputProtocol::new(buf);
814        let mut file_decryptor = None;
815        let decrypted_fmd_buf;
816
817        if encrypted_footer {
818            if let Some(file_decryption_properties) = file_decryption_properties {
819                let t_file_crypto_metadata: TFileCryptoMetaData =
820                    TFileCryptoMetaData::read_from_in_protocol(&mut prot)
821                        .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
822                let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm {
823                    EncryptionAlgorithm::AESGCMV1(algo) => algo.supply_aad_prefix,
824                    _ => Some(false),
825                }
826                .unwrap_or(false);
827                if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() {
828                    return Err(general_err!(
829                        "Parquet file was encrypted with an AAD prefix that is not stored in the file, \
830                        but no AAD prefix was provided in the file decryption properties"
831                    ));
832                }
833                let decryptor = get_file_decryptor(
834                    t_file_crypto_metadata.encryption_algorithm,
835                    t_file_crypto_metadata.key_metadata.as_deref(),
836                    file_decryption_properties,
837                )?;
838                let footer_decryptor = decryptor.get_footer_decryptor();
839                let aad_footer = create_footer_aad(decryptor.file_aad())?;
840
841                decrypted_fmd_buf = footer_decryptor?
842                    .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
843                    .map_err(|_| {
844                        general_err!(
845                            "Provided footer key and AAD were unable to decrypt parquet footer"
846                        )
847                    })?;
848                prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
849
850                file_decryptor = Some(decryptor);
851            } else {
852                return Err(general_err!("Parquet file has an encrypted footer but decryption properties were not provided"));
853            }
854        }
855
856        let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
857            .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
858        let schema = types::from_thrift(&t_file_metadata.schema)?;
859        let schema_descr = Arc::new(SchemaDescriptor::new(schema));
860
861        if let (Some(algo), Some(file_decryption_properties)) = (
862            t_file_metadata.encryption_algorithm,
863            file_decryption_properties,
864        ) {
865            // File has a plaintext footer but encryption algorithm is set
866            file_decryptor = Some(get_file_decryptor(
867                algo,
868                t_file_metadata.footer_signing_key_metadata.as_deref(),
869                file_decryption_properties,
870            )?);
871        }
872
873        let mut row_groups = Vec::new();
874        for rg in t_file_metadata.row_groups {
875            let r = RowGroupMetaData::from_encrypted_thrift(
876                schema_descr.clone(),
877                rg,
878                file_decryptor.as_ref(),
879            )?;
880            row_groups.push(r);
881        }
882        let column_orders =
883            Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
884
885        let file_metadata = FileMetaData::new(
886            t_file_metadata.version,
887            t_file_metadata.num_rows,
888            t_file_metadata.created_by,
889            t_file_metadata.key_value_metadata,
890            schema_descr,
891            column_orders,
892        );
893        let mut metadata = ParquetMetaData::new(file_metadata, row_groups);
894
895        metadata.with_file_decryptor(file_decryptor);
896
897        Ok(metadata)
898    }
899
900    /// Decodes [`ParquetMetaData`] from the provided bytes.
901    ///
902    /// Typically this is used to decode the metadata from the end of a parquet
903    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
904    /// by the [Parquet Spec].
905    ///
906    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
907    pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
908        let mut prot = TCompactSliceInputProtocol::new(buf);
909
910        let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
911            .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
912        let schema = types::from_thrift(&t_file_metadata.schema)?;
913        let schema_descr = Arc::new(SchemaDescriptor::new(schema));
914
915        let mut row_groups = Vec::new();
916        for rg in t_file_metadata.row_groups {
917            row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
918        }
919        let column_orders =
920            Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
921
922        let file_metadata = FileMetaData::new(
923            t_file_metadata.version,
924            t_file_metadata.num_rows,
925            t_file_metadata.created_by,
926            t_file_metadata.key_value_metadata,
927            schema_descr,
928            column_orders,
929        );
930
931        Ok(ParquetMetaData::new(file_metadata, row_groups))
932    }
933
934    /// Parses column orders from Thrift definition.
935    /// If no column orders are defined, returns `None`.
936    fn parse_column_orders(
937        t_column_orders: Option<Vec<TColumnOrder>>,
938        schema_descr: &SchemaDescriptor,
939    ) -> Result<Option<Vec<ColumnOrder>>> {
940        match t_column_orders {
941            Some(orders) => {
942                // Should always be the case
943                if orders.len() != schema_descr.num_columns() {
944                    return Err(general_err!("Column order length mismatch"));
945                };
946                let mut res = Vec::new();
947                for (i, column) in schema_descr.columns().iter().enumerate() {
948                    match orders[i] {
949                        TColumnOrder::TYPEORDER(_) => {
950                            let sort_order = ColumnOrder::get_sort_order(
951                                column.logical_type(),
952                                column.converted_type(),
953                                column.physical_type(),
954                            );
955                            res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
956                        }
957                    }
958                }
959                Ok(Some(res))
960            }
961            None => Ok(None),
962        }
963    }
964}
965
966#[cfg(feature = "encryption")]
967fn get_file_decryptor(
968    encryption_algorithm: EncryptionAlgorithm,
969    footer_key_metadata: Option<&[u8]>,
970    file_decryption_properties: &FileDecryptionProperties,
971) -> Result<FileDecryptor> {
972    match encryption_algorithm {
973        EncryptionAlgorithm::AESGCMV1(algo) => {
974            let aad_file_unique = algo
975                .aad_file_unique
976                .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?;
977            let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() {
978                aad_prefix.clone()
979            } else {
980                algo.aad_prefix.unwrap_or_default()
981            };
982
983            FileDecryptor::new(
984                file_decryption_properties,
985                footer_key_metadata,
986                aad_file_unique,
987                aad_prefix,
988            )
989        }
990        EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!(
991            "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
992        )),
993    }
994}
995
996#[cfg(test)]
997mod tests {
998    use super::*;
999    use bytes::Bytes;
1000
1001    use crate::basic::SortOrder;
1002    use crate::basic::Type;
1003    use crate::file::reader::Length;
1004    use crate::format::TypeDefinedOrder;
1005    use crate::schema::types::Type as SchemaType;
1006    use crate::util::test_common::file_util::get_test_file;
1007
1008    #[test]
1009    fn test_parse_metadata_size_smaller_than_footer() {
1010        let test_file = tempfile::tempfile().unwrap();
1011        let err = ParquetMetaDataReader::new()
1012            .parse_metadata(&test_file)
1013            .unwrap_err();
1014        assert!(matches!(err, ParquetError::NeedMoreData(8)));
1015    }
1016
1017    #[test]
1018    fn test_parse_metadata_corrupt_footer() {
1019        let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
1020        let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
1021        assert_eq!(
1022            reader_result.unwrap_err().to_string(),
1023            "Parquet error: Invalid Parquet file. Corrupt footer"
1024        );
1025    }
1026
1027    #[test]
1028    fn test_parse_metadata_invalid_start() {
1029        let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
1030        let err = ParquetMetaDataReader::new()
1031            .parse_metadata(&test_file)
1032            .unwrap_err();
1033        assert!(matches!(err, ParquetError::NeedMoreData(263)));
1034    }
1035
1036    #[test]
1037    fn test_metadata_column_orders_parse() {
1038        // Define simple schema, we do not need to provide logical types.
1039        let fields = vec![
1040            Arc::new(
1041                SchemaType::primitive_type_builder("col1", Type::INT32)
1042                    .build()
1043                    .unwrap(),
1044            ),
1045            Arc::new(
1046                SchemaType::primitive_type_builder("col2", Type::FLOAT)
1047                    .build()
1048                    .unwrap(),
1049            ),
1050        ];
1051        let schema = SchemaType::group_type_builder("schema")
1052            .with_fields(fields)
1053            .build()
1054            .unwrap();
1055        let schema_descr = SchemaDescriptor::new(Arc::new(schema));
1056
1057        let t_column_orders = Some(vec![
1058            TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
1059            TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
1060        ]);
1061
1062        assert_eq!(
1063            ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr).unwrap(),
1064            Some(vec![
1065                ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1066                ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
1067            ])
1068        );
1069
1070        // Test when no column orders are defined.
1071        assert_eq!(
1072            ParquetMetaDataReader::parse_column_orders(None, &schema_descr).unwrap(),
1073            None
1074        );
1075    }
1076
1077    #[test]
1078    fn test_metadata_column_orders_len_mismatch() {
1079        let schema = SchemaType::group_type_builder("schema").build().unwrap();
1080        let schema_descr = SchemaDescriptor::new(Arc::new(schema));
1081
1082        let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);
1083
1084        let res = ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr);
1085        assert!(res.is_err());
1086        assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch"));
1087    }
1088
1089    #[test]
1090    fn test_try_parse() {
1091        let file = get_test_file("alltypes_tiny_pages.parquet");
1092        let len = file.len() as usize;
1093
1094        let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
1095
1096        let bytes_for_range = |range: Range<usize>| {
1097            file.get_bytes(range.start as u64, range.end - range.start)
1098                .unwrap()
1099        };
1100
1101        // read entire file
1102        let bytes = bytes_for_range(0..len);
1103        reader.try_parse(&bytes).unwrap();
1104        let metadata = reader.finish().unwrap();
1105        assert!(metadata.column_index.is_some());
1106        assert!(metadata.offset_index.is_some());
1107
1108        // read more than enough of file
1109        let bytes = bytes_for_range(320000..len);
1110        reader.try_parse_sized(&bytes, len).unwrap();
1111        let metadata = reader.finish().unwrap();
1112        assert!(metadata.column_index.is_some());
1113        assert!(metadata.offset_index.is_some());
1114
1115        // exactly enough
1116        let bytes = bytes_for_range(323583..len);
1117        reader.try_parse_sized(&bytes, len).unwrap();
1118        let metadata = reader.finish().unwrap();
1119        assert!(metadata.column_index.is_some());
1120        assert!(metadata.offset_index.is_some());
1121
1122        // not enough for page index
1123        let bytes = bytes_for_range(323584..len);
1124        // should fail
1125        match reader.try_parse_sized(&bytes, len).unwrap_err() {
1126            // expected error, try again with provided bounds
1127            ParquetError::NeedMoreData(needed) => {
1128                let bytes = bytes_for_range(len - needed..len);
1129                reader.try_parse_sized(&bytes, len).unwrap();
1130                let metadata = reader.finish().unwrap();
1131                assert!(metadata.column_index.is_some());
1132                assert!(metadata.offset_index.is_some());
1133            }
1134            _ => panic!("unexpected error"),
1135        };
1136
1137        // not enough for file metadata, but keep trying until page indexes are read
1138        let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
1139        let mut bytes = bytes_for_range(452505..len);
1140        loop {
1141            match reader.try_parse_sized(&bytes, len) {
1142                Ok(_) => break,
1143                Err(ParquetError::NeedMoreData(needed)) => {
1144                    bytes = bytes_for_range(len - needed..len);
1145                    if reader.has_metadata() {
1146                        reader.read_page_indexes_sized(&bytes, len).unwrap();
1147                        break;
1148                    }
1149                }
1150                _ => panic!("unexpected error"),
1151            }
1152        }
1153        let metadata = reader.finish().unwrap();
1154        assert!(metadata.column_index.is_some());
1155        assert!(metadata.offset_index.is_some());
1156
1157        // not enough for page index but lie about file size
1158        let bytes = bytes_for_range(323584..len);
1159        let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
1160        assert_eq!(
1161            reader_result.to_string(),
1162            "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
1163        );
1164
1165        // not enough for file metadata
1166        let mut reader = ParquetMetaDataReader::new();
1167        let bytes = bytes_for_range(452505..len);
1168        // should fail
1169        match reader.try_parse_sized(&bytes, len).unwrap_err() {
1170            // expected error, try again with provided bounds
1171            ParquetError::NeedMoreData(needed) => {
1172                let bytes = bytes_for_range(len - needed..len);
1173                reader.try_parse_sized(&bytes, len).unwrap();
1174                reader.finish().unwrap();
1175            }
1176            _ => panic!("unexpected error"),
1177        };
1178
1179        // not enough for file metadata but use try_parse()
1180        let reader_result = reader.try_parse(&bytes).unwrap_err();
1181        assert_eq!(
1182            reader_result.to_string(),
1183            "EOF: Parquet file too small. Size is 1728 but need 1729"
1184        );
1185
1186        // read head of file rather than tail
1187        let bytes = bytes_for_range(0..1000);
1188        let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
1189        assert_eq!(
1190            reader_result.to_string(),
1191            "Parquet error: Invalid Parquet file. Corrupt footer"
1192        );
1193
1194        // lie about file size
1195        let bytes = bytes_for_range(452510..len);
1196        let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
1197        assert_eq!(
1198            reader_result.to_string(),
1199            "EOF: Parquet file too small. Size is 1728 but need 1729"
1200        );
1201    }
1202}
1203
1204#[cfg(all(feature = "async", feature = "arrow", test))]
1205mod async_tests {
1206    use super::*;
1207    use bytes::Bytes;
1208    use futures::future::BoxFuture;
1209    use futures::FutureExt;
1210    use std::fs::File;
1211    use std::future::Future;
1212    use std::io::{Read, Seek, SeekFrom};
1213    use std::ops::Range;
1214    use std::sync::atomic::{AtomicUsize, Ordering};
1215
1216    use crate::file::reader::Length;
1217    use crate::util::test_common::file_util::get_test_file;
1218
1219    struct MetadataFetchFn<F>(F);
1220
1221    impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
1222    where
1223        F: FnMut(Range<usize>) -> Fut + Send,
1224        Fut: Future<Output = Result<Bytes>> + Send,
1225    {
1226        fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
1227            async move { self.0(range).await }.boxed()
1228        }
1229    }
1230
1231    struct MetadataSuffixFetchFn<F1, F2>(F1, F2);
1232
1233    impl<F1, Fut, F2> MetadataFetch for MetadataSuffixFetchFn<F1, F2>
1234    where
1235        F1: FnMut(Range<usize>) -> Fut + Send,
1236        Fut: Future<Output = Result<Bytes>> + Send,
1237        F2: Send,
1238    {
1239        fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
1240            async move { self.0(range).await }.boxed()
1241        }
1242    }
1243
1244    impl<F1, Fut, F2> MetadataSuffixFetch for MetadataSuffixFetchFn<F1, F2>
1245    where
1246        F1: FnMut(Range<usize>) -> Fut + Send,
1247        F2: FnMut(usize) -> Fut + Send,
1248        Fut: Future<Output = Result<Bytes>> + Send,
1249    {
1250        fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
1251            async move { self.1(suffix).await }.boxed()
1252        }
1253    }
1254
1255    fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
1256        file.seek(SeekFrom::Start(range.start as _))?;
1257        let len = range.end - range.start;
1258        let mut buf = Vec::with_capacity(len);
1259        file.take(len as _).read_to_end(&mut buf)?;
1260        Ok(buf.into())
1261    }
1262
1263    fn read_suffix(file: &mut File, suffix: usize) -> Result<Bytes> {
1264        let file_len = file.len();
1265        // Don't seek before beginning of file
1266        file.seek(SeekFrom::End(0 - suffix.min(file_len as usize) as i64))?;
1267        let mut buf = Vec::with_capacity(suffix);
1268        file.take(suffix as _).read_to_end(&mut buf)?;
1269        Ok(buf.into())
1270    }
1271
1272    #[tokio::test]
1273    async fn test_simple() {
1274        let mut file = get_test_file("nulls.snappy.parquet");
1275        let len = file.len() as usize;
1276
1277        let expected = ParquetMetaDataReader::new()
1278            .parse_and_finish(&file)
1279            .unwrap();
1280        let expected = expected.file_metadata().schema();
1281        let fetch_count = AtomicUsize::new(0);
1282
1283        let mut fetch = |range| {
1284            fetch_count.fetch_add(1, Ordering::SeqCst);
1285            futures::future::ready(read_range(&mut file, range))
1286        };
1287
1288        let input = MetadataFetchFn(&mut fetch);
1289        let actual = ParquetMetaDataReader::new()
1290            .load_and_finish(input, len)
1291            .await
1292            .unwrap();
1293        assert_eq!(actual.file_metadata().schema(), expected);
1294        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1295
1296        // Metadata hint too small - below footer size
1297        fetch_count.store(0, Ordering::SeqCst);
1298        let input = MetadataFetchFn(&mut fetch);
1299        let actual = ParquetMetaDataReader::new()
1300            .with_prefetch_hint(Some(7))
1301            .load_and_finish(input, len)
1302            .await
1303            .unwrap();
1304        assert_eq!(actual.file_metadata().schema(), expected);
1305        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1306
1307        // Metadata hint too small
1308        fetch_count.store(0, Ordering::SeqCst);
1309        let input = MetadataFetchFn(&mut fetch);
1310        let actual = ParquetMetaDataReader::new()
1311            .with_prefetch_hint(Some(10))
1312            .load_and_finish(input, len)
1313            .await
1314            .unwrap();
1315        assert_eq!(actual.file_metadata().schema(), expected);
1316        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1317
1318        // Metadata hint too large
1319        fetch_count.store(0, Ordering::SeqCst);
1320        let input = MetadataFetchFn(&mut fetch);
1321        let actual = ParquetMetaDataReader::new()
1322            .with_prefetch_hint(Some(500))
1323            .load_and_finish(input, len)
1324            .await
1325            .unwrap();
1326        assert_eq!(actual.file_metadata().schema(), expected);
1327        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1328
1329        // Metadata hint exactly correct
1330        fetch_count.store(0, Ordering::SeqCst);
1331        let input = MetadataFetchFn(&mut fetch);
1332        let actual = ParquetMetaDataReader::new()
1333            .with_prefetch_hint(Some(428))
1334            .load_and_finish(input, len)
1335            .await
1336            .unwrap();
1337        assert_eq!(actual.file_metadata().schema(), expected);
1338        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1339
1340        let input = MetadataFetchFn(&mut fetch);
1341        let err = ParquetMetaDataReader::new()
1342            .load_and_finish(input, 4)
1343            .await
1344            .unwrap_err()
1345            .to_string();
1346        assert_eq!(err, "EOF: file size of 4 is less than footer");
1347
1348        let input = MetadataFetchFn(&mut fetch);
1349        let err = ParquetMetaDataReader::new()
1350            .load_and_finish(input, 20)
1351            .await
1352            .unwrap_err()
1353            .to_string();
1354        assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
1355    }
1356
1357    #[tokio::test]
1358    async fn test_suffix() {
1359        let mut file = get_test_file("nulls.snappy.parquet");
1360        let mut file2 = file.try_clone().unwrap();
1361
1362        let expected = ParquetMetaDataReader::new()
1363            .parse_and_finish(&file)
1364            .unwrap();
1365        let expected = expected.file_metadata().schema();
1366        let fetch_count = AtomicUsize::new(0);
1367        let suffix_fetch_count = AtomicUsize::new(0);
1368
1369        let mut fetch = |range| {
1370            fetch_count.fetch_add(1, Ordering::SeqCst);
1371            futures::future::ready(read_range(&mut file, range))
1372        };
1373        let mut suffix_fetch = |suffix| {
1374            suffix_fetch_count.fetch_add(1, Ordering::SeqCst);
1375            futures::future::ready(read_suffix(&mut file2, suffix))
1376        };
1377
1378        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1379        let actual = ParquetMetaDataReader::new()
1380            .load_via_suffix_and_finish(input)
1381            .await
1382            .unwrap();
1383        assert_eq!(actual.file_metadata().schema(), expected);
1384        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1385        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1386
1387        // Metadata hint too small - below footer size
1388        fetch_count.store(0, Ordering::SeqCst);
1389        suffix_fetch_count.store(0, Ordering::SeqCst);
1390        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1391        let actual = ParquetMetaDataReader::new()
1392            .with_prefetch_hint(Some(7))
1393            .load_via_suffix_and_finish(input)
1394            .await
1395            .unwrap();
1396        assert_eq!(actual.file_metadata().schema(), expected);
1397        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1398        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1399
1400        // Metadata hint too small
1401        fetch_count.store(0, Ordering::SeqCst);
1402        suffix_fetch_count.store(0, Ordering::SeqCst);
1403        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1404        let actual = ParquetMetaDataReader::new()
1405            .with_prefetch_hint(Some(10))
1406            .load_via_suffix_and_finish(input)
1407            .await
1408            .unwrap();
1409        assert_eq!(actual.file_metadata().schema(), expected);
1410        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1411        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1412
1413        dbg!("test");
1414        // Metadata hint too large
1415        fetch_count.store(0, Ordering::SeqCst);
1416        suffix_fetch_count.store(0, Ordering::SeqCst);
1417        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1418        let actual = ParquetMetaDataReader::new()
1419            .with_prefetch_hint(Some(500))
1420            .load_via_suffix_and_finish(input)
1421            .await
1422            .unwrap();
1423        assert_eq!(actual.file_metadata().schema(), expected);
1424        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1425        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1426
1427        // Metadata hint exactly correct
1428        fetch_count.store(0, Ordering::SeqCst);
1429        suffix_fetch_count.store(0, Ordering::SeqCst);
1430        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1431        let actual = ParquetMetaDataReader::new()
1432            .with_prefetch_hint(Some(428))
1433            .load_via_suffix_and_finish(input)
1434            .await
1435            .unwrap();
1436        assert_eq!(actual.file_metadata().schema(), expected);
1437        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1438        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1439    }
1440
1441    #[cfg(feature = "encryption")]
1442    #[tokio::test]
1443    async fn test_suffix_with_encryption() {
1444        let mut file = get_test_file("uniform_encryption.parquet.encrypted");
1445        let mut file2 = file.try_clone().unwrap();
1446
1447        let mut fetch = |range| futures::future::ready(read_range(&mut file, range));
1448        let mut suffix_fetch = |suffix| futures::future::ready(read_suffix(&mut file2, suffix));
1449
1450        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1451
1452        let key_code: &[u8] = "0123456789012345".as_bytes();
1453        let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec())
1454            .build()
1455            .unwrap();
1456
1457        // just make sure the metadata is properly decrypted and read
1458        let expected = ParquetMetaDataReader::new()
1459            .with_decryption_properties(Some(&decryption_properties))
1460            .load_via_suffix_and_finish(input)
1461            .await
1462            .unwrap();
1463        assert_eq!(expected.num_row_groups(), 1);
1464    }
1465
1466    #[tokio::test]
1467    async fn test_page_index() {
1468        let mut file = get_test_file("alltypes_tiny_pages.parquet");
1469        let len = file.len() as usize;
1470        let fetch_count = AtomicUsize::new(0);
1471        let mut fetch = |range| {
1472            fetch_count.fetch_add(1, Ordering::SeqCst);
1473            futures::future::ready(read_range(&mut file, range))
1474        };
1475
1476        let f = MetadataFetchFn(&mut fetch);
1477        let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
1478        loader.try_load(f, len).await.unwrap();
1479        assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
1480        let metadata = loader.finish().unwrap();
1481        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1482
1483        // Prefetch just footer exactly
1484        fetch_count.store(0, Ordering::SeqCst);
1485        let f = MetadataFetchFn(&mut fetch);
1486        let mut loader = ParquetMetaDataReader::new()
1487            .with_page_indexes(true)
1488            .with_prefetch_hint(Some(1729));
1489        loader.try_load(f, len).await.unwrap();
1490        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1491        let metadata = loader.finish().unwrap();
1492        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1493
1494        // Prefetch more than footer but not enough
1495        fetch_count.store(0, Ordering::SeqCst);
1496        let f = MetadataFetchFn(&mut fetch);
1497        let mut loader = ParquetMetaDataReader::new()
1498            .with_page_indexes(true)
1499            .with_prefetch_hint(Some(130649));
1500        loader.try_load(f, len).await.unwrap();
1501        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1502        let metadata = loader.finish().unwrap();
1503        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1504
1505        // Prefetch exactly enough
1506        fetch_count.store(0, Ordering::SeqCst);
1507        let f = MetadataFetchFn(&mut fetch);
1508        let metadata = ParquetMetaDataReader::new()
1509            .with_page_indexes(true)
1510            .with_prefetch_hint(Some(130650))
1511            .load_and_finish(f, len)
1512            .await
1513            .unwrap();
1514        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1515        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1516
1517        // Prefetch more than enough but less than the entire file
1518        fetch_count.store(0, Ordering::SeqCst);
1519        let f = MetadataFetchFn(&mut fetch);
1520        let metadata = ParquetMetaDataReader::new()
1521            .with_page_indexes(true)
1522            .with_prefetch_hint(Some(len - 1000)) // prefetch entire file
1523            .load_and_finish(f, len)
1524            .await
1525            .unwrap();
1526        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1527        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1528
1529        // Prefetch the entire file
1530        fetch_count.store(0, Ordering::SeqCst);
1531        let f = MetadataFetchFn(&mut fetch);
1532        let metadata = ParquetMetaDataReader::new()
1533            .with_page_indexes(true)
1534            .with_prefetch_hint(Some(len)) // prefetch entire file
1535            .load_and_finish(f, len)
1536            .await
1537            .unwrap();
1538        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1539        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1540
1541        // Prefetch more than the entire file
1542        fetch_count.store(0, Ordering::SeqCst);
1543        let f = MetadataFetchFn(&mut fetch);
1544        let metadata = ParquetMetaDataReader::new()
1545            .with_page_indexes(true)
1546            .with_prefetch_hint(Some(len + 1000)) // prefetch entire file
1547            .load_and_finish(f, len)
1548            .await
1549            .unwrap();
1550        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1551        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1552    }
1553}