parquet/file/metadata/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#[cfg(feature = "encryption")]
19use crate::encryption::decrypt::FileDecryptionProperties;
20use crate::errors::{ParquetError, Result};
21use crate::file::FOOTER_SIZE;
22use crate::file::metadata::parser::decode_metadata;
23use crate::file::metadata::{FooterTail, ParquetMetaData, ParquetMetaDataPushDecoder};
24use crate::file::reader::ChunkReader;
25use bytes::Bytes;
26use std::{io::Read, ops::Range};
27
28use crate::DecodeResult;
29#[cfg(all(feature = "async", feature = "arrow"))]
30use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
31
32/// Reads [`ParquetMetaData`] from a byte stream, with either synchronous or
33/// asynchronous I/O.
34///
35/// There are two flavors of APIs:
36/// * Synchronous: [`Self::try_parse()`], [`Self::try_parse_sized()`], [`Self::parse_and_finish()`], etc.
37/// * Asynchronous (requires `async` and `arrow` features): [`Self::try_load()`], etc
38///
39///  See the [`ParquetMetaDataPushDecoder`] for an API that does not require I/O.
40///
41/// # Format Notes
42///
43/// Parquet metadata is not necessarily contiguous in a Parquet file: a portion is stored
44/// in the footer (the last bytes of the file), but other portions (such as the
45/// PageIndex) can be stored elsewhere.
46/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for more details of
47/// Parquet metadata.
48///
49/// This reader handles reading the footer as well as the non contiguous parts
50/// of the metadata (`PageIndex` and `ColumnIndex`). It does not handle reading Bloom Filters.
51///
52/// # Example
53/// ```no_run
54/// # use parquet::file::metadata::ParquetMetaDataReader;
55/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
56/// // read parquet metadata including page indexes from a file
57/// let file = open_parquet_file("some_path.parquet");
58/// let mut reader = ParquetMetaDataReader::new()
59///     .with_page_indexes(true);
60/// reader.try_parse(&file).unwrap();
61/// let metadata = reader.finish().unwrap();
62/// assert!(metadata.column_index().is_some());
63/// assert!(metadata.offset_index().is_some());
64/// ```
65#[derive(Default, Debug)]
66pub struct ParquetMetaDataReader {
67    metadata: Option<ParquetMetaData>,
68    column_index: PageIndexPolicy,
69    offset_index: PageIndexPolicy,
70    prefetch_hint: Option<usize>,
71    // Size of the serialized thrift metadata plus the 8 byte footer. Only set if
72    // `self.parse_metadata` is called.
73    metadata_size: Option<usize>,
74    #[cfg(feature = "encryption")]
75    file_decryption_properties: Option<std::sync::Arc<FileDecryptionProperties>>,
76}
77
78/// Describes the policy for reading page indexes
79#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
80pub enum PageIndexPolicy {
81    /// Do not read the page index.
82    #[default]
83    Skip,
84    /// Read the page index if it exists, otherwise do not error.
85    Optional,
86    /// Require the page index to exist, and error if it does not.
87    Required,
88}
89
90impl From<bool> for PageIndexPolicy {
91    fn from(value: bool) -> Self {
92        match value {
93            true => Self::Required,
94            false => Self::Skip,
95        }
96    }
97}
98
99impl ParquetMetaDataReader {
100    /// Create a new [`ParquetMetaDataReader`]
101    pub fn new() -> Self {
102        Default::default()
103    }
104
105    /// Create a new [`ParquetMetaDataReader`] populated with a [`ParquetMetaData`] struct
106    /// obtained via other means.
107    pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
108        Self {
109            metadata: Some(metadata),
110            ..Default::default()
111        }
112    }
113
114    /// Enable or disable reading the page index structures described in
115    /// "[Parquet page index]: Layout to Support Page Skipping".
116    ///
117    /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
118    #[deprecated(since = "56.1.0", note = "Use `with_page_index_policy` instead")]
119    pub fn with_page_indexes(self, val: bool) -> Self {
120        let policy = PageIndexPolicy::from(val);
121        self.with_column_index_policy(policy)
122            .with_offset_index_policy(policy)
123    }
124
125    /// Enable or disable reading the Parquet [ColumnIndex] structure.
126    ///
127    /// [ColumnIndex]:  https://github.com/apache/parquet-format/blob/master/PageIndex.md
128    #[deprecated(since = "56.1.0", note = "Use `with_column_index_policy` instead")]
129    pub fn with_column_indexes(self, val: bool) -> Self {
130        let policy = PageIndexPolicy::from(val);
131        self.with_column_index_policy(policy)
132    }
133
134    /// Enable or disable reading the Parquet [OffsetIndex] structure.
135    ///
136    /// [OffsetIndex]:  https://github.com/apache/parquet-format/blob/master/PageIndex.md
137    #[deprecated(since = "56.1.0", note = "Use `with_offset_index_policy` instead")]
138    pub fn with_offset_indexes(self, val: bool) -> Self {
139        let policy = PageIndexPolicy::from(val);
140        self.with_offset_index_policy(policy)
141    }
142
143    /// Sets the [`PageIndexPolicy`] for the column and offset indexes
144    pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
145        self.with_column_index_policy(policy)
146            .with_offset_index_policy(policy)
147    }
148
149    /// Sets the [`PageIndexPolicy`] for the column index
150    pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
151        self.column_index = policy;
152        self
153    }
154
155    /// Sets the [`PageIndexPolicy`] for the offset index
156    pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
157        self.offset_index = policy;
158        self
159    }
160
161    /// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`].
162    /// Only used for the asynchronous [`Self::try_load()`] method.
163    ///
164    /// By default, the reader will first fetch the last 8 bytes of the input file to obtain the
165    /// size of the footer metadata. A second fetch will be performed to obtain the needed bytes.
166    /// After parsing the footer metadata, a third fetch will be performed to obtain the bytes
167    /// needed to decode the page index structures, if they have been requested. To avoid
168    /// unnecessary fetches, `prefetch` can be set to an estimate of the number of bytes needed
169    /// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and
170    /// reduce latency. Setting `prefetch` too small will not trigger an error, but will result
171    /// in extra fetches being performed.
172    pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
173        self.prefetch_hint = prefetch;
174        self
175    }
176
177    /// Provide the FileDecryptionProperties to use when decrypting the file.
178    ///
179    /// This is only necessary when the file is encrypted.
180    #[cfg(feature = "encryption")]
181    pub fn with_decryption_properties(
182        mut self,
183        properties: Option<std::sync::Arc<FileDecryptionProperties>>,
184    ) -> Self {
185        self.file_decryption_properties = properties;
186        self
187    }
188
189    /// Indicates whether this reader has a [`ParquetMetaData`] internally.
190    pub fn has_metadata(&self) -> bool {
191        self.metadata.is_some()
192    }
193
194    /// Return the parsed [`ParquetMetaData`] struct, leaving `None` in its place.
195    pub fn finish(&mut self) -> Result<ParquetMetaData> {
196        self.metadata
197            .take()
198            .ok_or_else(|| general_err!("could not parse parquet metadata"))
199    }
200
201    /// Given a [`ChunkReader`], parse and return the [`ParquetMetaData`] in a single pass.
202    ///
203    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
204    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
205    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
206    ///
207    /// This call will consume `self`.
208    ///
209    /// # Example
210    /// ```no_run
211    /// # use parquet::file::metadata::ParquetMetaDataReader;
212    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
213    /// // read parquet metadata including page indexes
214    /// let file = open_parquet_file("some_path.parquet");
215    /// let metadata = ParquetMetaDataReader::new()
216    ///     .with_page_indexes(true)
217    ///     .parse_and_finish(&file).unwrap();
218    /// ```
219    pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
220        self.try_parse(reader)?;
221        self.finish()
222    }
223
224    /// Attempts to parse the footer metadata (and optionally page indexes) given a [`ChunkReader`].
225    ///
226    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
227    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
228    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
229    pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
230        self.try_parse_sized(reader, reader.len())
231    }
232
233    /// Same as [`Self::try_parse()`], but provide the original file size in the case that `reader`
234    /// is a [`Bytes`] struct that does not contain the entire file. This information is necessary
235    /// when the page indexes are desired. `reader` must have access to the Parquet footer.
236    ///
237    /// Using this function also allows for retrying with a larger buffer.
238    ///
239    /// # Errors
240    ///
241    /// This function will return [`ParquetError::NeedMoreData`] in the event `reader` does not
242    /// provide enough data to fully parse the metadata (see example below). The returned error
243    /// will be populated with a `usize` field indicating the number of bytes required from the
244    /// tail of the file to completely parse the requested metadata.
245    ///
246    /// Other errors returned include [`ParquetError::General`] and [`ParquetError::EOF`].
247    ///
248    /// # Example
249    /// ```no_run
250    /// # use parquet::file::metadata::ParquetMetaDataReader;
251    /// # use parquet::errors::ParquetError;
252    /// # use crate::parquet::file::reader::Length;
253    /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<u64>) -> bytes::Bytes { unimplemented!(); }
254    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
255    /// let file = open_parquet_file("some_path.parquet");
256    /// let len = file.len();
257    /// // Speculatively read 1 kilobyte from the end of the file
258    /// let bytes = get_bytes(&file, len - 1024..len);
259    /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
260    /// match reader.try_parse_sized(&bytes, len) {
261    ///     Ok(_) => (),
262    ///     Err(ParquetError::NeedMoreData(needed)) => {
263    ///         // Read the needed number of bytes from the end of the file
264    ///         let bytes = get_bytes(&file, len - needed as u64..len);
265    ///         reader.try_parse_sized(&bytes, len).unwrap();
266    ///     }
267    ///     _ => panic!("unexpected error")
268    /// }
269    /// let metadata = reader.finish().unwrap();
270    /// ```
271    ///
272    /// Note that it is possible for the file metadata to be completely read, but there are
273    /// insufficient bytes available to read the page indexes. [`Self::has_metadata()`] can be used
274    /// to test for this. In the event the file metadata is present, re-parsing of the file
275    /// metadata can be skipped by using [`Self::read_page_indexes_sized()`], as shown below.
276    /// ```no_run
277    /// # use parquet::file::metadata::ParquetMetaDataReader;
278    /// # use parquet::errors::ParquetError;
279    /// # use crate::parquet::file::reader::Length;
280    /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<u64>) -> bytes::Bytes { unimplemented!(); }
281    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
282    /// let file = open_parquet_file("some_path.parquet");
283    /// let len = file.len();
284    /// // Speculatively read 1 kilobyte from the end of the file
285    /// let mut bytes = get_bytes(&file, len - 1024..len);
286    /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
287    /// // Loop until `bytes` is large enough
288    /// loop {
289    ///     match reader.try_parse_sized(&bytes, len) {
290    ///         Ok(_) => break,
291    ///         Err(ParquetError::NeedMoreData(needed)) => {
292    ///             // Read the needed number of bytes from the end of the file
293    ///             bytes = get_bytes(&file, len - needed as u64..len);
294    ///             // If file metadata was read only read page indexes, otherwise continue loop
295    ///             if reader.has_metadata() {
296    ///                 reader.read_page_indexes_sized(&bytes, len).unwrap();
297    ///                 break;
298    ///             }
299    ///         }
300    ///         _ => panic!("unexpected error")
301    ///     }
302    /// }
303    /// let metadata = reader.finish().unwrap();
304    /// ```
305    pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: u64) -> Result<()> {
306        self.metadata = match self.parse_metadata(reader) {
307            Ok(metadata) => Some(metadata),
308            Err(ParquetError::NeedMoreData(needed)) => {
309                // If reader is the same length as `file_size` then presumably there is no more to
310                // read, so return an EOF error.
311                if file_size == reader.len() || needed as u64 > file_size {
312                    return Err(eof_err!(
313                        "Parquet file too small. Size is {} but need {}",
314                        file_size,
315                        needed
316                    ));
317                } else {
318                    // Ask for a larger buffer
319                    return Err(ParquetError::NeedMoreData(needed));
320                }
321            }
322            Err(e) => return Err(e),
323        };
324
325        // we can return if page indexes aren't requested
326        if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
327        {
328            return Ok(());
329        }
330
331        self.read_page_indexes_sized(reader, file_size)
332    }
333
334    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
335    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`].
336    pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
337        self.read_page_indexes_sized(reader, reader.len())
338    }
339
340    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
341    /// This variant is used when `reader` cannot access the entire Parquet file (e.g. it is
342    /// a [`Bytes`] struct containing the tail of the file).
343    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`]. Like
344    /// [`Self::try_parse_sized()`] this function may return [`ParquetError::NeedMoreData`].
345    pub fn read_page_indexes_sized<R: ChunkReader>(
346        &mut self,
347        reader: &R,
348        file_size: u64,
349    ) -> Result<()> {
350        let Some(metadata) = self.metadata.take() else {
351            return Err(general_err!(
352                "Tried to read page indexes without ParquetMetaData metadata"
353            ));
354        };
355
356        let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
357            .with_offset_index_policy(self.offset_index)
358            .with_column_index_policy(self.column_index);
359        let mut push_decoder = self.prepare_push_decoder(push_decoder);
360
361        // Get bounds needed for page indexes (if any are present in the file).
362        let range = match needs_index_data(&mut push_decoder)? {
363            NeedsIndexData::No(metadata) => {
364                self.metadata = Some(metadata);
365                return Ok(());
366            }
367            NeedsIndexData::Yes(range) => range,
368        };
369
370        // Check to see if needed range is within `file_range`. Checking `range.end` seems
371        // redundant, but it guards against `range_for_page_index()` returning garbage.
372        let file_range = file_size.saturating_sub(reader.len())..file_size;
373        if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
374            // Requested range starts beyond EOF
375            if range.end > file_size {
376                return Err(eof_err!(
377                    "Parquet file too small. Range {range:?} is beyond file bounds {file_size}",
378                ));
379            } else {
380                // Ask for a larger buffer
381                return Err(ParquetError::NeedMoreData(
382                    (file_size - range.start).try_into()?,
383                ));
384            }
385        }
386
387        // Perform extra sanity check to make sure `range` and the footer metadata don't
388        // overlap.
389        if let Some(metadata_size) = self.metadata_size {
390            let metadata_range = file_size.saturating_sub(metadata_size as u64)..file_size;
391            if range.end > metadata_range.start {
392                return Err(eof_err!(
393                    "Parquet file too small. Page index range {range:?} overlaps with file metadata {metadata_range:?}",
394                ));
395            }
396        }
397
398        // add the needed ranges to the decoder
399        let bytes_needed = usize::try_from(range.end - range.start)?;
400        let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?;
401
402        push_decoder.push_range(range, bytes)?;
403        let metadata = parse_index_data(&mut push_decoder)?;
404        self.metadata = Some(metadata);
405
406        Ok(())
407    }
408
409    /// Given a [`MetadataFetch`], parse and return the [`ParquetMetaData`] in a single pass.
410    ///
411    /// This call will consume `self`.
412    ///
413    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
414    /// performed by this function.
415    #[cfg(all(feature = "async", feature = "arrow"))]
416    pub async fn load_and_finish<F: MetadataFetch>(
417        mut self,
418        fetch: F,
419        file_size: u64,
420    ) -> Result<ParquetMetaData> {
421        self.try_load(fetch, file_size).await?;
422        self.finish()
423    }
424
425    /// Given a [`MetadataSuffixFetch`], parse and return the [`ParquetMetaData`] in a single pass.
426    ///
427    /// This call will consume `self`.
428    ///
429    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
430    /// performed by this function.
431    #[cfg(all(feature = "async", feature = "arrow"))]
432    pub async fn load_via_suffix_and_finish<F: MetadataSuffixFetch>(
433        mut self,
434        fetch: F,
435    ) -> Result<ParquetMetaData> {
436        self.try_load_via_suffix(fetch).await?;
437        self.finish()
438    }
439    /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
440    /// given a [`MetadataFetch`].
441    ///
442    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
443    /// performed by this function.
444    #[cfg(all(feature = "async", feature = "arrow"))]
445    pub async fn try_load<F: MetadataFetch>(&mut self, mut fetch: F, file_size: u64) -> Result<()> {
446        let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
447
448        self.metadata = Some(metadata);
449
450        // we can return if page indexes aren't requested
451        if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
452        {
453            return Ok(());
454        }
455
456        self.load_page_index_with_remainder(fetch, remainder).await
457    }
458
459    /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
460    /// given a [`MetadataSuffixFetch`].
461    ///
462    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
463    /// performed by this function.
464    #[cfg(all(feature = "async", feature = "arrow"))]
465    pub async fn try_load_via_suffix<F: MetadataSuffixFetch>(
466        &mut self,
467        mut fetch: F,
468    ) -> Result<()> {
469        let (metadata, remainder) = self.load_metadata_via_suffix(&mut fetch).await?;
470
471        self.metadata = Some(metadata);
472
473        // we can return if page indexes aren't requested
474        if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
475        {
476            return Ok(());
477        }
478
479        self.load_page_index_with_remainder(fetch, remainder).await
480    }
481
482    /// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
483    /// been obtained. See [`Self::new_with_metadata()`].
484    #[cfg(all(feature = "async", feature = "arrow"))]
485    pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
486        self.load_page_index_with_remainder(fetch, None).await
487    }
488
489    #[cfg(all(feature = "async", feature = "arrow"))]
490    async fn load_page_index_with_remainder<F: MetadataFetch>(
491        &mut self,
492        mut fetch: F,
493        remainder: Option<(usize, Bytes)>,
494    ) -> Result<()> {
495        let Some(metadata) = self.metadata.take() else {
496            return Err(general_err!("Footer metadata is not present"));
497        };
498
499        // in this case we don't actually know what the file size is, so just use u64::MAX
500        // this is ok since the offsets in the metadata are always valid
501        let file_size = u64::MAX;
502        let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
503            .with_offset_index_policy(self.offset_index)
504            .with_column_index_policy(self.column_index);
505        let mut push_decoder = self.prepare_push_decoder(push_decoder);
506
507        // Get bounds needed for page indexes (if any are present in the file).
508        let range = match needs_index_data(&mut push_decoder)? {
509            NeedsIndexData::No(metadata) => {
510                self.metadata = Some(metadata);
511                return Ok(());
512            }
513            NeedsIndexData::Yes(range) => range,
514        };
515
516        let bytes = match &remainder {
517            Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => {
518                let remainder_start = *remainder_start as u64;
519                let offset = usize::try_from(range.start - remainder_start)?;
520                let end = usize::try_from(range.end - remainder_start)?;
521                assert!(end <= remainder.len());
522                remainder.slice(offset..end)
523            }
524            // Note: this will potentially fetch data already in remainder, this keeps things simple
525            _ => fetch.fetch(range.start..range.end).await?,
526        };
527
528        // Sanity check
529        assert_eq!(bytes.len() as u64, range.end - range.start);
530        push_decoder.push_range(range.clone(), bytes)?;
531        let metadata = parse_index_data(&mut push_decoder)?;
532        self.metadata = Some(metadata);
533        Ok(())
534    }
535
536    // One-shot parse of footer.
537    // Side effect: this will set `self.metadata_size`
538    fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
539        // check file is large enough to hold footer
540        let file_size = chunk_reader.len();
541        if file_size < (FOOTER_SIZE as u64) {
542            return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
543        }
544
545        let mut footer = [0_u8; FOOTER_SIZE];
546        chunk_reader
547            .get_read(file_size - FOOTER_SIZE as u64)?
548            .read_exact(&mut footer)?;
549
550        let footer = FooterTail::try_new(&footer)?;
551        let metadata_len = footer.metadata_length();
552        let footer_metadata_len = FOOTER_SIZE + metadata_len;
553        self.metadata_size = Some(footer_metadata_len);
554
555        if footer_metadata_len as u64 > file_size {
556            return Err(ParquetError::NeedMoreData(footer_metadata_len));
557        }
558
559        let start = file_size - footer_metadata_len as u64;
560        let bytes = chunk_reader.get_bytes(start, metadata_len)?;
561        self.decode_footer_metadata(bytes, file_size, footer)
562    }
563
564    /// Size of the serialized thrift metadata plus the 8 byte footer. Only set if
565    /// `self.parse_metadata` is called.
566    pub fn metadata_size(&self) -> Option<usize> {
567        self.metadata_size
568    }
569
570    /// Return the number of bytes to read in the initial pass. If `prefetch_size` has
571    /// been provided, then return that value if it is larger than the size of the Parquet
572    /// file footer (8 bytes). Otherwise returns `8`.
573    #[cfg(all(feature = "async", feature = "arrow"))]
574    fn get_prefetch_size(&self) -> usize {
575        if let Some(prefetch) = self.prefetch_hint {
576            if prefetch > FOOTER_SIZE {
577                return prefetch;
578            }
579        }
580        FOOTER_SIZE
581    }
582
583    #[cfg(all(feature = "async", feature = "arrow"))]
584    async fn load_metadata<F: MetadataFetch>(
585        &self,
586        fetch: &mut F,
587        file_size: u64,
588    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
589        let prefetch = self.get_prefetch_size() as u64;
590
591        if file_size < FOOTER_SIZE as u64 {
592            return Err(eof_err!("file size of {} is less than footer", file_size));
593        }
594
595        // If a size hint is provided, read more than the minimum size
596        // to try and avoid a second fetch.
597        // Note: prefetch > file_size is ok since we're using saturating_sub.
598        let footer_start = file_size.saturating_sub(prefetch);
599
600        let suffix = fetch.fetch(footer_start..file_size).await?;
601        let suffix_len = suffix.len();
602        let fetch_len = (file_size - footer_start)
603            .try_into()
604            .expect("footer size should never be larger than u32");
605        if suffix_len < fetch_len {
606            return Err(eof_err!(
607                "metadata requires {} bytes, but could only read {}",
608                fetch_len,
609                suffix_len
610            ));
611        }
612
613        let mut footer = [0; FOOTER_SIZE];
614        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
615
616        let footer = FooterTail::try_new(&footer)?;
617        let length = footer.metadata_length();
618
619        if file_size < (length + FOOTER_SIZE) as u64 {
620            return Err(eof_err!(
621                "file size of {} is less than footer + metadata {}",
622                file_size,
623                length + FOOTER_SIZE
624            ));
625        }
626
627        // Did not fetch the entire file metadata in the initial read, need to make a second request
628        if length > suffix_len - FOOTER_SIZE {
629            let metadata_start = file_size - (length + FOOTER_SIZE) as u64;
630            let meta = fetch
631                .fetch(metadata_start..(file_size - FOOTER_SIZE as u64))
632                .await?;
633            Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
634        } else {
635            let metadata_start = (file_size - (length + FOOTER_SIZE) as u64 - footer_start)
636                .try_into()
637                .expect("metadata length should never be larger than u32");
638            let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
639            Ok((
640                self.decode_footer_metadata(slice, file_size, footer)?,
641                Some((footer_start as usize, suffix.slice(..metadata_start))),
642            ))
643        }
644    }
645
646    #[cfg(all(feature = "async", feature = "arrow"))]
647    async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
648        &self,
649        fetch: &mut F,
650    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
651        let prefetch = self.get_prefetch_size();
652
653        let suffix = fetch.fetch_suffix(prefetch as _).await?;
654        let suffix_len = suffix.len();
655
656        if suffix_len < FOOTER_SIZE {
657            return Err(eof_err!(
658                "footer metadata requires {} bytes, but could only read {}",
659                FOOTER_SIZE,
660                suffix_len
661            ));
662        }
663
664        let mut footer = [0; FOOTER_SIZE];
665        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
666
667        let footer = FooterTail::try_new(&footer)?;
668        let length = footer.metadata_length();
669        // fake file size as we are only parsing the footer metadata here
670        // (cant be parsing page indexes without the full file size)
671        let file_size = (length + FOOTER_SIZE) as u64;
672
673        // Did not fetch the entire file metadata in the initial read, need to make a second request
674        let metadata_offset = length + FOOTER_SIZE;
675        if length > suffix_len - FOOTER_SIZE {
676            let meta = fetch.fetch_suffix(metadata_offset).await?;
677
678            if meta.len() < metadata_offset {
679                return Err(eof_err!(
680                    "metadata requires {} bytes, but could only read {}",
681                    metadata_offset,
682                    meta.len()
683                ));
684            }
685
686            // need to slice off the footer or decryption fails
687            let meta = meta.slice(0..length);
688            Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
689        } else {
690            let metadata_start = suffix_len - metadata_offset;
691            let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
692            Ok((
693                self.decode_footer_metadata(slice, file_size, footer)?,
694                Some((0, suffix.slice(..metadata_start))),
695            ))
696        }
697    }
698
699    /// Decodes a [`FooterTail`] from the provided 8-byte slice.
700    #[deprecated(since = "57.0.0", note = "Use FooterTail::try_from instead")]
701    pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result<FooterTail> {
702        FooterTail::try_new(slice)
703    }
704
705    /// Decodes the Parquet footer, returning the metadata length in bytes
706    #[deprecated(since = "54.3.0", note = "Use decode_footer_tail instead")]
707    pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
708        FooterTail::try_new(slice).map(|f| f.metadata_length())
709    }
710
711    /// Decodes [`ParquetMetaData`] from the provided bytes.
712    ///
713    /// Typically, this is used to decode the metadata from the end of a parquet
714    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
715    /// by the [Parquet Spec].
716    ///
717    /// It does **NOT** include the 8-byte footer.
718    ///
719    /// This method handles using either `decode_metadata` or
720    /// `decode_metadata_with_encryption` depending on whether the encryption
721    /// feature is enabled.
722    ///
723    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
724    pub(crate) fn decode_footer_metadata(
725        &self,
726        buf: Bytes,
727        file_size: u64,
728        footer_tail: FooterTail,
729    ) -> Result<ParquetMetaData> {
730        // The push decoder expects the metadata to be at the end of the file
731        // (... data ...) + (metadata) + (footer)
732        // so we need to provide the starting offset of the metadata
733        // within the file.
734        let ending_offset = file_size.checked_sub(FOOTER_SIZE as u64).ok_or_else(|| {
735            general_err!(
736                "file size {file_size} is smaller than footer size {}",
737                FOOTER_SIZE
738            )
739        })?;
740
741        let starting_offset = ending_offset.checked_sub(buf.len() as u64).ok_or_else(|| {
742            general_err!(
743                "file size {file_size} is smaller than buffer size {} + footer size {}",
744                buf.len(),
745                FOOTER_SIZE
746            )
747        })?;
748
749        let range = starting_offset..ending_offset;
750
751        let push_decoder =
752            ParquetMetaDataPushDecoder::try_new_with_footer_tail(file_size, footer_tail)?
753                // NOTE: DO NOT enable page indexes here, they are handled separately
754                .with_page_index_policy(PageIndexPolicy::Skip);
755
756        let mut push_decoder = self.prepare_push_decoder(push_decoder);
757        push_decoder.push_range(range, buf)?;
758        match push_decoder.try_decode()? {
759            DecodeResult::Data(metadata) => Ok(metadata),
760            DecodeResult::Finished => Err(general_err!(
761                "could not parse parquet metadata -- previously finished"
762            )),
763            DecodeResult::NeedsData(ranges) => Err(general_err!(
764                "could not parse parquet metadata, needs ranges {:?}",
765                ranges
766            )),
767        }
768    }
769
770    /// Prepares a push decoder and runs it to decode the metadata.
771    #[cfg(feature = "encryption")]
772    fn prepare_push_decoder(
773        &self,
774        push_decoder: ParquetMetaDataPushDecoder,
775    ) -> ParquetMetaDataPushDecoder {
776        push_decoder.with_file_decryption_properties(
777            self.file_decryption_properties
778                .as_ref()
779                .map(std::sync::Arc::clone),
780        )
781    }
782    #[cfg(not(feature = "encryption"))]
783    fn prepare_push_decoder(
784        &self,
785        push_decoder: ParquetMetaDataPushDecoder,
786    ) -> ParquetMetaDataPushDecoder {
787        push_decoder
788    }
789
790    /// Decodes [`ParquetMetaData`] from the provided bytes.
791    ///
792    /// Typically this is used to decode the metadata from the end of a parquet
793    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
794    /// by the [Parquet Spec].
795    ///
796    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
797    pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
798        decode_metadata(buf)
799    }
800}
801
802/// The bounds needed to read page indexes
803// this is an internal enum, so it is ok to allow differences in enum size
804#[allow(clippy::large_enum_variant)]
805enum NeedsIndexData {
806    /// no additional data is needed (e.g. the indexes weren't requested)
807    No(ParquetMetaData),
808    /// Additional data is needed, with the range that are required
809    Yes(Range<u64>),
810}
811
812/// Determines a single combined range of bytes needed to read the page indexes,
813/// or returns the metadata if no additional data is needed (e.g. if no page indexes are requested)
814fn needs_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> Result<NeedsIndexData> {
815    match push_decoder.try_decode()? {
816        DecodeResult::NeedsData(ranges) => {
817            let range = ranges
818                .into_iter()
819                .reduce(|a, b| a.start.min(b.start)..a.end.max(b.end))
820                .ok_or_else(|| general_err!("Internal error: no ranges provided"))?;
821            Ok(NeedsIndexData::Yes(range))
822        }
823        DecodeResult::Data(metadata) => Ok(NeedsIndexData::No(metadata)),
824        DecodeResult::Finished => Err(general_err!("Internal error: decoder was finished")),
825    }
826}
827
828/// Given a push decoder that has had the needed ranges pushed to it,
829/// attempt to decode indexes and return the updated metadata.
830fn parse_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> Result<ParquetMetaData> {
831    match push_decoder.try_decode()? {
832        DecodeResult::NeedsData(_) => Err(general_err!(
833            "Internal error: decoder still needs data after reading required range"
834        )),
835        DecodeResult::Data(metadata) => Ok(metadata),
836        DecodeResult::Finished => Err(general_err!("Internal error: decoder was finished")),
837    }
838}
839
840#[cfg(test)]
841mod tests {
842    use super::*;
843    use crate::file::reader::Length;
844    use crate::util::test_common::file_util::get_test_file;
845    use std::ops::Range;
846
847    #[test]
848    fn test_parse_metadata_size_smaller_than_footer() {
849        let test_file = tempfile::tempfile().unwrap();
850        let err = ParquetMetaDataReader::new()
851            .parse_metadata(&test_file)
852            .unwrap_err();
853        assert!(matches!(err, ParquetError::NeedMoreData(FOOTER_SIZE)));
854    }
855
856    #[test]
857    fn test_parse_metadata_corrupt_footer() {
858        let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
859        let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
860        assert_eq!(
861            reader_result.unwrap_err().to_string(),
862            "Parquet error: Invalid Parquet file. Corrupt footer"
863        );
864    }
865
866    #[test]
867    fn test_parse_metadata_invalid_start() {
868        let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
869        let err = ParquetMetaDataReader::new()
870            .parse_metadata(&test_file)
871            .unwrap_err();
872        assert!(matches!(err, ParquetError::NeedMoreData(263)));
873    }
874
875    #[test]
876    #[allow(deprecated)]
877    fn test_try_parse() {
878        let file = get_test_file("alltypes_tiny_pages.parquet");
879        let len = file.len();
880
881        let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
882
883        let bytes_for_range = |range: Range<u64>| {
884            file.get_bytes(range.start, (range.end - range.start).try_into().unwrap())
885                .unwrap()
886        };
887
888        // read entire file
889        let bytes = bytes_for_range(0..len);
890        reader.try_parse(&bytes).unwrap();
891        let metadata = reader.finish().unwrap();
892        assert!(metadata.column_index.is_some());
893        assert!(metadata.offset_index.is_some());
894
895        // read more than enough of file
896        let bytes = bytes_for_range(320000..len);
897        reader.try_parse_sized(&bytes, len).unwrap();
898        let metadata = reader.finish().unwrap();
899        assert!(metadata.column_index.is_some());
900        assert!(metadata.offset_index.is_some());
901
902        // exactly enough
903        let bytes = bytes_for_range(323583..len);
904        reader.try_parse_sized(&bytes, len).unwrap();
905        let metadata = reader.finish().unwrap();
906        assert!(metadata.column_index.is_some());
907        assert!(metadata.offset_index.is_some());
908
909        // not enough for page index
910        let bytes = bytes_for_range(323584..len);
911        // should fail
912        match reader.try_parse_sized(&bytes, len).unwrap_err() {
913            // expected error, try again with provided bounds
914            ParquetError::NeedMoreData(needed) => {
915                let bytes = bytes_for_range(len - needed as u64..len);
916                reader.try_parse_sized(&bytes, len).unwrap();
917                let metadata = reader.finish().unwrap();
918                assert!(metadata.column_index.is_some());
919                assert!(metadata.offset_index.is_some());
920            }
921            _ => panic!("unexpected error"),
922        };
923
924        // not enough for file metadata, but keep trying until page indexes are read
925        let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
926        let mut bytes = bytes_for_range(452505..len);
927        loop {
928            match reader.try_parse_sized(&bytes, len) {
929                Ok(_) => break,
930                Err(ParquetError::NeedMoreData(needed)) => {
931                    bytes = bytes_for_range(len - needed as u64..len);
932                    if reader.has_metadata() {
933                        reader.read_page_indexes_sized(&bytes, len).unwrap();
934                        break;
935                    }
936                }
937                _ => panic!("unexpected error"),
938            }
939        }
940        let metadata = reader.finish().unwrap();
941        assert!(metadata.column_index.is_some());
942        assert!(metadata.offset_index.is_some());
943
944        // not enough for page index but lie about file size
945        let bytes = bytes_for_range(323584..len);
946        let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
947        assert_eq!(
948            reader_result.to_string(),
949            "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
950        );
951
952        // not enough for file metadata
953        let mut reader = ParquetMetaDataReader::new();
954        let bytes = bytes_for_range(452505..len);
955        // should fail
956        match reader.try_parse_sized(&bytes, len).unwrap_err() {
957            // expected error, try again with provided bounds
958            ParquetError::NeedMoreData(needed) => {
959                let bytes = bytes_for_range(len - needed as u64..len);
960                reader.try_parse_sized(&bytes, len).unwrap();
961                reader.finish().unwrap();
962            }
963            _ => panic!("unexpected error"),
964        };
965
966        // not enough for file metadata but use try_parse()
967        let reader_result = reader.try_parse(&bytes).unwrap_err();
968        assert_eq!(
969            reader_result.to_string(),
970            "EOF: Parquet file too small. Size is 1728 but need 1729"
971        );
972
973        // read head of file rather than tail
974        let bytes = bytes_for_range(0..1000);
975        let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
976        assert_eq!(
977            reader_result.to_string(),
978            "Parquet error: Invalid Parquet file. Corrupt footer"
979        );
980
981        // lie about file size
982        let bytes = bytes_for_range(452510..len);
983        let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
984        assert_eq!(
985            reader_result.to_string(),
986            "EOF: Parquet file too small. Size is 1728 but need 1729"
987        );
988    }
989}
990
991#[cfg(all(feature = "async", feature = "arrow", test))]
992mod async_tests {
993    use super::*;
994
995    use arrow::{array::Int32Array, datatypes::DataType};
996    use arrow_array::RecordBatch;
997    use arrow_schema::{Field, Schema};
998    use bytes::Bytes;
999    use futures::FutureExt;
1000    use futures::future::BoxFuture;
1001    use std::fs::File;
1002    use std::future::Future;
1003    use std::io::{Read, Seek, SeekFrom};
1004    use std::ops::Range;
1005    use std::sync::Arc;
1006    use std::sync::atomic::{AtomicUsize, Ordering};
1007    use tempfile::NamedTempFile;
1008
1009    use crate::arrow::ArrowWriter;
1010    use crate::file::properties::WriterProperties;
1011    use crate::file::reader::Length;
1012    use crate::util::test_common::file_util::get_test_file;
1013
1014    struct MetadataFetchFn<F>(F);
1015
1016    impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
1017    where
1018        F: FnMut(Range<u64>) -> Fut + Send,
1019        Fut: Future<Output = Result<Bytes>> + Send,
1020    {
1021        fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1022            async move { self.0(range).await }.boxed()
1023        }
1024    }
1025
1026    struct MetadataSuffixFetchFn<F1, F2>(F1, F2);
1027
1028    impl<F1, Fut, F2> MetadataFetch for MetadataSuffixFetchFn<F1, F2>
1029    where
1030        F1: FnMut(Range<u64>) -> Fut + Send,
1031        Fut: Future<Output = Result<Bytes>> + Send,
1032        F2: Send,
1033    {
1034        fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1035            async move { self.0(range).await }.boxed()
1036        }
1037    }
1038
1039    impl<F1, Fut, F2> MetadataSuffixFetch for MetadataSuffixFetchFn<F1, F2>
1040    where
1041        F1: FnMut(Range<u64>) -> Fut + Send,
1042        F2: FnMut(usize) -> Fut + Send,
1043        Fut: Future<Output = Result<Bytes>> + Send,
1044    {
1045        fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
1046            async move { self.1(suffix).await }.boxed()
1047        }
1048    }
1049
1050    fn read_range(file: &mut File, range: Range<u64>) -> Result<Bytes> {
1051        file.seek(SeekFrom::Start(range.start as _))?;
1052        let len = range.end - range.start;
1053        let mut buf = Vec::with_capacity(len.try_into().unwrap());
1054        file.take(len as _).read_to_end(&mut buf)?;
1055        Ok(buf.into())
1056    }
1057
1058    fn read_suffix(file: &mut File, suffix: usize) -> Result<Bytes> {
1059        let file_len = file.len();
1060        // Don't seek before beginning of file
1061        file.seek(SeekFrom::End(0 - suffix.min(file_len as _) as i64))?;
1062        let mut buf = Vec::with_capacity(suffix);
1063        file.take(suffix as _).read_to_end(&mut buf)?;
1064        Ok(buf.into())
1065    }
1066
1067    #[tokio::test]
1068    async fn test_simple() {
1069        let mut file = get_test_file("nulls.snappy.parquet");
1070        let len = file.len();
1071
1072        let expected = ParquetMetaDataReader::new()
1073            .parse_and_finish(&file)
1074            .unwrap();
1075        let expected = expected.file_metadata().schema();
1076        let fetch_count = AtomicUsize::new(0);
1077
1078        let mut fetch = |range| {
1079            fetch_count.fetch_add(1, Ordering::SeqCst);
1080            futures::future::ready(read_range(&mut file, range))
1081        };
1082
1083        let input = MetadataFetchFn(&mut fetch);
1084        let actual = ParquetMetaDataReader::new()
1085            .load_and_finish(input, len)
1086            .await
1087            .unwrap();
1088        assert_eq!(actual.file_metadata().schema(), expected);
1089        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1090
1091        // Metadata hint too small - below footer size
1092        fetch_count.store(0, Ordering::SeqCst);
1093        let input = MetadataFetchFn(&mut fetch);
1094        let actual = ParquetMetaDataReader::new()
1095            .with_prefetch_hint(Some(7))
1096            .load_and_finish(input, len)
1097            .await
1098            .unwrap();
1099        assert_eq!(actual.file_metadata().schema(), expected);
1100        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1101
1102        // Metadata hint too small
1103        fetch_count.store(0, Ordering::SeqCst);
1104        let input = MetadataFetchFn(&mut fetch);
1105        let actual = ParquetMetaDataReader::new()
1106            .with_prefetch_hint(Some(10))
1107            .load_and_finish(input, len)
1108            .await
1109            .unwrap();
1110        assert_eq!(actual.file_metadata().schema(), expected);
1111        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1112
1113        // Metadata hint too large
1114        fetch_count.store(0, Ordering::SeqCst);
1115        let input = MetadataFetchFn(&mut fetch);
1116        let actual = ParquetMetaDataReader::new()
1117            .with_prefetch_hint(Some(500))
1118            .load_and_finish(input, len)
1119            .await
1120            .unwrap();
1121        assert_eq!(actual.file_metadata().schema(), expected);
1122        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1123
1124        // Metadata hint exactly correct
1125        fetch_count.store(0, Ordering::SeqCst);
1126        let input = MetadataFetchFn(&mut fetch);
1127        let actual = ParquetMetaDataReader::new()
1128            .with_prefetch_hint(Some(428))
1129            .load_and_finish(input, len)
1130            .await
1131            .unwrap();
1132        assert_eq!(actual.file_metadata().schema(), expected);
1133        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1134
1135        let input = MetadataFetchFn(&mut fetch);
1136        let err = ParquetMetaDataReader::new()
1137            .load_and_finish(input, 4)
1138            .await
1139            .unwrap_err()
1140            .to_string();
1141        assert_eq!(err, "EOF: file size of 4 is less than footer");
1142
1143        let input = MetadataFetchFn(&mut fetch);
1144        let err = ParquetMetaDataReader::new()
1145            .load_and_finish(input, 20)
1146            .await
1147            .unwrap_err()
1148            .to_string();
1149        assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
1150    }
1151
1152    #[tokio::test]
1153    async fn test_suffix() {
1154        let mut file = get_test_file("nulls.snappy.parquet");
1155        let mut file2 = file.try_clone().unwrap();
1156
1157        let expected = ParquetMetaDataReader::new()
1158            .parse_and_finish(&file)
1159            .unwrap();
1160        let expected = expected.file_metadata().schema();
1161        let fetch_count = AtomicUsize::new(0);
1162        let suffix_fetch_count = AtomicUsize::new(0);
1163
1164        let mut fetch = |range| {
1165            fetch_count.fetch_add(1, Ordering::SeqCst);
1166            futures::future::ready(read_range(&mut file, range))
1167        };
1168        let mut suffix_fetch = |suffix| {
1169            suffix_fetch_count.fetch_add(1, Ordering::SeqCst);
1170            futures::future::ready(read_suffix(&mut file2, suffix))
1171        };
1172
1173        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1174        let actual = ParquetMetaDataReader::new()
1175            .load_via_suffix_and_finish(input)
1176            .await
1177            .unwrap();
1178        assert_eq!(actual.file_metadata().schema(), expected);
1179        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1180        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1181
1182        // Metadata hint too small - below footer size
1183        fetch_count.store(0, Ordering::SeqCst);
1184        suffix_fetch_count.store(0, Ordering::SeqCst);
1185        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1186        let actual = ParquetMetaDataReader::new()
1187            .with_prefetch_hint(Some(7))
1188            .load_via_suffix_and_finish(input)
1189            .await
1190            .unwrap();
1191        assert_eq!(actual.file_metadata().schema(), expected);
1192        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1193        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1194
1195        // Metadata hint too small
1196        fetch_count.store(0, Ordering::SeqCst);
1197        suffix_fetch_count.store(0, Ordering::SeqCst);
1198        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1199        let actual = ParquetMetaDataReader::new()
1200            .with_prefetch_hint(Some(10))
1201            .load_via_suffix_and_finish(input)
1202            .await
1203            .unwrap();
1204        assert_eq!(actual.file_metadata().schema(), expected);
1205        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1206        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1207
1208        dbg!("test");
1209        // Metadata hint too large
1210        fetch_count.store(0, Ordering::SeqCst);
1211        suffix_fetch_count.store(0, Ordering::SeqCst);
1212        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1213        let actual = ParquetMetaDataReader::new()
1214            .with_prefetch_hint(Some(500))
1215            .load_via_suffix_and_finish(input)
1216            .await
1217            .unwrap();
1218        assert_eq!(actual.file_metadata().schema(), expected);
1219        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1220        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1221
1222        // Metadata hint exactly correct
1223        fetch_count.store(0, Ordering::SeqCst);
1224        suffix_fetch_count.store(0, Ordering::SeqCst);
1225        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1226        let actual = ParquetMetaDataReader::new()
1227            .with_prefetch_hint(Some(428))
1228            .load_via_suffix_and_finish(input)
1229            .await
1230            .unwrap();
1231        assert_eq!(actual.file_metadata().schema(), expected);
1232        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1233        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1234    }
1235
1236    #[cfg(feature = "encryption")]
1237    #[tokio::test]
1238    async fn test_suffix_with_encryption() {
1239        let mut file = get_test_file("uniform_encryption.parquet.encrypted");
1240        let mut file2 = file.try_clone().unwrap();
1241
1242        let mut fetch = |range| futures::future::ready(read_range(&mut file, range));
1243        let mut suffix_fetch = |suffix| futures::future::ready(read_suffix(&mut file2, suffix));
1244
1245        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1246
1247        let key_code: &[u8] = "0123456789012345".as_bytes();
1248        let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec())
1249            .build()
1250            .unwrap();
1251
1252        // just make sure the metadata is properly decrypted and read
1253        let expected = ParquetMetaDataReader::new()
1254            .with_decryption_properties(Some(decryption_properties))
1255            .load_via_suffix_and_finish(input)
1256            .await
1257            .unwrap();
1258        assert_eq!(expected.num_row_groups(), 1);
1259    }
1260
1261    #[tokio::test]
1262    #[allow(deprecated)]
1263    async fn test_page_index() {
1264        let mut file = get_test_file("alltypes_tiny_pages.parquet");
1265        let len = file.len();
1266        let fetch_count = AtomicUsize::new(0);
1267        let mut fetch = |range| {
1268            fetch_count.fetch_add(1, Ordering::SeqCst);
1269            futures::future::ready(read_range(&mut file, range))
1270        };
1271
1272        let f = MetadataFetchFn(&mut fetch);
1273        let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
1274        loader.try_load(f, len).await.unwrap();
1275        assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
1276        let metadata = loader.finish().unwrap();
1277        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1278
1279        // Prefetch just footer exactly
1280        fetch_count.store(0, Ordering::SeqCst);
1281        let f = MetadataFetchFn(&mut fetch);
1282        let mut loader = ParquetMetaDataReader::new()
1283            .with_page_indexes(true)
1284            .with_prefetch_hint(Some(1729));
1285        loader.try_load(f, len).await.unwrap();
1286        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1287        let metadata = loader.finish().unwrap();
1288        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1289
1290        // Prefetch more than footer but not enough
1291        fetch_count.store(0, Ordering::SeqCst);
1292        let f = MetadataFetchFn(&mut fetch);
1293        let mut loader = ParquetMetaDataReader::new()
1294            .with_page_indexes(true)
1295            .with_prefetch_hint(Some(130649));
1296        loader.try_load(f, len).await.unwrap();
1297        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1298        let metadata = loader.finish().unwrap();
1299        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1300
1301        // Prefetch exactly enough
1302        fetch_count.store(0, Ordering::SeqCst);
1303        let f = MetadataFetchFn(&mut fetch);
1304        let metadata = ParquetMetaDataReader::new()
1305            .with_page_indexes(true)
1306            .with_prefetch_hint(Some(130650))
1307            .load_and_finish(f, len)
1308            .await
1309            .unwrap();
1310        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1311        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1312
1313        // Prefetch more than enough but less than the entire file
1314        fetch_count.store(0, Ordering::SeqCst);
1315        let f = MetadataFetchFn(&mut fetch);
1316        let metadata = ParquetMetaDataReader::new()
1317            .with_page_indexes(true)
1318            .with_prefetch_hint(Some((len - 1000) as usize)) // prefetch entire file
1319            .load_and_finish(f, len)
1320            .await
1321            .unwrap();
1322        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1323        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1324
1325        // Prefetch the entire file
1326        fetch_count.store(0, Ordering::SeqCst);
1327        let f = MetadataFetchFn(&mut fetch);
1328        let metadata = ParquetMetaDataReader::new()
1329            .with_page_indexes(true)
1330            .with_prefetch_hint(Some(len as usize)) // prefetch entire file
1331            .load_and_finish(f, len)
1332            .await
1333            .unwrap();
1334        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1335        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1336
1337        // Prefetch more than the entire file
1338        fetch_count.store(0, Ordering::SeqCst);
1339        let f = MetadataFetchFn(&mut fetch);
1340        let metadata = ParquetMetaDataReader::new()
1341            .with_page_indexes(true)
1342            .with_prefetch_hint(Some((len + 1000) as usize)) // prefetch entire file
1343            .load_and_finish(f, len)
1344            .await
1345            .unwrap();
1346        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1347        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1348    }
1349
1350    fn write_parquet_file(offset_index_disabled: bool) -> Result<NamedTempFile> {
1351        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1352        let batch = RecordBatch::try_new(
1353            schema.clone(),
1354            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
1355        )?;
1356
1357        let file = NamedTempFile::new().unwrap();
1358
1359        // Write properties with page index disabled
1360        let props = WriterProperties::builder()
1361            .set_offset_index_disabled(offset_index_disabled)
1362            .build();
1363
1364        let mut writer = ArrowWriter::try_new(file.reopen()?, schema, Some(props))?;
1365        writer.write(&batch)?;
1366        writer.close()?;
1367
1368        Ok(file)
1369    }
1370
1371    fn read_and_check(file: &File, policy: PageIndexPolicy) -> Result<ParquetMetaData> {
1372        let mut reader = ParquetMetaDataReader::new().with_page_index_policy(policy);
1373        reader.try_parse(file)?;
1374        reader.finish()
1375    }
1376
1377    #[test]
1378    fn test_page_index_policy() {
1379        // With page index
1380        let f = write_parquet_file(false).unwrap();
1381        read_and_check(f.as_file(), PageIndexPolicy::Required).unwrap();
1382        read_and_check(f.as_file(), PageIndexPolicy::Optional).unwrap();
1383        read_and_check(f.as_file(), PageIndexPolicy::Skip).unwrap();
1384
1385        // Without page index
1386        let f = write_parquet_file(true).unwrap();
1387        let res = read_and_check(f.as_file(), PageIndexPolicy::Required);
1388        assert!(matches!(
1389            res,
1390            Err(ParquetError::General(e)) if e == "missing offset index"
1391        ));
1392        read_and_check(f.as_file(), PageIndexPolicy::Optional).unwrap();
1393        read_and_check(f.as_file(), PageIndexPolicy::Skip).unwrap();
1394    }
1395}