Skip to main content

parquet/file/metadata/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#[cfg(feature = "encryption")]
19use crate::encryption::decrypt::FileDecryptionProperties;
20use crate::errors::{ParquetError, Result};
21use crate::file::FOOTER_SIZE;
22use crate::file::metadata::parser::decode_metadata;
23use crate::file::metadata::thrift::parquet_schema_from_bytes;
24use crate::file::metadata::{
25    FooterTail, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataPushDecoder,
26};
27use crate::file::reader::ChunkReader;
28use crate::schema::types::SchemaDescriptor;
29use bytes::Bytes;
30use std::sync::Arc;
31use std::{io::Read, ops::Range};
32
33use crate::DecodeResult;
34#[cfg(all(feature = "async", feature = "arrow"))]
35use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
36
37/// Reads [`ParquetMetaData`] from a byte stream, with either synchronous or
38/// asynchronous I/O.
39///
40/// There are two flavors of APIs:
41/// * Synchronous: [`Self::try_parse()`], [`Self::try_parse_sized()`], [`Self::parse_and_finish()`], etc.
42/// * Asynchronous (requires `async` and `arrow` features): [`Self::try_load()`], etc
43///
44///  See the [`ParquetMetaDataPushDecoder`] for an API that does not require I/O.
45///
46/// # Format Notes
47///
48/// Parquet metadata is not necessarily contiguous in a Parquet file: a portion is stored
49/// in the footer (the last bytes of the file), but other portions (such as the
50/// PageIndex) can be stored elsewhere.
51/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for more details of
52/// Parquet metadata.
53///
54/// This reader handles reading the footer as well as the non contiguous parts
55/// of the metadata (`PageIndex` and `ColumnIndex`). It does not handle reading Bloom Filters.
56///
57/// # Example
58/// ```no_run
59/// # use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
60/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
61/// // read parquet metadata including page indexes from a file
62/// let file = open_parquet_file("some_path.parquet");
63/// let mut reader = ParquetMetaDataReader::new()
64///     .with_page_index_policy(PageIndexPolicy::Required);
65/// reader.try_parse(&file).unwrap();
66/// let metadata = reader.finish().unwrap();
67/// assert!(metadata.column_index().is_some());
68/// assert!(metadata.offset_index().is_some());
69/// ```
70#[derive(Default, Debug)]
71pub struct ParquetMetaDataReader {
72    metadata: Option<ParquetMetaData>,
73    column_index: PageIndexPolicy,
74    offset_index: PageIndexPolicy,
75    prefetch_hint: Option<usize>,
76    metadata_options: Option<Arc<ParquetMetaDataOptions>>,
77    // Size of the serialized thrift metadata plus the 8 byte footer. Only set if
78    // `self.parse_metadata` is called.
79    metadata_size: Option<usize>,
80    #[cfg(feature = "encryption")]
81    file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
82}
83
84/// Describes the policy for reading page indexes
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
86pub enum PageIndexPolicy {
87    /// Do not read the page index.
88    #[default]
89    Skip,
90    /// Read the page index if it exists, otherwise do not error.
91    Optional,
92    /// Require the page index to exist, and error if it does not.
93    Required,
94}
95
96impl From<bool> for PageIndexPolicy {
97    fn from(value: bool) -> Self {
98        match value {
99            true => Self::Required,
100            false => Self::Skip,
101        }
102    }
103}
104
105impl ParquetMetaDataReader {
106    /// Create a new [`ParquetMetaDataReader`]
107    pub fn new() -> Self {
108        Default::default()
109    }
110
111    /// Create a new [`ParquetMetaDataReader`] populated with a [`ParquetMetaData`] struct
112    /// obtained via other means.
113    pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
114        Self {
115            metadata: Some(metadata),
116            ..Default::default()
117        }
118    }
119
120    /// Sets the [`PageIndexPolicy`] for the column and offset indexes
121    pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
122        self.with_column_index_policy(policy)
123            .with_offset_index_policy(policy)
124    }
125
126    /// Sets the [`PageIndexPolicy`] for the column index
127    pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
128        self.column_index = policy;
129        self
130    }
131
132    /// Sets the [`PageIndexPolicy`] for the offset index
133    pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
134        self.offset_index = policy;
135        self
136    }
137
138    /// Sets the [`ParquetMetaDataOptions`] to use when decoding
139    pub fn with_metadata_options(mut self, options: Option<ParquetMetaDataOptions>) -> Self {
140        self.metadata_options = options.map(Arc::new);
141        self
142    }
143
144    /// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`].
145    /// Only used for the asynchronous [`Self::try_load()`] method.
146    ///
147    /// By default, the reader will first fetch the last 8 bytes of the input file to obtain the
148    /// size of the footer metadata. A second fetch will be performed to obtain the needed bytes.
149    /// After parsing the footer metadata, a third fetch will be performed to obtain the bytes
150    /// needed to decode the page index structures, if they have been requested. To avoid
151    /// unnecessary fetches, `prefetch` can be set to an estimate of the number of bytes needed
152    /// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and
153    /// reduce latency. Setting `prefetch` too small will not trigger an error, but will result
154    /// in extra fetches being performed.
155    pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
156        self.prefetch_hint = prefetch;
157        self
158    }
159
160    /// Provide the FileDecryptionProperties to use when decrypting the file.
161    ///
162    /// This is only necessary when the file is encrypted.
163    #[cfg(feature = "encryption")]
164    pub fn with_decryption_properties(
165        mut self,
166        properties: Option<std::sync::Arc<FileDecryptionProperties>>,
167    ) -> Self {
168        self.file_decryption_properties = properties;
169        self
170    }
171
172    /// Indicates whether this reader has a [`ParquetMetaData`] internally.
173    pub fn has_metadata(&self) -> bool {
174        self.metadata.is_some()
175    }
176
177    /// Return the parsed [`ParquetMetaData`] struct, leaving `None` in its place.
178    pub fn finish(&mut self) -> Result<ParquetMetaData> {
179        self.metadata
180            .take()
181            .ok_or_else(|| general_err!("could not parse parquet metadata"))
182    }
183
184    /// Given a [`ChunkReader`], parse and return the [`ParquetMetaData`] in a single pass.
185    ///
186    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
187    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
188    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
189    ///
190    /// This call will consume `self`.
191    ///
192    /// # Example
193    /// ```no_run
194    /// # use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
195    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
196    /// // read parquet metadata including page indexes
197    /// let file = open_parquet_file("some_path.parquet");
198    /// let metadata = ParquetMetaDataReader::new()
199    ///     .with_page_index_policy(PageIndexPolicy::Required)
200    ///     .parse_and_finish(&file).unwrap();
201    /// ```
202    pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
203        self.try_parse(reader)?;
204        self.finish()
205    }
206
207    /// Attempts to parse the footer metadata (and optionally page indexes) given a [`ChunkReader`].
208    ///
209    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
210    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
211    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
212    pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
213        self.try_parse_sized(reader, reader.len())
214    }
215
216    /// Same as [`Self::try_parse()`], but provide the original file size in the case that `reader`
217    /// is a [`Bytes`] struct that does not contain the entire file. This information is necessary
218    /// when the page indexes are desired. `reader` must have access to the Parquet footer.
219    ///
220    /// Using this function also allows for retrying with a larger buffer.
221    ///
222    /// # Errors
223    ///
224    /// This function will return [`ParquetError::NeedMoreData`] in the event `reader` does not
225    /// provide enough data to fully parse the metadata (see example below). The returned error
226    /// will be populated with a `usize` field indicating the number of bytes required from the
227    /// tail of the file to completely parse the requested metadata.
228    ///
229    /// Other errors returned include [`ParquetError::General`] and [`ParquetError::EOF`].
230    ///
231    /// # Example
232    /// ```no_run
233    /// # use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
234    /// # use parquet::errors::ParquetError;
235    /// # use crate::parquet::file::reader::Length;
236    /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<u64>) -> bytes::Bytes { unimplemented!(); }
237    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
238    /// let file = open_parquet_file("some_path.parquet");
239    /// let len = file.len();
240    /// // Speculatively read 1 kilobyte from the end of the file
241    /// let bytes = get_bytes(&file, len - 1024..len);
242    /// let mut reader = ParquetMetaDataReader::new().with_page_index_policy(PageIndexPolicy::Required);
243    /// match reader.try_parse_sized(&bytes, len) {
244    ///     Ok(_) => (),
245    ///     Err(ParquetError::NeedMoreData(needed)) => {
246    ///         // Read the needed number of bytes from the end of the file
247    ///         let bytes = get_bytes(&file, len - needed as u64..len);
248    ///         reader.try_parse_sized(&bytes, len).unwrap();
249    ///     }
250    ///     _ => panic!("unexpected error")
251    /// }
252    /// let metadata = reader.finish().unwrap();
253    /// ```
254    ///
255    /// Note that it is possible for the file metadata to be completely read, but there are
256    /// insufficient bytes available to read the page indexes. [`Self::has_metadata()`] can be used
257    /// to test for this. In the event the file metadata is present, re-parsing of the file
258    /// metadata can be skipped by using [`Self::read_page_indexes_sized()`], as shown below.
259    /// ```no_run
260    /// # use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
261    /// # use parquet::errors::ParquetError;
262    /// # use crate::parquet::file::reader::Length;
263    /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<u64>) -> bytes::Bytes { unimplemented!(); }
264    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
265    /// let file = open_parquet_file("some_path.parquet");
266    /// let len = file.len();
267    /// // Speculatively read 1 kilobyte from the end of the file
268    /// let mut bytes = get_bytes(&file, len - 1024..len);
269    /// let mut reader = ParquetMetaDataReader::new().with_page_index_policy(PageIndexPolicy::Required);
270    /// // Loop until `bytes` is large enough
271    /// loop {
272    ///     match reader.try_parse_sized(&bytes, len) {
273    ///         Ok(_) => break,
274    ///         Err(ParquetError::NeedMoreData(needed)) => {
275    ///             // Read the needed number of bytes from the end of the file
276    ///             bytes = get_bytes(&file, len - needed as u64..len);
277    ///             // If file metadata was read only read page indexes, otherwise continue loop
278    ///             if reader.has_metadata() {
279    ///                 reader.read_page_indexes_sized(&bytes, len).unwrap();
280    ///                 break;
281    ///             }
282    ///         }
283    ///         _ => panic!("unexpected error")
284    ///     }
285    /// }
286    /// let metadata = reader.finish().unwrap();
287    /// ```
288    pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: u64) -> Result<()> {
289        self.metadata = match self.parse_metadata(reader) {
290            Ok(metadata) => Some(metadata),
291            Err(ParquetError::NeedMoreData(needed)) => {
292                // If reader is the same length as `file_size` then presumably there is no more to
293                // read, so return an EOF error.
294                if file_size == reader.len() || needed as u64 > file_size {
295                    return Err(eof_err!(
296                        "Parquet file too small. Size is {} but need {}",
297                        file_size,
298                        needed
299                    ));
300                } else {
301                    // Ask for a larger buffer
302                    return Err(ParquetError::NeedMoreData(needed));
303                }
304            }
305            Err(e) => return Err(e),
306        };
307
308        // we can return if page indexes aren't requested
309        if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
310        {
311            return Ok(());
312        }
313
314        self.read_page_indexes_sized(reader, file_size)
315    }
316
317    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
318    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`].
319    pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
320        self.read_page_indexes_sized(reader, reader.len())
321    }
322
323    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
324    /// This variant is used when `reader` cannot access the entire Parquet file (e.g. it is
325    /// a [`Bytes`] struct containing the tail of the file).
326    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`]. Like
327    /// [`Self::try_parse_sized()`] this function may return [`ParquetError::NeedMoreData`].
328    pub fn read_page_indexes_sized<R: ChunkReader>(
329        &mut self,
330        reader: &R,
331        file_size: u64,
332    ) -> Result<()> {
333        let Some(metadata) = self.metadata.take() else {
334            return Err(general_err!(
335                "Tried to read page indexes without ParquetMetaData metadata"
336            ));
337        };
338
339        let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
340            .with_offset_index_policy(self.offset_index)
341            .with_column_index_policy(self.column_index)
342            .with_metadata_options(self.metadata_options.clone());
343        let mut push_decoder = self.prepare_push_decoder(push_decoder);
344
345        // Get bounds needed for page indexes (if any are present in the file).
346        let range = match needs_index_data(&mut push_decoder)? {
347            NeedsIndexData::No(metadata) => {
348                self.metadata = Some(metadata);
349                return Ok(());
350            }
351            NeedsIndexData::Yes(range) => range,
352        };
353
354        // Check to see if needed range is within `file_range`. Checking `range.end` seems
355        // redundant, but it guards against `range_for_page_index()` returning garbage.
356        let file_range = file_size.saturating_sub(reader.len())..file_size;
357        if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
358            // Requested range starts beyond EOF
359            if range.end > file_size {
360                return Err(eof_err!(
361                    "Parquet file too small. Range {range:?} is beyond file bounds {file_size}",
362                ));
363            } else {
364                // Ask for a larger buffer
365                return Err(ParquetError::NeedMoreData(
366                    (file_size - range.start).try_into()?,
367                ));
368            }
369        }
370
371        // Perform extra sanity check to make sure `range` and the footer metadata don't
372        // overlap.
373        if let Some(metadata_size) = self.metadata_size {
374            let metadata_range = file_size.saturating_sub(metadata_size as u64)..file_size;
375            if range.end > metadata_range.start {
376                return Err(eof_err!(
377                    "Parquet file too small. Page index range {range:?} overlaps with file metadata {metadata_range:?}",
378                ));
379            }
380        }
381
382        // add the needed ranges to the decoder
383        let bytes_needed = usize::try_from(range.end - range.start)?;
384        let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?;
385
386        push_decoder.push_range(range, bytes)?;
387        let metadata = parse_index_data(&mut push_decoder)?;
388        self.metadata = Some(metadata);
389
390        Ok(())
391    }
392
393    /// Given a [`MetadataFetch`], parse and return the [`ParquetMetaData`] in a single pass.
394    ///
395    /// This call will consume `self`.
396    ///
397    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
398    /// performed by this function.
399    #[cfg(all(feature = "async", feature = "arrow"))]
400    pub async fn load_and_finish<F: MetadataFetch>(
401        mut self,
402        fetch: F,
403        file_size: u64,
404    ) -> Result<ParquetMetaData> {
405        self.try_load(fetch, file_size).await?;
406        self.finish()
407    }
408
409    /// Given a [`MetadataSuffixFetch`], parse and return the [`ParquetMetaData`] in a single pass.
410    ///
411    /// This call will consume `self`.
412    ///
413    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
414    /// performed by this function.
415    #[cfg(all(feature = "async", feature = "arrow"))]
416    pub async fn load_via_suffix_and_finish<F: MetadataSuffixFetch>(
417        mut self,
418        fetch: F,
419    ) -> Result<ParquetMetaData> {
420        self.try_load_via_suffix(fetch).await?;
421        self.finish()
422    }
423    /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
424    /// given a [`MetadataFetch`].
425    ///
426    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
427    /// performed by this function.
428    #[cfg(all(feature = "async", feature = "arrow"))]
429    pub async fn try_load<F: MetadataFetch>(&mut self, mut fetch: F, file_size: u64) -> Result<()> {
430        let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
431
432        self.metadata = Some(metadata);
433
434        // we can return if page indexes aren't requested
435        if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
436        {
437            return Ok(());
438        }
439
440        self.load_page_index_with_remainder(fetch, remainder).await
441    }
442
443    /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
444    /// given a [`MetadataSuffixFetch`].
445    ///
446    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
447    /// performed by this function.
448    #[cfg(all(feature = "async", feature = "arrow"))]
449    pub async fn try_load_via_suffix<F: MetadataSuffixFetch>(
450        &mut self,
451        mut fetch: F,
452    ) -> Result<()> {
453        let (metadata, remainder) = self.load_metadata_via_suffix(&mut fetch).await?;
454
455        self.metadata = Some(metadata);
456
457        // we can return if page indexes aren't requested
458        if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
459        {
460            return Ok(());
461        }
462
463        self.load_page_index_with_remainder(fetch, remainder).await
464    }
465
466    /// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
467    /// been obtained. See [`Self::new_with_metadata()`].
468    #[cfg(all(feature = "async", feature = "arrow"))]
469    pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
470        self.load_page_index_with_remainder(fetch, None).await
471    }
472
473    #[cfg(all(feature = "async", feature = "arrow"))]
474    async fn load_page_index_with_remainder<F: MetadataFetch>(
475        &mut self,
476        mut fetch: F,
477        remainder: Option<(usize, Bytes)>,
478    ) -> Result<()> {
479        let Some(metadata) = self.metadata.take() else {
480            return Err(general_err!("Footer metadata is not present"));
481        };
482
483        // in this case we don't actually know what the file size is, so just use u64::MAX
484        // this is ok since the offsets in the metadata are always valid
485        let file_size = u64::MAX;
486        let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
487            .with_offset_index_policy(self.offset_index)
488            .with_column_index_policy(self.column_index)
489            .with_metadata_options(self.metadata_options.clone());
490        let mut push_decoder = self.prepare_push_decoder(push_decoder);
491
492        // Get bounds needed for page indexes (if any are present in the file).
493        let range = match needs_index_data(&mut push_decoder)? {
494            NeedsIndexData::No(metadata) => {
495                self.metadata = Some(metadata);
496                return Ok(());
497            }
498            NeedsIndexData::Yes(range) => range,
499        };
500
501        let bytes = match &remainder {
502            Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => {
503                let remainder_start = *remainder_start as u64;
504                let offset = usize::try_from(range.start - remainder_start)?;
505                let end = usize::try_from(range.end - remainder_start)?;
506                if end > remainder.len() {
507                    return Err(general_err!(
508                        "Corrupted parquet file: index data range ({:?}) exceeds remainder length ({})",
509                        range,
510                        remainder.len()
511                    ));
512                }
513                remainder.slice(offset..end)
514            }
515            // Note: this will potentially fetch data already in remainder, this keeps things simple
516            _ => fetch.fetch(range.start..range.end).await?,
517        };
518
519        // Sanity check
520        if bytes.len() as u64 != range.end - range.start {
521            return Err(general_err!(
522                "Corrupted parquet file: index data length mismatch, expected {}, got {}",
523                range.end - range.start,
524                bytes.len()
525            ));
526        }
527        push_decoder.push_range(range.clone(), bytes)?;
528        let metadata = parse_index_data(&mut push_decoder)?;
529        self.metadata = Some(metadata);
530        Ok(())
531    }
532
533    // One-shot parse of footer.
534    // Side effect: this will set `self.metadata_size`
535    fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
536        // check file is large enough to hold footer
537        let file_size = chunk_reader.len();
538        if file_size < (FOOTER_SIZE as u64) {
539            return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
540        }
541
542        let mut footer = [0_u8; FOOTER_SIZE];
543        chunk_reader
544            .get_read(file_size - FOOTER_SIZE as u64)?
545            .read_exact(&mut footer)?;
546
547        let footer = FooterTail::try_new(&footer)?;
548        let metadata_len = footer.metadata_length();
549        let footer_metadata_len = FOOTER_SIZE + metadata_len;
550        self.metadata_size = Some(footer_metadata_len);
551
552        if footer_metadata_len as u64 > file_size {
553            return Err(ParquetError::NeedMoreData(footer_metadata_len));
554        }
555
556        let start = file_size - footer_metadata_len as u64;
557        let bytes = chunk_reader.get_bytes(start, metadata_len)?;
558        self.decode_footer_metadata(bytes, file_size, footer)
559    }
560
561    /// Size of the serialized thrift metadata plus the 8 byte footer. Only set if
562    /// `self.parse_metadata` is called.
563    pub fn metadata_size(&self) -> Option<usize> {
564        self.metadata_size
565    }
566
567    /// Return the number of bytes to read in the initial pass. If `prefetch_size` has
568    /// been provided, then return that value if it is larger than the size of the Parquet
569    /// file footer (8 bytes). Otherwise returns `8`.
570    #[cfg(all(feature = "async", feature = "arrow"))]
571    fn get_prefetch_size(&self) -> usize {
572        if let Some(prefetch) = self.prefetch_hint {
573            if prefetch > FOOTER_SIZE {
574                return prefetch;
575            }
576        }
577        FOOTER_SIZE
578    }
579
580    #[cfg(all(feature = "async", feature = "arrow"))]
581    async fn load_metadata<F: MetadataFetch>(
582        &self,
583        fetch: &mut F,
584        file_size: u64,
585    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
586        let prefetch = self.get_prefetch_size() as u64;
587
588        if file_size < FOOTER_SIZE as u64 {
589            return Err(eof_err!("file size of {} is less than footer", file_size));
590        }
591
592        // If a size hint is provided, read more than the minimum size
593        // to try and avoid a second fetch.
594        // Note: prefetch > file_size is ok since we're using saturating_sub.
595        let footer_start = file_size.saturating_sub(prefetch);
596
597        let suffix = fetch.fetch(footer_start..file_size).await?;
598        let suffix_len = suffix.len();
599        let fetch_len = (file_size - footer_start)
600            .try_into()
601            .expect("footer size should never be larger than u32");
602        if suffix_len < fetch_len {
603            return Err(eof_err!(
604                "metadata requires {} bytes, but could only read {}",
605                fetch_len,
606                suffix_len
607            ));
608        }
609
610        let mut footer = [0; FOOTER_SIZE];
611        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
612
613        let footer = FooterTail::try_new(&footer)?;
614        let length = footer.metadata_length();
615
616        if file_size < (length + FOOTER_SIZE) as u64 {
617            return Err(eof_err!(
618                "file size of {} is less than footer + metadata {}",
619                file_size,
620                length + FOOTER_SIZE
621            ));
622        }
623
624        // Did not fetch the entire file metadata in the initial read, need to make a second request
625        if length > suffix_len - FOOTER_SIZE {
626            let metadata_start = file_size - (length + FOOTER_SIZE) as u64;
627            let meta = fetch
628                .fetch(metadata_start..(file_size - FOOTER_SIZE as u64))
629                .await?;
630            Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
631        } else {
632            let metadata_start = (file_size - (length + FOOTER_SIZE) as u64 - footer_start)
633                .try_into()
634                .expect("metadata length should never be larger than u32");
635            let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
636            Ok((
637                self.decode_footer_metadata(slice, file_size, footer)?,
638                Some((footer_start as usize, suffix.slice(..metadata_start))),
639            ))
640        }
641    }
642
643    #[cfg(all(feature = "async", feature = "arrow"))]
644    async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
645        &self,
646        fetch: &mut F,
647    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
648        let prefetch = self.get_prefetch_size();
649
650        let suffix = fetch.fetch_suffix(prefetch as _).await?;
651        let suffix_len = suffix.len();
652
653        if suffix_len < FOOTER_SIZE {
654            return Err(eof_err!(
655                "footer metadata requires {} bytes, but could only read {}",
656                FOOTER_SIZE,
657                suffix_len
658            ));
659        }
660
661        let mut footer = [0; FOOTER_SIZE];
662        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
663
664        let footer = FooterTail::try_new(&footer)?;
665        let length = footer.metadata_length();
666        // fake file size as we are only parsing the footer metadata here
667        // (cant be parsing page indexes without the full file size)
668        let file_size = (length + FOOTER_SIZE) as u64;
669
670        // Did not fetch the entire file metadata in the initial read, need to make a second request
671        let metadata_offset = length + FOOTER_SIZE;
672        if length > suffix_len - FOOTER_SIZE {
673            let meta = fetch.fetch_suffix(metadata_offset).await?;
674
675            if meta.len() < metadata_offset {
676                return Err(eof_err!(
677                    "metadata requires {} bytes, but could only read {}",
678                    metadata_offset,
679                    meta.len()
680                ));
681            }
682
683            // need to slice off the footer or decryption fails
684            let meta = meta.slice(0..length);
685            Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
686        } else {
687            let metadata_start = suffix_len - metadata_offset;
688            let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
689            Ok((
690                self.decode_footer_metadata(slice, file_size, footer)?,
691                Some((0, suffix.slice(..metadata_start))),
692            ))
693        }
694    }
695
696    /// Decodes [`ParquetMetaData`] from the provided bytes.
697    ///
698    /// Typically, this is used to decode the metadata from the end of a parquet
699    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
700    /// by the [Parquet Spec].
701    ///
702    /// It does **NOT** include the 8-byte footer.
703    ///
704    /// This method handles using either `decode_metadata` or
705    /// `decode_metadata_with_encryption` depending on whether the encryption
706    /// feature is enabled.
707    ///
708    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
709    pub(crate) fn decode_footer_metadata(
710        &self,
711        buf: Bytes,
712        file_size: u64,
713        footer_tail: FooterTail,
714    ) -> Result<ParquetMetaData> {
715        // The push decoder expects the metadata to be at the end of the file
716        // (... data ...) + (metadata) + (footer)
717        // so we need to provide the starting offset of the metadata
718        // within the file.
719        let ending_offset = file_size.checked_sub(FOOTER_SIZE as u64).ok_or_else(|| {
720            general_err!(
721                "file size {file_size} is smaller than footer size {}",
722                FOOTER_SIZE
723            )
724        })?;
725
726        let starting_offset = ending_offset.checked_sub(buf.len() as u64).ok_or_else(|| {
727            general_err!(
728                "file size {file_size} is smaller than buffer size {} + footer size {}",
729                buf.len(),
730                FOOTER_SIZE
731            )
732        })?;
733
734        let range = starting_offset..ending_offset;
735
736        let push_decoder =
737            ParquetMetaDataPushDecoder::try_new_with_footer_tail(file_size, footer_tail)?
738                // NOTE: DO NOT enable page indexes here, they are handled separately
739                .with_page_index_policy(PageIndexPolicy::Skip)
740                .with_metadata_options(self.metadata_options.clone());
741
742        let mut push_decoder = self.prepare_push_decoder(push_decoder);
743        push_decoder.push_range(range, buf)?;
744        match push_decoder.try_decode()? {
745            DecodeResult::Data(metadata) => Ok(metadata),
746            DecodeResult::Finished => Err(general_err!(
747                "could not parse parquet metadata -- previously finished"
748            )),
749            DecodeResult::NeedsData(ranges) => Err(general_err!(
750                "could not parse parquet metadata, needs ranges {:?}",
751                ranges
752            )),
753        }
754    }
755
756    /// Prepares a push decoder and runs it to decode the metadata.
757    #[cfg(feature = "encryption")]
758    fn prepare_push_decoder(
759        &self,
760        push_decoder: ParquetMetaDataPushDecoder,
761    ) -> ParquetMetaDataPushDecoder {
762        push_decoder.with_file_decryption_properties(
763            self.file_decryption_properties
764                .as_ref()
765                .map(std::sync::Arc::clone),
766        )
767    }
768    #[cfg(not(feature = "encryption"))]
769    fn prepare_push_decoder(
770        &self,
771        push_decoder: ParquetMetaDataPushDecoder,
772    ) -> ParquetMetaDataPushDecoder {
773        push_decoder
774    }
775
776    /// Decodes [`ParquetMetaData`] from the provided bytes.
777    ///
778    /// Typically this is used to decode the metadata from the end of a parquet
779    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
780    /// by the [Parquet Spec].
781    ///
782    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
783    pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
784        decode_metadata(buf, None)
785    }
786
787    /// Decodes [`ParquetMetaData`] from the provided bytes.
788    ///
789    /// Like [`Self::decode_metadata`] but this also accepts
790    /// metadata parsing options.
791    pub fn decode_metadata_with_options(
792        buf: &[u8],
793        options: Option<&ParquetMetaDataOptions>,
794    ) -> Result<ParquetMetaData> {
795        decode_metadata(buf, options)
796    }
797
798    /// Decodes the schema from the Parquet footer in `buf`. Returned as
799    /// a [`SchemaDescriptor`].
800    pub fn decode_schema(buf: &[u8]) -> Result<Arc<SchemaDescriptor>> {
801        Ok(Arc::new(parquet_schema_from_bytes(buf)?))
802    }
803}
804
805/// The bounds needed to read page indexes
806// this is an internal enum, so it is ok to allow differences in enum size
807#[allow(clippy::large_enum_variant)]
808enum NeedsIndexData {
809    /// no additional data is needed (e.g. the indexes weren't requested)
810    No(ParquetMetaData),
811    /// Additional data is needed, with the range that are required
812    Yes(Range<u64>),
813}
814
815/// Determines a single combined range of bytes needed to read the page indexes,
816/// or returns the metadata if no additional data is needed (e.g. if no page indexes are requested)
817fn needs_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> Result<NeedsIndexData> {
818    match push_decoder.try_decode()? {
819        DecodeResult::NeedsData(ranges) => {
820            let range = ranges
821                .into_iter()
822                .reduce(|a, b| a.start.min(b.start)..a.end.max(b.end))
823                .ok_or_else(|| general_err!("Internal error: no ranges provided"))?;
824            Ok(NeedsIndexData::Yes(range))
825        }
826        DecodeResult::Data(metadata) => Ok(NeedsIndexData::No(metadata)),
827        DecodeResult::Finished => Err(general_err!("Internal error: decoder was finished")),
828    }
829}
830
831/// Given a push decoder that has had the needed ranges pushed to it,
832/// attempt to decode indexes and return the updated metadata.
833fn parse_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> Result<ParquetMetaData> {
834    match push_decoder.try_decode()? {
835        DecodeResult::NeedsData(_) => Err(general_err!(
836            "Internal error: decoder still needs data after reading required range"
837        )),
838        DecodeResult::Data(metadata) => Ok(metadata),
839        DecodeResult::Finished => Err(general_err!("Internal error: decoder was finished")),
840    }
841}
842
843#[cfg(test)]
844mod tests {
845    use super::*;
846    use crate::file::reader::Length;
847    use crate::util::test_common::file_util::get_test_file;
848    use std::ops::Range;
849
850    #[test]
851    fn test_parse_metadata_size_smaller_than_footer() {
852        let test_file = tempfile::tempfile().unwrap();
853        let err = ParquetMetaDataReader::new()
854            .parse_metadata(&test_file)
855            .unwrap_err();
856        assert!(matches!(err, ParquetError::NeedMoreData(FOOTER_SIZE)));
857    }
858
859    #[test]
860    fn test_parse_metadata_corrupt_footer() {
861        let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
862        let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
863        assert_eq!(
864            reader_result.unwrap_err().to_string(),
865            "Parquet error: Invalid Parquet file. Corrupt footer"
866        );
867    }
868
869    #[test]
870    fn test_parse_metadata_invalid_start() {
871        let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
872        let err = ParquetMetaDataReader::new()
873            .parse_metadata(&test_file)
874            .unwrap_err();
875        assert!(matches!(err, ParquetError::NeedMoreData(263)));
876    }
877
878    #[test]
879    fn test_try_parse() {
880        let file = get_test_file("alltypes_tiny_pages.parquet");
881        let len = file.len();
882
883        let mut reader =
884            ParquetMetaDataReader::new().with_page_index_policy(PageIndexPolicy::Required);
885
886        let bytes_for_range = |range: Range<u64>| {
887            file.get_bytes(range.start, (range.end - range.start).try_into().unwrap())
888                .unwrap()
889        };
890
891        // read entire file
892        let bytes = bytes_for_range(0..len);
893        reader.try_parse(&bytes).unwrap();
894        let metadata = reader.finish().unwrap();
895        assert!(metadata.column_index.is_some());
896        assert!(metadata.offset_index.is_some());
897
898        // read more than enough of file
899        let bytes = bytes_for_range(320000..len);
900        reader.try_parse_sized(&bytes, len).unwrap();
901        let metadata = reader.finish().unwrap();
902        assert!(metadata.column_index.is_some());
903        assert!(metadata.offset_index.is_some());
904
905        // exactly enough
906        let bytes = bytes_for_range(323583..len);
907        reader.try_parse_sized(&bytes, len).unwrap();
908        let metadata = reader.finish().unwrap();
909        assert!(metadata.column_index.is_some());
910        assert!(metadata.offset_index.is_some());
911
912        // not enough for page index
913        let bytes = bytes_for_range(323584..len);
914        // should fail
915        match reader.try_parse_sized(&bytes, len).unwrap_err() {
916            // expected error, try again with provided bounds
917            ParquetError::NeedMoreData(needed) => {
918                let bytes = bytes_for_range(len - needed as u64..len);
919                reader.try_parse_sized(&bytes, len).unwrap();
920                let metadata = reader.finish().unwrap();
921                assert!(metadata.column_index.is_some());
922                assert!(metadata.offset_index.is_some());
923            }
924            _ => panic!("unexpected error"),
925        };
926
927        // not enough for file metadata, but keep trying until page indexes are read
928        let mut reader =
929            ParquetMetaDataReader::new().with_page_index_policy(PageIndexPolicy::Required);
930        let mut bytes = bytes_for_range(452505..len);
931        loop {
932            match reader.try_parse_sized(&bytes, len) {
933                Ok(_) => break,
934                Err(ParquetError::NeedMoreData(needed)) => {
935                    bytes = bytes_for_range(len - needed as u64..len);
936                    if reader.has_metadata() {
937                        reader.read_page_indexes_sized(&bytes, len).unwrap();
938                        break;
939                    }
940                }
941                _ => panic!("unexpected error"),
942            }
943        }
944        let metadata = reader.finish().unwrap();
945        assert!(metadata.column_index.is_some());
946        assert!(metadata.offset_index.is_some());
947
948        // not enough for page index but lie about file size
949        let bytes = bytes_for_range(323584..len);
950        let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
951        assert_eq!(
952            reader_result.to_string(),
953            "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
954        );
955
956        // not enough for file metadata
957        let mut reader = ParquetMetaDataReader::new();
958        let bytes = bytes_for_range(452505..len);
959        // should fail
960        match reader.try_parse_sized(&bytes, len).unwrap_err() {
961            // expected error, try again with provided bounds
962            ParquetError::NeedMoreData(needed) => {
963                let bytes = bytes_for_range(len - needed as u64..len);
964                reader.try_parse_sized(&bytes, len).unwrap();
965                reader.finish().unwrap();
966            }
967            _ => panic!("unexpected error"),
968        };
969
970        // not enough for file metadata but use try_parse()
971        let reader_result = reader.try_parse(&bytes).unwrap_err();
972        assert_eq!(
973            reader_result.to_string(),
974            "EOF: Parquet file too small. Size is 1728 but need 1729"
975        );
976
977        // read head of file rather than tail
978        let bytes = bytes_for_range(0..1000);
979        let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
980        assert_eq!(
981            reader_result.to_string(),
982            "Parquet error: Invalid Parquet file. Corrupt footer"
983        );
984
985        // lie about file size
986        let bytes = bytes_for_range(452510..len);
987        let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
988        assert_eq!(
989            reader_result.to_string(),
990            "EOF: Parquet file too small. Size is 1728 but need 1729"
991        );
992    }
993}
994
995#[cfg(all(feature = "async", feature = "arrow", test))]
996mod async_tests {
997    use super::*;
998
999    use arrow::{array::Int32Array, datatypes::DataType};
1000    use arrow_array::RecordBatch;
1001    use arrow_schema::{Field, Schema};
1002    use bytes::Bytes;
1003    use futures::FutureExt;
1004    use futures::future::BoxFuture;
1005    use std::fs::File;
1006    use std::future::Future;
1007    use std::io::{Read, Seek, SeekFrom};
1008    use std::ops::Range;
1009    use std::sync::Arc;
1010    use std::sync::atomic::{AtomicUsize, Ordering};
1011    use tempfile::NamedTempFile;
1012
1013    use crate::arrow::ArrowWriter;
1014    use crate::file::properties::WriterProperties;
1015    use crate::file::reader::Length;
1016    use crate::util::test_common::file_util::get_test_file;
1017
1018    struct MetadataFetchFn<F>(F);
1019
1020    impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
1021    where
1022        F: FnMut(Range<u64>) -> Fut + Send,
1023        Fut: Future<Output = Result<Bytes>> + Send,
1024    {
1025        fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1026            async move { self.0(range).await }.boxed()
1027        }
1028    }
1029
1030    struct MetadataSuffixFetchFn<F1, F2>(F1, F2);
1031
1032    impl<F1, Fut, F2> MetadataFetch for MetadataSuffixFetchFn<F1, F2>
1033    where
1034        F1: FnMut(Range<u64>) -> Fut + Send,
1035        Fut: Future<Output = Result<Bytes>> + Send,
1036        F2: Send,
1037    {
1038        fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1039            async move { self.0(range).await }.boxed()
1040        }
1041    }
1042
1043    impl<F1, Fut, F2> MetadataSuffixFetch for MetadataSuffixFetchFn<F1, F2>
1044    where
1045        F1: FnMut(Range<u64>) -> Fut + Send,
1046        F2: FnMut(usize) -> Fut + Send,
1047        Fut: Future<Output = Result<Bytes>> + Send,
1048    {
1049        fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
1050            async move { self.1(suffix).await }.boxed()
1051        }
1052    }
1053
1054    fn read_range(file: &mut File, range: Range<u64>) -> Result<Bytes> {
1055        file.seek(SeekFrom::Start(range.start as _))?;
1056        let len = range.end - range.start;
1057        let mut buf = Vec::with_capacity(len.try_into().unwrap());
1058        file.take(len as _).read_to_end(&mut buf)?;
1059        Ok(buf.into())
1060    }
1061
1062    fn read_suffix(file: &mut File, suffix: usize) -> Result<Bytes> {
1063        let file_len = file.len();
1064        // Don't seek before beginning of file
1065        file.seek(SeekFrom::End(0 - suffix.min(file_len as _) as i64))?;
1066        let mut buf = Vec::with_capacity(suffix);
1067        file.take(suffix as _).read_to_end(&mut buf)?;
1068        Ok(buf.into())
1069    }
1070
1071    #[tokio::test]
1072    async fn test_simple() {
1073        let mut file = get_test_file("nulls.snappy.parquet");
1074        let len = file.len();
1075
1076        let expected = ParquetMetaDataReader::new()
1077            .parse_and_finish(&file)
1078            .unwrap();
1079        let expected = expected.file_metadata().schema();
1080        let fetch_count = AtomicUsize::new(0);
1081
1082        let mut fetch = |range| {
1083            fetch_count.fetch_add(1, Ordering::SeqCst);
1084            futures::future::ready(read_range(&mut file, range))
1085        };
1086
1087        let input = MetadataFetchFn(&mut fetch);
1088        let actual = ParquetMetaDataReader::new()
1089            .load_and_finish(input, len)
1090            .await
1091            .unwrap();
1092        assert_eq!(actual.file_metadata().schema(), expected);
1093        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1094
1095        // Metadata hint too small - below footer size
1096        fetch_count.store(0, Ordering::SeqCst);
1097        let input = MetadataFetchFn(&mut fetch);
1098        let actual = ParquetMetaDataReader::new()
1099            .with_prefetch_hint(Some(7))
1100            .load_and_finish(input, len)
1101            .await
1102            .unwrap();
1103        assert_eq!(actual.file_metadata().schema(), expected);
1104        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1105
1106        // Metadata hint too small
1107        fetch_count.store(0, Ordering::SeqCst);
1108        let input = MetadataFetchFn(&mut fetch);
1109        let actual = ParquetMetaDataReader::new()
1110            .with_prefetch_hint(Some(10))
1111            .load_and_finish(input, len)
1112            .await
1113            .unwrap();
1114        assert_eq!(actual.file_metadata().schema(), expected);
1115        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1116
1117        // Metadata hint too large
1118        fetch_count.store(0, Ordering::SeqCst);
1119        let input = MetadataFetchFn(&mut fetch);
1120        let actual = ParquetMetaDataReader::new()
1121            .with_prefetch_hint(Some(500))
1122            .load_and_finish(input, len)
1123            .await
1124            .unwrap();
1125        assert_eq!(actual.file_metadata().schema(), expected);
1126        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1127
1128        // Metadata hint exactly correct
1129        fetch_count.store(0, Ordering::SeqCst);
1130        let input = MetadataFetchFn(&mut fetch);
1131        let actual = ParquetMetaDataReader::new()
1132            .with_prefetch_hint(Some(428))
1133            .load_and_finish(input, len)
1134            .await
1135            .unwrap();
1136        assert_eq!(actual.file_metadata().schema(), expected);
1137        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1138
1139        let input = MetadataFetchFn(&mut fetch);
1140        let err = ParquetMetaDataReader::new()
1141            .load_and_finish(input, 4)
1142            .await
1143            .unwrap_err()
1144            .to_string();
1145        assert_eq!(err, "EOF: file size of 4 is less than footer");
1146
1147        let input = MetadataFetchFn(&mut fetch);
1148        let err = ParquetMetaDataReader::new()
1149            .load_and_finish(input, 20)
1150            .await
1151            .unwrap_err()
1152            .to_string();
1153        assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
1154    }
1155
1156    #[tokio::test]
1157    async fn test_suffix() {
1158        let mut file = get_test_file("nulls.snappy.parquet");
1159        let mut file2 = file.try_clone().unwrap();
1160
1161        let expected = ParquetMetaDataReader::new()
1162            .parse_and_finish(&file)
1163            .unwrap();
1164        let expected = expected.file_metadata().schema();
1165        let fetch_count = AtomicUsize::new(0);
1166        let suffix_fetch_count = AtomicUsize::new(0);
1167
1168        let mut fetch = |range| {
1169            fetch_count.fetch_add(1, Ordering::SeqCst);
1170            futures::future::ready(read_range(&mut file, range))
1171        };
1172        let mut suffix_fetch = |suffix| {
1173            suffix_fetch_count.fetch_add(1, Ordering::SeqCst);
1174            futures::future::ready(read_suffix(&mut file2, suffix))
1175        };
1176
1177        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1178        let actual = ParquetMetaDataReader::new()
1179            .load_via_suffix_and_finish(input)
1180            .await
1181            .unwrap();
1182        assert_eq!(actual.file_metadata().schema(), expected);
1183        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1184        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1185
1186        // Metadata hint too small - below footer size
1187        fetch_count.store(0, Ordering::SeqCst);
1188        suffix_fetch_count.store(0, Ordering::SeqCst);
1189        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1190        let actual = ParquetMetaDataReader::new()
1191            .with_prefetch_hint(Some(7))
1192            .load_via_suffix_and_finish(input)
1193            .await
1194            .unwrap();
1195        assert_eq!(actual.file_metadata().schema(), expected);
1196        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1197        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1198
1199        // Metadata hint too small
1200        fetch_count.store(0, Ordering::SeqCst);
1201        suffix_fetch_count.store(0, Ordering::SeqCst);
1202        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1203        let actual = ParquetMetaDataReader::new()
1204            .with_prefetch_hint(Some(10))
1205            .load_via_suffix_and_finish(input)
1206            .await
1207            .unwrap();
1208        assert_eq!(actual.file_metadata().schema(), expected);
1209        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1210        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1211
1212        dbg!("test");
1213        // Metadata hint too large
1214        fetch_count.store(0, Ordering::SeqCst);
1215        suffix_fetch_count.store(0, Ordering::SeqCst);
1216        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1217        let actual = ParquetMetaDataReader::new()
1218            .with_prefetch_hint(Some(500))
1219            .load_via_suffix_and_finish(input)
1220            .await
1221            .unwrap();
1222        assert_eq!(actual.file_metadata().schema(), expected);
1223        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1224        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1225
1226        // Metadata hint exactly correct
1227        fetch_count.store(0, Ordering::SeqCst);
1228        suffix_fetch_count.store(0, Ordering::SeqCst);
1229        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1230        let actual = ParquetMetaDataReader::new()
1231            .with_prefetch_hint(Some(428))
1232            .load_via_suffix_and_finish(input)
1233            .await
1234            .unwrap();
1235        assert_eq!(actual.file_metadata().schema(), expected);
1236        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1237        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1238    }
1239
1240    #[cfg(feature = "encryption")]
1241    #[tokio::test]
1242    async fn test_suffix_with_encryption() {
1243        let mut file = get_test_file("uniform_encryption.parquet.encrypted");
1244        let mut file2 = file.try_clone().unwrap();
1245
1246        let mut fetch = |range| futures::future::ready(read_range(&mut file, range));
1247        let mut suffix_fetch = |suffix| futures::future::ready(read_suffix(&mut file2, suffix));
1248
1249        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1250
1251        let key_code: &[u8] = "0123456789012345".as_bytes();
1252        let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec())
1253            .build()
1254            .unwrap();
1255
1256        // just make sure the metadata is properly decrypted and read
1257        let expected = ParquetMetaDataReader::new()
1258            .with_decryption_properties(Some(decryption_properties))
1259            .load_via_suffix_and_finish(input)
1260            .await
1261            .unwrap();
1262        assert_eq!(expected.num_row_groups(), 1);
1263    }
1264
1265    #[tokio::test]
1266    async fn test_page_index() {
1267        let mut file = get_test_file("alltypes_tiny_pages.parquet");
1268        let len = file.len();
1269        let fetch_count = AtomicUsize::new(0);
1270        let mut fetch = |range| {
1271            fetch_count.fetch_add(1, Ordering::SeqCst);
1272            futures::future::ready(read_range(&mut file, range))
1273        };
1274
1275        let f = MetadataFetchFn(&mut fetch);
1276        let mut loader =
1277            ParquetMetaDataReader::new().with_page_index_policy(PageIndexPolicy::Required);
1278        loader.try_load(f, len).await.unwrap();
1279        assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
1280        let metadata = loader.finish().unwrap();
1281        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1282
1283        // Prefetch just footer exactly
1284        fetch_count.store(0, Ordering::SeqCst);
1285        let f = MetadataFetchFn(&mut fetch);
1286        let mut loader = ParquetMetaDataReader::new()
1287            .with_page_index_policy(PageIndexPolicy::Required)
1288            .with_prefetch_hint(Some(1729));
1289        loader.try_load(f, len).await.unwrap();
1290        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1291        let metadata = loader.finish().unwrap();
1292        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1293
1294        // Prefetch more than footer but not enough
1295        fetch_count.store(0, Ordering::SeqCst);
1296        let f = MetadataFetchFn(&mut fetch);
1297        let mut loader = ParquetMetaDataReader::new()
1298            .with_page_index_policy(PageIndexPolicy::Required)
1299            .with_prefetch_hint(Some(130649));
1300        loader.try_load(f, len).await.unwrap();
1301        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1302        let metadata = loader.finish().unwrap();
1303        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1304
1305        // Prefetch exactly enough
1306        fetch_count.store(0, Ordering::SeqCst);
1307        let f = MetadataFetchFn(&mut fetch);
1308        let metadata = ParquetMetaDataReader::new()
1309            .with_page_index_policy(PageIndexPolicy::Required)
1310            .with_prefetch_hint(Some(130650))
1311            .load_and_finish(f, len)
1312            .await
1313            .unwrap();
1314        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1315        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1316
1317        // Prefetch more than enough but less than the entire file
1318        fetch_count.store(0, Ordering::SeqCst);
1319        let f = MetadataFetchFn(&mut fetch);
1320        let metadata = ParquetMetaDataReader::new()
1321            .with_page_index_policy(PageIndexPolicy::Required)
1322            .with_prefetch_hint(Some((len - 1000) as usize)) // prefetch entire file
1323            .load_and_finish(f, len)
1324            .await
1325            .unwrap();
1326        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1327        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1328
1329        // Prefetch the entire file
1330        fetch_count.store(0, Ordering::SeqCst);
1331        let f = MetadataFetchFn(&mut fetch);
1332        let metadata = ParquetMetaDataReader::new()
1333            .with_page_index_policy(PageIndexPolicy::Required)
1334            .with_prefetch_hint(Some(len as usize)) // prefetch entire file
1335            .load_and_finish(f, len)
1336            .await
1337            .unwrap();
1338        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1339        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1340
1341        // Prefetch more than the entire file
1342        fetch_count.store(0, Ordering::SeqCst);
1343        let f = MetadataFetchFn(&mut fetch);
1344        let metadata = ParquetMetaDataReader::new()
1345            .with_page_index_policy(PageIndexPolicy::Required)
1346            .with_prefetch_hint(Some((len + 1000) as usize)) // prefetch entire file
1347            .load_and_finish(f, len)
1348            .await
1349            .unwrap();
1350        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1351        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1352    }
1353
1354    fn write_parquet_file(offset_index_disabled: bool) -> Result<NamedTempFile> {
1355        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1356        let batch = RecordBatch::try_new(
1357            schema.clone(),
1358            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
1359        )?;
1360
1361        let file = NamedTempFile::new().unwrap();
1362
1363        // Write properties with page index disabled
1364        let props = WriterProperties::builder()
1365            .set_offset_index_disabled(offset_index_disabled)
1366            .build();
1367
1368        let mut writer = ArrowWriter::try_new(file.reopen()?, schema, Some(props))?;
1369        writer.write(&batch)?;
1370        writer.close()?;
1371
1372        Ok(file)
1373    }
1374
1375    fn read_and_check(file: &File, policy: PageIndexPolicy) -> Result<ParquetMetaData> {
1376        let mut reader = ParquetMetaDataReader::new().with_page_index_policy(policy);
1377        reader.try_parse(file)?;
1378        reader.finish()
1379    }
1380
1381    #[test]
1382    fn test_page_index_policy() {
1383        // With page index
1384        let f = write_parquet_file(false).unwrap();
1385        read_and_check(f.as_file(), PageIndexPolicy::Required).unwrap();
1386        read_and_check(f.as_file(), PageIndexPolicy::Optional).unwrap();
1387        read_and_check(f.as_file(), PageIndexPolicy::Skip).unwrap();
1388
1389        // Without page index
1390        let f = write_parquet_file(true).unwrap();
1391        let res = read_and_check(f.as_file(), PageIndexPolicy::Required);
1392        assert!(matches!(
1393            res,
1394            Err(ParquetError::General(e)) if e == "missing offset index"
1395        ));
1396        read_and_check(f.as_file(), PageIndexPolicy::Optional).unwrap();
1397        read_and_check(f.as_file(), PageIndexPolicy::Skip).unwrap();
1398    }
1399}