parquet/file/metadata/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#[cfg(feature = "encryption")]
19use crate::encryption::decrypt::FileDecryptionProperties;
20use crate::errors::{ParquetError, Result};
21use crate::file::FOOTER_SIZE;
22use crate::file::metadata::parser::decode_metadata;
23use crate::file::metadata::thrift::parquet_schema_from_bytes;
24use crate::file::metadata::{
25    FooterTail, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataPushDecoder,
26};
27use crate::file::reader::ChunkReader;
28use crate::schema::types::SchemaDescriptor;
29use bytes::Bytes;
30use std::sync::Arc;
31use std::{io::Read, ops::Range};
32
33use crate::DecodeResult;
34#[cfg(all(feature = "async", feature = "arrow"))]
35use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
36
37/// Reads [`ParquetMetaData`] from a byte stream, with either synchronous or
38/// asynchronous I/O.
39///
40/// There are two flavors of APIs:
41/// * Synchronous: [`Self::try_parse()`], [`Self::try_parse_sized()`], [`Self::parse_and_finish()`], etc.
42/// * Asynchronous (requires `async` and `arrow` features): [`Self::try_load()`], etc
43///
44///  See the [`ParquetMetaDataPushDecoder`] for an API that does not require I/O.
45///
46/// # Format Notes
47///
48/// Parquet metadata is not necessarily contiguous in a Parquet file: a portion is stored
49/// in the footer (the last bytes of the file), but other portions (such as the
50/// PageIndex) can be stored elsewhere.
51/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for more details of
52/// Parquet metadata.
53///
54/// This reader handles reading the footer as well as the non contiguous parts
55/// of the metadata (`PageIndex` and `ColumnIndex`). It does not handle reading Bloom Filters.
56///
57/// # Example
58/// ```no_run
59/// # use parquet::file::metadata::ParquetMetaDataReader;
60/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
61/// // read parquet metadata including page indexes from a file
62/// let file = open_parquet_file("some_path.parquet");
63/// let mut reader = ParquetMetaDataReader::new()
64///     .with_page_indexes(true);
65/// reader.try_parse(&file).unwrap();
66/// let metadata = reader.finish().unwrap();
67/// assert!(metadata.column_index().is_some());
68/// assert!(metadata.offset_index().is_some());
69/// ```
70#[derive(Default, Debug)]
71pub struct ParquetMetaDataReader {
72    metadata: Option<ParquetMetaData>,
73    column_index: PageIndexPolicy,
74    offset_index: PageIndexPolicy,
75    prefetch_hint: Option<usize>,
76    metadata_options: Option<Arc<ParquetMetaDataOptions>>,
77    // Size of the serialized thrift metadata plus the 8 byte footer. Only set if
78    // `self.parse_metadata` is called.
79    metadata_size: Option<usize>,
80    #[cfg(feature = "encryption")]
81    file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
82}
83
84/// Describes the policy for reading page indexes
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
86pub enum PageIndexPolicy {
87    /// Do not read the page index.
88    #[default]
89    Skip,
90    /// Read the page index if it exists, otherwise do not error.
91    Optional,
92    /// Require the page index to exist, and error if it does not.
93    Required,
94}
95
96impl From<bool> for PageIndexPolicy {
97    fn from(value: bool) -> Self {
98        match value {
99            true => Self::Required,
100            false => Self::Skip,
101        }
102    }
103}
104
105impl ParquetMetaDataReader {
106    /// Create a new [`ParquetMetaDataReader`]
107    pub fn new() -> Self {
108        Default::default()
109    }
110
111    /// Create a new [`ParquetMetaDataReader`] populated with a [`ParquetMetaData`] struct
112    /// obtained via other means.
113    pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
114        Self {
115            metadata: Some(metadata),
116            ..Default::default()
117        }
118    }
119
120    /// Enable or disable reading the page index structures described in
121    /// "[Parquet page index]: Layout to Support Page Skipping".
122    ///
123    /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
124    #[deprecated(since = "56.1.0", note = "Use `with_page_index_policy` instead")]
125    pub fn with_page_indexes(self, val: bool) -> Self {
126        let policy = PageIndexPolicy::from(val);
127        self.with_column_index_policy(policy)
128            .with_offset_index_policy(policy)
129    }
130
131    /// Enable or disable reading the Parquet [ColumnIndex] structure.
132    ///
133    /// [ColumnIndex]:  https://github.com/apache/parquet-format/blob/master/PageIndex.md
134    #[deprecated(since = "56.1.0", note = "Use `with_column_index_policy` instead")]
135    pub fn with_column_indexes(self, val: bool) -> Self {
136        let policy = PageIndexPolicy::from(val);
137        self.with_column_index_policy(policy)
138    }
139
140    /// Enable or disable reading the Parquet [OffsetIndex] structure.
141    ///
142    /// [OffsetIndex]:  https://github.com/apache/parquet-format/blob/master/PageIndex.md
143    #[deprecated(since = "56.1.0", note = "Use `with_offset_index_policy` instead")]
144    pub fn with_offset_indexes(self, val: bool) -> Self {
145        let policy = PageIndexPolicy::from(val);
146        self.with_offset_index_policy(policy)
147    }
148
149    /// Sets the [`PageIndexPolicy`] for the column and offset indexes
150    pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
151        self.with_column_index_policy(policy)
152            .with_offset_index_policy(policy)
153    }
154
155    /// Sets the [`PageIndexPolicy`] for the column index
156    pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
157        self.column_index = policy;
158        self
159    }
160
161    /// Sets the [`PageIndexPolicy`] for the offset index
162    pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
163        self.offset_index = policy;
164        self
165    }
166
167    /// Sets the [`ParquetMetaDataOptions`] to use when decoding
168    pub fn with_metadata_options(mut self, options: Option<ParquetMetaDataOptions>) -> Self {
169        self.metadata_options = options.map(Arc::new);
170        self
171    }
172
173    /// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`].
174    /// Only used for the asynchronous [`Self::try_load()`] method.
175    ///
176    /// By default, the reader will first fetch the last 8 bytes of the input file to obtain the
177    /// size of the footer metadata. A second fetch will be performed to obtain the needed bytes.
178    /// After parsing the footer metadata, a third fetch will be performed to obtain the bytes
179    /// needed to decode the page index structures, if they have been requested. To avoid
180    /// unnecessary fetches, `prefetch` can be set to an estimate of the number of bytes needed
181    /// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and
182    /// reduce latency. Setting `prefetch` too small will not trigger an error, but will result
183    /// in extra fetches being performed.
184    pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
185        self.prefetch_hint = prefetch;
186        self
187    }
188
189    /// Provide the FileDecryptionProperties to use when decrypting the file.
190    ///
191    /// This is only necessary when the file is encrypted.
192    #[cfg(feature = "encryption")]
193    pub fn with_decryption_properties(
194        mut self,
195        properties: Option<std::sync::Arc<FileDecryptionProperties>>,
196    ) -> Self {
197        self.file_decryption_properties = properties;
198        self
199    }
200
201    /// Indicates whether this reader has a [`ParquetMetaData`] internally.
202    pub fn has_metadata(&self) -> bool {
203        self.metadata.is_some()
204    }
205
206    /// Return the parsed [`ParquetMetaData`] struct, leaving `None` in its place.
207    pub fn finish(&mut self) -> Result<ParquetMetaData> {
208        self.metadata
209            .take()
210            .ok_or_else(|| general_err!("could not parse parquet metadata"))
211    }
212
213    /// Given a [`ChunkReader`], parse and return the [`ParquetMetaData`] in a single pass.
214    ///
215    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
216    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
217    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
218    ///
219    /// This call will consume `self`.
220    ///
221    /// # Example
222    /// ```no_run
223    /// # use parquet::file::metadata::ParquetMetaDataReader;
224    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
225    /// // read parquet metadata including page indexes
226    /// let file = open_parquet_file("some_path.parquet");
227    /// let metadata = ParquetMetaDataReader::new()
228    ///     .with_page_indexes(true)
229    ///     .parse_and_finish(&file).unwrap();
230    /// ```
231    pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
232        self.try_parse(reader)?;
233        self.finish()
234    }
235
236    /// Attempts to parse the footer metadata (and optionally page indexes) given a [`ChunkReader`].
237    ///
238    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
239    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
240    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
241    pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
242        self.try_parse_sized(reader, reader.len())
243    }
244
245    /// Same as [`Self::try_parse()`], but provide the original file size in the case that `reader`
246    /// is a [`Bytes`] struct that does not contain the entire file. This information is necessary
247    /// when the page indexes are desired. `reader` must have access to the Parquet footer.
248    ///
249    /// Using this function also allows for retrying with a larger buffer.
250    ///
251    /// # Errors
252    ///
253    /// This function will return [`ParquetError::NeedMoreData`] in the event `reader` does not
254    /// provide enough data to fully parse the metadata (see example below). The returned error
255    /// will be populated with a `usize` field indicating the number of bytes required from the
256    /// tail of the file to completely parse the requested metadata.
257    ///
258    /// Other errors returned include [`ParquetError::General`] and [`ParquetError::EOF`].
259    ///
260    /// # Example
261    /// ```no_run
262    /// # use parquet::file::metadata::ParquetMetaDataReader;
263    /// # use parquet::errors::ParquetError;
264    /// # use crate::parquet::file::reader::Length;
265    /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<u64>) -> bytes::Bytes { unimplemented!(); }
266    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
267    /// let file = open_parquet_file("some_path.parquet");
268    /// let len = file.len();
269    /// // Speculatively read 1 kilobyte from the end of the file
270    /// let bytes = get_bytes(&file, len - 1024..len);
271    /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
272    /// match reader.try_parse_sized(&bytes, len) {
273    ///     Ok(_) => (),
274    ///     Err(ParquetError::NeedMoreData(needed)) => {
275    ///         // Read the needed number of bytes from the end of the file
276    ///         let bytes = get_bytes(&file, len - needed as u64..len);
277    ///         reader.try_parse_sized(&bytes, len).unwrap();
278    ///     }
279    ///     _ => panic!("unexpected error")
280    /// }
281    /// let metadata = reader.finish().unwrap();
282    /// ```
283    ///
284    /// Note that it is possible for the file metadata to be completely read, but there are
285    /// insufficient bytes available to read the page indexes. [`Self::has_metadata()`] can be used
286    /// to test for this. In the event the file metadata is present, re-parsing of the file
287    /// metadata can be skipped by using [`Self::read_page_indexes_sized()`], as shown below.
288    /// ```no_run
289    /// # use parquet::file::metadata::ParquetMetaDataReader;
290    /// # use parquet::errors::ParquetError;
291    /// # use crate::parquet::file::reader::Length;
292    /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<u64>) -> bytes::Bytes { unimplemented!(); }
293    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
294    /// let file = open_parquet_file("some_path.parquet");
295    /// let len = file.len();
296    /// // Speculatively read 1 kilobyte from the end of the file
297    /// let mut bytes = get_bytes(&file, len - 1024..len);
298    /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
299    /// // Loop until `bytes` is large enough
300    /// loop {
301    ///     match reader.try_parse_sized(&bytes, len) {
302    ///         Ok(_) => break,
303    ///         Err(ParquetError::NeedMoreData(needed)) => {
304    ///             // Read the needed number of bytes from the end of the file
305    ///             bytes = get_bytes(&file, len - needed as u64..len);
306    ///             // If file metadata was read only read page indexes, otherwise continue loop
307    ///             if reader.has_metadata() {
308    ///                 reader.read_page_indexes_sized(&bytes, len).unwrap();
309    ///                 break;
310    ///             }
311    ///         }
312    ///         _ => panic!("unexpected error")
313    ///     }
314    /// }
315    /// let metadata = reader.finish().unwrap();
316    /// ```
317    pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: u64) -> Result<()> {
318        self.metadata = match self.parse_metadata(reader) {
319            Ok(metadata) => Some(metadata),
320            Err(ParquetError::NeedMoreData(needed)) => {
321                // If reader is the same length as `file_size` then presumably there is no more to
322                // read, so return an EOF error.
323                if file_size == reader.len() || needed as u64 > file_size {
324                    return Err(eof_err!(
325                        "Parquet file too small. Size is {} but need {}",
326                        file_size,
327                        needed
328                    ));
329                } else {
330                    // Ask for a larger buffer
331                    return Err(ParquetError::NeedMoreData(needed));
332                }
333            }
334            Err(e) => return Err(e),
335        };
336
337        // we can return if page indexes aren't requested
338        if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
339        {
340            return Ok(());
341        }
342
343        self.read_page_indexes_sized(reader, file_size)
344    }
345
346    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
347    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`].
348    pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
349        self.read_page_indexes_sized(reader, reader.len())
350    }
351
352    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
353    /// This variant is used when `reader` cannot access the entire Parquet file (e.g. it is
354    /// a [`Bytes`] struct containing the tail of the file).
355    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`]. Like
356    /// [`Self::try_parse_sized()`] this function may return [`ParquetError::NeedMoreData`].
357    pub fn read_page_indexes_sized<R: ChunkReader>(
358        &mut self,
359        reader: &R,
360        file_size: u64,
361    ) -> Result<()> {
362        let Some(metadata) = self.metadata.take() else {
363            return Err(general_err!(
364                "Tried to read page indexes without ParquetMetaData metadata"
365            ));
366        };
367
368        let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
369            .with_offset_index_policy(self.offset_index)
370            .with_column_index_policy(self.column_index)
371            .with_metadata_options(self.metadata_options.clone());
372        let mut push_decoder = self.prepare_push_decoder(push_decoder);
373
374        // Get bounds needed for page indexes (if any are present in the file).
375        let range = match needs_index_data(&mut push_decoder)? {
376            NeedsIndexData::No(metadata) => {
377                self.metadata = Some(metadata);
378                return Ok(());
379            }
380            NeedsIndexData::Yes(range) => range,
381        };
382
383        // Check to see if needed range is within `file_range`. Checking `range.end` seems
384        // redundant, but it guards against `range_for_page_index()` returning garbage.
385        let file_range = file_size.saturating_sub(reader.len())..file_size;
386        if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
387            // Requested range starts beyond EOF
388            if range.end > file_size {
389                return Err(eof_err!(
390                    "Parquet file too small. Range {range:?} is beyond file bounds {file_size}",
391                ));
392            } else {
393                // Ask for a larger buffer
394                return Err(ParquetError::NeedMoreData(
395                    (file_size - range.start).try_into()?,
396                ));
397            }
398        }
399
400        // Perform extra sanity check to make sure `range` and the footer metadata don't
401        // overlap.
402        if let Some(metadata_size) = self.metadata_size {
403            let metadata_range = file_size.saturating_sub(metadata_size as u64)..file_size;
404            if range.end > metadata_range.start {
405                return Err(eof_err!(
406                    "Parquet file too small. Page index range {range:?} overlaps with file metadata {metadata_range:?}",
407                ));
408            }
409        }
410
411        // add the needed ranges to the decoder
412        let bytes_needed = usize::try_from(range.end - range.start)?;
413        let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?;
414
415        push_decoder.push_range(range, bytes)?;
416        let metadata = parse_index_data(&mut push_decoder)?;
417        self.metadata = Some(metadata);
418
419        Ok(())
420    }
421
422    /// Given a [`MetadataFetch`], parse and return the [`ParquetMetaData`] in a single pass.
423    ///
424    /// This call will consume `self`.
425    ///
426    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
427    /// performed by this function.
428    #[cfg(all(feature = "async", feature = "arrow"))]
429    pub async fn load_and_finish<F: MetadataFetch>(
430        mut self,
431        fetch: F,
432        file_size: u64,
433    ) -> Result<ParquetMetaData> {
434        self.try_load(fetch, file_size).await?;
435        self.finish()
436    }
437
438    /// Given a [`MetadataSuffixFetch`], parse and return the [`ParquetMetaData`] in a single pass.
439    ///
440    /// This call will consume `self`.
441    ///
442    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
443    /// performed by this function.
444    #[cfg(all(feature = "async", feature = "arrow"))]
445    pub async fn load_via_suffix_and_finish<F: MetadataSuffixFetch>(
446        mut self,
447        fetch: F,
448    ) -> Result<ParquetMetaData> {
449        self.try_load_via_suffix(fetch).await?;
450        self.finish()
451    }
452    /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
453    /// given a [`MetadataFetch`].
454    ///
455    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
456    /// performed by this function.
457    #[cfg(all(feature = "async", feature = "arrow"))]
458    pub async fn try_load<F: MetadataFetch>(&mut self, mut fetch: F, file_size: u64) -> Result<()> {
459        let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
460
461        self.metadata = Some(metadata);
462
463        // we can return if page indexes aren't requested
464        if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
465        {
466            return Ok(());
467        }
468
469        self.load_page_index_with_remainder(fetch, remainder).await
470    }
471
472    /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
473    /// given a [`MetadataSuffixFetch`].
474    ///
475    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
476    /// performed by this function.
477    #[cfg(all(feature = "async", feature = "arrow"))]
478    pub async fn try_load_via_suffix<F: MetadataSuffixFetch>(
479        &mut self,
480        mut fetch: F,
481    ) -> Result<()> {
482        let (metadata, remainder) = self.load_metadata_via_suffix(&mut fetch).await?;
483
484        self.metadata = Some(metadata);
485
486        // we can return if page indexes aren't requested
487        if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
488        {
489            return Ok(());
490        }
491
492        self.load_page_index_with_remainder(fetch, remainder).await
493    }
494
495    /// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
496    /// been obtained. See [`Self::new_with_metadata()`].
497    #[cfg(all(feature = "async", feature = "arrow"))]
498    pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
499        self.load_page_index_with_remainder(fetch, None).await
500    }
501
502    #[cfg(all(feature = "async", feature = "arrow"))]
503    async fn load_page_index_with_remainder<F: MetadataFetch>(
504        &mut self,
505        mut fetch: F,
506        remainder: Option<(usize, Bytes)>,
507    ) -> Result<()> {
508        let Some(metadata) = self.metadata.take() else {
509            return Err(general_err!("Footer metadata is not present"));
510        };
511
512        // in this case we don't actually know what the file size is, so just use u64::MAX
513        // this is ok since the offsets in the metadata are always valid
514        let file_size = u64::MAX;
515        let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
516            .with_offset_index_policy(self.offset_index)
517            .with_column_index_policy(self.column_index)
518            .with_metadata_options(self.metadata_options.clone());
519        let mut push_decoder = self.prepare_push_decoder(push_decoder);
520
521        // Get bounds needed for page indexes (if any are present in the file).
522        let range = match needs_index_data(&mut push_decoder)? {
523            NeedsIndexData::No(metadata) => {
524                self.metadata = Some(metadata);
525                return Ok(());
526            }
527            NeedsIndexData::Yes(range) => range,
528        };
529
530        let bytes = match &remainder {
531            Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => {
532                let remainder_start = *remainder_start as u64;
533                let offset = usize::try_from(range.start - remainder_start)?;
534                let end = usize::try_from(range.end - remainder_start)?;
535                assert!(end <= remainder.len());
536                remainder.slice(offset..end)
537            }
538            // Note: this will potentially fetch data already in remainder, this keeps things simple
539            _ => fetch.fetch(range.start..range.end).await?,
540        };
541
542        // Sanity check
543        assert_eq!(bytes.len() as u64, range.end - range.start);
544        push_decoder.push_range(range.clone(), bytes)?;
545        let metadata = parse_index_data(&mut push_decoder)?;
546        self.metadata = Some(metadata);
547        Ok(())
548    }
549
550    // One-shot parse of footer.
551    // Side effect: this will set `self.metadata_size`
552    fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
553        // check file is large enough to hold footer
554        let file_size = chunk_reader.len();
555        if file_size < (FOOTER_SIZE as u64) {
556            return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
557        }
558
559        let mut footer = [0_u8; FOOTER_SIZE];
560        chunk_reader
561            .get_read(file_size - FOOTER_SIZE as u64)?
562            .read_exact(&mut footer)?;
563
564        let footer = FooterTail::try_new(&footer)?;
565        let metadata_len = footer.metadata_length();
566        let footer_metadata_len = FOOTER_SIZE + metadata_len;
567        self.metadata_size = Some(footer_metadata_len);
568
569        if footer_metadata_len as u64 > file_size {
570            return Err(ParquetError::NeedMoreData(footer_metadata_len));
571        }
572
573        let start = file_size - footer_metadata_len as u64;
574        let bytes = chunk_reader.get_bytes(start, metadata_len)?;
575        self.decode_footer_metadata(bytes, file_size, footer)
576    }
577
578    /// Size of the serialized thrift metadata plus the 8 byte footer. Only set if
579    /// `self.parse_metadata` is called.
580    pub fn metadata_size(&self) -> Option<usize> {
581        self.metadata_size
582    }
583
584    /// Return the number of bytes to read in the initial pass. If `prefetch_size` has
585    /// been provided, then return that value if it is larger than the size of the Parquet
586    /// file footer (8 bytes). Otherwise returns `8`.
587    #[cfg(all(feature = "async", feature = "arrow"))]
588    fn get_prefetch_size(&self) -> usize {
589        if let Some(prefetch) = self.prefetch_hint {
590            if prefetch > FOOTER_SIZE {
591                return prefetch;
592            }
593        }
594        FOOTER_SIZE
595    }
596
597    #[cfg(all(feature = "async", feature = "arrow"))]
598    async fn load_metadata<F: MetadataFetch>(
599        &self,
600        fetch: &mut F,
601        file_size: u64,
602    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
603        let prefetch = self.get_prefetch_size() as u64;
604
605        if file_size < FOOTER_SIZE as u64 {
606            return Err(eof_err!("file size of {} is less than footer", file_size));
607        }
608
609        // If a size hint is provided, read more than the minimum size
610        // to try and avoid a second fetch.
611        // Note: prefetch > file_size is ok since we're using saturating_sub.
612        let footer_start = file_size.saturating_sub(prefetch);
613
614        let suffix = fetch.fetch(footer_start..file_size).await?;
615        let suffix_len = suffix.len();
616        let fetch_len = (file_size - footer_start)
617            .try_into()
618            .expect("footer size should never be larger than u32");
619        if suffix_len < fetch_len {
620            return Err(eof_err!(
621                "metadata requires {} bytes, but could only read {}",
622                fetch_len,
623                suffix_len
624            ));
625        }
626
627        let mut footer = [0; FOOTER_SIZE];
628        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
629
630        let footer = FooterTail::try_new(&footer)?;
631        let length = footer.metadata_length();
632
633        if file_size < (length + FOOTER_SIZE) as u64 {
634            return Err(eof_err!(
635                "file size of {} is less than footer + metadata {}",
636                file_size,
637                length + FOOTER_SIZE
638            ));
639        }
640
641        // Did not fetch the entire file metadata in the initial read, need to make a second request
642        if length > suffix_len - FOOTER_SIZE {
643            let metadata_start = file_size - (length + FOOTER_SIZE) as u64;
644            let meta = fetch
645                .fetch(metadata_start..(file_size - FOOTER_SIZE as u64))
646                .await?;
647            Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
648        } else {
649            let metadata_start = (file_size - (length + FOOTER_SIZE) as u64 - footer_start)
650                .try_into()
651                .expect("metadata length should never be larger than u32");
652            let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
653            Ok((
654                self.decode_footer_metadata(slice, file_size, footer)?,
655                Some((footer_start as usize, suffix.slice(..metadata_start))),
656            ))
657        }
658    }
659
660    #[cfg(all(feature = "async", feature = "arrow"))]
661    async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
662        &self,
663        fetch: &mut F,
664    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
665        let prefetch = self.get_prefetch_size();
666
667        let suffix = fetch.fetch_suffix(prefetch as _).await?;
668        let suffix_len = suffix.len();
669
670        if suffix_len < FOOTER_SIZE {
671            return Err(eof_err!(
672                "footer metadata requires {} bytes, but could only read {}",
673                FOOTER_SIZE,
674                suffix_len
675            ));
676        }
677
678        let mut footer = [0; FOOTER_SIZE];
679        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
680
681        let footer = FooterTail::try_new(&footer)?;
682        let length = footer.metadata_length();
683        // fake file size as we are only parsing the footer metadata here
684        // (cant be parsing page indexes without the full file size)
685        let file_size = (length + FOOTER_SIZE) as u64;
686
687        // Did not fetch the entire file metadata in the initial read, need to make a second request
688        let metadata_offset = length + FOOTER_SIZE;
689        if length > suffix_len - FOOTER_SIZE {
690            let meta = fetch.fetch_suffix(metadata_offset).await?;
691
692            if meta.len() < metadata_offset {
693                return Err(eof_err!(
694                    "metadata requires {} bytes, but could only read {}",
695                    metadata_offset,
696                    meta.len()
697                ));
698            }
699
700            // need to slice off the footer or decryption fails
701            let meta = meta.slice(0..length);
702            Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
703        } else {
704            let metadata_start = suffix_len - metadata_offset;
705            let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
706            Ok((
707                self.decode_footer_metadata(slice, file_size, footer)?,
708                Some((0, suffix.slice(..metadata_start))),
709            ))
710        }
711    }
712
713    /// Decodes a [`FooterTail`] from the provided 8-byte slice.
714    #[deprecated(since = "57.0.0", note = "Use FooterTail::try_from instead")]
715    pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result<FooterTail> {
716        FooterTail::try_new(slice)
717    }
718
719    /// Decodes the Parquet footer, returning the metadata length in bytes
720    #[deprecated(since = "54.3.0", note = "Use decode_footer_tail instead")]
721    pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
722        FooterTail::try_new(slice).map(|f| f.metadata_length())
723    }
724
725    /// Decodes [`ParquetMetaData`] from the provided bytes.
726    ///
727    /// Typically, this is used to decode the metadata from the end of a parquet
728    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
729    /// by the [Parquet Spec].
730    ///
731    /// It does **NOT** include the 8-byte footer.
732    ///
733    /// This method handles using either `decode_metadata` or
734    /// `decode_metadata_with_encryption` depending on whether the encryption
735    /// feature is enabled.
736    ///
737    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
738    pub(crate) fn decode_footer_metadata(
739        &self,
740        buf: Bytes,
741        file_size: u64,
742        footer_tail: FooterTail,
743    ) -> Result<ParquetMetaData> {
744        // The push decoder expects the metadata to be at the end of the file
745        // (... data ...) + (metadata) + (footer)
746        // so we need to provide the starting offset of the metadata
747        // within the file.
748        let ending_offset = file_size.checked_sub(FOOTER_SIZE as u64).ok_or_else(|| {
749            general_err!(
750                "file size {file_size} is smaller than footer size {}",
751                FOOTER_SIZE
752            )
753        })?;
754
755        let starting_offset = ending_offset.checked_sub(buf.len() as u64).ok_or_else(|| {
756            general_err!(
757                "file size {file_size} is smaller than buffer size {} + footer size {}",
758                buf.len(),
759                FOOTER_SIZE
760            )
761        })?;
762
763        let range = starting_offset..ending_offset;
764
765        let push_decoder =
766            ParquetMetaDataPushDecoder::try_new_with_footer_tail(file_size, footer_tail)?
767                // NOTE: DO NOT enable page indexes here, they are handled separately
768                .with_page_index_policy(PageIndexPolicy::Skip)
769                .with_metadata_options(self.metadata_options.clone());
770
771        let mut push_decoder = self.prepare_push_decoder(push_decoder);
772        push_decoder.push_range(range, buf)?;
773        match push_decoder.try_decode()? {
774            DecodeResult::Data(metadata) => Ok(metadata),
775            DecodeResult::Finished => Err(general_err!(
776                "could not parse parquet metadata -- previously finished"
777            )),
778            DecodeResult::NeedsData(ranges) => Err(general_err!(
779                "could not parse parquet metadata, needs ranges {:?}",
780                ranges
781            )),
782        }
783    }
784
785    /// Prepares a push decoder and runs it to decode the metadata.
786    #[cfg(feature = "encryption")]
787    fn prepare_push_decoder(
788        &self,
789        push_decoder: ParquetMetaDataPushDecoder,
790    ) -> ParquetMetaDataPushDecoder {
791        push_decoder.with_file_decryption_properties(
792            self.file_decryption_properties
793                .as_ref()
794                .map(std::sync::Arc::clone),
795        )
796    }
797    #[cfg(not(feature = "encryption"))]
798    fn prepare_push_decoder(
799        &self,
800        push_decoder: ParquetMetaDataPushDecoder,
801    ) -> ParquetMetaDataPushDecoder {
802        push_decoder
803    }
804
805    /// Decodes [`ParquetMetaData`] from the provided bytes.
806    ///
807    /// Typically this is used to decode the metadata from the end of a parquet
808    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
809    /// by the [Parquet Spec].
810    ///
811    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
812    pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
813        decode_metadata(buf, None)
814    }
815
816    /// Decodes [`ParquetMetaData`] from the provided bytes.
817    ///
818    /// Like [`Self::decode_metadata`] but this also accepts
819    /// metadata parsing options.
820    pub fn decode_metadata_with_options(
821        buf: &[u8],
822        options: Option<&ParquetMetaDataOptions>,
823    ) -> Result<ParquetMetaData> {
824        decode_metadata(buf, options)
825    }
826
827    /// Decodes the schema from the Parquet footer in `buf`. Returned as
828    /// a [`SchemaDescriptor`].
829    pub fn decode_schema(buf: &[u8]) -> Result<Arc<SchemaDescriptor>> {
830        Ok(Arc::new(parquet_schema_from_bytes(buf)?))
831    }
832}
833
834/// The bounds needed to read page indexes
835// this is an internal enum, so it is ok to allow differences in enum size
836#[allow(clippy::large_enum_variant)]
837enum NeedsIndexData {
838    /// no additional data is needed (e.g. the indexes weren't requested)
839    No(ParquetMetaData),
840    /// Additional data is needed, with the range that are required
841    Yes(Range<u64>),
842}
843
844/// Determines a single combined range of bytes needed to read the page indexes,
845/// or returns the metadata if no additional data is needed (e.g. if no page indexes are requested)
846fn needs_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> Result<NeedsIndexData> {
847    match push_decoder.try_decode()? {
848        DecodeResult::NeedsData(ranges) => {
849            let range = ranges
850                .into_iter()
851                .reduce(|a, b| a.start.min(b.start)..a.end.max(b.end))
852                .ok_or_else(|| general_err!("Internal error: no ranges provided"))?;
853            Ok(NeedsIndexData::Yes(range))
854        }
855        DecodeResult::Data(metadata) => Ok(NeedsIndexData::No(metadata)),
856        DecodeResult::Finished => Err(general_err!("Internal error: decoder was finished")),
857    }
858}
859
860/// Given a push decoder that has had the needed ranges pushed to it,
861/// attempt to decode indexes and return the updated metadata.
862fn parse_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> Result<ParquetMetaData> {
863    match push_decoder.try_decode()? {
864        DecodeResult::NeedsData(_) => Err(general_err!(
865            "Internal error: decoder still needs data after reading required range"
866        )),
867        DecodeResult::Data(metadata) => Ok(metadata),
868        DecodeResult::Finished => Err(general_err!("Internal error: decoder was finished")),
869    }
870}
871
872#[cfg(test)]
873mod tests {
874    use super::*;
875    use crate::file::reader::Length;
876    use crate::util::test_common::file_util::get_test_file;
877    use std::ops::Range;
878
879    #[test]
880    fn test_parse_metadata_size_smaller_than_footer() {
881        let test_file = tempfile::tempfile().unwrap();
882        let err = ParquetMetaDataReader::new()
883            .parse_metadata(&test_file)
884            .unwrap_err();
885        assert!(matches!(err, ParquetError::NeedMoreData(FOOTER_SIZE)));
886    }
887
888    #[test]
889    fn test_parse_metadata_corrupt_footer() {
890        let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
891        let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
892        assert_eq!(
893            reader_result.unwrap_err().to_string(),
894            "Parquet error: Invalid Parquet file. Corrupt footer"
895        );
896    }
897
898    #[test]
899    fn test_parse_metadata_invalid_start() {
900        let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
901        let err = ParquetMetaDataReader::new()
902            .parse_metadata(&test_file)
903            .unwrap_err();
904        assert!(matches!(err, ParquetError::NeedMoreData(263)));
905    }
906
907    #[test]
908    #[allow(deprecated)]
909    fn test_try_parse() {
910        let file = get_test_file("alltypes_tiny_pages.parquet");
911        let len = file.len();
912
913        let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
914
915        let bytes_for_range = |range: Range<u64>| {
916            file.get_bytes(range.start, (range.end - range.start).try_into().unwrap())
917                .unwrap()
918        };
919
920        // read entire file
921        let bytes = bytes_for_range(0..len);
922        reader.try_parse(&bytes).unwrap();
923        let metadata = reader.finish().unwrap();
924        assert!(metadata.column_index.is_some());
925        assert!(metadata.offset_index.is_some());
926
927        // read more than enough of file
928        let bytes = bytes_for_range(320000..len);
929        reader.try_parse_sized(&bytes, len).unwrap();
930        let metadata = reader.finish().unwrap();
931        assert!(metadata.column_index.is_some());
932        assert!(metadata.offset_index.is_some());
933
934        // exactly enough
935        let bytes = bytes_for_range(323583..len);
936        reader.try_parse_sized(&bytes, len).unwrap();
937        let metadata = reader.finish().unwrap();
938        assert!(metadata.column_index.is_some());
939        assert!(metadata.offset_index.is_some());
940
941        // not enough for page index
942        let bytes = bytes_for_range(323584..len);
943        // should fail
944        match reader.try_parse_sized(&bytes, len).unwrap_err() {
945            // expected error, try again with provided bounds
946            ParquetError::NeedMoreData(needed) => {
947                let bytes = bytes_for_range(len - needed as u64..len);
948                reader.try_parse_sized(&bytes, len).unwrap();
949                let metadata = reader.finish().unwrap();
950                assert!(metadata.column_index.is_some());
951                assert!(metadata.offset_index.is_some());
952            }
953            _ => panic!("unexpected error"),
954        };
955
956        // not enough for file metadata, but keep trying until page indexes are read
957        let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
958        let mut bytes = bytes_for_range(452505..len);
959        loop {
960            match reader.try_parse_sized(&bytes, len) {
961                Ok(_) => break,
962                Err(ParquetError::NeedMoreData(needed)) => {
963                    bytes = bytes_for_range(len - needed as u64..len);
964                    if reader.has_metadata() {
965                        reader.read_page_indexes_sized(&bytes, len).unwrap();
966                        break;
967                    }
968                }
969                _ => panic!("unexpected error"),
970            }
971        }
972        let metadata = reader.finish().unwrap();
973        assert!(metadata.column_index.is_some());
974        assert!(metadata.offset_index.is_some());
975
976        // not enough for page index but lie about file size
977        let bytes = bytes_for_range(323584..len);
978        let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
979        assert_eq!(
980            reader_result.to_string(),
981            "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
982        );
983
984        // not enough for file metadata
985        let mut reader = ParquetMetaDataReader::new();
986        let bytes = bytes_for_range(452505..len);
987        // should fail
988        match reader.try_parse_sized(&bytes, len).unwrap_err() {
989            // expected error, try again with provided bounds
990            ParquetError::NeedMoreData(needed) => {
991                let bytes = bytes_for_range(len - needed as u64..len);
992                reader.try_parse_sized(&bytes, len).unwrap();
993                reader.finish().unwrap();
994            }
995            _ => panic!("unexpected error"),
996        };
997
998        // not enough for file metadata but use try_parse()
999        let reader_result = reader.try_parse(&bytes).unwrap_err();
1000        assert_eq!(
1001            reader_result.to_string(),
1002            "EOF: Parquet file too small. Size is 1728 but need 1729"
1003        );
1004
1005        // read head of file rather than tail
1006        let bytes = bytes_for_range(0..1000);
1007        let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
1008        assert_eq!(
1009            reader_result.to_string(),
1010            "Parquet error: Invalid Parquet file. Corrupt footer"
1011        );
1012
1013        // lie about file size
1014        let bytes = bytes_for_range(452510..len);
1015        let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
1016        assert_eq!(
1017            reader_result.to_string(),
1018            "EOF: Parquet file too small. Size is 1728 but need 1729"
1019        );
1020    }
1021}
1022
1023#[cfg(all(feature = "async", feature = "arrow", test))]
1024mod async_tests {
1025    use super::*;
1026
1027    use arrow::{array::Int32Array, datatypes::DataType};
1028    use arrow_array::RecordBatch;
1029    use arrow_schema::{Field, Schema};
1030    use bytes::Bytes;
1031    use futures::FutureExt;
1032    use futures::future::BoxFuture;
1033    use std::fs::File;
1034    use std::future::Future;
1035    use std::io::{Read, Seek, SeekFrom};
1036    use std::ops::Range;
1037    use std::sync::Arc;
1038    use std::sync::atomic::{AtomicUsize, Ordering};
1039    use tempfile::NamedTempFile;
1040
1041    use crate::arrow::ArrowWriter;
1042    use crate::file::properties::WriterProperties;
1043    use crate::file::reader::Length;
1044    use crate::util::test_common::file_util::get_test_file;
1045
1046    struct MetadataFetchFn<F>(F);
1047
1048    impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
1049    where
1050        F: FnMut(Range<u64>) -> Fut + Send,
1051        Fut: Future<Output = Result<Bytes>> + Send,
1052    {
1053        fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1054            async move { self.0(range).await }.boxed()
1055        }
1056    }
1057
1058    struct MetadataSuffixFetchFn<F1, F2>(F1, F2);
1059
1060    impl<F1, Fut, F2> MetadataFetch for MetadataSuffixFetchFn<F1, F2>
1061    where
1062        F1: FnMut(Range<u64>) -> Fut + Send,
1063        Fut: Future<Output = Result<Bytes>> + Send,
1064        F2: Send,
1065    {
1066        fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1067            async move { self.0(range).await }.boxed()
1068        }
1069    }
1070
1071    impl<F1, Fut, F2> MetadataSuffixFetch for MetadataSuffixFetchFn<F1, F2>
1072    where
1073        F1: FnMut(Range<u64>) -> Fut + Send,
1074        F2: FnMut(usize) -> Fut + Send,
1075        Fut: Future<Output = Result<Bytes>> + Send,
1076    {
1077        fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
1078            async move { self.1(suffix).await }.boxed()
1079        }
1080    }
1081
1082    fn read_range(file: &mut File, range: Range<u64>) -> Result<Bytes> {
1083        file.seek(SeekFrom::Start(range.start as _))?;
1084        let len = range.end - range.start;
1085        let mut buf = Vec::with_capacity(len.try_into().unwrap());
1086        file.take(len as _).read_to_end(&mut buf)?;
1087        Ok(buf.into())
1088    }
1089
1090    fn read_suffix(file: &mut File, suffix: usize) -> Result<Bytes> {
1091        let file_len = file.len();
1092        // Don't seek before beginning of file
1093        file.seek(SeekFrom::End(0 - suffix.min(file_len as _) as i64))?;
1094        let mut buf = Vec::with_capacity(suffix);
1095        file.take(suffix as _).read_to_end(&mut buf)?;
1096        Ok(buf.into())
1097    }
1098
1099    #[tokio::test]
1100    async fn test_simple() {
1101        let mut file = get_test_file("nulls.snappy.parquet");
1102        let len = file.len();
1103
1104        let expected = ParquetMetaDataReader::new()
1105            .parse_and_finish(&file)
1106            .unwrap();
1107        let expected = expected.file_metadata().schema();
1108        let fetch_count = AtomicUsize::new(0);
1109
1110        let mut fetch = |range| {
1111            fetch_count.fetch_add(1, Ordering::SeqCst);
1112            futures::future::ready(read_range(&mut file, range))
1113        };
1114
1115        let input = MetadataFetchFn(&mut fetch);
1116        let actual = ParquetMetaDataReader::new()
1117            .load_and_finish(input, len)
1118            .await
1119            .unwrap();
1120        assert_eq!(actual.file_metadata().schema(), expected);
1121        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1122
1123        // Metadata hint too small - below footer size
1124        fetch_count.store(0, Ordering::SeqCst);
1125        let input = MetadataFetchFn(&mut fetch);
1126        let actual = ParquetMetaDataReader::new()
1127            .with_prefetch_hint(Some(7))
1128            .load_and_finish(input, len)
1129            .await
1130            .unwrap();
1131        assert_eq!(actual.file_metadata().schema(), expected);
1132        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1133
1134        // Metadata hint too small
1135        fetch_count.store(0, Ordering::SeqCst);
1136        let input = MetadataFetchFn(&mut fetch);
1137        let actual = ParquetMetaDataReader::new()
1138            .with_prefetch_hint(Some(10))
1139            .load_and_finish(input, len)
1140            .await
1141            .unwrap();
1142        assert_eq!(actual.file_metadata().schema(), expected);
1143        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1144
1145        // Metadata hint too large
1146        fetch_count.store(0, Ordering::SeqCst);
1147        let input = MetadataFetchFn(&mut fetch);
1148        let actual = ParquetMetaDataReader::new()
1149            .with_prefetch_hint(Some(500))
1150            .load_and_finish(input, len)
1151            .await
1152            .unwrap();
1153        assert_eq!(actual.file_metadata().schema(), expected);
1154        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1155
1156        // Metadata hint exactly correct
1157        fetch_count.store(0, Ordering::SeqCst);
1158        let input = MetadataFetchFn(&mut fetch);
1159        let actual = ParquetMetaDataReader::new()
1160            .with_prefetch_hint(Some(428))
1161            .load_and_finish(input, len)
1162            .await
1163            .unwrap();
1164        assert_eq!(actual.file_metadata().schema(), expected);
1165        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1166
1167        let input = MetadataFetchFn(&mut fetch);
1168        let err = ParquetMetaDataReader::new()
1169            .load_and_finish(input, 4)
1170            .await
1171            .unwrap_err()
1172            .to_string();
1173        assert_eq!(err, "EOF: file size of 4 is less than footer");
1174
1175        let input = MetadataFetchFn(&mut fetch);
1176        let err = ParquetMetaDataReader::new()
1177            .load_and_finish(input, 20)
1178            .await
1179            .unwrap_err()
1180            .to_string();
1181        assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
1182    }
1183
1184    #[tokio::test]
1185    async fn test_suffix() {
1186        let mut file = get_test_file("nulls.snappy.parquet");
1187        let mut file2 = file.try_clone().unwrap();
1188
1189        let expected = ParquetMetaDataReader::new()
1190            .parse_and_finish(&file)
1191            .unwrap();
1192        let expected = expected.file_metadata().schema();
1193        let fetch_count = AtomicUsize::new(0);
1194        let suffix_fetch_count = AtomicUsize::new(0);
1195
1196        let mut fetch = |range| {
1197            fetch_count.fetch_add(1, Ordering::SeqCst);
1198            futures::future::ready(read_range(&mut file, range))
1199        };
1200        let mut suffix_fetch = |suffix| {
1201            suffix_fetch_count.fetch_add(1, Ordering::SeqCst);
1202            futures::future::ready(read_suffix(&mut file2, suffix))
1203        };
1204
1205        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1206        let actual = ParquetMetaDataReader::new()
1207            .load_via_suffix_and_finish(input)
1208            .await
1209            .unwrap();
1210        assert_eq!(actual.file_metadata().schema(), expected);
1211        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1212        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1213
1214        // Metadata hint too small - below footer size
1215        fetch_count.store(0, Ordering::SeqCst);
1216        suffix_fetch_count.store(0, Ordering::SeqCst);
1217        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1218        let actual = ParquetMetaDataReader::new()
1219            .with_prefetch_hint(Some(7))
1220            .load_via_suffix_and_finish(input)
1221            .await
1222            .unwrap();
1223        assert_eq!(actual.file_metadata().schema(), expected);
1224        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1225        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1226
1227        // Metadata hint too small
1228        fetch_count.store(0, Ordering::SeqCst);
1229        suffix_fetch_count.store(0, Ordering::SeqCst);
1230        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1231        let actual = ParquetMetaDataReader::new()
1232            .with_prefetch_hint(Some(10))
1233            .load_via_suffix_and_finish(input)
1234            .await
1235            .unwrap();
1236        assert_eq!(actual.file_metadata().schema(), expected);
1237        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1238        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1239
1240        dbg!("test");
1241        // Metadata hint too large
1242        fetch_count.store(0, Ordering::SeqCst);
1243        suffix_fetch_count.store(0, Ordering::SeqCst);
1244        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1245        let actual = ParquetMetaDataReader::new()
1246            .with_prefetch_hint(Some(500))
1247            .load_via_suffix_and_finish(input)
1248            .await
1249            .unwrap();
1250        assert_eq!(actual.file_metadata().schema(), expected);
1251        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1252        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1253
1254        // Metadata hint exactly correct
1255        fetch_count.store(0, Ordering::SeqCst);
1256        suffix_fetch_count.store(0, Ordering::SeqCst);
1257        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1258        let actual = ParquetMetaDataReader::new()
1259            .with_prefetch_hint(Some(428))
1260            .load_via_suffix_and_finish(input)
1261            .await
1262            .unwrap();
1263        assert_eq!(actual.file_metadata().schema(), expected);
1264        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1265        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1266    }
1267
1268    #[cfg(feature = "encryption")]
1269    #[tokio::test]
1270    async fn test_suffix_with_encryption() {
1271        let mut file = get_test_file("uniform_encryption.parquet.encrypted");
1272        let mut file2 = file.try_clone().unwrap();
1273
1274        let mut fetch = |range| futures::future::ready(read_range(&mut file, range));
1275        let mut suffix_fetch = |suffix| futures::future::ready(read_suffix(&mut file2, suffix));
1276
1277        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1278
1279        let key_code: &[u8] = "0123456789012345".as_bytes();
1280        let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec())
1281            .build()
1282            .unwrap();
1283
1284        // just make sure the metadata is properly decrypted and read
1285        let expected = ParquetMetaDataReader::new()
1286            .with_decryption_properties(Some(decryption_properties))
1287            .load_via_suffix_and_finish(input)
1288            .await
1289            .unwrap();
1290        assert_eq!(expected.num_row_groups(), 1);
1291    }
1292
1293    #[tokio::test]
1294    #[allow(deprecated)]
1295    async fn test_page_index() {
1296        let mut file = get_test_file("alltypes_tiny_pages.parquet");
1297        let len = file.len();
1298        let fetch_count = AtomicUsize::new(0);
1299        let mut fetch = |range| {
1300            fetch_count.fetch_add(1, Ordering::SeqCst);
1301            futures::future::ready(read_range(&mut file, range))
1302        };
1303
1304        let f = MetadataFetchFn(&mut fetch);
1305        let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
1306        loader.try_load(f, len).await.unwrap();
1307        assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
1308        let metadata = loader.finish().unwrap();
1309        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1310
1311        // Prefetch just footer exactly
1312        fetch_count.store(0, Ordering::SeqCst);
1313        let f = MetadataFetchFn(&mut fetch);
1314        let mut loader = ParquetMetaDataReader::new()
1315            .with_page_indexes(true)
1316            .with_prefetch_hint(Some(1729));
1317        loader.try_load(f, len).await.unwrap();
1318        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1319        let metadata = loader.finish().unwrap();
1320        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1321
1322        // Prefetch more than footer but not enough
1323        fetch_count.store(0, Ordering::SeqCst);
1324        let f = MetadataFetchFn(&mut fetch);
1325        let mut loader = ParquetMetaDataReader::new()
1326            .with_page_indexes(true)
1327            .with_prefetch_hint(Some(130649));
1328        loader.try_load(f, len).await.unwrap();
1329        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1330        let metadata = loader.finish().unwrap();
1331        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1332
1333        // Prefetch exactly enough
1334        fetch_count.store(0, Ordering::SeqCst);
1335        let f = MetadataFetchFn(&mut fetch);
1336        let metadata = ParquetMetaDataReader::new()
1337            .with_page_indexes(true)
1338            .with_prefetch_hint(Some(130650))
1339            .load_and_finish(f, len)
1340            .await
1341            .unwrap();
1342        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1343        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1344
1345        // Prefetch more than enough but less than the entire file
1346        fetch_count.store(0, Ordering::SeqCst);
1347        let f = MetadataFetchFn(&mut fetch);
1348        let metadata = ParquetMetaDataReader::new()
1349            .with_page_indexes(true)
1350            .with_prefetch_hint(Some((len - 1000) as usize)) // prefetch entire file
1351            .load_and_finish(f, len)
1352            .await
1353            .unwrap();
1354        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1355        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1356
1357        // Prefetch the entire file
1358        fetch_count.store(0, Ordering::SeqCst);
1359        let f = MetadataFetchFn(&mut fetch);
1360        let metadata = ParquetMetaDataReader::new()
1361            .with_page_indexes(true)
1362            .with_prefetch_hint(Some(len as usize)) // prefetch entire file
1363            .load_and_finish(f, len)
1364            .await
1365            .unwrap();
1366        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1367        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1368
1369        // Prefetch more than the entire file
1370        fetch_count.store(0, Ordering::SeqCst);
1371        let f = MetadataFetchFn(&mut fetch);
1372        let metadata = ParquetMetaDataReader::new()
1373            .with_page_indexes(true)
1374            .with_prefetch_hint(Some((len + 1000) as usize)) // prefetch entire file
1375            .load_and_finish(f, len)
1376            .await
1377            .unwrap();
1378        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1379        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1380    }
1381
1382    fn write_parquet_file(offset_index_disabled: bool) -> Result<NamedTempFile> {
1383        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1384        let batch = RecordBatch::try_new(
1385            schema.clone(),
1386            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
1387        )?;
1388
1389        let file = NamedTempFile::new().unwrap();
1390
1391        // Write properties with page index disabled
1392        let props = WriterProperties::builder()
1393            .set_offset_index_disabled(offset_index_disabled)
1394            .build();
1395
1396        let mut writer = ArrowWriter::try_new(file.reopen()?, schema, Some(props))?;
1397        writer.write(&batch)?;
1398        writer.close()?;
1399
1400        Ok(file)
1401    }
1402
1403    fn read_and_check(file: &File, policy: PageIndexPolicy) -> Result<ParquetMetaData> {
1404        let mut reader = ParquetMetaDataReader::new().with_page_index_policy(policy);
1405        reader.try_parse(file)?;
1406        reader.finish()
1407    }
1408
1409    #[test]
1410    fn test_page_index_policy() {
1411        // With page index
1412        let f = write_parquet_file(false).unwrap();
1413        read_and_check(f.as_file(), PageIndexPolicy::Required).unwrap();
1414        read_and_check(f.as_file(), PageIndexPolicy::Optional).unwrap();
1415        read_and_check(f.as_file(), PageIndexPolicy::Skip).unwrap();
1416
1417        // Without page index
1418        let f = write_parquet_file(true).unwrap();
1419        let res = read_and_check(f.as_file(), PageIndexPolicy::Required);
1420        assert!(matches!(
1421            res,
1422            Err(ParquetError::General(e)) if e == "missing offset index"
1423        ));
1424        read_and_check(f.as_file(), PageIndexPolicy::Optional).unwrap();
1425        read_and_check(f.as_file(), PageIndexPolicy::Skip).unwrap();
1426    }
1427}