parquet/file/metadata/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{io::Read, ops::Range, sync::Arc};
19
20use crate::basic::ColumnOrder;
21#[cfg(feature = "encryption")]
22use crate::encryption::{
23    decrypt::{FileDecryptionProperties, FileDecryptor},
24    modules::create_footer_aad,
25};
26use bytes::Bytes;
27
28use crate::errors::{ParquetError, Result};
29use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData};
30use crate::file::page_index::index::Index;
31use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
32use crate::file::reader::ChunkReader;
33use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER};
34use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
35#[cfg(feature = "encryption")]
36use crate::format::{EncryptionAlgorithm, FileCryptoMetaData as TFileCryptoMetaData};
37use crate::schema::types;
38use crate::schema::types::SchemaDescriptor;
39use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
40
41#[cfg(all(feature = "async", feature = "arrow"))]
42use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
43#[cfg(feature = "encryption")]
44use crate::encryption::decrypt::CryptoContext;
45use crate::file::page_index::offset_index::OffsetIndexMetaData;
46
47/// Reads the [`ParquetMetaData`] from a byte stream.
48///
49/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for a description of
50/// the Parquet metadata.
51///
52/// Parquet metadata is not necessarily contiguous in the files: part is stored
53/// in the footer (the last bytes of the file), but other portions (such as the
54/// PageIndex) can be stored elsewhere.
55///
56/// This reader handles reading the footer as well as the non contiguous parts
57/// of the metadata such as the page indexes; excluding Bloom Filters.
58///
59/// # Example
60/// ```no_run
61/// # use parquet::file::metadata::ParquetMetaDataReader;
62/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
63/// // read parquet metadata including page indexes from a file
64/// let file = open_parquet_file("some_path.parquet");
65/// let mut reader = ParquetMetaDataReader::new()
66///     .with_page_indexes(true);
67/// reader.try_parse(&file).unwrap();
68/// let metadata = reader.finish().unwrap();
69/// assert!(metadata.column_index().is_some());
70/// assert!(metadata.offset_index().is_some());
71/// ```
72#[derive(Default)]
73pub struct ParquetMetaDataReader {
74    metadata: Option<ParquetMetaData>,
75    column_index: bool,
76    offset_index: bool,
77    prefetch_hint: Option<usize>,
78    // Size of the serialized thrift metadata plus the 8 byte footer. Only set if
79    // `self.parse_metadata` is called.
80    metadata_size: Option<usize>,
81    #[cfg(feature = "encryption")]
82    file_decryption_properties: Option<FileDecryptionProperties>,
83}
84
85/// Describes how the footer metadata is stored
86///
87/// This is parsed from the last 8 bytes of the Parquet file
88pub struct FooterTail {
89    metadata_length: usize,
90    encrypted_footer: bool,
91}
92
93impl FooterTail {
94    /// The length of the footer metadata in bytes
95    pub fn metadata_length(&self) -> usize {
96        self.metadata_length
97    }
98
99    /// Whether the footer metadata is encrypted
100    pub fn is_encrypted_footer(&self) -> bool {
101        self.encrypted_footer
102    }
103}
104
105impl ParquetMetaDataReader {
106    /// Create a new [`ParquetMetaDataReader`]
107    pub fn new() -> Self {
108        Default::default()
109    }
110
111    /// Create a new [`ParquetMetaDataReader`] populated with a [`ParquetMetaData`] struct
112    /// obtained via other means.
113    pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
114        Self {
115            metadata: Some(metadata),
116            ..Default::default()
117        }
118    }
119
120    /// Enable or disable reading the page index structures described in
121    /// "[Parquet page index]: Layout to Support Page Skipping". Equivalent to:
122    /// `self.with_column_indexes(val).with_offset_indexes(val)`
123    ///
124    /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
125    pub fn with_page_indexes(self, val: bool) -> Self {
126        self.with_column_indexes(val).with_offset_indexes(val)
127    }
128
129    /// Enable or disable reading the Parquet [ColumnIndex] structure.
130    ///
131    /// [ColumnIndex]:  https://github.com/apache/parquet-format/blob/master/PageIndex.md
132    pub fn with_column_indexes(mut self, val: bool) -> Self {
133        self.column_index = val;
134        self
135    }
136
137    /// Enable or disable reading the Parquet [OffsetIndex] structure.
138    ///
139    /// [OffsetIndex]:  https://github.com/apache/parquet-format/blob/master/PageIndex.md
140    pub fn with_offset_indexes(mut self, val: bool) -> Self {
141        self.offset_index = val;
142        self
143    }
144
145    /// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`].
146    /// Only used for the asynchronous [`Self::try_load()`] method.
147    ///
148    /// By default, the reader will first fetch the last 8 bytes of the input file to obtain the
149    /// size of the footer metadata. A second fetch will be performed to obtain the needed bytes.
150    /// After parsing the footer metadata, a third fetch will be performed to obtain the bytes
151    /// needed to decode the page index structures, if they have been requested. To avoid
152    /// unnecessary fetches, `prefetch` can be set to an estimate of the number of bytes needed
153    /// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and
154    /// reduce latency. Setting `prefetch` too small will not trigger an error, but will result
155    /// in extra fetches being performed.
156    pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
157        self.prefetch_hint = prefetch;
158        self
159    }
160
161    /// Provide the FileDecryptionProperties to use when decrypting the file.
162    ///
163    /// This is only necessary when the file is encrypted.
164    #[cfg(feature = "encryption")]
165    pub fn with_decryption_properties(
166        mut self,
167        properties: Option<&FileDecryptionProperties>,
168    ) -> Self {
169        self.file_decryption_properties = properties.cloned();
170        self
171    }
172
173    /// Indicates whether this reader has a [`ParquetMetaData`] internally.
174    pub fn has_metadata(&self) -> bool {
175        self.metadata.is_some()
176    }
177
178    /// Return the parsed [`ParquetMetaData`] struct, leaving `None` in its place.
179    pub fn finish(&mut self) -> Result<ParquetMetaData> {
180        self.metadata
181            .take()
182            .ok_or_else(|| general_err!("could not parse parquet metadata"))
183    }
184
185    /// Given a [`ChunkReader`], parse and return the [`ParquetMetaData`] in a single pass.
186    ///
187    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
188    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
189    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
190    ///
191    /// This call will consume `self`.
192    ///
193    /// # Example
194    /// ```no_run
195    /// # use parquet::file::metadata::ParquetMetaDataReader;
196    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
197    /// // read parquet metadata including page indexes
198    /// let file = open_parquet_file("some_path.parquet");
199    /// let metadata = ParquetMetaDataReader::new()
200    ///     .with_page_indexes(true)
201    ///     .parse_and_finish(&file).unwrap();
202    /// ```
203    pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
204        self.try_parse(reader)?;
205        self.finish()
206    }
207
208    /// Attempts to parse the footer metadata (and optionally page indexes) given a [`ChunkReader`].
209    ///
210    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
211    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
212    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
213    pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
214        self.try_parse_sized(reader, reader.len())
215    }
216
217    /// Same as [`Self::try_parse()`], but provide the original file size in the case that `reader`
218    /// is a [`Bytes`] struct that does not contain the entire file. This information is necessary
219    /// when the page indexes are desired. `reader` must have access to the Parquet footer.
220    ///
221    /// Using this function also allows for retrying with a larger buffer.
222    ///
223    /// # Errors
224    ///
225    /// This function will return [`ParquetError::NeedMoreData`] in the event `reader` does not
226    /// provide enough data to fully parse the metadata (see example below). The returned error
227    /// will be populated with a `usize` field indicating the number of bytes required from the
228    /// tail of the file to completely parse the requested metadata.
229    ///
230    /// Other errors returned include [`ParquetError::General`] and [`ParquetError::EOF`].
231    ///
232    /// # Example
233    /// ```no_run
234    /// # use parquet::file::metadata::ParquetMetaDataReader;
235    /// # use parquet::errors::ParquetError;
236    /// # use crate::parquet::file::reader::Length;
237    /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<u64>) -> bytes::Bytes { unimplemented!(); }
238    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
239    /// let file = open_parquet_file("some_path.parquet");
240    /// let len = file.len();
241    /// // Speculatively read 1 kilobyte from the end of the file
242    /// let bytes = get_bytes(&file, len - 1024..len);
243    /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
244    /// match reader.try_parse_sized(&bytes, len) {
245    ///     Ok(_) => (),
246    ///     Err(ParquetError::NeedMoreData(needed)) => {
247    ///         // Read the needed number of bytes from the end of the file
248    ///         let bytes = get_bytes(&file, len - needed as u64..len);
249    ///         reader.try_parse_sized(&bytes, len).unwrap();
250    ///     }
251    ///     _ => panic!("unexpected error")
252    /// }
253    /// let metadata = reader.finish().unwrap();
254    /// ```
255    ///
256    /// Note that it is possible for the file metadata to be completely read, but there are
257    /// insufficient bytes available to read the page indexes. [`Self::has_metadata()`] can be used
258    /// to test for this. In the event the file metadata is present, re-parsing of the file
259    /// metadata can be skipped by using [`Self::read_page_indexes_sized()`], as shown below.
260    /// ```no_run
261    /// # use parquet::file::metadata::ParquetMetaDataReader;
262    /// # use parquet::errors::ParquetError;
263    /// # use crate::parquet::file::reader::Length;
264    /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<u64>) -> bytes::Bytes { unimplemented!(); }
265    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
266    /// let file = open_parquet_file("some_path.parquet");
267    /// let len = file.len();
268    /// // Speculatively read 1 kilobyte from the end of the file
269    /// let mut bytes = get_bytes(&file, len - 1024..len);
270    /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
271    /// // Loop until `bytes` is large enough
272    /// loop {
273    ///     match reader.try_parse_sized(&bytes, len) {
274    ///         Ok(_) => break,
275    ///         Err(ParquetError::NeedMoreData(needed)) => {
276    ///             // Read the needed number of bytes from the end of the file
277    ///             bytes = get_bytes(&file, len - needed as u64..len);
278    ///             // If file metadata was read only read page indexes, otherwise continue loop
279    ///             if reader.has_metadata() {
280    ///                 reader.read_page_indexes_sized(&bytes, len);
281    ///                 break;
282    ///             }
283    ///         }
284    ///         _ => panic!("unexpected error")
285    ///     }
286    /// }
287    /// let metadata = reader.finish().unwrap();
288    /// ```
289    pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: u64) -> Result<()> {
290        self.metadata = match self.parse_metadata(reader) {
291            Ok(metadata) => Some(metadata),
292            Err(ParquetError::NeedMoreData(needed)) => {
293                // If reader is the same length as `file_size` then presumably there is no more to
294                // read, so return an EOF error.
295                if file_size == reader.len() || needed as u64 > file_size {
296                    return Err(eof_err!(
297                        "Parquet file too small. Size is {} but need {}",
298                        file_size,
299                        needed
300                    ));
301                } else {
302                    // Ask for a larger buffer
303                    return Err(ParquetError::NeedMoreData(needed));
304                }
305            }
306            Err(e) => return Err(e),
307        };
308
309        // we can return if page indexes aren't requested
310        if !self.column_index && !self.offset_index {
311            return Ok(());
312        }
313
314        self.read_page_indexes_sized(reader, file_size)
315    }
316
317    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
318    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`].
319    pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
320        self.read_page_indexes_sized(reader, reader.len())
321    }
322
323    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
324    /// This variant is used when `reader` cannot access the entire Parquet file (e.g. it is
325    /// a [`Bytes`] struct containing the tail of the file).
326    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`]. Like
327    /// [`Self::try_parse_sized()`] this function may return [`ParquetError::NeedMoreData`].
328    pub fn read_page_indexes_sized<R: ChunkReader>(
329        &mut self,
330        reader: &R,
331        file_size: u64,
332    ) -> Result<()> {
333        if self.metadata.is_none() {
334            return Err(general_err!(
335                "Tried to read page indexes without ParquetMetaData metadata"
336            ));
337        }
338
339        // Get bounds needed for page indexes (if any are present in the file).
340        let Some(range) = self.range_for_page_index() else {
341            return Ok(());
342        };
343
344        // Check to see if needed range is within `file_range`. Checking `range.end` seems
345        // redundant, but it guards against `range_for_page_index()` returning garbage.
346        let file_range = file_size.saturating_sub(reader.len())..file_size;
347        if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
348            // Requested range starts beyond EOF
349            if range.end > file_size {
350                return Err(eof_err!(
351                    "Parquet file too small. Range {:?} is beyond file bounds {file_size}",
352                    range
353                ));
354            } else {
355                // Ask for a larger buffer
356                return Err(ParquetError::NeedMoreData(
357                    (file_size - range.start).try_into()?,
358                ));
359            }
360        }
361
362        // Perform extra sanity check to make sure `range` and the footer metadata don't
363        // overlap.
364        if let Some(metadata_size) = self.metadata_size {
365            let metadata_range = file_size.saturating_sub(metadata_size as u64)..file_size;
366            if range.end > metadata_range.start {
367                return Err(eof_err!(
368                    "Parquet file too small. Page index range {:?} overlaps with file metadata {:?}",
369                    range,
370                    metadata_range
371                ));
372            }
373        }
374
375        let bytes_needed = usize::try_from(range.end - range.start)?;
376        let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?;
377        let offset = range.start;
378
379        self.parse_column_index(&bytes, offset)?;
380        self.parse_offset_index(&bytes, offset)?;
381
382        Ok(())
383    }
384
385    /// Given a [`MetadataFetch`], parse and return the [`ParquetMetaData`] in a single pass.
386    ///
387    /// This call will consume `self`.
388    ///
389    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
390    /// performed by this function.
391    #[cfg(all(feature = "async", feature = "arrow"))]
392    pub async fn load_and_finish<F: MetadataFetch>(
393        mut self,
394        fetch: F,
395        file_size: u64,
396    ) -> Result<ParquetMetaData> {
397        self.try_load(fetch, file_size).await?;
398        self.finish()
399    }
400
401    /// Given a [`MetadataSuffixFetch`], parse and return the [`ParquetMetaData`] in a single pass.
402    ///
403    /// This call will consume `self`.
404    ///
405    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
406    /// performed by this function.
407    #[cfg(all(feature = "async", feature = "arrow"))]
408    pub async fn load_via_suffix_and_finish<F: MetadataSuffixFetch>(
409        mut self,
410        fetch: F,
411    ) -> Result<ParquetMetaData> {
412        self.try_load_via_suffix(fetch).await?;
413        self.finish()
414    }
415    /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
416    /// given a [`MetadataFetch`].
417    ///
418    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
419    /// performed by this function.
420    #[cfg(all(feature = "async", feature = "arrow"))]
421    pub async fn try_load<F: MetadataFetch>(&mut self, mut fetch: F, file_size: u64) -> Result<()> {
422        let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
423
424        self.metadata = Some(metadata);
425
426        // we can return if page indexes aren't requested
427        if !self.column_index && !self.offset_index {
428            return Ok(());
429        }
430
431        self.load_page_index_with_remainder(fetch, remainder).await
432    }
433
434    /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
435    /// given a [`MetadataSuffixFetch`].
436    ///
437    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
438    /// performed by this function.
439    #[cfg(all(feature = "async", feature = "arrow"))]
440    pub async fn try_load_via_suffix<F: MetadataSuffixFetch>(
441        &mut self,
442        mut fetch: F,
443    ) -> Result<()> {
444        let (metadata, remainder) = self.load_metadata_via_suffix(&mut fetch).await?;
445
446        self.metadata = Some(metadata);
447
448        // we can return if page indexes aren't requested
449        if !self.column_index && !self.offset_index {
450            return Ok(());
451        }
452
453        self.load_page_index_with_remainder(fetch, remainder).await
454    }
455
456    /// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
457    /// been obtained. See [`Self::new_with_metadata()`].
458    #[cfg(all(feature = "async", feature = "arrow"))]
459    pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
460        self.load_page_index_with_remainder(fetch, None).await
461    }
462
463    #[cfg(all(feature = "async", feature = "arrow"))]
464    async fn load_page_index_with_remainder<F: MetadataFetch>(
465        &mut self,
466        mut fetch: F,
467        remainder: Option<(usize, Bytes)>,
468    ) -> Result<()> {
469        if self.metadata.is_none() {
470            return Err(general_err!("Footer metadata is not present"));
471        }
472
473        // Get bounds needed for page indexes (if any are present in the file).
474        let range = self.range_for_page_index();
475        let range = match range {
476            Some(range) => range,
477            None => return Ok(()),
478        };
479
480        let bytes = match &remainder {
481            Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => {
482                let remainder_start = *remainder_start as u64;
483                let offset = usize::try_from(range.start - remainder_start)?;
484                let end = usize::try_from(range.end - remainder_start)?;
485                assert!(end <= remainder.len());
486                remainder.slice(offset..end)
487            }
488            // Note: this will potentially fetch data already in remainder, this keeps things simple
489            _ => fetch.fetch(range.start..range.end).await?,
490        };
491
492        // Sanity check
493        assert_eq!(bytes.len() as u64, range.end - range.start);
494
495        self.parse_column_index(&bytes, range.start)?;
496        self.parse_offset_index(&bytes, range.start)?;
497
498        Ok(())
499    }
500
501    fn parse_column_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> {
502        let metadata = self.metadata.as_mut().unwrap();
503        if self.column_index {
504            let index = metadata
505                .row_groups()
506                .iter()
507                .enumerate()
508                .map(|(rg_idx, x)| {
509                    x.columns()
510                        .iter()
511                        .enumerate()
512                        .map(|(col_idx, c)| match c.column_index_range() {
513                            Some(r) => {
514                                let r_start = usize::try_from(r.start - start_offset)?;
515                                let r_end = usize::try_from(r.end - start_offset)?;
516                                Self::parse_single_column_index(
517                                    &bytes[r_start..r_end],
518                                    metadata,
519                                    c,
520                                    rg_idx,
521                                    col_idx,
522                                )
523                            }
524                            None => Ok(Index::NONE),
525                        })
526                        .collect::<Result<Vec<_>>>()
527                })
528                .collect::<Result<Vec<_>>>()?;
529            metadata.set_column_index(Some(index));
530        }
531        Ok(())
532    }
533
534    #[cfg(feature = "encryption")]
535    fn parse_single_column_index(
536        bytes: &[u8],
537        metadata: &ParquetMetaData,
538        column: &ColumnChunkMetaData,
539        row_group_index: usize,
540        col_index: usize,
541    ) -> Result<Index> {
542        match &column.column_crypto_metadata {
543            Some(crypto_metadata) => {
544                let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
545                    general_err!("Cannot decrypt column index, no file decryptor set")
546                })?;
547                let crypto_context = CryptoContext::for_column(
548                    file_decryptor,
549                    crypto_metadata,
550                    row_group_index,
551                    col_index,
552                )?;
553                let column_decryptor = crypto_context.metadata_decryptor();
554                let aad = crypto_context.create_column_index_aad()?;
555                let plaintext = column_decryptor.decrypt(bytes, &aad)?;
556                decode_column_index(&plaintext, column.column_type())
557            }
558            None => decode_column_index(bytes, column.column_type()),
559        }
560    }
561
562    #[cfg(not(feature = "encryption"))]
563    fn parse_single_column_index(
564        bytes: &[u8],
565        _metadata: &ParquetMetaData,
566        column: &ColumnChunkMetaData,
567        _row_group_index: usize,
568        _col_index: usize,
569    ) -> Result<Index> {
570        decode_column_index(bytes, column.column_type())
571    }
572
573    fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> {
574        let metadata = self.metadata.as_mut().unwrap();
575        if self.offset_index {
576            let index = metadata
577                .row_groups()
578                .iter()
579                .enumerate()
580                .map(|(rg_idx, x)| {
581                    x.columns()
582                        .iter()
583                        .enumerate()
584                        .map(|(col_idx, c)| match c.offset_index_range() {
585                            Some(r) => {
586                                let r_start = usize::try_from(r.start - start_offset)?;
587                                let r_end = usize::try_from(r.end - start_offset)?;
588                                Self::parse_single_offset_index(
589                                    &bytes[r_start..r_end],
590                                    metadata,
591                                    c,
592                                    rg_idx,
593                                    col_idx,
594                                )
595                            }
596                            None => Err(general_err!("missing offset index")),
597                        })
598                        .collect::<Result<Vec<_>>>()
599                })
600                .collect::<Result<Vec<_>>>()?;
601
602            metadata.set_offset_index(Some(index));
603        }
604        Ok(())
605    }
606
607    #[cfg(feature = "encryption")]
608    fn parse_single_offset_index(
609        bytes: &[u8],
610        metadata: &ParquetMetaData,
611        column: &ColumnChunkMetaData,
612        row_group_index: usize,
613        col_index: usize,
614    ) -> Result<OffsetIndexMetaData> {
615        match &column.column_crypto_metadata {
616            Some(crypto_metadata) => {
617                let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
618                    general_err!("Cannot decrypt offset index, no file decryptor set")
619                })?;
620                let crypto_context = CryptoContext::for_column(
621                    file_decryptor,
622                    crypto_metadata,
623                    row_group_index,
624                    col_index,
625                )?;
626                let column_decryptor = crypto_context.metadata_decryptor();
627                let aad = crypto_context.create_offset_index_aad()?;
628                let plaintext = column_decryptor.decrypt(bytes, &aad)?;
629                decode_offset_index(&plaintext)
630            }
631            None => decode_offset_index(bytes),
632        }
633    }
634
635    #[cfg(not(feature = "encryption"))]
636    fn parse_single_offset_index(
637        bytes: &[u8],
638        _metadata: &ParquetMetaData,
639        _column: &ColumnChunkMetaData,
640        _row_group_index: usize,
641        _col_index: usize,
642    ) -> Result<OffsetIndexMetaData> {
643        decode_offset_index(bytes)
644    }
645
646    fn range_for_page_index(&self) -> Option<Range<u64>> {
647        // sanity check
648        self.metadata.as_ref()?;
649
650        // Get bounds needed for page indexes (if any are present in the file).
651        let mut range = None;
652        let metadata = self.metadata.as_ref().unwrap();
653        for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
654            if self.column_index {
655                range = acc_range(range, c.column_index_range());
656            }
657            if self.offset_index {
658                range = acc_range(range, c.offset_index_range());
659            }
660        }
661        range
662    }
663
664    // One-shot parse of footer.
665    // Side effect: this will set `self.metadata_size`
666    fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
667        // check file is large enough to hold footer
668        let file_size = chunk_reader.len();
669        if file_size < (FOOTER_SIZE as u64) {
670            return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
671        }
672
673        let mut footer = [0_u8; 8];
674        chunk_reader
675            .get_read(file_size - 8)?
676            .read_exact(&mut footer)?;
677
678        let footer = Self::decode_footer_tail(&footer)?;
679        let metadata_len = footer.metadata_length();
680        let footer_metadata_len = FOOTER_SIZE + metadata_len;
681        self.metadata_size = Some(footer_metadata_len);
682
683        if footer_metadata_len as u64 > file_size {
684            return Err(ParquetError::NeedMoreData(footer_metadata_len));
685        }
686
687        let start = file_size - footer_metadata_len as u64;
688        self.decode_footer_metadata(
689            chunk_reader.get_bytes(start, metadata_len)?.as_ref(),
690            &footer,
691        )
692    }
693
694    /// Return the number of bytes to read in the initial pass. If `prefetch_size` has
695    /// been provided, then return that value if it is larger than the size of the Parquet
696    /// file footer (8 bytes). Otherwise returns `8`.
697    #[cfg(all(feature = "async", feature = "arrow"))]
698    fn get_prefetch_size(&self) -> usize {
699        if let Some(prefetch) = self.prefetch_hint {
700            if prefetch > FOOTER_SIZE {
701                return prefetch;
702            }
703        }
704        FOOTER_SIZE
705    }
706
707    #[cfg(all(feature = "async", feature = "arrow"))]
708    async fn load_metadata<F: MetadataFetch>(
709        &self,
710        fetch: &mut F,
711        file_size: u64,
712    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
713        let prefetch = self.get_prefetch_size() as u64;
714
715        if file_size < FOOTER_SIZE as u64 {
716            return Err(eof_err!("file size of {} is less than footer", file_size));
717        }
718
719        // If a size hint is provided, read more than the minimum size
720        // to try and avoid a second fetch.
721        // Note: prefetch > file_size is ok since we're using saturating_sub.
722        let footer_start = file_size.saturating_sub(prefetch);
723
724        let suffix = fetch.fetch(footer_start..file_size).await?;
725        let suffix_len = suffix.len();
726        let fetch_len = (file_size - footer_start)
727            .try_into()
728            .expect("footer size should never be larger than u32");
729        if suffix_len < fetch_len {
730            return Err(eof_err!(
731                "metadata requires {} bytes, but could only read {}",
732                fetch_len,
733                suffix_len
734            ));
735        }
736
737        let mut footer = [0; FOOTER_SIZE];
738        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
739
740        let footer = Self::decode_footer_tail(&footer)?;
741        let length = footer.metadata_length();
742
743        if file_size < (length + FOOTER_SIZE) as u64 {
744            return Err(eof_err!(
745                "file size of {} is less than footer + metadata {}",
746                file_size,
747                length + FOOTER_SIZE
748            ));
749        }
750
751        // Did not fetch the entire file metadata in the initial read, need to make a second request
752        if length > suffix_len - FOOTER_SIZE {
753            let metadata_start = file_size - (length + FOOTER_SIZE) as u64;
754            let meta = fetch
755                .fetch(metadata_start..(file_size - FOOTER_SIZE as u64))
756                .await?;
757            Ok((self.decode_footer_metadata(&meta, &footer)?, None))
758        } else {
759            let metadata_start = (file_size - (length + FOOTER_SIZE) as u64 - footer_start)
760                .try_into()
761                .expect("metadata length should never be larger than u32");
762            let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
763            Ok((
764                self.decode_footer_metadata(slice, &footer)?,
765                Some((footer_start as usize, suffix.slice(..metadata_start))),
766            ))
767        }
768    }
769
770    #[cfg(all(feature = "async", feature = "arrow"))]
771    async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
772        &self,
773        fetch: &mut F,
774    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
775        let prefetch = self.get_prefetch_size();
776
777        let suffix = fetch.fetch_suffix(prefetch as _).await?;
778        let suffix_len = suffix.len();
779
780        if suffix_len < FOOTER_SIZE {
781            return Err(eof_err!(
782                "footer metadata requires {} bytes, but could only read {}",
783                FOOTER_SIZE,
784                suffix_len
785            ));
786        }
787
788        let mut footer = [0; FOOTER_SIZE];
789        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
790
791        let footer = Self::decode_footer_tail(&footer)?;
792        let length = footer.metadata_length();
793
794        // Did not fetch the entire file metadata in the initial read, need to make a second request
795        let metadata_offset = length + FOOTER_SIZE;
796        if length > suffix_len - FOOTER_SIZE {
797            let meta = fetch.fetch_suffix(metadata_offset).await?;
798
799            if meta.len() < metadata_offset {
800                return Err(eof_err!(
801                    "metadata requires {} bytes, but could only read {}",
802                    metadata_offset,
803                    meta.len()
804                ));
805            }
806
807            Ok((
808                // need to slice off the footer or decryption fails
809                self.decode_footer_metadata(&meta.slice(0..length), &footer)?,
810                None,
811            ))
812        } else {
813            let metadata_start = suffix_len - metadata_offset;
814            let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
815            Ok((
816                self.decode_footer_metadata(slice, &footer)?,
817                Some((0, suffix.slice(..metadata_start))),
818            ))
819        }
820    }
821
822    /// Decodes the end of the Parquet footer
823    ///
824    /// There are 8 bytes at the end of the Parquet footer with the following layout:
825    /// * 4 bytes for the metadata length
826    /// * 4 bytes for the magic bytes 'PAR1' or 'PARE' (encrypted footer)
827    ///
828    /// ```text
829    /// +-----+------------------+
830    /// | len | 'PAR1' or 'PARE' |
831    /// +-----+------------------+
832    /// ```
833    pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result<FooterTail> {
834        let magic = &slice[4..];
835        let encrypted_footer = if magic == PARQUET_MAGIC_ENCR_FOOTER {
836            true
837        } else if magic == PARQUET_MAGIC {
838            false
839        } else {
840            return Err(general_err!("Invalid Parquet file. Corrupt footer"));
841        };
842        // get the metadata length from the footer
843        let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap());
844        Ok(FooterTail {
845            // u32 won't be larger than usize in most cases
846            metadata_length: metadata_len as usize,
847            encrypted_footer,
848        })
849    }
850
851    /// Decodes the Parquet footer, returning the metadata length in bytes
852    #[deprecated(since = "54.3.0", note = "Use decode_footer_tail instead")]
853    pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
854        Self::decode_footer_tail(slice).map(|f| f.metadata_length)
855    }
856
857    /// Decodes [`ParquetMetaData`] from the provided bytes.
858    ///
859    /// Typically, this is used to decode the metadata from the end of a parquet
860    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
861    /// by the [Parquet Spec].
862    ///
863    /// This method handles using either `decode_metadata` or
864    /// `decode_metadata_with_encryption` depending on whether the encryption
865    /// feature is enabled.
866    ///
867    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
868    pub(crate) fn decode_footer_metadata(
869        &self,
870        buf: &[u8],
871        footer_tail: &FooterTail,
872    ) -> Result<ParquetMetaData> {
873        #[cfg(feature = "encryption")]
874        let result = Self::decode_metadata_with_encryption(
875            buf,
876            footer_tail.is_encrypted_footer(),
877            self.file_decryption_properties.as_ref(),
878        );
879        #[cfg(not(feature = "encryption"))]
880        let result = {
881            if footer_tail.is_encrypted_footer() {
882                Err(general_err!(
883                    "Parquet file has an encrypted footer but the encryption feature is disabled"
884                ))
885            } else {
886                Self::decode_metadata(buf)
887            }
888        };
889        result
890    }
891
892    /// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata that may be encrypted.
893    ///
894    /// Typically this is used to decode the metadata from the end of a parquet
895    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
896    /// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR
897    /// ciphers as specfied in the [Parquet Encryption Spec].
898    ///
899    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
900    /// [Parquet Encryption Spec]: https://parquet.apache.org/docs/file-format/data-pages/encryption/
901    #[cfg(feature = "encryption")]
902    fn decode_metadata_with_encryption(
903        buf: &[u8],
904        encrypted_footer: bool,
905        file_decryption_properties: Option<&FileDecryptionProperties>,
906    ) -> Result<ParquetMetaData> {
907        let mut prot = TCompactSliceInputProtocol::new(buf);
908        let mut file_decryptor = None;
909        let decrypted_fmd_buf;
910
911        if encrypted_footer {
912            if let Some(file_decryption_properties) = file_decryption_properties {
913                let t_file_crypto_metadata: TFileCryptoMetaData =
914                    TFileCryptoMetaData::read_from_in_protocol(&mut prot)
915                        .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
916                let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm {
917                    EncryptionAlgorithm::AESGCMV1(algo) => algo.supply_aad_prefix,
918                    _ => Some(false),
919                }
920                .unwrap_or(false);
921                if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() {
922                    return Err(general_err!(
923                        "Parquet file was encrypted with an AAD prefix that is not stored in the file, \
924                        but no AAD prefix was provided in the file decryption properties"
925                    ));
926                }
927                let decryptor = get_file_decryptor(
928                    t_file_crypto_metadata.encryption_algorithm,
929                    t_file_crypto_metadata.key_metadata.as_deref(),
930                    file_decryption_properties,
931                )?;
932                let footer_decryptor = decryptor.get_footer_decryptor();
933                let aad_footer = create_footer_aad(decryptor.file_aad())?;
934
935                decrypted_fmd_buf = footer_decryptor?
936                    .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
937                    .map_err(|_| {
938                        general_err!(
939                            "Provided footer key and AAD were unable to decrypt parquet footer"
940                        )
941                    })?;
942                prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
943
944                file_decryptor = Some(decryptor);
945            } else {
946                return Err(general_err!("Parquet file has an encrypted footer but decryption properties were not provided"));
947            }
948        }
949
950        let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
951            .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
952        let schema = types::from_thrift(&t_file_metadata.schema)?;
953        let schema_descr = Arc::new(SchemaDescriptor::new(schema));
954
955        if let (Some(algo), Some(file_decryption_properties)) = (
956            t_file_metadata.encryption_algorithm,
957            file_decryption_properties,
958        ) {
959            // File has a plaintext footer but encryption algorithm is set
960            let file_decryptor_value = get_file_decryptor(
961                algo,
962                t_file_metadata.footer_signing_key_metadata.as_deref(),
963                file_decryption_properties,
964            )?;
965            if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer {
966                file_decryptor_value.verify_plaintext_footer_signature(buf)?;
967            }
968            file_decryptor = Some(file_decryptor_value);
969        }
970
971        let mut row_groups = Vec::new();
972        for rg in t_file_metadata.row_groups {
973            let r = RowGroupMetaData::from_encrypted_thrift(
974                schema_descr.clone(),
975                rg,
976                file_decryptor.as_ref(),
977            )?;
978            row_groups.push(r);
979        }
980        let column_orders =
981            Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
982
983        let file_metadata = FileMetaData::new(
984            t_file_metadata.version,
985            t_file_metadata.num_rows,
986            t_file_metadata.created_by,
987            t_file_metadata.key_value_metadata,
988            schema_descr,
989            column_orders,
990        );
991        let mut metadata = ParquetMetaData::new(file_metadata, row_groups);
992
993        metadata.with_file_decryptor(file_decryptor);
994
995        Ok(metadata)
996    }
997
998    /// Decodes [`ParquetMetaData`] from the provided bytes.
999    ///
1000    /// Typically this is used to decode the metadata from the end of a parquet
1001    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
1002    /// by the [Parquet Spec].
1003    ///
1004    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
1005    pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
1006        let mut prot = TCompactSliceInputProtocol::new(buf);
1007
1008        let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
1009            .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
1010        let schema = types::from_thrift(&t_file_metadata.schema)?;
1011        let schema_descr = Arc::new(SchemaDescriptor::new(schema));
1012
1013        let mut row_groups = Vec::new();
1014        for rg in t_file_metadata.row_groups {
1015            row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
1016        }
1017        let column_orders =
1018            Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
1019
1020        let file_metadata = FileMetaData::new(
1021            t_file_metadata.version,
1022            t_file_metadata.num_rows,
1023            t_file_metadata.created_by,
1024            t_file_metadata.key_value_metadata,
1025            schema_descr,
1026            column_orders,
1027        );
1028
1029        Ok(ParquetMetaData::new(file_metadata, row_groups))
1030    }
1031
1032    /// Parses column orders from Thrift definition.
1033    /// If no column orders are defined, returns `None`.
1034    fn parse_column_orders(
1035        t_column_orders: Option<Vec<TColumnOrder>>,
1036        schema_descr: &SchemaDescriptor,
1037    ) -> Result<Option<Vec<ColumnOrder>>> {
1038        match t_column_orders {
1039            Some(orders) => {
1040                // Should always be the case
1041                if orders.len() != schema_descr.num_columns() {
1042                    return Err(general_err!("Column order length mismatch"));
1043                };
1044                let mut res = Vec::new();
1045                for (i, column) in schema_descr.columns().iter().enumerate() {
1046                    match orders[i] {
1047                        TColumnOrder::TYPEORDER(_) => {
1048                            let sort_order = ColumnOrder::get_sort_order(
1049                                column.logical_type(),
1050                                column.converted_type(),
1051                                column.physical_type(),
1052                            );
1053                            res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
1054                        }
1055                    }
1056                }
1057                Ok(Some(res))
1058            }
1059            None => Ok(None),
1060        }
1061    }
1062}
1063
1064#[cfg(feature = "encryption")]
1065fn get_file_decryptor(
1066    encryption_algorithm: EncryptionAlgorithm,
1067    footer_key_metadata: Option<&[u8]>,
1068    file_decryption_properties: &FileDecryptionProperties,
1069) -> Result<FileDecryptor> {
1070    match encryption_algorithm {
1071        EncryptionAlgorithm::AESGCMV1(algo) => {
1072            let aad_file_unique = algo
1073                .aad_file_unique
1074                .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?;
1075            let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() {
1076                aad_prefix.clone()
1077            } else {
1078                algo.aad_prefix.unwrap_or_default()
1079            };
1080
1081            FileDecryptor::new(
1082                file_decryption_properties,
1083                footer_key_metadata,
1084                aad_file_unique,
1085                aad_prefix,
1086            )
1087        }
1088        EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!(
1089            "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
1090        )),
1091    }
1092}
1093
1094#[cfg(test)]
1095mod tests {
1096    use super::*;
1097    use bytes::Bytes;
1098
1099    use crate::basic::SortOrder;
1100    use crate::basic::Type;
1101    use crate::file::reader::Length;
1102    use crate::format::TypeDefinedOrder;
1103    use crate::schema::types::Type as SchemaType;
1104    use crate::util::test_common::file_util::get_test_file;
1105
1106    #[test]
1107    fn test_parse_metadata_size_smaller_than_footer() {
1108        let test_file = tempfile::tempfile().unwrap();
1109        let err = ParquetMetaDataReader::new()
1110            .parse_metadata(&test_file)
1111            .unwrap_err();
1112        assert!(matches!(err, ParquetError::NeedMoreData(8)));
1113    }
1114
1115    #[test]
1116    fn test_parse_metadata_corrupt_footer() {
1117        let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
1118        let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
1119        assert_eq!(
1120            reader_result.unwrap_err().to_string(),
1121            "Parquet error: Invalid Parquet file. Corrupt footer"
1122        );
1123    }
1124
1125    #[test]
1126    fn test_parse_metadata_invalid_start() {
1127        let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
1128        let err = ParquetMetaDataReader::new()
1129            .parse_metadata(&test_file)
1130            .unwrap_err();
1131        assert!(matches!(err, ParquetError::NeedMoreData(263)));
1132    }
1133
1134    #[test]
1135    fn test_metadata_column_orders_parse() {
1136        // Define simple schema, we do not need to provide logical types.
1137        let fields = vec![
1138            Arc::new(
1139                SchemaType::primitive_type_builder("col1", Type::INT32)
1140                    .build()
1141                    .unwrap(),
1142            ),
1143            Arc::new(
1144                SchemaType::primitive_type_builder("col2", Type::FLOAT)
1145                    .build()
1146                    .unwrap(),
1147            ),
1148        ];
1149        let schema = SchemaType::group_type_builder("schema")
1150            .with_fields(fields)
1151            .build()
1152            .unwrap();
1153        let schema_descr = SchemaDescriptor::new(Arc::new(schema));
1154
1155        let t_column_orders = Some(vec![
1156            TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
1157            TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
1158        ]);
1159
1160        assert_eq!(
1161            ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr).unwrap(),
1162            Some(vec![
1163                ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1164                ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
1165            ])
1166        );
1167
1168        // Test when no column orders are defined.
1169        assert_eq!(
1170            ParquetMetaDataReader::parse_column_orders(None, &schema_descr).unwrap(),
1171            None
1172        );
1173    }
1174
1175    #[test]
1176    fn test_metadata_column_orders_len_mismatch() {
1177        let schema = SchemaType::group_type_builder("schema").build().unwrap();
1178        let schema_descr = SchemaDescriptor::new(Arc::new(schema));
1179
1180        let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);
1181
1182        let res = ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr);
1183        assert!(res.is_err());
1184        assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch"));
1185    }
1186
1187    #[test]
1188    fn test_try_parse() {
1189        let file = get_test_file("alltypes_tiny_pages.parquet");
1190        let len = file.len();
1191
1192        let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
1193
1194        let bytes_for_range = |range: Range<u64>| {
1195            file.get_bytes(range.start, (range.end - range.start).try_into().unwrap())
1196                .unwrap()
1197        };
1198
1199        // read entire file
1200        let bytes = bytes_for_range(0..len);
1201        reader.try_parse(&bytes).unwrap();
1202        let metadata = reader.finish().unwrap();
1203        assert!(metadata.column_index.is_some());
1204        assert!(metadata.offset_index.is_some());
1205
1206        // read more than enough of file
1207        let bytes = bytes_for_range(320000..len);
1208        reader.try_parse_sized(&bytes, len).unwrap();
1209        let metadata = reader.finish().unwrap();
1210        assert!(metadata.column_index.is_some());
1211        assert!(metadata.offset_index.is_some());
1212
1213        // exactly enough
1214        let bytes = bytes_for_range(323583..len);
1215        reader.try_parse_sized(&bytes, len).unwrap();
1216        let metadata = reader.finish().unwrap();
1217        assert!(metadata.column_index.is_some());
1218        assert!(metadata.offset_index.is_some());
1219
1220        // not enough for page index
1221        let bytes = bytes_for_range(323584..len);
1222        // should fail
1223        match reader.try_parse_sized(&bytes, len).unwrap_err() {
1224            // expected error, try again with provided bounds
1225            ParquetError::NeedMoreData(needed) => {
1226                let bytes = bytes_for_range(len - needed as u64..len);
1227                reader.try_parse_sized(&bytes, len).unwrap();
1228                let metadata = reader.finish().unwrap();
1229                assert!(metadata.column_index.is_some());
1230                assert!(metadata.offset_index.is_some());
1231            }
1232            _ => panic!("unexpected error"),
1233        };
1234
1235        // not enough for file metadata, but keep trying until page indexes are read
1236        let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
1237        let mut bytes = bytes_for_range(452505..len);
1238        loop {
1239            match reader.try_parse_sized(&bytes, len) {
1240                Ok(_) => break,
1241                Err(ParquetError::NeedMoreData(needed)) => {
1242                    bytes = bytes_for_range(len - needed as u64..len);
1243                    if reader.has_metadata() {
1244                        reader.read_page_indexes_sized(&bytes, len).unwrap();
1245                        break;
1246                    }
1247                }
1248                _ => panic!("unexpected error"),
1249            }
1250        }
1251        let metadata = reader.finish().unwrap();
1252        assert!(metadata.column_index.is_some());
1253        assert!(metadata.offset_index.is_some());
1254
1255        // not enough for page index but lie about file size
1256        let bytes = bytes_for_range(323584..len);
1257        let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
1258        assert_eq!(
1259            reader_result.to_string(),
1260            "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
1261        );
1262
1263        // not enough for file metadata
1264        let mut reader = ParquetMetaDataReader::new();
1265        let bytes = bytes_for_range(452505..len);
1266        // should fail
1267        match reader.try_parse_sized(&bytes, len).unwrap_err() {
1268            // expected error, try again with provided bounds
1269            ParquetError::NeedMoreData(needed) => {
1270                let bytes = bytes_for_range(len - needed as u64..len);
1271                reader.try_parse_sized(&bytes, len).unwrap();
1272                reader.finish().unwrap();
1273            }
1274            _ => panic!("unexpected error"),
1275        };
1276
1277        // not enough for file metadata but use try_parse()
1278        let reader_result = reader.try_parse(&bytes).unwrap_err();
1279        assert_eq!(
1280            reader_result.to_string(),
1281            "EOF: Parquet file too small. Size is 1728 but need 1729"
1282        );
1283
1284        // read head of file rather than tail
1285        let bytes = bytes_for_range(0..1000);
1286        let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
1287        assert_eq!(
1288            reader_result.to_string(),
1289            "Parquet error: Invalid Parquet file. Corrupt footer"
1290        );
1291
1292        // lie about file size
1293        let bytes = bytes_for_range(452510..len);
1294        let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
1295        assert_eq!(
1296            reader_result.to_string(),
1297            "EOF: Parquet file too small. Size is 1728 but need 1729"
1298        );
1299    }
1300}
1301
1302#[cfg(all(feature = "async", feature = "arrow", test))]
1303mod async_tests {
1304    use super::*;
1305    use bytes::Bytes;
1306    use futures::future::BoxFuture;
1307    use futures::FutureExt;
1308    use std::fs::File;
1309    use std::future::Future;
1310    use std::io::{Read, Seek, SeekFrom};
1311    use std::ops::Range;
1312    use std::sync::atomic::{AtomicUsize, Ordering};
1313
1314    use crate::file::reader::Length;
1315    use crate::util::test_common::file_util::get_test_file;
1316
1317    struct MetadataFetchFn<F>(F);
1318
1319    impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
1320    where
1321        F: FnMut(Range<u64>) -> Fut + Send,
1322        Fut: Future<Output = Result<Bytes>> + Send,
1323    {
1324        fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1325            async move { self.0(range).await }.boxed()
1326        }
1327    }
1328
1329    struct MetadataSuffixFetchFn<F1, F2>(F1, F2);
1330
1331    impl<F1, Fut, F2> MetadataFetch for MetadataSuffixFetchFn<F1, F2>
1332    where
1333        F1: FnMut(Range<u64>) -> Fut + Send,
1334        Fut: Future<Output = Result<Bytes>> + Send,
1335        F2: Send,
1336    {
1337        fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1338            async move { self.0(range).await }.boxed()
1339        }
1340    }
1341
1342    impl<F1, Fut, F2> MetadataSuffixFetch for MetadataSuffixFetchFn<F1, F2>
1343    where
1344        F1: FnMut(Range<u64>) -> Fut + Send,
1345        F2: FnMut(usize) -> Fut + Send,
1346        Fut: Future<Output = Result<Bytes>> + Send,
1347    {
1348        fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
1349            async move { self.1(suffix).await }.boxed()
1350        }
1351    }
1352
1353    fn read_range(file: &mut File, range: Range<u64>) -> Result<Bytes> {
1354        file.seek(SeekFrom::Start(range.start as _))?;
1355        let len = range.end - range.start;
1356        let mut buf = Vec::with_capacity(len.try_into().unwrap());
1357        file.take(len as _).read_to_end(&mut buf)?;
1358        Ok(buf.into())
1359    }
1360
1361    fn read_suffix(file: &mut File, suffix: usize) -> Result<Bytes> {
1362        let file_len = file.len();
1363        // Don't seek before beginning of file
1364        file.seek(SeekFrom::End(0 - suffix.min(file_len as _) as i64))?;
1365        let mut buf = Vec::with_capacity(suffix);
1366        file.take(suffix as _).read_to_end(&mut buf)?;
1367        Ok(buf.into())
1368    }
1369
1370    #[tokio::test]
1371    async fn test_simple() {
1372        let mut file = get_test_file("nulls.snappy.parquet");
1373        let len = file.len();
1374
1375        let expected = ParquetMetaDataReader::new()
1376            .parse_and_finish(&file)
1377            .unwrap();
1378        let expected = expected.file_metadata().schema();
1379        let fetch_count = AtomicUsize::new(0);
1380
1381        let mut fetch = |range| {
1382            fetch_count.fetch_add(1, Ordering::SeqCst);
1383            futures::future::ready(read_range(&mut file, range))
1384        };
1385
1386        let input = MetadataFetchFn(&mut fetch);
1387        let actual = ParquetMetaDataReader::new()
1388            .load_and_finish(input, len)
1389            .await
1390            .unwrap();
1391        assert_eq!(actual.file_metadata().schema(), expected);
1392        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1393
1394        // Metadata hint too small - below footer size
1395        fetch_count.store(0, Ordering::SeqCst);
1396        let input = MetadataFetchFn(&mut fetch);
1397        let actual = ParquetMetaDataReader::new()
1398            .with_prefetch_hint(Some(7))
1399            .load_and_finish(input, len)
1400            .await
1401            .unwrap();
1402        assert_eq!(actual.file_metadata().schema(), expected);
1403        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1404
1405        // Metadata hint too small
1406        fetch_count.store(0, Ordering::SeqCst);
1407        let input = MetadataFetchFn(&mut fetch);
1408        let actual = ParquetMetaDataReader::new()
1409            .with_prefetch_hint(Some(10))
1410            .load_and_finish(input, len)
1411            .await
1412            .unwrap();
1413        assert_eq!(actual.file_metadata().schema(), expected);
1414        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1415
1416        // Metadata hint too large
1417        fetch_count.store(0, Ordering::SeqCst);
1418        let input = MetadataFetchFn(&mut fetch);
1419        let actual = ParquetMetaDataReader::new()
1420            .with_prefetch_hint(Some(500))
1421            .load_and_finish(input, len)
1422            .await
1423            .unwrap();
1424        assert_eq!(actual.file_metadata().schema(), expected);
1425        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1426
1427        // Metadata hint exactly correct
1428        fetch_count.store(0, Ordering::SeqCst);
1429        let input = MetadataFetchFn(&mut fetch);
1430        let actual = ParquetMetaDataReader::new()
1431            .with_prefetch_hint(Some(428))
1432            .load_and_finish(input, len)
1433            .await
1434            .unwrap();
1435        assert_eq!(actual.file_metadata().schema(), expected);
1436        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1437
1438        let input = MetadataFetchFn(&mut fetch);
1439        let err = ParquetMetaDataReader::new()
1440            .load_and_finish(input, 4)
1441            .await
1442            .unwrap_err()
1443            .to_string();
1444        assert_eq!(err, "EOF: file size of 4 is less than footer");
1445
1446        let input = MetadataFetchFn(&mut fetch);
1447        let err = ParquetMetaDataReader::new()
1448            .load_and_finish(input, 20)
1449            .await
1450            .unwrap_err()
1451            .to_string();
1452        assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
1453    }
1454
1455    #[tokio::test]
1456    async fn test_suffix() {
1457        let mut file = get_test_file("nulls.snappy.parquet");
1458        let mut file2 = file.try_clone().unwrap();
1459
1460        let expected = ParquetMetaDataReader::new()
1461            .parse_and_finish(&file)
1462            .unwrap();
1463        let expected = expected.file_metadata().schema();
1464        let fetch_count = AtomicUsize::new(0);
1465        let suffix_fetch_count = AtomicUsize::new(0);
1466
1467        let mut fetch = |range| {
1468            fetch_count.fetch_add(1, Ordering::SeqCst);
1469            futures::future::ready(read_range(&mut file, range))
1470        };
1471        let mut suffix_fetch = |suffix| {
1472            suffix_fetch_count.fetch_add(1, Ordering::SeqCst);
1473            futures::future::ready(read_suffix(&mut file2, suffix))
1474        };
1475
1476        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1477        let actual = ParquetMetaDataReader::new()
1478            .load_via_suffix_and_finish(input)
1479            .await
1480            .unwrap();
1481        assert_eq!(actual.file_metadata().schema(), expected);
1482        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1483        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1484
1485        // Metadata hint too small - below footer size
1486        fetch_count.store(0, Ordering::SeqCst);
1487        suffix_fetch_count.store(0, Ordering::SeqCst);
1488        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1489        let actual = ParquetMetaDataReader::new()
1490            .with_prefetch_hint(Some(7))
1491            .load_via_suffix_and_finish(input)
1492            .await
1493            .unwrap();
1494        assert_eq!(actual.file_metadata().schema(), expected);
1495        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1496        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1497
1498        // Metadata hint too small
1499        fetch_count.store(0, Ordering::SeqCst);
1500        suffix_fetch_count.store(0, Ordering::SeqCst);
1501        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1502        let actual = ParquetMetaDataReader::new()
1503            .with_prefetch_hint(Some(10))
1504            .load_via_suffix_and_finish(input)
1505            .await
1506            .unwrap();
1507        assert_eq!(actual.file_metadata().schema(), expected);
1508        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1509        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1510
1511        dbg!("test");
1512        // Metadata hint too large
1513        fetch_count.store(0, Ordering::SeqCst);
1514        suffix_fetch_count.store(0, Ordering::SeqCst);
1515        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1516        let actual = ParquetMetaDataReader::new()
1517            .with_prefetch_hint(Some(500))
1518            .load_via_suffix_and_finish(input)
1519            .await
1520            .unwrap();
1521        assert_eq!(actual.file_metadata().schema(), expected);
1522        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1523        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1524
1525        // Metadata hint exactly correct
1526        fetch_count.store(0, Ordering::SeqCst);
1527        suffix_fetch_count.store(0, Ordering::SeqCst);
1528        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1529        let actual = ParquetMetaDataReader::new()
1530            .with_prefetch_hint(Some(428))
1531            .load_via_suffix_and_finish(input)
1532            .await
1533            .unwrap();
1534        assert_eq!(actual.file_metadata().schema(), expected);
1535        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1536        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1537    }
1538
1539    #[cfg(feature = "encryption")]
1540    #[tokio::test]
1541    async fn test_suffix_with_encryption() {
1542        let mut file = get_test_file("uniform_encryption.parquet.encrypted");
1543        let mut file2 = file.try_clone().unwrap();
1544
1545        let mut fetch = |range| futures::future::ready(read_range(&mut file, range));
1546        let mut suffix_fetch = |suffix| futures::future::ready(read_suffix(&mut file2, suffix));
1547
1548        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1549
1550        let key_code: &[u8] = "0123456789012345".as_bytes();
1551        let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec())
1552            .build()
1553            .unwrap();
1554
1555        // just make sure the metadata is properly decrypted and read
1556        let expected = ParquetMetaDataReader::new()
1557            .with_decryption_properties(Some(&decryption_properties))
1558            .load_via_suffix_and_finish(input)
1559            .await
1560            .unwrap();
1561        assert_eq!(expected.num_row_groups(), 1);
1562    }
1563
1564    #[tokio::test]
1565    async fn test_page_index() {
1566        let mut file = get_test_file("alltypes_tiny_pages.parquet");
1567        let len = file.len();
1568        let fetch_count = AtomicUsize::new(0);
1569        let mut fetch = |range| {
1570            fetch_count.fetch_add(1, Ordering::SeqCst);
1571            futures::future::ready(read_range(&mut file, range))
1572        };
1573
1574        let f = MetadataFetchFn(&mut fetch);
1575        let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
1576        loader.try_load(f, len).await.unwrap();
1577        assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
1578        let metadata = loader.finish().unwrap();
1579        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1580
1581        // Prefetch just footer exactly
1582        fetch_count.store(0, Ordering::SeqCst);
1583        let f = MetadataFetchFn(&mut fetch);
1584        let mut loader = ParquetMetaDataReader::new()
1585            .with_page_indexes(true)
1586            .with_prefetch_hint(Some(1729));
1587        loader.try_load(f, len).await.unwrap();
1588        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1589        let metadata = loader.finish().unwrap();
1590        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1591
1592        // Prefetch more than footer but not enough
1593        fetch_count.store(0, Ordering::SeqCst);
1594        let f = MetadataFetchFn(&mut fetch);
1595        let mut loader = ParquetMetaDataReader::new()
1596            .with_page_indexes(true)
1597            .with_prefetch_hint(Some(130649));
1598        loader.try_load(f, len).await.unwrap();
1599        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1600        let metadata = loader.finish().unwrap();
1601        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1602
1603        // Prefetch exactly enough
1604        fetch_count.store(0, Ordering::SeqCst);
1605        let f = MetadataFetchFn(&mut fetch);
1606        let metadata = ParquetMetaDataReader::new()
1607            .with_page_indexes(true)
1608            .with_prefetch_hint(Some(130650))
1609            .load_and_finish(f, len)
1610            .await
1611            .unwrap();
1612        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1613        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1614
1615        // Prefetch more than enough but less than the entire file
1616        fetch_count.store(0, Ordering::SeqCst);
1617        let f = MetadataFetchFn(&mut fetch);
1618        let metadata = ParquetMetaDataReader::new()
1619            .with_page_indexes(true)
1620            .with_prefetch_hint(Some((len - 1000) as usize)) // prefetch entire file
1621            .load_and_finish(f, len)
1622            .await
1623            .unwrap();
1624        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1625        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1626
1627        // Prefetch the entire file
1628        fetch_count.store(0, Ordering::SeqCst);
1629        let f = MetadataFetchFn(&mut fetch);
1630        let metadata = ParquetMetaDataReader::new()
1631            .with_page_indexes(true)
1632            .with_prefetch_hint(Some(len as usize)) // prefetch entire file
1633            .load_and_finish(f, len)
1634            .await
1635            .unwrap();
1636        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1637        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1638
1639        // Prefetch more than the entire file
1640        fetch_count.store(0, Ordering::SeqCst);
1641        let f = MetadataFetchFn(&mut fetch);
1642        let metadata = ParquetMetaDataReader::new()
1643            .with_page_indexes(true)
1644            .with_prefetch_hint(Some((len + 1000) as usize)) // prefetch entire file
1645            .load_and_finish(f, len)
1646            .await
1647            .unwrap();
1648        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1649        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1650    }
1651}