parquet/file/metadata/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{io::Read, ops::Range, sync::Arc};
19
20use crate::basic::ColumnOrder;
21#[cfg(feature = "encryption")]
22use crate::encryption::{
23    decrypt::{FileDecryptionProperties, FileDecryptor},
24    modules::create_footer_aad,
25};
26use bytes::Bytes;
27
28use crate::errors::{ParquetError, Result};
29use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData};
30use crate::file::page_index::index::Index;
31use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
32use crate::file::reader::ChunkReader;
33use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER};
34use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
35#[cfg(feature = "encryption")]
36use crate::format::{EncryptionAlgorithm, FileCryptoMetaData as TFileCryptoMetaData};
37use crate::schema::types;
38use crate::schema::types::SchemaDescriptor;
39use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
40
41#[cfg(all(feature = "async", feature = "arrow"))]
42use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
43#[cfg(feature = "encryption")]
44use crate::encryption::decrypt::CryptoContext;
45use crate::file::page_index::offset_index::OffsetIndexMetaData;
46
47/// Reads the [`ParquetMetaData`] from a byte stream.
48///
49/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for a description of
50/// the Parquet metadata.
51///
52/// Parquet metadata is not necessarily contiguous in the files: part is stored
53/// in the footer (the last bytes of the file), but other portions (such as the
54/// PageIndex) can be stored elsewhere.
55///
56/// This reader handles reading the footer as well as the non contiguous parts
57/// of the metadata such as the page indexes; excluding Bloom Filters.
58///
59/// # Example
60/// ```no_run
61/// # use parquet::file::metadata::ParquetMetaDataReader;
62/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
63/// // read parquet metadata including page indexes from a file
64/// let file = open_parquet_file("some_path.parquet");
65/// let mut reader = ParquetMetaDataReader::new()
66///     .with_page_indexes(true);
67/// reader.try_parse(&file).unwrap();
68/// let metadata = reader.finish().unwrap();
69/// assert!(metadata.column_index().is_some());
70/// assert!(metadata.offset_index().is_some());
71/// ```
72#[derive(Default)]
73pub struct ParquetMetaDataReader {
74    metadata: Option<ParquetMetaData>,
75    column_index: bool,
76    offset_index: bool,
77    prefetch_hint: Option<usize>,
78    // Size of the serialized thrift metadata plus the 8 byte footer. Only set if
79    // `self.parse_metadata` is called.
80    metadata_size: Option<usize>,
81    #[cfg(feature = "encryption")]
82    file_decryption_properties: Option<FileDecryptionProperties>,
83}
84
85/// Describes how the footer metadata is stored
86///
87/// This is parsed from the last 8 bytes of the Parquet file
88pub struct FooterTail {
89    metadata_length: usize,
90    encrypted_footer: bool,
91}
92
93impl FooterTail {
94    /// The length of the footer metadata in bytes
95    pub fn metadata_length(&self) -> usize {
96        self.metadata_length
97    }
98
99    /// Whether the footer metadata is encrypted
100    pub fn is_encrypted_footer(&self) -> bool {
101        self.encrypted_footer
102    }
103}
104
105impl ParquetMetaDataReader {
106    /// Create a new [`ParquetMetaDataReader`]
107    pub fn new() -> Self {
108        Default::default()
109    }
110
111    /// Create a new [`ParquetMetaDataReader`] populated with a [`ParquetMetaData`] struct
112    /// obtained via other means.
113    pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
114        Self {
115            metadata: Some(metadata),
116            ..Default::default()
117        }
118    }
119
120    /// Enable or disable reading the page index structures described in
121    /// "[Parquet page index]: Layout to Support Page Skipping". Equivalent to:
122    /// `self.with_column_indexes(val).with_offset_indexes(val)`
123    ///
124    /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
125    pub fn with_page_indexes(self, val: bool) -> Self {
126        self.with_column_indexes(val).with_offset_indexes(val)
127    }
128
129    /// Enable or disable reading the Parquet [ColumnIndex] structure.
130    ///
131    /// [ColumnIndex]:  https://github.com/apache/parquet-format/blob/master/PageIndex.md
132    pub fn with_column_indexes(mut self, val: bool) -> Self {
133        self.column_index = val;
134        self
135    }
136
137    /// Enable or disable reading the Parquet [OffsetIndex] structure.
138    ///
139    /// [OffsetIndex]:  https://github.com/apache/parquet-format/blob/master/PageIndex.md
140    pub fn with_offset_indexes(mut self, val: bool) -> Self {
141        self.offset_index = val;
142        self
143    }
144
145    /// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`].
146    /// Only used for the asynchronous [`Self::try_load()`] method.
147    ///
148    /// By default, the reader will first fetch the last 8 bytes of the input file to obtain the
149    /// size of the footer metadata. A second fetch will be performed to obtain the needed bytes.
150    /// After parsing the footer metadata, a third fetch will be performed to obtain the bytes
151    /// needed to decode the page index structures, if they have been requested. To avoid
152    /// unnecessary fetches, `prefetch` can be set to an estimate of the number of bytes needed
153    /// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and
154    /// reduce latency. Setting `prefetch` too small will not trigger an error, but will result
155    /// in extra fetches being performed.
156    pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
157        self.prefetch_hint = prefetch;
158        self
159    }
160
161    /// Provide the FileDecryptionProperties to use when decrypting the file.
162    ///
163    /// This is only necessary when the file is encrypted.
164    #[cfg(feature = "encryption")]
165    pub fn with_decryption_properties(
166        mut self,
167        properties: Option<&FileDecryptionProperties>,
168    ) -> Self {
169        self.file_decryption_properties = properties.cloned();
170        self
171    }
172
173    /// Indicates whether this reader has a [`ParquetMetaData`] internally.
174    pub fn has_metadata(&self) -> bool {
175        self.metadata.is_some()
176    }
177
178    /// Return the parsed [`ParquetMetaData`] struct, leaving `None` in its place.
179    pub fn finish(&mut self) -> Result<ParquetMetaData> {
180        self.metadata
181            .take()
182            .ok_or_else(|| general_err!("could not parse parquet metadata"))
183    }
184
185    /// Given a [`ChunkReader`], parse and return the [`ParquetMetaData`] in a single pass.
186    ///
187    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
188    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
189    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
190    ///
191    /// This call will consume `self`.
192    ///
193    /// # Example
194    /// ```no_run
195    /// # use parquet::file::metadata::ParquetMetaDataReader;
196    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
197    /// // read parquet metadata including page indexes
198    /// let file = open_parquet_file("some_path.parquet");
199    /// let metadata = ParquetMetaDataReader::new()
200    ///     .with_page_indexes(true)
201    ///     .parse_and_finish(&file).unwrap();
202    /// ```
203    pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
204        self.try_parse(reader)?;
205        self.finish()
206    }
207
208    /// Attempts to parse the footer metadata (and optionally page indexes) given a [`ChunkReader`].
209    ///
210    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
211    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
212    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
213    pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
214        self.try_parse_sized(reader, reader.len())
215    }
216
217    /// Same as [`Self::try_parse()`], but provide the original file size in the case that `reader`
218    /// is a [`Bytes`] struct that does not contain the entire file. This information is necessary
219    /// when the page indexes are desired. `reader` must have access to the Parquet footer.
220    ///
221    /// Using this function also allows for retrying with a larger buffer.
222    ///
223    /// # Errors
224    ///
225    /// This function will return [`ParquetError::NeedMoreData`] in the event `reader` does not
226    /// provide enough data to fully parse the metadata (see example below). The returned error
227    /// will be populated with a `usize` field indicating the number of bytes required from the
228    /// tail of the file to completely parse the requested metadata.
229    ///
230    /// Other errors returned include [`ParquetError::General`] and [`ParquetError::EOF`].
231    ///
232    /// # Example
233    /// ```no_run
234    /// # use parquet::file::metadata::ParquetMetaDataReader;
235    /// # use parquet::errors::ParquetError;
236    /// # use crate::parquet::file::reader::Length;
237    /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<u64>) -> bytes::Bytes { unimplemented!(); }
238    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
239    /// let file = open_parquet_file("some_path.parquet");
240    /// let len = file.len();
241    /// // Speculatively read 1 kilobyte from the end of the file
242    /// let bytes = get_bytes(&file, len - 1024..len);
243    /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
244    /// match reader.try_parse_sized(&bytes, len) {
245    ///     Ok(_) => (),
246    ///     Err(ParquetError::NeedMoreData(needed)) => {
247    ///         // Read the needed number of bytes from the end of the file
248    ///         let bytes = get_bytes(&file, len - needed as u64..len);
249    ///         reader.try_parse_sized(&bytes, len).unwrap();
250    ///     }
251    ///     _ => panic!("unexpected error")
252    /// }
253    /// let metadata = reader.finish().unwrap();
254    /// ```
255    ///
256    /// Note that it is possible for the file metadata to be completely read, but there are
257    /// insufficient bytes available to read the page indexes. [`Self::has_metadata()`] can be used
258    /// to test for this. In the event the file metadata is present, re-parsing of the file
259    /// metadata can be skipped by using [`Self::read_page_indexes_sized()`], as shown below.
260    /// ```no_run
261    /// # use parquet::file::metadata::ParquetMetaDataReader;
262    /// # use parquet::errors::ParquetError;
263    /// # use crate::parquet::file::reader::Length;
264    /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<u64>) -> bytes::Bytes { unimplemented!(); }
265    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
266    /// let file = open_parquet_file("some_path.parquet");
267    /// let len = file.len();
268    /// // Speculatively read 1 kilobyte from the end of the file
269    /// let mut bytes = get_bytes(&file, len - 1024..len);
270    /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
271    /// // Loop until `bytes` is large enough
272    /// loop {
273    ///     match reader.try_parse_sized(&bytes, len) {
274    ///         Ok(_) => break,
275    ///         Err(ParquetError::NeedMoreData(needed)) => {
276    ///             // Read the needed number of bytes from the end of the file
277    ///             bytes = get_bytes(&file, len - needed as u64..len);
278    ///             // If file metadata was read only read page indexes, otherwise continue loop
279    ///             if reader.has_metadata() {
280    ///                 reader.read_page_indexes_sized(&bytes, len);
281    ///                 break;
282    ///             }
283    ///         }
284    ///         _ => panic!("unexpected error")
285    ///     }
286    /// }
287    /// let metadata = reader.finish().unwrap();
288    /// ```
289    pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: u64) -> Result<()> {
290        self.metadata = match self.parse_metadata(reader) {
291            Ok(metadata) => Some(metadata),
292            Err(ParquetError::NeedMoreData(needed)) => {
293                // If reader is the same length as `file_size` then presumably there is no more to
294                // read, so return an EOF error.
295                if file_size == reader.len() || needed as u64 > file_size {
296                    return Err(eof_err!(
297                        "Parquet file too small. Size is {} but need {}",
298                        file_size,
299                        needed
300                    ));
301                } else {
302                    // Ask for a larger buffer
303                    return Err(ParquetError::NeedMoreData(needed));
304                }
305            }
306            Err(e) => return Err(e),
307        };
308
309        // we can return if page indexes aren't requested
310        if !self.column_index && !self.offset_index {
311            return Ok(());
312        }
313
314        self.read_page_indexes_sized(reader, file_size)
315    }
316
317    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
318    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`].
319    pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
320        self.read_page_indexes_sized(reader, reader.len())
321    }
322
323    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
324    /// This variant is used when `reader` cannot access the entire Parquet file (e.g. it is
325    /// a [`Bytes`] struct containing the tail of the file).
326    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`]. Like
327    /// [`Self::try_parse_sized()`] this function may return [`ParquetError::NeedMoreData`].
328    pub fn read_page_indexes_sized<R: ChunkReader>(
329        &mut self,
330        reader: &R,
331        file_size: u64,
332    ) -> Result<()> {
333        if self.metadata.is_none() {
334            return Err(general_err!(
335                "Tried to read page indexes without ParquetMetaData metadata"
336            ));
337        }
338
339        // FIXME: there are differing implementations in the case where page indexes are missing
340        // from the file. `MetadataLoader` will leave them as `None`, while the parser in
341        // `index_reader::read_columns_indexes` returns a vector of empty vectors.
342        // It is best for this function to replicate the latter behavior for now, but in a future
343        // breaking release, the two paths to retrieve metadata should be made consistent. Note that this is only
344        // an issue if the user requested page indexes, so there is no need to provide empty
345        // vectors in `try_parse_sized()`.
346        // https://github.com/apache/arrow-rs/issues/6447
347
348        // Get bounds needed for page indexes (if any are present in the file).
349        let Some(range) = self.range_for_page_index() else {
350            return Ok(());
351        };
352
353        // Check to see if needed range is within `file_range`. Checking `range.end` seems
354        // redundant, but it guards against `range_for_page_index()` returning garbage.
355        let file_range = file_size.saturating_sub(reader.len())..file_size;
356        if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
357            // Requested range starts beyond EOF
358            if range.end > file_size {
359                return Err(eof_err!(
360                    "Parquet file too small. Range {:?} is beyond file bounds {file_size}",
361                    range
362                ));
363            } else {
364                // Ask for a larger buffer
365                return Err(ParquetError::NeedMoreData(
366                    (file_size - range.start).try_into()?,
367                ));
368            }
369        }
370
371        // Perform extra sanity check to make sure `range` and the footer metadata don't
372        // overlap.
373        if let Some(metadata_size) = self.metadata_size {
374            let metadata_range = file_size.saturating_sub(metadata_size as u64)..file_size;
375            if range.end > metadata_range.start {
376                return Err(eof_err!(
377                    "Parquet file too small. Page index range {:?} overlaps with file metadata {:?}",
378                    range,
379                    metadata_range
380                ));
381            }
382        }
383
384        let bytes_needed = usize::try_from(range.end - range.start)?;
385        let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?;
386        let offset = range.start;
387
388        self.parse_column_index(&bytes, offset)?;
389        self.parse_offset_index(&bytes, offset)?;
390
391        Ok(())
392    }
393
394    /// Given a [`MetadataFetch`], parse and return the [`ParquetMetaData`] in a single pass.
395    ///
396    /// This call will consume `self`.
397    ///
398    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
399    /// performed by this function.
400    #[cfg(all(feature = "async", feature = "arrow"))]
401    pub async fn load_and_finish<F: MetadataFetch>(
402        mut self,
403        fetch: F,
404        file_size: u64,
405    ) -> Result<ParquetMetaData> {
406        self.try_load(fetch, file_size).await?;
407        self.finish()
408    }
409
410    /// Given a [`MetadataSuffixFetch`], parse and return the [`ParquetMetaData`] in a single pass.
411    ///
412    /// This call will consume `self`.
413    ///
414    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
415    /// performed by this function.
416    #[cfg(all(feature = "async", feature = "arrow"))]
417    pub async fn load_via_suffix_and_finish<F: MetadataSuffixFetch>(
418        mut self,
419        fetch: F,
420    ) -> Result<ParquetMetaData> {
421        self.try_load_via_suffix(fetch).await?;
422        self.finish()
423    }
424    /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
425    /// given a [`MetadataFetch`].
426    ///
427    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
428    /// performed by this function.
429    #[cfg(all(feature = "async", feature = "arrow"))]
430    pub async fn try_load<F: MetadataFetch>(&mut self, mut fetch: F, file_size: u64) -> Result<()> {
431        let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
432
433        self.metadata = Some(metadata);
434
435        // we can return if page indexes aren't requested
436        if !self.column_index && !self.offset_index {
437            return Ok(());
438        }
439
440        self.load_page_index_with_remainder(fetch, remainder).await
441    }
442
443    /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
444    /// given a [`MetadataSuffixFetch`].
445    ///
446    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
447    /// performed by this function.
448    #[cfg(all(feature = "async", feature = "arrow"))]
449    pub async fn try_load_via_suffix<F: MetadataSuffixFetch>(
450        &mut self,
451        mut fetch: F,
452    ) -> Result<()> {
453        let (metadata, remainder) = self.load_metadata_via_suffix(&mut fetch).await?;
454
455        self.metadata = Some(metadata);
456
457        // we can return if page indexes aren't requested
458        if !self.column_index && !self.offset_index {
459            return Ok(());
460        }
461
462        self.load_page_index_with_remainder(fetch, remainder).await
463    }
464
465    /// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
466    /// been obtained. See [`Self::new_with_metadata()`].
467    #[cfg(all(feature = "async", feature = "arrow"))]
468    pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
469        self.load_page_index_with_remainder(fetch, None).await
470    }
471
472    #[cfg(all(feature = "async", feature = "arrow"))]
473    async fn load_page_index_with_remainder<F: MetadataFetch>(
474        &mut self,
475        mut fetch: F,
476        remainder: Option<(usize, Bytes)>,
477    ) -> Result<()> {
478        if self.metadata.is_none() {
479            return Err(general_err!("Footer metadata is not present"));
480        }
481
482        // Get bounds needed for page indexes (if any are present in the file).
483        let range = self.range_for_page_index();
484        let range = match range {
485            Some(range) => range,
486            None => return Ok(()),
487        };
488
489        let bytes = match &remainder {
490            Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => {
491                let remainder_start = *remainder_start as u64;
492                let offset = usize::try_from(range.start - remainder_start)?;
493                let end = usize::try_from(range.end - remainder_start)?;
494                assert!(end <= remainder.len());
495                remainder.slice(offset..end)
496            }
497            // Note: this will potentially fetch data already in remainder, this keeps things simple
498            _ => fetch.fetch(range.start..range.end).await?,
499        };
500
501        // Sanity check
502        assert_eq!(bytes.len() as u64, range.end - range.start);
503
504        self.parse_column_index(&bytes, range.start)?;
505        self.parse_offset_index(&bytes, range.start)?;
506
507        Ok(())
508    }
509
510    fn parse_column_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> {
511        let metadata = self.metadata.as_mut().unwrap();
512        if self.column_index {
513            let index = metadata
514                .row_groups()
515                .iter()
516                .enumerate()
517                .map(|(rg_idx, x)| {
518                    x.columns()
519                        .iter()
520                        .enumerate()
521                        .map(|(col_idx, c)| match c.column_index_range() {
522                            Some(r) => {
523                                let r_start = usize::try_from(r.start - start_offset)?;
524                                let r_end = usize::try_from(r.end - start_offset)?;
525                                Self::parse_single_column_index(
526                                    &bytes[r_start..r_end],
527                                    metadata,
528                                    c,
529                                    rg_idx,
530                                    col_idx,
531                                )
532                            }
533                            None => Ok(Index::NONE),
534                        })
535                        .collect::<Result<Vec<_>>>()
536                })
537                .collect::<Result<Vec<_>>>()?;
538            metadata.set_column_index(Some(index));
539        }
540        Ok(())
541    }
542
543    #[cfg(feature = "encryption")]
544    fn parse_single_column_index(
545        bytes: &[u8],
546        metadata: &ParquetMetaData,
547        column: &ColumnChunkMetaData,
548        row_group_index: usize,
549        col_index: usize,
550    ) -> Result<Index> {
551        match &column.column_crypto_metadata {
552            Some(crypto_metadata) => {
553                let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
554                    general_err!("Cannot decrypt column index, no file decryptor set")
555                })?;
556                let crypto_context = CryptoContext::for_column(
557                    file_decryptor,
558                    crypto_metadata,
559                    row_group_index,
560                    col_index,
561                )?;
562                let column_decryptor = crypto_context.metadata_decryptor();
563                let aad = crypto_context.create_column_index_aad()?;
564                let plaintext = column_decryptor.decrypt(bytes, &aad)?;
565                decode_column_index(&plaintext, column.column_type())
566            }
567            None => decode_column_index(bytes, column.column_type()),
568        }
569    }
570
571    #[cfg(not(feature = "encryption"))]
572    fn parse_single_column_index(
573        bytes: &[u8],
574        _metadata: &ParquetMetaData,
575        column: &ColumnChunkMetaData,
576        _row_group_index: usize,
577        _col_index: usize,
578    ) -> Result<Index> {
579        decode_column_index(bytes, column.column_type())
580    }
581
582    fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> {
583        let metadata = self.metadata.as_mut().unwrap();
584        if self.offset_index {
585            let index = metadata
586                .row_groups()
587                .iter()
588                .enumerate()
589                .map(|(rg_idx, x)| {
590                    x.columns()
591                        .iter()
592                        .enumerate()
593                        .map(|(col_idx, c)| match c.offset_index_range() {
594                            Some(r) => {
595                                let r_start = usize::try_from(r.start - start_offset)?;
596                                let r_end = usize::try_from(r.end - start_offset)?;
597                                Self::parse_single_offset_index(
598                                    &bytes[r_start..r_end],
599                                    metadata,
600                                    c,
601                                    rg_idx,
602                                    col_idx,
603                                )
604                            }
605                            None => Err(general_err!("missing offset index")),
606                        })
607                        .collect::<Result<Vec<_>>>()
608                })
609                .collect::<Result<Vec<_>>>()?;
610
611            metadata.set_offset_index(Some(index));
612        }
613        Ok(())
614    }
615
616    #[cfg(feature = "encryption")]
617    fn parse_single_offset_index(
618        bytes: &[u8],
619        metadata: &ParquetMetaData,
620        column: &ColumnChunkMetaData,
621        row_group_index: usize,
622        col_index: usize,
623    ) -> Result<OffsetIndexMetaData> {
624        match &column.column_crypto_metadata {
625            Some(crypto_metadata) => {
626                let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
627                    general_err!("Cannot decrypt offset index, no file decryptor set")
628                })?;
629                let crypto_context = CryptoContext::for_column(
630                    file_decryptor,
631                    crypto_metadata,
632                    row_group_index,
633                    col_index,
634                )?;
635                let column_decryptor = crypto_context.metadata_decryptor();
636                let aad = crypto_context.create_offset_index_aad()?;
637                let plaintext = column_decryptor.decrypt(bytes, &aad)?;
638                decode_offset_index(&plaintext)
639            }
640            None => decode_offset_index(bytes),
641        }
642    }
643
644    #[cfg(not(feature = "encryption"))]
645    fn parse_single_offset_index(
646        bytes: &[u8],
647        _metadata: &ParquetMetaData,
648        _column: &ColumnChunkMetaData,
649        _row_group_index: usize,
650        _col_index: usize,
651    ) -> Result<OffsetIndexMetaData> {
652        decode_offset_index(bytes)
653    }
654
655    fn range_for_page_index(&self) -> Option<Range<u64>> {
656        // sanity check
657        self.metadata.as_ref()?;
658
659        // Get bounds needed for page indexes (if any are present in the file).
660        let mut range = None;
661        let metadata = self.metadata.as_ref().unwrap();
662        for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
663            if self.column_index {
664                range = acc_range(range, c.column_index_range());
665            }
666            if self.offset_index {
667                range = acc_range(range, c.offset_index_range());
668            }
669        }
670        range
671    }
672
673    // One-shot parse of footer.
674    // Side effect: this will set `self.metadata_size`
675    fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
676        // check file is large enough to hold footer
677        let file_size = chunk_reader.len();
678        if file_size < (FOOTER_SIZE as u64) {
679            return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
680        }
681
682        let mut footer = [0_u8; 8];
683        chunk_reader
684            .get_read(file_size - 8)?
685            .read_exact(&mut footer)?;
686
687        let footer = Self::decode_footer_tail(&footer)?;
688        let metadata_len = footer.metadata_length();
689        let footer_metadata_len = FOOTER_SIZE + metadata_len;
690        self.metadata_size = Some(footer_metadata_len);
691
692        if footer_metadata_len as u64 > file_size {
693            return Err(ParquetError::NeedMoreData(footer_metadata_len));
694        }
695
696        let start = file_size - footer_metadata_len as u64;
697        self.decode_footer_metadata(
698            chunk_reader.get_bytes(start, metadata_len)?.as_ref(),
699            &footer,
700        )
701    }
702
703    /// Return the number of bytes to read in the initial pass. If `prefetch_size` has
704    /// been provided, then return that value if it is larger than the size of the Parquet
705    /// file footer (8 bytes). Otherwise returns `8`.
706    #[cfg(all(feature = "async", feature = "arrow"))]
707    fn get_prefetch_size(&self) -> usize {
708        if let Some(prefetch) = self.prefetch_hint {
709            if prefetch > FOOTER_SIZE {
710                return prefetch;
711            }
712        }
713        FOOTER_SIZE
714    }
715
716    #[cfg(all(feature = "async", feature = "arrow"))]
717    async fn load_metadata<F: MetadataFetch>(
718        &self,
719        fetch: &mut F,
720        file_size: u64,
721    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
722        let prefetch = self.get_prefetch_size() as u64;
723
724        if file_size < FOOTER_SIZE as u64 {
725            return Err(eof_err!("file size of {} is less than footer", file_size));
726        }
727
728        // If a size hint is provided, read more than the minimum size
729        // to try and avoid a second fetch.
730        // Note: prefetch > file_size is ok since we're using saturating_sub.
731        let footer_start = file_size.saturating_sub(prefetch);
732
733        let suffix = fetch.fetch(footer_start..file_size).await?;
734        let suffix_len = suffix.len();
735        let fetch_len = (file_size - footer_start)
736            .try_into()
737            .expect("footer size should never be larger than u32");
738        if suffix_len < fetch_len {
739            return Err(eof_err!(
740                "metadata requires {} bytes, but could only read {}",
741                fetch_len,
742                suffix_len
743            ));
744        }
745
746        let mut footer = [0; FOOTER_SIZE];
747        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
748
749        let footer = Self::decode_footer_tail(&footer)?;
750        let length = footer.metadata_length();
751
752        if file_size < (length + FOOTER_SIZE) as u64 {
753            return Err(eof_err!(
754                "file size of {} is less than footer + metadata {}",
755                file_size,
756                length + FOOTER_SIZE
757            ));
758        }
759
760        // Did not fetch the entire file metadata in the initial read, need to make a second request
761        if length > suffix_len - FOOTER_SIZE {
762            let metadata_start = file_size - (length + FOOTER_SIZE) as u64;
763            let meta = fetch
764                .fetch(metadata_start..(file_size - FOOTER_SIZE as u64))
765                .await?;
766            Ok((self.decode_footer_metadata(&meta, &footer)?, None))
767        } else {
768            let metadata_start = (file_size - (length + FOOTER_SIZE) as u64 - footer_start)
769                .try_into()
770                .expect("metadata length should never be larger than u32");
771            let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
772            Ok((
773                self.decode_footer_metadata(slice, &footer)?,
774                Some((footer_start as usize, suffix.slice(..metadata_start))),
775            ))
776        }
777    }
778
779    #[cfg(all(feature = "async", feature = "arrow"))]
780    async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
781        &self,
782        fetch: &mut F,
783    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
784        let prefetch = self.get_prefetch_size();
785
786        let suffix = fetch.fetch_suffix(prefetch as _).await?;
787        let suffix_len = suffix.len();
788
789        if suffix_len < FOOTER_SIZE {
790            return Err(eof_err!(
791                "footer metadata requires {} bytes, but could only read {}",
792                FOOTER_SIZE,
793                suffix_len
794            ));
795        }
796
797        let mut footer = [0; FOOTER_SIZE];
798        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
799
800        let footer = Self::decode_footer_tail(&footer)?;
801        let length = footer.metadata_length();
802
803        // Did not fetch the entire file metadata in the initial read, need to make a second request
804        let metadata_offset = length + FOOTER_SIZE;
805        if length > suffix_len - FOOTER_SIZE {
806            let meta = fetch.fetch_suffix(metadata_offset).await?;
807
808            if meta.len() < metadata_offset {
809                return Err(eof_err!(
810                    "metadata requires {} bytes, but could only read {}",
811                    metadata_offset,
812                    meta.len()
813                ));
814            }
815
816            Ok((
817                // need to slice off the footer or decryption fails
818                self.decode_footer_metadata(&meta.slice(0..length), &footer)?,
819                None,
820            ))
821        } else {
822            let metadata_start = suffix_len - metadata_offset;
823            let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
824            Ok((
825                self.decode_footer_metadata(slice, &footer)?,
826                Some((0, suffix.slice(..metadata_start))),
827            ))
828        }
829    }
830
831    /// Decodes the end of the Parquet footer
832    ///
833    /// There are 8 bytes at the end of the Parquet footer with the following layout:
834    /// * 4 bytes for the metadata length
835    /// * 4 bytes for the magic bytes 'PAR1' or 'PARE' (encrypted footer)
836    ///
837    /// ```text
838    /// +-----+------------------+
839    /// | len | 'PAR1' or 'PARE' |
840    /// +-----+------------------+
841    /// ```
842    pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result<FooterTail> {
843        let magic = &slice[4..];
844        let encrypted_footer = if magic == PARQUET_MAGIC_ENCR_FOOTER {
845            true
846        } else if magic == PARQUET_MAGIC {
847            false
848        } else {
849            return Err(general_err!("Invalid Parquet file. Corrupt footer"));
850        };
851        // get the metadata length from the footer
852        let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap());
853        Ok(FooterTail {
854            // u32 won't be larger than usize in most cases
855            metadata_length: metadata_len as usize,
856            encrypted_footer,
857        })
858    }
859
860    /// Decodes the Parquet footer, returning the metadata length in bytes
861    #[deprecated(note = "use decode_footer_tail instead")]
862    pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
863        Self::decode_footer_tail(slice).map(|f| f.metadata_length)
864    }
865
866    /// Decodes [`ParquetMetaData`] from the provided bytes.
867    ///
868    /// Typically, this is used to decode the metadata from the end of a parquet
869    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
870    /// by the [Parquet Spec].
871    ///
872    /// This method handles using either `decode_metadata` or
873    /// `decode_metadata_with_encryption` depending on whether the encryption
874    /// feature is enabled.
875    ///
876    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
877    pub(crate) fn decode_footer_metadata(
878        &self,
879        buf: &[u8],
880        footer_tail: &FooterTail,
881    ) -> Result<ParquetMetaData> {
882        #[cfg(feature = "encryption")]
883        let result = Self::decode_metadata_with_encryption(
884            buf,
885            footer_tail.is_encrypted_footer(),
886            self.file_decryption_properties.as_ref(),
887        );
888        #[cfg(not(feature = "encryption"))]
889        let result = {
890            if footer_tail.is_encrypted_footer() {
891                Err(general_err!(
892                    "Parquet file has an encrypted footer but the encryption feature is disabled"
893                ))
894            } else {
895                Self::decode_metadata(buf)
896            }
897        };
898        result
899    }
900
901    /// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata that may be encrypted.
902    ///
903    /// Typically this is used to decode the metadata from the end of a parquet
904    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
905    /// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR
906    /// ciphers as specfied in the [Parquet Encryption Spec].
907    ///
908    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
909    /// [Parquet Encryption Spec]: https://parquet.apache.org/docs/file-format/data-pages/encryption/
910    #[cfg(feature = "encryption")]
911    fn decode_metadata_with_encryption(
912        buf: &[u8],
913        encrypted_footer: bool,
914        file_decryption_properties: Option<&FileDecryptionProperties>,
915    ) -> Result<ParquetMetaData> {
916        let mut prot = TCompactSliceInputProtocol::new(buf);
917        let mut file_decryptor = None;
918        let decrypted_fmd_buf;
919
920        if encrypted_footer {
921            if let Some(file_decryption_properties) = file_decryption_properties {
922                let t_file_crypto_metadata: TFileCryptoMetaData =
923                    TFileCryptoMetaData::read_from_in_protocol(&mut prot)
924                        .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
925                let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm {
926                    EncryptionAlgorithm::AESGCMV1(algo) => algo.supply_aad_prefix,
927                    _ => Some(false),
928                }
929                .unwrap_or(false);
930                if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() {
931                    return Err(general_err!(
932                        "Parquet file was encrypted with an AAD prefix that is not stored in the file, \
933                        but no AAD prefix was provided in the file decryption properties"
934                    ));
935                }
936                let decryptor = get_file_decryptor(
937                    t_file_crypto_metadata.encryption_algorithm,
938                    t_file_crypto_metadata.key_metadata.as_deref(),
939                    file_decryption_properties,
940                )?;
941                let footer_decryptor = decryptor.get_footer_decryptor();
942                let aad_footer = create_footer_aad(decryptor.file_aad())?;
943
944                decrypted_fmd_buf = footer_decryptor?
945                    .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
946                    .map_err(|_| {
947                        general_err!(
948                            "Provided footer key and AAD were unable to decrypt parquet footer"
949                        )
950                    })?;
951                prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
952
953                file_decryptor = Some(decryptor);
954            } else {
955                return Err(general_err!("Parquet file has an encrypted footer but decryption properties were not provided"));
956            }
957        }
958
959        let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
960            .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
961        let schema = types::from_thrift(&t_file_metadata.schema)?;
962        let schema_descr = Arc::new(SchemaDescriptor::new(schema));
963
964        if let (Some(algo), Some(file_decryption_properties)) = (
965            t_file_metadata.encryption_algorithm,
966            file_decryption_properties,
967        ) {
968            // File has a plaintext footer but encryption algorithm is set
969            let file_decryptor_value = get_file_decryptor(
970                algo,
971                t_file_metadata.footer_signing_key_metadata.as_deref(),
972                file_decryption_properties,
973            )?;
974            if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer {
975                file_decryptor_value.verify_plaintext_footer_signature(buf)?;
976            }
977            file_decryptor = Some(file_decryptor_value);
978        }
979
980        let mut row_groups = Vec::new();
981        for rg in t_file_metadata.row_groups {
982            let r = RowGroupMetaData::from_encrypted_thrift(
983                schema_descr.clone(),
984                rg,
985                file_decryptor.as_ref(),
986            )?;
987            row_groups.push(r);
988        }
989        let column_orders =
990            Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
991
992        let file_metadata = FileMetaData::new(
993            t_file_metadata.version,
994            t_file_metadata.num_rows,
995            t_file_metadata.created_by,
996            t_file_metadata.key_value_metadata,
997            schema_descr,
998            column_orders,
999        );
1000        let mut metadata = ParquetMetaData::new(file_metadata, row_groups);
1001
1002        metadata.with_file_decryptor(file_decryptor);
1003
1004        Ok(metadata)
1005    }
1006
1007    /// Decodes [`ParquetMetaData`] from the provided bytes.
1008    ///
1009    /// Typically this is used to decode the metadata from the end of a parquet
1010    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
1011    /// by the [Parquet Spec].
1012    ///
1013    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
1014    pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
1015        let mut prot = TCompactSliceInputProtocol::new(buf);
1016
1017        let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
1018            .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
1019        let schema = types::from_thrift(&t_file_metadata.schema)?;
1020        let schema_descr = Arc::new(SchemaDescriptor::new(schema));
1021
1022        let mut row_groups = Vec::new();
1023        for rg in t_file_metadata.row_groups {
1024            row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
1025        }
1026        let column_orders =
1027            Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
1028
1029        let file_metadata = FileMetaData::new(
1030            t_file_metadata.version,
1031            t_file_metadata.num_rows,
1032            t_file_metadata.created_by,
1033            t_file_metadata.key_value_metadata,
1034            schema_descr,
1035            column_orders,
1036        );
1037
1038        Ok(ParquetMetaData::new(file_metadata, row_groups))
1039    }
1040
1041    /// Parses column orders from Thrift definition.
1042    /// If no column orders are defined, returns `None`.
1043    fn parse_column_orders(
1044        t_column_orders: Option<Vec<TColumnOrder>>,
1045        schema_descr: &SchemaDescriptor,
1046    ) -> Result<Option<Vec<ColumnOrder>>> {
1047        match t_column_orders {
1048            Some(orders) => {
1049                // Should always be the case
1050                if orders.len() != schema_descr.num_columns() {
1051                    return Err(general_err!("Column order length mismatch"));
1052                };
1053                let mut res = Vec::new();
1054                for (i, column) in schema_descr.columns().iter().enumerate() {
1055                    match orders[i] {
1056                        TColumnOrder::TYPEORDER(_) => {
1057                            let sort_order = ColumnOrder::get_sort_order(
1058                                column.logical_type(),
1059                                column.converted_type(),
1060                                column.physical_type(),
1061                            );
1062                            res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
1063                        }
1064                    }
1065                }
1066                Ok(Some(res))
1067            }
1068            None => Ok(None),
1069        }
1070    }
1071}
1072
1073#[cfg(feature = "encryption")]
1074fn get_file_decryptor(
1075    encryption_algorithm: EncryptionAlgorithm,
1076    footer_key_metadata: Option<&[u8]>,
1077    file_decryption_properties: &FileDecryptionProperties,
1078) -> Result<FileDecryptor> {
1079    match encryption_algorithm {
1080        EncryptionAlgorithm::AESGCMV1(algo) => {
1081            let aad_file_unique = algo
1082                .aad_file_unique
1083                .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?;
1084            let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() {
1085                aad_prefix.clone()
1086            } else {
1087                algo.aad_prefix.unwrap_or_default()
1088            };
1089
1090            FileDecryptor::new(
1091                file_decryption_properties,
1092                footer_key_metadata,
1093                aad_file_unique,
1094                aad_prefix,
1095            )
1096        }
1097        EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!(
1098            "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
1099        )),
1100    }
1101}
1102
1103#[cfg(test)]
1104mod tests {
1105    use super::*;
1106    use bytes::Bytes;
1107
1108    use crate::basic::SortOrder;
1109    use crate::basic::Type;
1110    use crate::file::reader::Length;
1111    use crate::format::TypeDefinedOrder;
1112    use crate::schema::types::Type as SchemaType;
1113    use crate::util::test_common::file_util::get_test_file;
1114
1115    #[test]
1116    fn test_parse_metadata_size_smaller_than_footer() {
1117        let test_file = tempfile::tempfile().unwrap();
1118        let err = ParquetMetaDataReader::new()
1119            .parse_metadata(&test_file)
1120            .unwrap_err();
1121        assert!(matches!(err, ParquetError::NeedMoreData(8)));
1122    }
1123
1124    #[test]
1125    fn test_parse_metadata_corrupt_footer() {
1126        let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
1127        let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
1128        assert_eq!(
1129            reader_result.unwrap_err().to_string(),
1130            "Parquet error: Invalid Parquet file. Corrupt footer"
1131        );
1132    }
1133
1134    #[test]
1135    fn test_parse_metadata_invalid_start() {
1136        let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
1137        let err = ParquetMetaDataReader::new()
1138            .parse_metadata(&test_file)
1139            .unwrap_err();
1140        assert!(matches!(err, ParquetError::NeedMoreData(263)));
1141    }
1142
1143    #[test]
1144    fn test_metadata_column_orders_parse() {
1145        // Define simple schema, we do not need to provide logical types.
1146        let fields = vec![
1147            Arc::new(
1148                SchemaType::primitive_type_builder("col1", Type::INT32)
1149                    .build()
1150                    .unwrap(),
1151            ),
1152            Arc::new(
1153                SchemaType::primitive_type_builder("col2", Type::FLOAT)
1154                    .build()
1155                    .unwrap(),
1156            ),
1157        ];
1158        let schema = SchemaType::group_type_builder("schema")
1159            .with_fields(fields)
1160            .build()
1161            .unwrap();
1162        let schema_descr = SchemaDescriptor::new(Arc::new(schema));
1163
1164        let t_column_orders = Some(vec![
1165            TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
1166            TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
1167        ]);
1168
1169        assert_eq!(
1170            ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr).unwrap(),
1171            Some(vec![
1172                ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1173                ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
1174            ])
1175        );
1176
1177        // Test when no column orders are defined.
1178        assert_eq!(
1179            ParquetMetaDataReader::parse_column_orders(None, &schema_descr).unwrap(),
1180            None
1181        );
1182    }
1183
1184    #[test]
1185    fn test_metadata_column_orders_len_mismatch() {
1186        let schema = SchemaType::group_type_builder("schema").build().unwrap();
1187        let schema_descr = SchemaDescriptor::new(Arc::new(schema));
1188
1189        let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);
1190
1191        let res = ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr);
1192        assert!(res.is_err());
1193        assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch"));
1194    }
1195
1196    #[test]
1197    fn test_try_parse() {
1198        let file = get_test_file("alltypes_tiny_pages.parquet");
1199        let len = file.len();
1200
1201        let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
1202
1203        let bytes_for_range = |range: Range<u64>| {
1204            file.get_bytes(range.start, (range.end - range.start).try_into().unwrap())
1205                .unwrap()
1206        };
1207
1208        // read entire file
1209        let bytes = bytes_for_range(0..len);
1210        reader.try_parse(&bytes).unwrap();
1211        let metadata = reader.finish().unwrap();
1212        assert!(metadata.column_index.is_some());
1213        assert!(metadata.offset_index.is_some());
1214
1215        // read more than enough of file
1216        let bytes = bytes_for_range(320000..len);
1217        reader.try_parse_sized(&bytes, len).unwrap();
1218        let metadata = reader.finish().unwrap();
1219        assert!(metadata.column_index.is_some());
1220        assert!(metadata.offset_index.is_some());
1221
1222        // exactly enough
1223        let bytes = bytes_for_range(323583..len);
1224        reader.try_parse_sized(&bytes, len).unwrap();
1225        let metadata = reader.finish().unwrap();
1226        assert!(metadata.column_index.is_some());
1227        assert!(metadata.offset_index.is_some());
1228
1229        // not enough for page index
1230        let bytes = bytes_for_range(323584..len);
1231        // should fail
1232        match reader.try_parse_sized(&bytes, len).unwrap_err() {
1233            // expected error, try again with provided bounds
1234            ParquetError::NeedMoreData(needed) => {
1235                let bytes = bytes_for_range(len - needed as u64..len);
1236                reader.try_parse_sized(&bytes, len).unwrap();
1237                let metadata = reader.finish().unwrap();
1238                assert!(metadata.column_index.is_some());
1239                assert!(metadata.offset_index.is_some());
1240            }
1241            _ => panic!("unexpected error"),
1242        };
1243
1244        // not enough for file metadata, but keep trying until page indexes are read
1245        let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
1246        let mut bytes = bytes_for_range(452505..len);
1247        loop {
1248            match reader.try_parse_sized(&bytes, len) {
1249                Ok(_) => break,
1250                Err(ParquetError::NeedMoreData(needed)) => {
1251                    bytes = bytes_for_range(len - needed as u64..len);
1252                    if reader.has_metadata() {
1253                        reader.read_page_indexes_sized(&bytes, len).unwrap();
1254                        break;
1255                    }
1256                }
1257                _ => panic!("unexpected error"),
1258            }
1259        }
1260        let metadata = reader.finish().unwrap();
1261        assert!(metadata.column_index.is_some());
1262        assert!(metadata.offset_index.is_some());
1263
1264        // not enough for page index but lie about file size
1265        let bytes = bytes_for_range(323584..len);
1266        let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
1267        assert_eq!(
1268            reader_result.to_string(),
1269            "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
1270        );
1271
1272        // not enough for file metadata
1273        let mut reader = ParquetMetaDataReader::new();
1274        let bytes = bytes_for_range(452505..len);
1275        // should fail
1276        match reader.try_parse_sized(&bytes, len).unwrap_err() {
1277            // expected error, try again with provided bounds
1278            ParquetError::NeedMoreData(needed) => {
1279                let bytes = bytes_for_range(len - needed as u64..len);
1280                reader.try_parse_sized(&bytes, len).unwrap();
1281                reader.finish().unwrap();
1282            }
1283            _ => panic!("unexpected error"),
1284        };
1285
1286        // not enough for file metadata but use try_parse()
1287        let reader_result = reader.try_parse(&bytes).unwrap_err();
1288        assert_eq!(
1289            reader_result.to_string(),
1290            "EOF: Parquet file too small. Size is 1728 but need 1729"
1291        );
1292
1293        // read head of file rather than tail
1294        let bytes = bytes_for_range(0..1000);
1295        let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
1296        assert_eq!(
1297            reader_result.to_string(),
1298            "Parquet error: Invalid Parquet file. Corrupt footer"
1299        );
1300
1301        // lie about file size
1302        let bytes = bytes_for_range(452510..len);
1303        let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
1304        assert_eq!(
1305            reader_result.to_string(),
1306            "EOF: Parquet file too small. Size is 1728 but need 1729"
1307        );
1308    }
1309}
1310
1311#[cfg(all(feature = "async", feature = "arrow", test))]
1312mod async_tests {
1313    use super::*;
1314    use bytes::Bytes;
1315    use futures::future::BoxFuture;
1316    use futures::FutureExt;
1317    use std::fs::File;
1318    use std::future::Future;
1319    use std::io::{Read, Seek, SeekFrom};
1320    use std::ops::Range;
1321    use std::sync::atomic::{AtomicUsize, Ordering};
1322
1323    use crate::file::reader::Length;
1324    use crate::util::test_common::file_util::get_test_file;
1325
1326    struct MetadataFetchFn<F>(F);
1327
1328    impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
1329    where
1330        F: FnMut(Range<u64>) -> Fut + Send,
1331        Fut: Future<Output = Result<Bytes>> + Send,
1332    {
1333        fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1334            async move { self.0(range).await }.boxed()
1335        }
1336    }
1337
1338    struct MetadataSuffixFetchFn<F1, F2>(F1, F2);
1339
1340    impl<F1, Fut, F2> MetadataFetch for MetadataSuffixFetchFn<F1, F2>
1341    where
1342        F1: FnMut(Range<u64>) -> Fut + Send,
1343        Fut: Future<Output = Result<Bytes>> + Send,
1344        F2: Send,
1345    {
1346        fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1347            async move { self.0(range).await }.boxed()
1348        }
1349    }
1350
1351    impl<F1, Fut, F2> MetadataSuffixFetch for MetadataSuffixFetchFn<F1, F2>
1352    where
1353        F1: FnMut(Range<u64>) -> Fut + Send,
1354        F2: FnMut(usize) -> Fut + Send,
1355        Fut: Future<Output = Result<Bytes>> + Send,
1356    {
1357        fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
1358            async move { self.1(suffix).await }.boxed()
1359        }
1360    }
1361
1362    fn read_range(file: &mut File, range: Range<u64>) -> Result<Bytes> {
1363        file.seek(SeekFrom::Start(range.start as _))?;
1364        let len = range.end - range.start;
1365        let mut buf = Vec::with_capacity(len.try_into().unwrap());
1366        file.take(len as _).read_to_end(&mut buf)?;
1367        Ok(buf.into())
1368    }
1369
1370    fn read_suffix(file: &mut File, suffix: usize) -> Result<Bytes> {
1371        let file_len = file.len();
1372        // Don't seek before beginning of file
1373        file.seek(SeekFrom::End(0 - suffix.min(file_len as _) as i64))?;
1374        let mut buf = Vec::with_capacity(suffix);
1375        file.take(suffix as _).read_to_end(&mut buf)?;
1376        Ok(buf.into())
1377    }
1378
1379    #[tokio::test]
1380    async fn test_simple() {
1381        let mut file = get_test_file("nulls.snappy.parquet");
1382        let len = file.len();
1383
1384        let expected = ParquetMetaDataReader::new()
1385            .parse_and_finish(&file)
1386            .unwrap();
1387        let expected = expected.file_metadata().schema();
1388        let fetch_count = AtomicUsize::new(0);
1389
1390        let mut fetch = |range| {
1391            fetch_count.fetch_add(1, Ordering::SeqCst);
1392            futures::future::ready(read_range(&mut file, range))
1393        };
1394
1395        let input = MetadataFetchFn(&mut fetch);
1396        let actual = ParquetMetaDataReader::new()
1397            .load_and_finish(input, len)
1398            .await
1399            .unwrap();
1400        assert_eq!(actual.file_metadata().schema(), expected);
1401        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1402
1403        // Metadata hint too small - below footer size
1404        fetch_count.store(0, Ordering::SeqCst);
1405        let input = MetadataFetchFn(&mut fetch);
1406        let actual = ParquetMetaDataReader::new()
1407            .with_prefetch_hint(Some(7))
1408            .load_and_finish(input, len)
1409            .await
1410            .unwrap();
1411        assert_eq!(actual.file_metadata().schema(), expected);
1412        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1413
1414        // Metadata hint too small
1415        fetch_count.store(0, Ordering::SeqCst);
1416        let input = MetadataFetchFn(&mut fetch);
1417        let actual = ParquetMetaDataReader::new()
1418            .with_prefetch_hint(Some(10))
1419            .load_and_finish(input, len)
1420            .await
1421            .unwrap();
1422        assert_eq!(actual.file_metadata().schema(), expected);
1423        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1424
1425        // Metadata hint too large
1426        fetch_count.store(0, Ordering::SeqCst);
1427        let input = MetadataFetchFn(&mut fetch);
1428        let actual = ParquetMetaDataReader::new()
1429            .with_prefetch_hint(Some(500))
1430            .load_and_finish(input, len)
1431            .await
1432            .unwrap();
1433        assert_eq!(actual.file_metadata().schema(), expected);
1434        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1435
1436        // Metadata hint exactly correct
1437        fetch_count.store(0, Ordering::SeqCst);
1438        let input = MetadataFetchFn(&mut fetch);
1439        let actual = ParquetMetaDataReader::new()
1440            .with_prefetch_hint(Some(428))
1441            .load_and_finish(input, len)
1442            .await
1443            .unwrap();
1444        assert_eq!(actual.file_metadata().schema(), expected);
1445        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1446
1447        let input = MetadataFetchFn(&mut fetch);
1448        let err = ParquetMetaDataReader::new()
1449            .load_and_finish(input, 4)
1450            .await
1451            .unwrap_err()
1452            .to_string();
1453        assert_eq!(err, "EOF: file size of 4 is less than footer");
1454
1455        let input = MetadataFetchFn(&mut fetch);
1456        let err = ParquetMetaDataReader::new()
1457            .load_and_finish(input, 20)
1458            .await
1459            .unwrap_err()
1460            .to_string();
1461        assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
1462    }
1463
1464    #[tokio::test]
1465    async fn test_suffix() {
1466        let mut file = get_test_file("nulls.snappy.parquet");
1467        let mut file2 = file.try_clone().unwrap();
1468
1469        let expected = ParquetMetaDataReader::new()
1470            .parse_and_finish(&file)
1471            .unwrap();
1472        let expected = expected.file_metadata().schema();
1473        let fetch_count = AtomicUsize::new(0);
1474        let suffix_fetch_count = AtomicUsize::new(0);
1475
1476        let mut fetch = |range| {
1477            fetch_count.fetch_add(1, Ordering::SeqCst);
1478            futures::future::ready(read_range(&mut file, range))
1479        };
1480        let mut suffix_fetch = |suffix| {
1481            suffix_fetch_count.fetch_add(1, Ordering::SeqCst);
1482            futures::future::ready(read_suffix(&mut file2, suffix))
1483        };
1484
1485        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1486        let actual = ParquetMetaDataReader::new()
1487            .load_via_suffix_and_finish(input)
1488            .await
1489            .unwrap();
1490        assert_eq!(actual.file_metadata().schema(), expected);
1491        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1492        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1493
1494        // Metadata hint too small - below footer size
1495        fetch_count.store(0, Ordering::SeqCst);
1496        suffix_fetch_count.store(0, Ordering::SeqCst);
1497        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1498        let actual = ParquetMetaDataReader::new()
1499            .with_prefetch_hint(Some(7))
1500            .load_via_suffix_and_finish(input)
1501            .await
1502            .unwrap();
1503        assert_eq!(actual.file_metadata().schema(), expected);
1504        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1505        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1506
1507        // Metadata hint too small
1508        fetch_count.store(0, Ordering::SeqCst);
1509        suffix_fetch_count.store(0, Ordering::SeqCst);
1510        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1511        let actual = ParquetMetaDataReader::new()
1512            .with_prefetch_hint(Some(10))
1513            .load_via_suffix_and_finish(input)
1514            .await
1515            .unwrap();
1516        assert_eq!(actual.file_metadata().schema(), expected);
1517        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1518        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1519
1520        dbg!("test");
1521        // Metadata hint too large
1522        fetch_count.store(0, Ordering::SeqCst);
1523        suffix_fetch_count.store(0, Ordering::SeqCst);
1524        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1525        let actual = ParquetMetaDataReader::new()
1526            .with_prefetch_hint(Some(500))
1527            .load_via_suffix_and_finish(input)
1528            .await
1529            .unwrap();
1530        assert_eq!(actual.file_metadata().schema(), expected);
1531        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1532        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1533
1534        // Metadata hint exactly correct
1535        fetch_count.store(0, Ordering::SeqCst);
1536        suffix_fetch_count.store(0, Ordering::SeqCst);
1537        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1538        let actual = ParquetMetaDataReader::new()
1539            .with_prefetch_hint(Some(428))
1540            .load_via_suffix_and_finish(input)
1541            .await
1542            .unwrap();
1543        assert_eq!(actual.file_metadata().schema(), expected);
1544        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1545        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1546    }
1547
1548    #[cfg(feature = "encryption")]
1549    #[tokio::test]
1550    async fn test_suffix_with_encryption() {
1551        let mut file = get_test_file("uniform_encryption.parquet.encrypted");
1552        let mut file2 = file.try_clone().unwrap();
1553
1554        let mut fetch = |range| futures::future::ready(read_range(&mut file, range));
1555        let mut suffix_fetch = |suffix| futures::future::ready(read_suffix(&mut file2, suffix));
1556
1557        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1558
1559        let key_code: &[u8] = "0123456789012345".as_bytes();
1560        let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec())
1561            .build()
1562            .unwrap();
1563
1564        // just make sure the metadata is properly decrypted and read
1565        let expected = ParquetMetaDataReader::new()
1566            .with_decryption_properties(Some(&decryption_properties))
1567            .load_via_suffix_and_finish(input)
1568            .await
1569            .unwrap();
1570        assert_eq!(expected.num_row_groups(), 1);
1571    }
1572
1573    #[tokio::test]
1574    async fn test_page_index() {
1575        let mut file = get_test_file("alltypes_tiny_pages.parquet");
1576        let len = file.len();
1577        let fetch_count = AtomicUsize::new(0);
1578        let mut fetch = |range| {
1579            fetch_count.fetch_add(1, Ordering::SeqCst);
1580            futures::future::ready(read_range(&mut file, range))
1581        };
1582
1583        let f = MetadataFetchFn(&mut fetch);
1584        let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
1585        loader.try_load(f, len).await.unwrap();
1586        assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
1587        let metadata = loader.finish().unwrap();
1588        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1589
1590        // Prefetch just footer exactly
1591        fetch_count.store(0, Ordering::SeqCst);
1592        let f = MetadataFetchFn(&mut fetch);
1593        let mut loader = ParquetMetaDataReader::new()
1594            .with_page_indexes(true)
1595            .with_prefetch_hint(Some(1729));
1596        loader.try_load(f, len).await.unwrap();
1597        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1598        let metadata = loader.finish().unwrap();
1599        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1600
1601        // Prefetch more than footer but not enough
1602        fetch_count.store(0, Ordering::SeqCst);
1603        let f = MetadataFetchFn(&mut fetch);
1604        let mut loader = ParquetMetaDataReader::new()
1605            .with_page_indexes(true)
1606            .with_prefetch_hint(Some(130649));
1607        loader.try_load(f, len).await.unwrap();
1608        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1609        let metadata = loader.finish().unwrap();
1610        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1611
1612        // Prefetch exactly enough
1613        fetch_count.store(0, Ordering::SeqCst);
1614        let f = MetadataFetchFn(&mut fetch);
1615        let metadata = ParquetMetaDataReader::new()
1616            .with_page_indexes(true)
1617            .with_prefetch_hint(Some(130650))
1618            .load_and_finish(f, len)
1619            .await
1620            .unwrap();
1621        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1622        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1623
1624        // Prefetch more than enough but less than the entire file
1625        fetch_count.store(0, Ordering::SeqCst);
1626        let f = MetadataFetchFn(&mut fetch);
1627        let metadata = ParquetMetaDataReader::new()
1628            .with_page_indexes(true)
1629            .with_prefetch_hint(Some((len - 1000) as usize)) // prefetch entire file
1630            .load_and_finish(f, len)
1631            .await
1632            .unwrap();
1633        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1634        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1635
1636        // Prefetch the entire file
1637        fetch_count.store(0, Ordering::SeqCst);
1638        let f = MetadataFetchFn(&mut fetch);
1639        let metadata = ParquetMetaDataReader::new()
1640            .with_page_indexes(true)
1641            .with_prefetch_hint(Some(len as usize)) // prefetch entire file
1642            .load_and_finish(f, len)
1643            .await
1644            .unwrap();
1645        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1646        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1647
1648        // Prefetch more than the entire file
1649        fetch_count.store(0, Ordering::SeqCst);
1650        let f = MetadataFetchFn(&mut fetch);
1651        let metadata = ParquetMetaDataReader::new()
1652            .with_page_indexes(true)
1653            .with_prefetch_hint(Some((len + 1000) as usize)) // prefetch entire file
1654            .load_and_finish(f, len)
1655            .await
1656            .unwrap();
1657        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1658        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1659    }
1660}