parquet/file/metadata/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{io::Read, ops::Range, sync::Arc};
19
20use bytes::Bytes;
21
22use crate::basic::ColumnOrder;
23#[cfg(feature = "encryption")]
24use crate::encryption::{
25    decrypt::{FileDecryptionProperties, FileDecryptor},
26    modules::create_footer_aad,
27};
28
29use crate::errors::{ParquetError, Result};
30use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData};
31use crate::file::page_index::index::Index;
32use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
33use crate::file::reader::ChunkReader;
34use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER};
35use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
36#[cfg(feature = "encryption")]
37use crate::format::{EncryptionAlgorithm, FileCryptoMetaData as TFileCryptoMetaData};
38use crate::schema::types;
39use crate::schema::types::SchemaDescriptor;
40use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
41
42#[cfg(all(feature = "async", feature = "arrow"))]
43use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
44#[cfg(feature = "encryption")]
45use crate::encryption::decrypt::CryptoContext;
46use crate::file::page_index::offset_index::OffsetIndexMetaData;
47
48/// Reads the [`ParquetMetaData`] from a byte stream.
49///
50/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for a description of
51/// the Parquet metadata.
52///
53/// Parquet metadata is not necessarily contiguous in the files: part is stored
54/// in the footer (the last bytes of the file), but other portions (such as the
55/// PageIndex) can be stored elsewhere.
56///
57/// This reader handles reading the footer as well as the non contiguous parts
58/// of the metadata such as the page indexes; excluding Bloom Filters.
59///
60/// # Example
61/// ```no_run
62/// # use parquet::file::metadata::ParquetMetaDataReader;
63/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
64/// // read parquet metadata including page indexes from a file
65/// let file = open_parquet_file("some_path.parquet");
66/// let mut reader = ParquetMetaDataReader::new()
67///     .with_page_indexes(true);
68/// reader.try_parse(&file).unwrap();
69/// let metadata = reader.finish().unwrap();
70/// assert!(metadata.column_index().is_some());
71/// assert!(metadata.offset_index().is_some());
72/// ```
73#[derive(Default)]
74pub struct ParquetMetaDataReader {
75    metadata: Option<ParquetMetaData>,
76    column_index: bool,
77    offset_index: bool,
78    prefetch_hint: Option<usize>,
79    // Size of the serialized thrift metadata plus the 8 byte footer. Only set if
80    // `self.parse_metadata` is called.
81    metadata_size: Option<usize>,
82    #[cfg(feature = "encryption")]
83    file_decryption_properties: Option<FileDecryptionProperties>,
84}
85
86/// Describes how the footer metadata is stored
87///
88/// This is parsed from the last 8 bytes of the Parquet file
89pub struct FooterTail {
90    metadata_length: usize,
91    encrypted_footer: bool,
92}
93
94impl FooterTail {
95    /// The length of the footer metadata in bytes
96    pub fn metadata_length(&self) -> usize {
97        self.metadata_length
98    }
99
100    /// Whether the footer metadata is encrypted
101    pub fn is_encrypted_footer(&self) -> bool {
102        self.encrypted_footer
103    }
104}
105
106impl ParquetMetaDataReader {
107    /// Create a new [`ParquetMetaDataReader`]
108    pub fn new() -> Self {
109        Default::default()
110    }
111
112    /// Create a new [`ParquetMetaDataReader`] populated with a [`ParquetMetaData`] struct
113    /// obtained via other means.
114    pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
115        Self {
116            metadata: Some(metadata),
117            ..Default::default()
118        }
119    }
120
121    /// Enable or disable reading the page index structures described in
122    /// "[Parquet page index]: Layout to Support Page Skipping". Equivalent to:
123    /// `self.with_column_indexes(val).with_offset_indexes(val)`
124    ///
125    /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
126    pub fn with_page_indexes(self, val: bool) -> Self {
127        self.with_column_indexes(val).with_offset_indexes(val)
128    }
129
130    /// Enable or disable reading the Parquet [ColumnIndex] structure.
131    ///
132    /// [ColumnIndex]:  https://github.com/apache/parquet-format/blob/master/PageIndex.md
133    pub fn with_column_indexes(mut self, val: bool) -> Self {
134        self.column_index = val;
135        self
136    }
137
138    /// Enable or disable reading the Parquet [OffsetIndex] structure.
139    ///
140    /// [OffsetIndex]:  https://github.com/apache/parquet-format/blob/master/PageIndex.md
141    pub fn with_offset_indexes(mut self, val: bool) -> Self {
142        self.offset_index = val;
143        self
144    }
145
146    /// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`].
147    /// Only used for the asynchronous [`Self::try_load()`] method.
148    ///
149    /// By default, the reader will first fetch the last 8 bytes of the input file to obtain the
150    /// size of the footer metadata. A second fetch will be performed to obtain the needed bytes.
151    /// After parsing the footer metadata, a third fetch will be performed to obtain the bytes
152    /// needed to decode the page index structures, if they have been requested. To avoid
153    /// unnecessary fetches, `prefetch` can be set to an estimate of the number of bytes needed
154    /// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and
155    /// reduce latency. Setting `prefetch` too small will not trigger an error, but will result
156    /// in extra fetches being performed.
157    pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
158        self.prefetch_hint = prefetch;
159        self
160    }
161
162    /// Provide the FileDecryptionProperties to use when decrypting the file.
163    ///
164    /// This is only necessary when the file is encrypted.
165    #[cfg(feature = "encryption")]
166    pub fn with_decryption_properties(
167        mut self,
168        properties: Option<&FileDecryptionProperties>,
169    ) -> Self {
170        self.file_decryption_properties = properties.cloned();
171        self
172    }
173
174    /// Indicates whether this reader has a [`ParquetMetaData`] internally.
175    pub fn has_metadata(&self) -> bool {
176        self.metadata.is_some()
177    }
178
179    /// Return the parsed [`ParquetMetaData`] struct, leaving `None` in its place.
180    pub fn finish(&mut self) -> Result<ParquetMetaData> {
181        self.metadata
182            .take()
183            .ok_or_else(|| general_err!("could not parse parquet metadata"))
184    }
185
186    /// Given a [`ChunkReader`], parse and return the [`ParquetMetaData`] in a single pass.
187    ///
188    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
189    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
190    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
191    ///
192    /// This call will consume `self`.
193    ///
194    /// # Example
195    /// ```no_run
196    /// # use parquet::file::metadata::ParquetMetaDataReader;
197    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
198    /// // read parquet metadata including page indexes
199    /// let file = open_parquet_file("some_path.parquet");
200    /// let metadata = ParquetMetaDataReader::new()
201    ///     .with_page_indexes(true)
202    ///     .parse_and_finish(&file).unwrap();
203    /// ```
204    pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
205        self.try_parse(reader)?;
206        self.finish()
207    }
208
209    /// Attempts to parse the footer metadata (and optionally page indexes) given a [`ChunkReader`].
210    ///
211    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
212    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
213    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
214    pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
215        self.try_parse_sized(reader, reader.len())
216    }
217
218    /// Same as [`Self::try_parse()`], but provide the original file size in the case that `reader`
219    /// is a [`Bytes`] struct that does not contain the entire file. This information is necessary
220    /// when the page indexes are desired. `reader` must have access to the Parquet footer.
221    ///
222    /// Using this function also allows for retrying with a larger buffer.
223    ///
224    /// # Errors
225    ///
226    /// This function will return [`ParquetError::NeedMoreData`] in the event `reader` does not
227    /// provide enough data to fully parse the metadata (see example below). The returned error
228    /// will be populated with a `usize` field indicating the number of bytes required from the
229    /// tail of the file to completely parse the requested metadata.
230    ///
231    /// Other errors returned include [`ParquetError::General`] and [`ParquetError::EOF`].
232    ///
233    /// # Example
234    /// ```no_run
235    /// # use parquet::file::metadata::ParquetMetaDataReader;
236    /// # use parquet::errors::ParquetError;
237    /// # use crate::parquet::file::reader::Length;
238    /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<u64>) -> bytes::Bytes { unimplemented!(); }
239    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
240    /// let file = open_parquet_file("some_path.parquet");
241    /// let len = file.len();
242    /// // Speculatively read 1 kilobyte from the end of the file
243    /// let bytes = get_bytes(&file, len - 1024..len);
244    /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
245    /// match reader.try_parse_sized(&bytes, len) {
246    ///     Ok(_) => (),
247    ///     Err(ParquetError::NeedMoreData(needed)) => {
248    ///         // Read the needed number of bytes from the end of the file
249    ///         let bytes = get_bytes(&file, len - needed as u64..len);
250    ///         reader.try_parse_sized(&bytes, len).unwrap();
251    ///     }
252    ///     _ => panic!("unexpected error")
253    /// }
254    /// let metadata = reader.finish().unwrap();
255    /// ```
256    ///
257    /// Note that it is possible for the file metadata to be completely read, but there are
258    /// insufficient bytes available to read the page indexes. [`Self::has_metadata()`] can be used
259    /// to test for this. In the event the file metadata is present, re-parsing of the file
260    /// metadata can be skipped by using [`Self::read_page_indexes_sized()`], as shown below.
261    /// ```no_run
262    /// # use parquet::file::metadata::ParquetMetaDataReader;
263    /// # use parquet::errors::ParquetError;
264    /// # use crate::parquet::file::reader::Length;
265    /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<u64>) -> bytes::Bytes { unimplemented!(); }
266    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
267    /// let file = open_parquet_file("some_path.parquet");
268    /// let len = file.len();
269    /// // Speculatively read 1 kilobyte from the end of the file
270    /// let mut bytes = get_bytes(&file, len - 1024..len);
271    /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
272    /// // Loop until `bytes` is large enough
273    /// loop {
274    ///     match reader.try_parse_sized(&bytes, len) {
275    ///         Ok(_) => break,
276    ///         Err(ParquetError::NeedMoreData(needed)) => {
277    ///             // Read the needed number of bytes from the end of the file
278    ///             bytes = get_bytes(&file, len - needed as u64..len);
279    ///             // If file metadata was read only read page indexes, otherwise continue loop
280    ///             if reader.has_metadata() {
281    ///                 reader.read_page_indexes_sized(&bytes, len);
282    ///                 break;
283    ///             }
284    ///         }
285    ///         _ => panic!("unexpected error")
286    ///     }
287    /// }
288    /// let metadata = reader.finish().unwrap();
289    /// ```
290    pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: u64) -> Result<()> {
291        self.metadata = match self.parse_metadata(reader) {
292            Ok(metadata) => Some(metadata),
293            Err(ParquetError::NeedMoreData(needed)) => {
294                // If reader is the same length as `file_size` then presumably there is no more to
295                // read, so return an EOF error.
296                if file_size == reader.len() || needed as u64 > file_size {
297                    return Err(eof_err!(
298                        "Parquet file too small. Size is {} but need {}",
299                        file_size,
300                        needed
301                    ));
302                } else {
303                    // Ask for a larger buffer
304                    return Err(ParquetError::NeedMoreData(needed));
305                }
306            }
307            Err(e) => return Err(e),
308        };
309
310        // we can return if page indexes aren't requested
311        if !self.column_index && !self.offset_index {
312            return Ok(());
313        }
314
315        self.read_page_indexes_sized(reader, file_size)
316    }
317
318    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
319    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`].
320    pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
321        self.read_page_indexes_sized(reader, reader.len())
322    }
323
324    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
325    /// This variant is used when `reader` cannot access the entire Parquet file (e.g. it is
326    /// a [`Bytes`] struct containing the tail of the file).
327    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`]. Like
328    /// [`Self::try_parse_sized()`] this function may return [`ParquetError::NeedMoreData`].
329    pub fn read_page_indexes_sized<R: ChunkReader>(
330        &mut self,
331        reader: &R,
332        file_size: u64,
333    ) -> Result<()> {
334        if self.metadata.is_none() {
335            return Err(general_err!(
336                "Tried to read page indexes without ParquetMetaData metadata"
337            ));
338        }
339
340        // FIXME: there are differing implementations in the case where page indexes are missing
341        // from the file. `MetadataLoader` will leave them as `None`, while the parser in
342        // `index_reader::read_columns_indexes` returns a vector of empty vectors.
343        // It is best for this function to replicate the latter behavior for now, but in a future
344        // breaking release, the two paths to retrieve metadata should be made consistent. Note that this is only
345        // an issue if the user requested page indexes, so there is no need to provide empty
346        // vectors in `try_parse_sized()`.
347        // https://github.com/apache/arrow-rs/issues/6447
348
349        // Get bounds needed for page indexes (if any are present in the file).
350        let Some(range) = self.range_for_page_index() else {
351            return Ok(());
352        };
353
354        // Check to see if needed range is within `file_range`. Checking `range.end` seems
355        // redundant, but it guards against `range_for_page_index()` returning garbage.
356        let file_range = file_size.saturating_sub(reader.len())..file_size;
357        if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
358            // Requested range starts beyond EOF
359            if range.end > file_size {
360                return Err(eof_err!(
361                    "Parquet file too small. Range {:?} is beyond file bounds {file_size}",
362                    range
363                ));
364            } else {
365                // Ask for a larger buffer
366                return Err(ParquetError::NeedMoreData(
367                    (file_size - range.start).try_into()?,
368                ));
369            }
370        }
371
372        // Perform extra sanity check to make sure `range` and the footer metadata don't
373        // overlap.
374        if let Some(metadata_size) = self.metadata_size {
375            let metadata_range = file_size.saturating_sub(metadata_size as u64)..file_size;
376            if range.end > metadata_range.start {
377                return Err(eof_err!(
378                    "Parquet file too small. Page index range {:?} overlaps with file metadata {:?}",
379                    range,
380                    metadata_range
381                ));
382            }
383        }
384
385        let bytes_needed = usize::try_from(range.end - range.start)?;
386        let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?;
387        let offset = range.start;
388
389        self.parse_column_index(&bytes, offset)?;
390        self.parse_offset_index(&bytes, offset)?;
391
392        Ok(())
393    }
394
395    /// Given a [`MetadataFetch`], parse and return the [`ParquetMetaData`] in a single pass.
396    ///
397    /// This call will consume `self`.
398    ///
399    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
400    /// performed by this function.
401    #[cfg(all(feature = "async", feature = "arrow"))]
402    pub async fn load_and_finish<F: MetadataFetch>(
403        mut self,
404        fetch: F,
405        file_size: u64,
406    ) -> Result<ParquetMetaData> {
407        self.try_load(fetch, file_size).await?;
408        self.finish()
409    }
410
411    /// Given a [`MetadataSuffixFetch`], parse and return the [`ParquetMetaData`] in a single pass.
412    ///
413    /// This call will consume `self`.
414    ///
415    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
416    /// performed by this function.
417    #[cfg(all(feature = "async", feature = "arrow"))]
418    pub async fn load_via_suffix_and_finish<F: MetadataSuffixFetch>(
419        mut self,
420        fetch: F,
421    ) -> Result<ParquetMetaData> {
422        self.try_load_via_suffix(fetch).await?;
423        self.finish()
424    }
425    /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
426    /// given a [`MetadataFetch`].
427    ///
428    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
429    /// performed by this function.
430    #[cfg(all(feature = "async", feature = "arrow"))]
431    pub async fn try_load<F: MetadataFetch>(&mut self, mut fetch: F, file_size: u64) -> Result<()> {
432        let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
433
434        self.metadata = Some(metadata);
435
436        // we can return if page indexes aren't requested
437        if !self.column_index && !self.offset_index {
438            return Ok(());
439        }
440
441        self.load_page_index_with_remainder(fetch, remainder).await
442    }
443
444    /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
445    /// given a [`MetadataSuffixFetch`].
446    ///
447    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
448    /// performed by this function.
449    #[cfg(all(feature = "async", feature = "arrow"))]
450    pub async fn try_load_via_suffix<F: MetadataSuffixFetch>(
451        &mut self,
452        mut fetch: F,
453    ) -> Result<()> {
454        let (metadata, remainder) = self.load_metadata_via_suffix(&mut fetch).await?;
455
456        self.metadata = Some(metadata);
457
458        // we can return if page indexes aren't requested
459        if !self.column_index && !self.offset_index {
460            return Ok(());
461        }
462
463        self.load_page_index_with_remainder(fetch, remainder).await
464    }
465
466    /// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
467    /// been obtained. See [`Self::new_with_metadata()`].
468    #[cfg(all(feature = "async", feature = "arrow"))]
469    pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
470        self.load_page_index_with_remainder(fetch, None).await
471    }
472
473    #[cfg(all(feature = "async", feature = "arrow"))]
474    async fn load_page_index_with_remainder<F: MetadataFetch>(
475        &mut self,
476        mut fetch: F,
477        remainder: Option<(usize, Bytes)>,
478    ) -> Result<()> {
479        if self.metadata.is_none() {
480            return Err(general_err!("Footer metadata is not present"));
481        }
482
483        // Get bounds needed for page indexes (if any are present in the file).
484        let range = self.range_for_page_index();
485        let range = match range {
486            Some(range) => range,
487            None => return Ok(()),
488        };
489
490        let bytes = match &remainder {
491            Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => {
492                let remainder_start = *remainder_start as u64;
493                let offset = usize::try_from(range.start - remainder_start)?;
494                let end = usize::try_from(range.end - remainder_start)?;
495                assert!(end <= remainder.len());
496                remainder.slice(offset..end)
497            }
498            // Note: this will potentially fetch data already in remainder, this keeps things simple
499            _ => fetch.fetch(range.start..range.end).await?,
500        };
501
502        // Sanity check
503        assert_eq!(bytes.len() as u64, range.end - range.start);
504
505        self.parse_column_index(&bytes, range.start)?;
506        self.parse_offset_index(&bytes, range.start)?;
507
508        Ok(())
509    }
510
511    fn parse_column_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> {
512        let metadata = self.metadata.as_mut().unwrap();
513        if self.column_index {
514            let index = metadata
515                .row_groups()
516                .iter()
517                .enumerate()
518                .map(|(rg_idx, x)| {
519                    x.columns()
520                        .iter()
521                        .enumerate()
522                        .map(|(col_idx, c)| match c.column_index_range() {
523                            Some(r) => {
524                                let r_start = usize::try_from(r.start - start_offset)?;
525                                let r_end = usize::try_from(r.end - start_offset)?;
526                                Self::parse_single_column_index(
527                                    &bytes[r_start..r_end],
528                                    metadata,
529                                    c,
530                                    rg_idx,
531                                    col_idx,
532                                )
533                            }
534                            None => Ok(Index::NONE),
535                        })
536                        .collect::<Result<Vec<_>>>()
537                })
538                .collect::<Result<Vec<_>>>()?;
539            metadata.set_column_index(Some(index));
540        }
541        Ok(())
542    }
543
544    #[cfg(feature = "encryption")]
545    fn parse_single_column_index(
546        bytes: &[u8],
547        metadata: &ParquetMetaData,
548        column: &ColumnChunkMetaData,
549        row_group_index: usize,
550        col_index: usize,
551    ) -> Result<Index> {
552        match &column.column_crypto_metadata {
553            Some(crypto_metadata) => {
554                let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
555                    general_err!("Cannot decrypt column index, no file decryptor set")
556                })?;
557                let crypto_context = CryptoContext::for_column(
558                    file_decryptor,
559                    crypto_metadata,
560                    row_group_index,
561                    col_index,
562                )?;
563                let column_decryptor = crypto_context.metadata_decryptor();
564                let aad = crypto_context.create_column_index_aad()?;
565                let plaintext = column_decryptor.decrypt(bytes, &aad)?;
566                decode_column_index(&plaintext, column.column_type())
567            }
568            None => decode_column_index(bytes, column.column_type()),
569        }
570    }
571
572    #[cfg(not(feature = "encryption"))]
573    fn parse_single_column_index(
574        bytes: &[u8],
575        _metadata: &ParquetMetaData,
576        column: &ColumnChunkMetaData,
577        _row_group_index: usize,
578        _col_index: usize,
579    ) -> Result<Index> {
580        decode_column_index(bytes, column.column_type())
581    }
582
583    fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: u64) -> Result<()> {
584        let metadata = self.metadata.as_mut().unwrap();
585        if self.offset_index {
586            let index = metadata
587                .row_groups()
588                .iter()
589                .enumerate()
590                .map(|(rg_idx, x)| {
591                    x.columns()
592                        .iter()
593                        .enumerate()
594                        .map(|(col_idx, c)| match c.offset_index_range() {
595                            Some(r) => {
596                                let r_start = usize::try_from(r.start - start_offset)?;
597                                let r_end = usize::try_from(r.end - start_offset)?;
598                                Self::parse_single_offset_index(
599                                    &bytes[r_start..r_end],
600                                    metadata,
601                                    c,
602                                    rg_idx,
603                                    col_idx,
604                                )
605                            }
606                            None => Err(general_err!("missing offset index")),
607                        })
608                        .collect::<Result<Vec<_>>>()
609                })
610                .collect::<Result<Vec<_>>>()?;
611
612            metadata.set_offset_index(Some(index));
613        }
614        Ok(())
615    }
616
617    #[cfg(feature = "encryption")]
618    fn parse_single_offset_index(
619        bytes: &[u8],
620        metadata: &ParquetMetaData,
621        column: &ColumnChunkMetaData,
622        row_group_index: usize,
623        col_index: usize,
624    ) -> Result<OffsetIndexMetaData> {
625        match &column.column_crypto_metadata {
626            Some(crypto_metadata) => {
627                let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
628                    general_err!("Cannot decrypt offset index, no file decryptor set")
629                })?;
630                let crypto_context = CryptoContext::for_column(
631                    file_decryptor,
632                    crypto_metadata,
633                    row_group_index,
634                    col_index,
635                )?;
636                let column_decryptor = crypto_context.metadata_decryptor();
637                let aad = crypto_context.create_offset_index_aad()?;
638                let plaintext = column_decryptor.decrypt(bytes, &aad)?;
639                decode_offset_index(&plaintext)
640            }
641            None => decode_offset_index(bytes),
642        }
643    }
644
645    #[cfg(not(feature = "encryption"))]
646    fn parse_single_offset_index(
647        bytes: &[u8],
648        _metadata: &ParquetMetaData,
649        _column: &ColumnChunkMetaData,
650        _row_group_index: usize,
651        _col_index: usize,
652    ) -> Result<OffsetIndexMetaData> {
653        decode_offset_index(bytes)
654    }
655
656    fn range_for_page_index(&self) -> Option<Range<u64>> {
657        // sanity check
658        self.metadata.as_ref()?;
659
660        // Get bounds needed for page indexes (if any are present in the file).
661        let mut range = None;
662        let metadata = self.metadata.as_ref().unwrap();
663        for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
664            if self.column_index {
665                range = acc_range(range, c.column_index_range());
666            }
667            if self.offset_index {
668                range = acc_range(range, c.offset_index_range());
669            }
670        }
671        range
672    }
673
674    // One-shot parse of footer.
675    // Side effect: this will set `self.metadata_size`
676    fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
677        // check file is large enough to hold footer
678        let file_size = chunk_reader.len();
679        if file_size < (FOOTER_SIZE as u64) {
680            return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
681        }
682
683        let mut footer = [0_u8; 8];
684        chunk_reader
685            .get_read(file_size - 8)?
686            .read_exact(&mut footer)?;
687
688        let footer = Self::decode_footer_tail(&footer)?;
689        let metadata_len = footer.metadata_length();
690        let footer_metadata_len = FOOTER_SIZE + metadata_len;
691        self.metadata_size = Some(footer_metadata_len);
692
693        if footer_metadata_len as u64 > file_size {
694            return Err(ParquetError::NeedMoreData(footer_metadata_len));
695        }
696
697        let start = file_size - footer_metadata_len as u64;
698        self.decode_footer_metadata(
699            chunk_reader.get_bytes(start, metadata_len)?.as_ref(),
700            &footer,
701        )
702    }
703
704    /// Return the number of bytes to read in the initial pass. If `prefetch_size` has
705    /// been provided, then return that value if it is larger than the size of the Parquet
706    /// file footer (8 bytes). Otherwise returns `8`.
707    #[cfg(all(feature = "async", feature = "arrow"))]
708    fn get_prefetch_size(&self) -> usize {
709        if let Some(prefetch) = self.prefetch_hint {
710            if prefetch > FOOTER_SIZE {
711                return prefetch;
712            }
713        }
714        FOOTER_SIZE
715    }
716
717    #[cfg(all(feature = "async", feature = "arrow"))]
718    async fn load_metadata<F: MetadataFetch>(
719        &self,
720        fetch: &mut F,
721        file_size: u64,
722    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
723        let prefetch = self.get_prefetch_size() as u64;
724
725        if file_size < FOOTER_SIZE as u64 {
726            return Err(eof_err!("file size of {} is less than footer", file_size));
727        }
728
729        // If a size hint is provided, read more than the minimum size
730        // to try and avoid a second fetch.
731        // Note: prefetch > file_size is ok since we're using saturating_sub.
732        let footer_start = file_size.saturating_sub(prefetch);
733
734        let suffix = fetch.fetch(footer_start..file_size).await?;
735        let suffix_len = suffix.len();
736        let fetch_len = (file_size - footer_start)
737            .try_into()
738            .expect("footer size should never be larger than u32");
739        if suffix_len < fetch_len {
740            return Err(eof_err!(
741                "metadata requires {} bytes, but could only read {}",
742                fetch_len,
743                suffix_len
744            ));
745        }
746
747        let mut footer = [0; FOOTER_SIZE];
748        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
749
750        let footer = Self::decode_footer_tail(&footer)?;
751        let length = footer.metadata_length();
752
753        if file_size < (length + FOOTER_SIZE) as u64 {
754            return Err(eof_err!(
755                "file size of {} is less than footer + metadata {}",
756                file_size,
757                length + FOOTER_SIZE
758            ));
759        }
760
761        // Did not fetch the entire file metadata in the initial read, need to make a second request
762        if length > suffix_len - FOOTER_SIZE {
763            let metadata_start = file_size - (length + FOOTER_SIZE) as u64;
764            let meta = fetch
765                .fetch(metadata_start..(file_size - FOOTER_SIZE as u64))
766                .await?;
767            Ok((self.decode_footer_metadata(&meta, &footer)?, None))
768        } else {
769            let metadata_start = (file_size - (length + FOOTER_SIZE) as u64 - footer_start)
770                .try_into()
771                .expect("metadata length should never be larger than u32");
772            let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
773            Ok((
774                self.decode_footer_metadata(slice, &footer)?,
775                Some((footer_start as usize, suffix.slice(..metadata_start))),
776            ))
777        }
778    }
779
780    #[cfg(all(feature = "async", feature = "arrow"))]
781    async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
782        &self,
783        fetch: &mut F,
784    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
785        let prefetch = self.get_prefetch_size();
786
787        let suffix = fetch.fetch_suffix(prefetch as _).await?;
788        let suffix_len = suffix.len();
789
790        if suffix_len < FOOTER_SIZE {
791            return Err(eof_err!(
792                "footer metadata requires {} bytes, but could only read {}",
793                FOOTER_SIZE,
794                suffix_len
795            ));
796        }
797
798        let mut footer = [0; FOOTER_SIZE];
799        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
800
801        let footer = Self::decode_footer_tail(&footer)?;
802        let length = footer.metadata_length();
803
804        // Did not fetch the entire file metadata in the initial read, need to make a second request
805        let metadata_offset = length + FOOTER_SIZE;
806        if length > suffix_len - FOOTER_SIZE {
807            let meta = fetch.fetch_suffix(metadata_offset).await?;
808
809            if meta.len() < metadata_offset {
810                return Err(eof_err!(
811                    "metadata requires {} bytes, but could only read {}",
812                    metadata_offset,
813                    meta.len()
814                ));
815            }
816
817            Ok((
818                // need to slice off the footer or decryption fails
819                self.decode_footer_metadata(&meta.slice(0..length), &footer)?,
820                None,
821            ))
822        } else {
823            let metadata_start = suffix_len - metadata_offset;
824            let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
825            Ok((
826                self.decode_footer_metadata(slice, &footer)?,
827                Some((0, suffix.slice(..metadata_start))),
828            ))
829        }
830    }
831
832    /// Decodes the end of the Parquet footer
833    ///
834    /// There are 8 bytes at the end of the Parquet footer with the following layout:
835    /// * 4 bytes for the metadata length
836    /// * 4 bytes for the magic bytes 'PAR1' or 'PARE' (encrypted footer)
837    ///
838    /// ```text
839    /// +-----+------------------+
840    /// | len | 'PAR1' or 'PARE' |
841    /// +-----+------------------+
842    /// ```
843    pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result<FooterTail> {
844        let magic = &slice[4..];
845        let encrypted_footer = if magic == PARQUET_MAGIC_ENCR_FOOTER {
846            true
847        } else if magic == PARQUET_MAGIC {
848            false
849        } else {
850            return Err(general_err!("Invalid Parquet file. Corrupt footer"));
851        };
852        // get the metadata length from the footer
853        let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap());
854        Ok(FooterTail {
855            // u32 won't be larger than usize in most cases
856            metadata_length: metadata_len as usize,
857            encrypted_footer,
858        })
859    }
860
861    /// Decodes the Parquet footer, returning the metadata length in bytes
862    #[deprecated(note = "use decode_footer_tail instead")]
863    pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
864        Self::decode_footer_tail(slice).map(|f| f.metadata_length)
865    }
866
867    /// Decodes [`ParquetMetaData`] from the provided bytes.
868    ///
869    /// Typically, this is used to decode the metadata from the end of a parquet
870    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
871    /// by the [Parquet Spec].
872    ///
873    /// This method handles using either `decode_metadata` or
874    /// `decode_metadata_with_encryption` depending on whether the encryption
875    /// feature is enabled.
876    ///
877    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
878    pub(crate) fn decode_footer_metadata(
879        &self,
880        buf: &[u8],
881        footer_tail: &FooterTail,
882    ) -> Result<ParquetMetaData> {
883        #[cfg(feature = "encryption")]
884        let result = Self::decode_metadata_with_encryption(
885            buf,
886            footer_tail.is_encrypted_footer(),
887            self.file_decryption_properties.as_ref(),
888        );
889        #[cfg(not(feature = "encryption"))]
890        let result = {
891            if footer_tail.is_encrypted_footer() {
892                Err(general_err!(
893                    "Parquet file has an encrypted footer but the encryption feature is disabled"
894                ))
895            } else {
896                Self::decode_metadata(buf)
897            }
898        };
899        result
900    }
901
902    /// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata that may be encrypted.
903    ///
904    /// Typically this is used to decode the metadata from the end of a parquet
905    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
906    /// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR
907    /// ciphers as specfied in the [Parquet Encryption Spec].
908    ///
909    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
910    /// [Parquet Encryption Spec]: https://parquet.apache.org/docs/file-format/data-pages/encryption/
911    #[cfg(feature = "encryption")]
912    fn decode_metadata_with_encryption(
913        buf: &[u8],
914        encrypted_footer: bool,
915        file_decryption_properties: Option<&FileDecryptionProperties>,
916    ) -> Result<ParquetMetaData> {
917        let mut prot = TCompactSliceInputProtocol::new(buf);
918        let mut file_decryptor = None;
919        let decrypted_fmd_buf;
920
921        if encrypted_footer {
922            if let Some(file_decryption_properties) = file_decryption_properties {
923                let t_file_crypto_metadata: TFileCryptoMetaData =
924                    TFileCryptoMetaData::read_from_in_protocol(&mut prot)
925                        .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
926                let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm {
927                    EncryptionAlgorithm::AESGCMV1(algo) => algo.supply_aad_prefix,
928                    _ => Some(false),
929                }
930                .unwrap_or(false);
931                if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() {
932                    return Err(general_err!(
933                        "Parquet file was encrypted with an AAD prefix that is not stored in the file, \
934                        but no AAD prefix was provided in the file decryption properties"
935                    ));
936                }
937                let decryptor = get_file_decryptor(
938                    t_file_crypto_metadata.encryption_algorithm,
939                    t_file_crypto_metadata.key_metadata.as_deref(),
940                    file_decryption_properties,
941                )?;
942                let footer_decryptor = decryptor.get_footer_decryptor();
943                let aad_footer = create_footer_aad(decryptor.file_aad())?;
944
945                decrypted_fmd_buf = footer_decryptor?
946                    .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
947                    .map_err(|_| {
948                        general_err!(
949                            "Provided footer key and AAD were unable to decrypt parquet footer"
950                        )
951                    })?;
952                prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
953
954                file_decryptor = Some(decryptor);
955            } else {
956                return Err(general_err!("Parquet file has an encrypted footer but decryption properties were not provided"));
957            }
958        }
959
960        let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
961            .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
962        let schema = types::from_thrift(&t_file_metadata.schema)?;
963        let schema_descr = Arc::new(SchemaDescriptor::new(schema));
964
965        if let (Some(algo), Some(file_decryption_properties)) = (
966            t_file_metadata.encryption_algorithm,
967            file_decryption_properties,
968        ) {
969            // File has a plaintext footer but encryption algorithm is set
970            file_decryptor = Some(get_file_decryptor(
971                algo,
972                t_file_metadata.footer_signing_key_metadata.as_deref(),
973                file_decryption_properties,
974            )?);
975        }
976
977        let mut row_groups = Vec::new();
978        for rg in t_file_metadata.row_groups {
979            let r = RowGroupMetaData::from_encrypted_thrift(
980                schema_descr.clone(),
981                rg,
982                file_decryptor.as_ref(),
983            )?;
984            row_groups.push(r);
985        }
986        let column_orders =
987            Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
988
989        let file_metadata = FileMetaData::new(
990            t_file_metadata.version,
991            t_file_metadata.num_rows,
992            t_file_metadata.created_by,
993            t_file_metadata.key_value_metadata,
994            schema_descr,
995            column_orders,
996        );
997        let mut metadata = ParquetMetaData::new(file_metadata, row_groups);
998
999        metadata.with_file_decryptor(file_decryptor);
1000
1001        Ok(metadata)
1002    }
1003
1004    /// Decodes [`ParquetMetaData`] from the provided bytes.
1005    ///
1006    /// Typically this is used to decode the metadata from the end of a parquet
1007    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
1008    /// by the [Parquet Spec].
1009    ///
1010    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
1011    pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
1012        let mut prot = TCompactSliceInputProtocol::new(buf);
1013
1014        let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
1015            .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
1016        let schema = types::from_thrift(&t_file_metadata.schema)?;
1017        let schema_descr = Arc::new(SchemaDescriptor::new(schema));
1018
1019        let mut row_groups = Vec::new();
1020        for rg in t_file_metadata.row_groups {
1021            row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
1022        }
1023        let column_orders =
1024            Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
1025
1026        let file_metadata = FileMetaData::new(
1027            t_file_metadata.version,
1028            t_file_metadata.num_rows,
1029            t_file_metadata.created_by,
1030            t_file_metadata.key_value_metadata,
1031            schema_descr,
1032            column_orders,
1033        );
1034
1035        Ok(ParquetMetaData::new(file_metadata, row_groups))
1036    }
1037
1038    /// Parses column orders from Thrift definition.
1039    /// If no column orders are defined, returns `None`.
1040    fn parse_column_orders(
1041        t_column_orders: Option<Vec<TColumnOrder>>,
1042        schema_descr: &SchemaDescriptor,
1043    ) -> Result<Option<Vec<ColumnOrder>>> {
1044        match t_column_orders {
1045            Some(orders) => {
1046                // Should always be the case
1047                if orders.len() != schema_descr.num_columns() {
1048                    return Err(general_err!("Column order length mismatch"));
1049                };
1050                let mut res = Vec::new();
1051                for (i, column) in schema_descr.columns().iter().enumerate() {
1052                    match orders[i] {
1053                        TColumnOrder::TYPEORDER(_) => {
1054                            let sort_order = ColumnOrder::get_sort_order(
1055                                column.logical_type(),
1056                                column.converted_type(),
1057                                column.physical_type(),
1058                            );
1059                            res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
1060                        }
1061                    }
1062                }
1063                Ok(Some(res))
1064            }
1065            None => Ok(None),
1066        }
1067    }
1068}
1069
1070#[cfg(feature = "encryption")]
1071fn get_file_decryptor(
1072    encryption_algorithm: EncryptionAlgorithm,
1073    footer_key_metadata: Option<&[u8]>,
1074    file_decryption_properties: &FileDecryptionProperties,
1075) -> Result<FileDecryptor> {
1076    match encryption_algorithm {
1077        EncryptionAlgorithm::AESGCMV1(algo) => {
1078            let aad_file_unique = algo
1079                .aad_file_unique
1080                .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?;
1081            let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() {
1082                aad_prefix.clone()
1083            } else {
1084                algo.aad_prefix.unwrap_or_default()
1085            };
1086
1087            FileDecryptor::new(
1088                file_decryption_properties,
1089                footer_key_metadata,
1090                aad_file_unique,
1091                aad_prefix,
1092            )
1093        }
1094        EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!(
1095            "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
1096        )),
1097    }
1098}
1099
1100#[cfg(test)]
1101mod tests {
1102    use super::*;
1103    use bytes::Bytes;
1104
1105    use crate::basic::SortOrder;
1106    use crate::basic::Type;
1107    use crate::file::reader::Length;
1108    use crate::format::TypeDefinedOrder;
1109    use crate::schema::types::Type as SchemaType;
1110    use crate::util::test_common::file_util::get_test_file;
1111
1112    #[test]
1113    fn test_parse_metadata_size_smaller_than_footer() {
1114        let test_file = tempfile::tempfile().unwrap();
1115        let err = ParquetMetaDataReader::new()
1116            .parse_metadata(&test_file)
1117            .unwrap_err();
1118        assert!(matches!(err, ParquetError::NeedMoreData(8)));
1119    }
1120
1121    #[test]
1122    fn test_parse_metadata_corrupt_footer() {
1123        let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
1124        let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
1125        assert_eq!(
1126            reader_result.unwrap_err().to_string(),
1127            "Parquet error: Invalid Parquet file. Corrupt footer"
1128        );
1129    }
1130
1131    #[test]
1132    fn test_parse_metadata_invalid_start() {
1133        let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
1134        let err = ParquetMetaDataReader::new()
1135            .parse_metadata(&test_file)
1136            .unwrap_err();
1137        assert!(matches!(err, ParquetError::NeedMoreData(263)));
1138    }
1139
1140    #[test]
1141    fn test_metadata_column_orders_parse() {
1142        // Define simple schema, we do not need to provide logical types.
1143        let fields = vec![
1144            Arc::new(
1145                SchemaType::primitive_type_builder("col1", Type::INT32)
1146                    .build()
1147                    .unwrap(),
1148            ),
1149            Arc::new(
1150                SchemaType::primitive_type_builder("col2", Type::FLOAT)
1151                    .build()
1152                    .unwrap(),
1153            ),
1154        ];
1155        let schema = SchemaType::group_type_builder("schema")
1156            .with_fields(fields)
1157            .build()
1158            .unwrap();
1159        let schema_descr = SchemaDescriptor::new(Arc::new(schema));
1160
1161        let t_column_orders = Some(vec![
1162            TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
1163            TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
1164        ]);
1165
1166        assert_eq!(
1167            ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr).unwrap(),
1168            Some(vec![
1169                ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
1170                ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
1171            ])
1172        );
1173
1174        // Test when no column orders are defined.
1175        assert_eq!(
1176            ParquetMetaDataReader::parse_column_orders(None, &schema_descr).unwrap(),
1177            None
1178        );
1179    }
1180
1181    #[test]
1182    fn test_metadata_column_orders_len_mismatch() {
1183        let schema = SchemaType::group_type_builder("schema").build().unwrap();
1184        let schema_descr = SchemaDescriptor::new(Arc::new(schema));
1185
1186        let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);
1187
1188        let res = ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr);
1189        assert!(res.is_err());
1190        assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch"));
1191    }
1192
1193    #[test]
1194    fn test_try_parse() {
1195        let file = get_test_file("alltypes_tiny_pages.parquet");
1196        let len = file.len();
1197
1198        let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
1199
1200        let bytes_for_range = |range: Range<u64>| {
1201            file.get_bytes(range.start, (range.end - range.start).try_into().unwrap())
1202                .unwrap()
1203        };
1204
1205        // read entire file
1206        let bytes = bytes_for_range(0..len);
1207        reader.try_parse(&bytes).unwrap();
1208        let metadata = reader.finish().unwrap();
1209        assert!(metadata.column_index.is_some());
1210        assert!(metadata.offset_index.is_some());
1211
1212        // read more than enough of file
1213        let bytes = bytes_for_range(320000..len);
1214        reader.try_parse_sized(&bytes, len).unwrap();
1215        let metadata = reader.finish().unwrap();
1216        assert!(metadata.column_index.is_some());
1217        assert!(metadata.offset_index.is_some());
1218
1219        // exactly enough
1220        let bytes = bytes_for_range(323583..len);
1221        reader.try_parse_sized(&bytes, len).unwrap();
1222        let metadata = reader.finish().unwrap();
1223        assert!(metadata.column_index.is_some());
1224        assert!(metadata.offset_index.is_some());
1225
1226        // not enough for page index
1227        let bytes = bytes_for_range(323584..len);
1228        // should fail
1229        match reader.try_parse_sized(&bytes, len).unwrap_err() {
1230            // expected error, try again with provided bounds
1231            ParquetError::NeedMoreData(needed) => {
1232                let bytes = bytes_for_range(len - needed as u64..len);
1233                reader.try_parse_sized(&bytes, len).unwrap();
1234                let metadata = reader.finish().unwrap();
1235                assert!(metadata.column_index.is_some());
1236                assert!(metadata.offset_index.is_some());
1237            }
1238            _ => panic!("unexpected error"),
1239        };
1240
1241        // not enough for file metadata, but keep trying until page indexes are read
1242        let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
1243        let mut bytes = bytes_for_range(452505..len);
1244        loop {
1245            match reader.try_parse_sized(&bytes, len) {
1246                Ok(_) => break,
1247                Err(ParquetError::NeedMoreData(needed)) => {
1248                    bytes = bytes_for_range(len - needed as u64..len);
1249                    if reader.has_metadata() {
1250                        reader.read_page_indexes_sized(&bytes, len).unwrap();
1251                        break;
1252                    }
1253                }
1254                _ => panic!("unexpected error"),
1255            }
1256        }
1257        let metadata = reader.finish().unwrap();
1258        assert!(metadata.column_index.is_some());
1259        assert!(metadata.offset_index.is_some());
1260
1261        // not enough for page index but lie about file size
1262        let bytes = bytes_for_range(323584..len);
1263        let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
1264        assert_eq!(
1265            reader_result.to_string(),
1266            "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
1267        );
1268
1269        // not enough for file metadata
1270        let mut reader = ParquetMetaDataReader::new();
1271        let bytes = bytes_for_range(452505..len);
1272        // should fail
1273        match reader.try_parse_sized(&bytes, len).unwrap_err() {
1274            // expected error, try again with provided bounds
1275            ParquetError::NeedMoreData(needed) => {
1276                let bytes = bytes_for_range(len - needed as u64..len);
1277                reader.try_parse_sized(&bytes, len).unwrap();
1278                reader.finish().unwrap();
1279            }
1280            _ => panic!("unexpected error"),
1281        };
1282
1283        // not enough for file metadata but use try_parse()
1284        let reader_result = reader.try_parse(&bytes).unwrap_err();
1285        assert_eq!(
1286            reader_result.to_string(),
1287            "EOF: Parquet file too small. Size is 1728 but need 1729"
1288        );
1289
1290        // read head of file rather than tail
1291        let bytes = bytes_for_range(0..1000);
1292        let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
1293        assert_eq!(
1294            reader_result.to_string(),
1295            "Parquet error: Invalid Parquet file. Corrupt footer"
1296        );
1297
1298        // lie about file size
1299        let bytes = bytes_for_range(452510..len);
1300        let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
1301        assert_eq!(
1302            reader_result.to_string(),
1303            "EOF: Parquet file too small. Size is 1728 but need 1729"
1304        );
1305    }
1306}
1307
1308#[cfg(all(feature = "async", feature = "arrow", test))]
1309mod async_tests {
1310    use super::*;
1311    use bytes::Bytes;
1312    use futures::future::BoxFuture;
1313    use futures::FutureExt;
1314    use std::fs::File;
1315    use std::future::Future;
1316    use std::io::{Read, Seek, SeekFrom};
1317    use std::ops::Range;
1318    use std::sync::atomic::{AtomicUsize, Ordering};
1319
1320    use crate::file::reader::Length;
1321    use crate::util::test_common::file_util::get_test_file;
1322
1323    struct MetadataFetchFn<F>(F);
1324
1325    impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
1326    where
1327        F: FnMut(Range<u64>) -> Fut + Send,
1328        Fut: Future<Output = Result<Bytes>> + Send,
1329    {
1330        fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1331            async move { self.0(range).await }.boxed()
1332        }
1333    }
1334
1335    struct MetadataSuffixFetchFn<F1, F2>(F1, F2);
1336
1337    impl<F1, Fut, F2> MetadataFetch for MetadataSuffixFetchFn<F1, F2>
1338    where
1339        F1: FnMut(Range<u64>) -> Fut + Send,
1340        Fut: Future<Output = Result<Bytes>> + Send,
1341        F2: Send,
1342    {
1343        fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1344            async move { self.0(range).await }.boxed()
1345        }
1346    }
1347
1348    impl<F1, Fut, F2> MetadataSuffixFetch for MetadataSuffixFetchFn<F1, F2>
1349    where
1350        F1: FnMut(Range<u64>) -> Fut + Send,
1351        F2: FnMut(usize) -> Fut + Send,
1352        Fut: Future<Output = Result<Bytes>> + Send,
1353    {
1354        fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
1355            async move { self.1(suffix).await }.boxed()
1356        }
1357    }
1358
1359    fn read_range(file: &mut File, range: Range<u64>) -> Result<Bytes> {
1360        file.seek(SeekFrom::Start(range.start as _))?;
1361        let len = range.end - range.start;
1362        let mut buf = Vec::with_capacity(len.try_into().unwrap());
1363        file.take(len as _).read_to_end(&mut buf)?;
1364        Ok(buf.into())
1365    }
1366
1367    fn read_suffix(file: &mut File, suffix: usize) -> Result<Bytes> {
1368        let file_len = file.len();
1369        // Don't seek before beginning of file
1370        file.seek(SeekFrom::End(0 - suffix.min(file_len as _) as i64))?;
1371        let mut buf = Vec::with_capacity(suffix);
1372        file.take(suffix as _).read_to_end(&mut buf)?;
1373        Ok(buf.into())
1374    }
1375
1376    #[tokio::test]
1377    async fn test_simple() {
1378        let mut file = get_test_file("nulls.snappy.parquet");
1379        let len = file.len();
1380
1381        let expected = ParquetMetaDataReader::new()
1382            .parse_and_finish(&file)
1383            .unwrap();
1384        let expected = expected.file_metadata().schema();
1385        let fetch_count = AtomicUsize::new(0);
1386
1387        let mut fetch = |range| {
1388            fetch_count.fetch_add(1, Ordering::SeqCst);
1389            futures::future::ready(read_range(&mut file, range))
1390        };
1391
1392        let input = MetadataFetchFn(&mut fetch);
1393        let actual = ParquetMetaDataReader::new()
1394            .load_and_finish(input, len)
1395            .await
1396            .unwrap();
1397        assert_eq!(actual.file_metadata().schema(), expected);
1398        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1399
1400        // Metadata hint too small - below footer size
1401        fetch_count.store(0, Ordering::SeqCst);
1402        let input = MetadataFetchFn(&mut fetch);
1403        let actual = ParquetMetaDataReader::new()
1404            .with_prefetch_hint(Some(7))
1405            .load_and_finish(input, len)
1406            .await
1407            .unwrap();
1408        assert_eq!(actual.file_metadata().schema(), expected);
1409        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1410
1411        // Metadata hint too small
1412        fetch_count.store(0, Ordering::SeqCst);
1413        let input = MetadataFetchFn(&mut fetch);
1414        let actual = ParquetMetaDataReader::new()
1415            .with_prefetch_hint(Some(10))
1416            .load_and_finish(input, len)
1417            .await
1418            .unwrap();
1419        assert_eq!(actual.file_metadata().schema(), expected);
1420        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1421
1422        // Metadata hint too large
1423        fetch_count.store(0, Ordering::SeqCst);
1424        let input = MetadataFetchFn(&mut fetch);
1425        let actual = ParquetMetaDataReader::new()
1426            .with_prefetch_hint(Some(500))
1427            .load_and_finish(input, len)
1428            .await
1429            .unwrap();
1430        assert_eq!(actual.file_metadata().schema(), expected);
1431        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1432
1433        // Metadata hint exactly correct
1434        fetch_count.store(0, Ordering::SeqCst);
1435        let input = MetadataFetchFn(&mut fetch);
1436        let actual = ParquetMetaDataReader::new()
1437            .with_prefetch_hint(Some(428))
1438            .load_and_finish(input, len)
1439            .await
1440            .unwrap();
1441        assert_eq!(actual.file_metadata().schema(), expected);
1442        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1443
1444        let input = MetadataFetchFn(&mut fetch);
1445        let err = ParquetMetaDataReader::new()
1446            .load_and_finish(input, 4)
1447            .await
1448            .unwrap_err()
1449            .to_string();
1450        assert_eq!(err, "EOF: file size of 4 is less than footer");
1451
1452        let input = MetadataFetchFn(&mut fetch);
1453        let err = ParquetMetaDataReader::new()
1454            .load_and_finish(input, 20)
1455            .await
1456            .unwrap_err()
1457            .to_string();
1458        assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
1459    }
1460
1461    #[tokio::test]
1462    async fn test_suffix() {
1463        let mut file = get_test_file("nulls.snappy.parquet");
1464        let mut file2 = file.try_clone().unwrap();
1465
1466        let expected = ParquetMetaDataReader::new()
1467            .parse_and_finish(&file)
1468            .unwrap();
1469        let expected = expected.file_metadata().schema();
1470        let fetch_count = AtomicUsize::new(0);
1471        let suffix_fetch_count = AtomicUsize::new(0);
1472
1473        let mut fetch = |range| {
1474            fetch_count.fetch_add(1, Ordering::SeqCst);
1475            futures::future::ready(read_range(&mut file, range))
1476        };
1477        let mut suffix_fetch = |suffix| {
1478            suffix_fetch_count.fetch_add(1, Ordering::SeqCst);
1479            futures::future::ready(read_suffix(&mut file2, suffix))
1480        };
1481
1482        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1483        let actual = ParquetMetaDataReader::new()
1484            .load_via_suffix_and_finish(input)
1485            .await
1486            .unwrap();
1487        assert_eq!(actual.file_metadata().schema(), expected);
1488        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1489        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1490
1491        // Metadata hint too small - below footer size
1492        fetch_count.store(0, Ordering::SeqCst);
1493        suffix_fetch_count.store(0, Ordering::SeqCst);
1494        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1495        let actual = ParquetMetaDataReader::new()
1496            .with_prefetch_hint(Some(7))
1497            .load_via_suffix_and_finish(input)
1498            .await
1499            .unwrap();
1500        assert_eq!(actual.file_metadata().schema(), expected);
1501        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1502        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1503
1504        // Metadata hint too small
1505        fetch_count.store(0, Ordering::SeqCst);
1506        suffix_fetch_count.store(0, Ordering::SeqCst);
1507        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1508        let actual = ParquetMetaDataReader::new()
1509            .with_prefetch_hint(Some(10))
1510            .load_via_suffix_and_finish(input)
1511            .await
1512            .unwrap();
1513        assert_eq!(actual.file_metadata().schema(), expected);
1514        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1515        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1516
1517        dbg!("test");
1518        // Metadata hint too large
1519        fetch_count.store(0, Ordering::SeqCst);
1520        suffix_fetch_count.store(0, Ordering::SeqCst);
1521        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1522        let actual = ParquetMetaDataReader::new()
1523            .with_prefetch_hint(Some(500))
1524            .load_via_suffix_and_finish(input)
1525            .await
1526            .unwrap();
1527        assert_eq!(actual.file_metadata().schema(), expected);
1528        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1529        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1530
1531        // Metadata hint exactly correct
1532        fetch_count.store(0, Ordering::SeqCst);
1533        suffix_fetch_count.store(0, Ordering::SeqCst);
1534        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1535        let actual = ParquetMetaDataReader::new()
1536            .with_prefetch_hint(Some(428))
1537            .load_via_suffix_and_finish(input)
1538            .await
1539            .unwrap();
1540        assert_eq!(actual.file_metadata().schema(), expected);
1541        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1542        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1543    }
1544
1545    #[cfg(feature = "encryption")]
1546    #[tokio::test]
1547    async fn test_suffix_with_encryption() {
1548        let mut file = get_test_file("uniform_encryption.parquet.encrypted");
1549        let mut file2 = file.try_clone().unwrap();
1550
1551        let mut fetch = |range| futures::future::ready(read_range(&mut file, range));
1552        let mut suffix_fetch = |suffix| futures::future::ready(read_suffix(&mut file2, suffix));
1553
1554        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1555
1556        let key_code: &[u8] = "0123456789012345".as_bytes();
1557        let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec())
1558            .build()
1559            .unwrap();
1560
1561        // just make sure the metadata is properly decrypted and read
1562        let expected = ParquetMetaDataReader::new()
1563            .with_decryption_properties(Some(&decryption_properties))
1564            .load_via_suffix_and_finish(input)
1565            .await
1566            .unwrap();
1567        assert_eq!(expected.num_row_groups(), 1);
1568    }
1569
1570    #[tokio::test]
1571    async fn test_page_index() {
1572        let mut file = get_test_file("alltypes_tiny_pages.parquet");
1573        let len = file.len();
1574        let fetch_count = AtomicUsize::new(0);
1575        let mut fetch = |range| {
1576            fetch_count.fetch_add(1, Ordering::SeqCst);
1577            futures::future::ready(read_range(&mut file, range))
1578        };
1579
1580        let f = MetadataFetchFn(&mut fetch);
1581        let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
1582        loader.try_load(f, len).await.unwrap();
1583        assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
1584        let metadata = loader.finish().unwrap();
1585        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1586
1587        // Prefetch just footer exactly
1588        fetch_count.store(0, Ordering::SeqCst);
1589        let f = MetadataFetchFn(&mut fetch);
1590        let mut loader = ParquetMetaDataReader::new()
1591            .with_page_indexes(true)
1592            .with_prefetch_hint(Some(1729));
1593        loader.try_load(f, len).await.unwrap();
1594        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1595        let metadata = loader.finish().unwrap();
1596        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1597
1598        // Prefetch more than footer but not enough
1599        fetch_count.store(0, Ordering::SeqCst);
1600        let f = MetadataFetchFn(&mut fetch);
1601        let mut loader = ParquetMetaDataReader::new()
1602            .with_page_indexes(true)
1603            .with_prefetch_hint(Some(130649));
1604        loader.try_load(f, len).await.unwrap();
1605        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1606        let metadata = loader.finish().unwrap();
1607        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1608
1609        // Prefetch exactly enough
1610        fetch_count.store(0, Ordering::SeqCst);
1611        let f = MetadataFetchFn(&mut fetch);
1612        let metadata = ParquetMetaDataReader::new()
1613            .with_page_indexes(true)
1614            .with_prefetch_hint(Some(130650))
1615            .load_and_finish(f, len)
1616            .await
1617            .unwrap();
1618        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1619        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1620
1621        // Prefetch more than enough but less than the entire file
1622        fetch_count.store(0, Ordering::SeqCst);
1623        let f = MetadataFetchFn(&mut fetch);
1624        let metadata = ParquetMetaDataReader::new()
1625            .with_page_indexes(true)
1626            .with_prefetch_hint(Some((len - 1000) as usize)) // prefetch entire file
1627            .load_and_finish(f, len)
1628            .await
1629            .unwrap();
1630        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1631        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1632
1633        // Prefetch the entire file
1634        fetch_count.store(0, Ordering::SeqCst);
1635        let f = MetadataFetchFn(&mut fetch);
1636        let metadata = ParquetMetaDataReader::new()
1637            .with_page_indexes(true)
1638            .with_prefetch_hint(Some(len as usize)) // prefetch entire file
1639            .load_and_finish(f, len)
1640            .await
1641            .unwrap();
1642        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1643        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1644
1645        // Prefetch more than the entire file
1646        fetch_count.store(0, Ordering::SeqCst);
1647        let f = MetadataFetchFn(&mut fetch);
1648        let metadata = ParquetMetaDataReader::new()
1649            .with_page_indexes(true)
1650            .with_prefetch_hint(Some((len + 1000) as usize)) // prefetch entire file
1651            .load_and_finish(f, len)
1652            .await
1653            .unwrap();
1654        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1655        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1656    }
1657}