Skip to main content

parquet/file/metadata/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#[cfg(feature = "encryption")]
19use crate::encryption::decrypt::FileDecryptionProperties;
20use crate::errors::{ParquetError, Result};
21use crate::file::FOOTER_SIZE;
22use crate::file::metadata::parser::decode_metadata;
23use crate::file::metadata::thrift::parquet_schema_from_bytes;
24use crate::file::metadata::{
25    FooterTail, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataPushDecoder,
26};
27use crate::file::reader::ChunkReader;
28use crate::schema::types::SchemaDescriptor;
29use bytes::Bytes;
30use std::sync::Arc;
31use std::{io::Read, ops::Range};
32
33use crate::DecodeResult;
34#[cfg(all(feature = "async", feature = "arrow"))]
35use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
36
37/// Reads [`ParquetMetaData`] from a byte stream, with either synchronous or
38/// asynchronous I/O.
39///
40/// There are two flavors of APIs:
41/// * Synchronous: [`Self::try_parse()`], [`Self::try_parse_sized()`], [`Self::parse_and_finish()`], etc.
42/// * Asynchronous (requires `async` and `arrow` features): [`Self::try_load()`], etc
43///
44///  See the [`ParquetMetaDataPushDecoder`] for an API that does not require I/O.
45///
46/// # Format Notes
47///
48/// Parquet metadata is not necessarily contiguous in a Parquet file: a portion is stored
49/// in the footer (the last bytes of the file), but other portions (such as the
50/// PageIndex) can be stored elsewhere.
51/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for more details of
52/// Parquet metadata.
53///
54/// This reader handles reading the footer as well as the non contiguous parts
55/// of the metadata (`PageIndex` and `ColumnIndex`). It does not handle reading Bloom Filters.
56///
57/// # Example
58/// ```no_run
59/// # use parquet::file::metadata::ParquetMetaDataReader;
60/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
61/// // read parquet metadata including page indexes from a file
62/// let file = open_parquet_file("some_path.parquet");
63/// let mut reader = ParquetMetaDataReader::new()
64///     .with_page_indexes(true);
65/// reader.try_parse(&file).unwrap();
66/// let metadata = reader.finish().unwrap();
67/// assert!(metadata.column_index().is_some());
68/// assert!(metadata.offset_index().is_some());
69/// ```
70#[derive(Default, Debug)]
71pub struct ParquetMetaDataReader {
72    metadata: Option<ParquetMetaData>,
73    column_index: PageIndexPolicy,
74    offset_index: PageIndexPolicy,
75    prefetch_hint: Option<usize>,
76    metadata_options: Option<Arc<ParquetMetaDataOptions>>,
77    // Size of the serialized thrift metadata plus the 8 byte footer. Only set if
78    // `self.parse_metadata` is called.
79    metadata_size: Option<usize>,
80    #[cfg(feature = "encryption")]
81    file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
82}
83
84/// Describes the policy for reading page indexes
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
86pub enum PageIndexPolicy {
87    /// Do not read the page index.
88    #[default]
89    Skip,
90    /// Read the page index if it exists, otherwise do not error.
91    Optional,
92    /// Require the page index to exist, and error if it does not.
93    Required,
94}
95
96impl From<bool> for PageIndexPolicy {
97    fn from(value: bool) -> Self {
98        match value {
99            true => Self::Required,
100            false => Self::Skip,
101        }
102    }
103}
104
105impl ParquetMetaDataReader {
106    /// Create a new [`ParquetMetaDataReader`]
107    pub fn new() -> Self {
108        Default::default()
109    }
110
111    /// Create a new [`ParquetMetaDataReader`] populated with a [`ParquetMetaData`] struct
112    /// obtained via other means.
113    pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
114        Self {
115            metadata: Some(metadata),
116            ..Default::default()
117        }
118    }
119
120    /// Enable or disable reading the page index structures described in
121    /// "[Parquet page index]: Layout to Support Page Skipping".
122    ///
123    /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
124    #[deprecated(since = "56.1.0", note = "Use `with_page_index_policy` instead")]
125    pub fn with_page_indexes(self, val: bool) -> Self {
126        self.with_page_index_policy(PageIndexPolicy::from(val))
127    }
128
129    /// Enable or disable reading the Parquet [ColumnIndex] structure.
130    ///
131    /// [ColumnIndex]:  https://github.com/apache/parquet-format/blob/master/PageIndex.md
132    #[deprecated(since = "56.1.0", note = "Use `with_column_index_policy` instead")]
133    pub fn with_column_indexes(self, val: bool) -> Self {
134        let policy = PageIndexPolicy::from(val);
135        self.with_column_index_policy(policy)
136    }
137
138    /// Enable or disable reading the Parquet [OffsetIndex] structure.
139    ///
140    /// [OffsetIndex]:  https://github.com/apache/parquet-format/blob/master/PageIndex.md
141    #[deprecated(since = "56.1.0", note = "Use `with_offset_index_policy` instead")]
142    pub fn with_offset_indexes(self, val: bool) -> Self {
143        let policy = PageIndexPolicy::from(val);
144        self.with_offset_index_policy(policy)
145    }
146
147    /// Sets the [`PageIndexPolicy`] for the column and offset indexes
148    pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
149        self.with_column_index_policy(policy)
150            .with_offset_index_policy(policy)
151    }
152
153    /// Sets the [`PageIndexPolicy`] for the column index
154    pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
155        self.column_index = policy;
156        self
157    }
158
159    /// Sets the [`PageIndexPolicy`] for the offset index
160    pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
161        self.offset_index = policy;
162        self
163    }
164
165    /// Sets the [`ParquetMetaDataOptions`] to use when decoding
166    pub fn with_metadata_options(mut self, options: Option<ParquetMetaDataOptions>) -> Self {
167        self.metadata_options = options.map(Arc::new);
168        self
169    }
170
171    /// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`].
172    /// Only used for the asynchronous [`Self::try_load()`] method.
173    ///
174    /// By default, the reader will first fetch the last 8 bytes of the input file to obtain the
175    /// size of the footer metadata. A second fetch will be performed to obtain the needed bytes.
176    /// After parsing the footer metadata, a third fetch will be performed to obtain the bytes
177    /// needed to decode the page index structures, if they have been requested. To avoid
178    /// unnecessary fetches, `prefetch` can be set to an estimate of the number of bytes needed
179    /// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and
180    /// reduce latency. Setting `prefetch` too small will not trigger an error, but will result
181    /// in extra fetches being performed.
182    pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
183        self.prefetch_hint = prefetch;
184        self
185    }
186
187    /// Provide the FileDecryptionProperties to use when decrypting the file.
188    ///
189    /// This is only necessary when the file is encrypted.
190    #[cfg(feature = "encryption")]
191    pub fn with_decryption_properties(
192        mut self,
193        properties: Option<std::sync::Arc<FileDecryptionProperties>>,
194    ) -> Self {
195        self.file_decryption_properties = properties;
196        self
197    }
198
199    /// Indicates whether this reader has a [`ParquetMetaData`] internally.
200    pub fn has_metadata(&self) -> bool {
201        self.metadata.is_some()
202    }
203
204    /// Return the parsed [`ParquetMetaData`] struct, leaving `None` in its place.
205    pub fn finish(&mut self) -> Result<ParquetMetaData> {
206        self.metadata
207            .take()
208            .ok_or_else(|| general_err!("could not parse parquet metadata"))
209    }
210
211    /// Given a [`ChunkReader`], parse and return the [`ParquetMetaData`] in a single pass.
212    ///
213    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
214    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
215    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
216    ///
217    /// This call will consume `self`.
218    ///
219    /// # Example
220    /// ```no_run
221    /// # use parquet::file::metadata::ParquetMetaDataReader;
222    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
223    /// // read parquet metadata including page indexes
224    /// let file = open_parquet_file("some_path.parquet");
225    /// let metadata = ParquetMetaDataReader::new()
226    ///     .with_page_indexes(true)
227    ///     .parse_and_finish(&file).unwrap();
228    /// ```
229    pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
230        self.try_parse(reader)?;
231        self.finish()
232    }
233
234    /// Attempts to parse the footer metadata (and optionally page indexes) given a [`ChunkReader`].
235    ///
236    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
237    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
238    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
239    pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
240        self.try_parse_sized(reader, reader.len())
241    }
242
243    /// Same as [`Self::try_parse()`], but provide the original file size in the case that `reader`
244    /// is a [`Bytes`] struct that does not contain the entire file. This information is necessary
245    /// when the page indexes are desired. `reader` must have access to the Parquet footer.
246    ///
247    /// Using this function also allows for retrying with a larger buffer.
248    ///
249    /// # Errors
250    ///
251    /// This function will return [`ParquetError::NeedMoreData`] in the event `reader` does not
252    /// provide enough data to fully parse the metadata (see example below). The returned error
253    /// will be populated with a `usize` field indicating the number of bytes required from the
254    /// tail of the file to completely parse the requested metadata.
255    ///
256    /// Other errors returned include [`ParquetError::General`] and [`ParquetError::EOF`].
257    ///
258    /// # Example
259    /// ```no_run
260    /// # use parquet::file::metadata::ParquetMetaDataReader;
261    /// # use parquet::errors::ParquetError;
262    /// # use crate::parquet::file::reader::Length;
263    /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<u64>) -> bytes::Bytes { unimplemented!(); }
264    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
265    /// let file = open_parquet_file("some_path.parquet");
266    /// let len = file.len();
267    /// // Speculatively read 1 kilobyte from the end of the file
268    /// let bytes = get_bytes(&file, len - 1024..len);
269    /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
270    /// match reader.try_parse_sized(&bytes, len) {
271    ///     Ok(_) => (),
272    ///     Err(ParquetError::NeedMoreData(needed)) => {
273    ///         // Read the needed number of bytes from the end of the file
274    ///         let bytes = get_bytes(&file, len - needed as u64..len);
275    ///         reader.try_parse_sized(&bytes, len).unwrap();
276    ///     }
277    ///     _ => panic!("unexpected error")
278    /// }
279    /// let metadata = reader.finish().unwrap();
280    /// ```
281    ///
282    /// Note that it is possible for the file metadata to be completely read, but there are
283    /// insufficient bytes available to read the page indexes. [`Self::has_metadata()`] can be used
284    /// to test for this. In the event the file metadata is present, re-parsing of the file
285    /// metadata can be skipped by using [`Self::read_page_indexes_sized()`], as shown below.
286    /// ```no_run
287    /// # use parquet::file::metadata::ParquetMetaDataReader;
288    /// # use parquet::errors::ParquetError;
289    /// # use crate::parquet::file::reader::Length;
290    /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<u64>) -> bytes::Bytes { unimplemented!(); }
291    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
292    /// let file = open_parquet_file("some_path.parquet");
293    /// let len = file.len();
294    /// // Speculatively read 1 kilobyte from the end of the file
295    /// let mut bytes = get_bytes(&file, len - 1024..len);
296    /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
297    /// // Loop until `bytes` is large enough
298    /// loop {
299    ///     match reader.try_parse_sized(&bytes, len) {
300    ///         Ok(_) => break,
301    ///         Err(ParquetError::NeedMoreData(needed)) => {
302    ///             // Read the needed number of bytes from the end of the file
303    ///             bytes = get_bytes(&file, len - needed as u64..len);
304    ///             // If file metadata was read only read page indexes, otherwise continue loop
305    ///             if reader.has_metadata() {
306    ///                 reader.read_page_indexes_sized(&bytes, len).unwrap();
307    ///                 break;
308    ///             }
309    ///         }
310    ///         _ => panic!("unexpected error")
311    ///     }
312    /// }
313    /// let metadata = reader.finish().unwrap();
314    /// ```
315    pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: u64) -> Result<()> {
316        self.metadata = match self.parse_metadata(reader) {
317            Ok(metadata) => Some(metadata),
318            Err(ParquetError::NeedMoreData(needed)) => {
319                // If reader is the same length as `file_size` then presumably there is no more to
320                // read, so return an EOF error.
321                if file_size == reader.len() || needed as u64 > file_size {
322                    return Err(eof_err!(
323                        "Parquet file too small. Size is {} but need {}",
324                        file_size,
325                        needed
326                    ));
327                } else {
328                    // Ask for a larger buffer
329                    return Err(ParquetError::NeedMoreData(needed));
330                }
331            }
332            Err(e) => return Err(e),
333        };
334
335        // we can return if page indexes aren't requested
336        if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
337        {
338            return Ok(());
339        }
340
341        self.read_page_indexes_sized(reader, file_size)
342    }
343
344    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
345    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`].
346    pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
347        self.read_page_indexes_sized(reader, reader.len())
348    }
349
350    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
351    /// This variant is used when `reader` cannot access the entire Parquet file (e.g. it is
352    /// a [`Bytes`] struct containing the tail of the file).
353    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`]. Like
354    /// [`Self::try_parse_sized()`] this function may return [`ParquetError::NeedMoreData`].
355    pub fn read_page_indexes_sized<R: ChunkReader>(
356        &mut self,
357        reader: &R,
358        file_size: u64,
359    ) -> Result<()> {
360        let Some(metadata) = self.metadata.take() else {
361            return Err(general_err!(
362                "Tried to read page indexes without ParquetMetaData metadata"
363            ));
364        };
365
366        let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
367            .with_offset_index_policy(self.offset_index)
368            .with_column_index_policy(self.column_index)
369            .with_metadata_options(self.metadata_options.clone());
370        let mut push_decoder = self.prepare_push_decoder(push_decoder);
371
372        // Get bounds needed for page indexes (if any are present in the file).
373        let range = match needs_index_data(&mut push_decoder)? {
374            NeedsIndexData::No(metadata) => {
375                self.metadata = Some(metadata);
376                return Ok(());
377            }
378            NeedsIndexData::Yes(range) => range,
379        };
380
381        // Check to see if needed range is within `file_range`. Checking `range.end` seems
382        // redundant, but it guards against `range_for_page_index()` returning garbage.
383        let file_range = file_size.saturating_sub(reader.len())..file_size;
384        if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
385            // Requested range starts beyond EOF
386            if range.end > file_size {
387                return Err(eof_err!(
388                    "Parquet file too small. Range {range:?} is beyond file bounds {file_size}",
389                ));
390            } else {
391                // Ask for a larger buffer
392                return Err(ParquetError::NeedMoreData(
393                    (file_size - range.start).try_into()?,
394                ));
395            }
396        }
397
398        // Perform extra sanity check to make sure `range` and the footer metadata don't
399        // overlap.
400        if let Some(metadata_size) = self.metadata_size {
401            let metadata_range = file_size.saturating_sub(metadata_size as u64)..file_size;
402            if range.end > metadata_range.start {
403                return Err(eof_err!(
404                    "Parquet file too small. Page index range {range:?} overlaps with file metadata {metadata_range:?}",
405                ));
406            }
407        }
408
409        // add the needed ranges to the decoder
410        let bytes_needed = usize::try_from(range.end - range.start)?;
411        let bytes = reader.get_bytes(range.start - file_range.start, bytes_needed)?;
412
413        push_decoder.push_range(range, bytes)?;
414        let metadata = parse_index_data(&mut push_decoder)?;
415        self.metadata = Some(metadata);
416
417        Ok(())
418    }
419
420    /// Given a [`MetadataFetch`], parse and return the [`ParquetMetaData`] in a single pass.
421    ///
422    /// This call will consume `self`.
423    ///
424    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
425    /// performed by this function.
426    #[cfg(all(feature = "async", feature = "arrow"))]
427    pub async fn load_and_finish<F: MetadataFetch>(
428        mut self,
429        fetch: F,
430        file_size: u64,
431    ) -> Result<ParquetMetaData> {
432        self.try_load(fetch, file_size).await?;
433        self.finish()
434    }
435
436    /// Given a [`MetadataSuffixFetch`], parse and return the [`ParquetMetaData`] in a single pass.
437    ///
438    /// This call will consume `self`.
439    ///
440    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
441    /// performed by this function.
442    #[cfg(all(feature = "async", feature = "arrow"))]
443    pub async fn load_via_suffix_and_finish<F: MetadataSuffixFetch>(
444        mut self,
445        fetch: F,
446    ) -> Result<ParquetMetaData> {
447        self.try_load_via_suffix(fetch).await?;
448        self.finish()
449    }
450    /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
451    /// given a [`MetadataFetch`].
452    ///
453    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
454    /// performed by this function.
455    #[cfg(all(feature = "async", feature = "arrow"))]
456    pub async fn try_load<F: MetadataFetch>(&mut self, mut fetch: F, file_size: u64) -> Result<()> {
457        let (metadata, remainder) = self.load_metadata(&mut fetch, file_size).await?;
458
459        self.metadata = Some(metadata);
460
461        // we can return if page indexes aren't requested
462        if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
463        {
464            return Ok(());
465        }
466
467        self.load_page_index_with_remainder(fetch, remainder).await
468    }
469
470    /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
471    /// given a [`MetadataSuffixFetch`].
472    ///
473    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
474    /// performed by this function.
475    #[cfg(all(feature = "async", feature = "arrow"))]
476    pub async fn try_load_via_suffix<F: MetadataSuffixFetch>(
477        &mut self,
478        mut fetch: F,
479    ) -> Result<()> {
480        let (metadata, remainder) = self.load_metadata_via_suffix(&mut fetch).await?;
481
482        self.metadata = Some(metadata);
483
484        // we can return if page indexes aren't requested
485        if self.column_index == PageIndexPolicy::Skip && self.offset_index == PageIndexPolicy::Skip
486        {
487            return Ok(());
488        }
489
490        self.load_page_index_with_remainder(fetch, remainder).await
491    }
492
493    /// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
494    /// been obtained. See [`Self::new_with_metadata()`].
495    #[cfg(all(feature = "async", feature = "arrow"))]
496    pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
497        self.load_page_index_with_remainder(fetch, None).await
498    }
499
500    #[cfg(all(feature = "async", feature = "arrow"))]
501    async fn load_page_index_with_remainder<F: MetadataFetch>(
502        &mut self,
503        mut fetch: F,
504        remainder: Option<(usize, Bytes)>,
505    ) -> Result<()> {
506        let Some(metadata) = self.metadata.take() else {
507            return Err(general_err!("Footer metadata is not present"));
508        };
509
510        // in this case we don't actually know what the file size is, so just use u64::MAX
511        // this is ok since the offsets in the metadata are always valid
512        let file_size = u64::MAX;
513        let push_decoder = ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
514            .with_offset_index_policy(self.offset_index)
515            .with_column_index_policy(self.column_index)
516            .with_metadata_options(self.metadata_options.clone());
517        let mut push_decoder = self.prepare_push_decoder(push_decoder);
518
519        // Get bounds needed for page indexes (if any are present in the file).
520        let range = match needs_index_data(&mut push_decoder)? {
521            NeedsIndexData::No(metadata) => {
522                self.metadata = Some(metadata);
523                return Ok(());
524            }
525            NeedsIndexData::Yes(range) => range,
526        };
527
528        let bytes = match &remainder {
529            Some((remainder_start, remainder)) if *remainder_start as u64 <= range.start => {
530                let remainder_start = *remainder_start as u64;
531                let offset = usize::try_from(range.start - remainder_start)?;
532                let end = usize::try_from(range.end - remainder_start)?;
533                if end > remainder.len() {
534                    return Err(general_err!(
535                        "Corrupted parquet file: index data range ({:?}) exceeds remainder length ({})",
536                        range,
537                        remainder.len()
538                    ));
539                }
540                remainder.slice(offset..end)
541            }
542            // Note: this will potentially fetch data already in remainder, this keeps things simple
543            _ => fetch.fetch(range.start..range.end).await?,
544        };
545
546        // Sanity check
547        if bytes.len() as u64 != range.end - range.start {
548            return Err(general_err!(
549                "Corrupted parquet file: index data length mismatch, expected {}, got {}",
550                range.end - range.start,
551                bytes.len()
552            ));
553        }
554        push_decoder.push_range(range.clone(), bytes)?;
555        let metadata = parse_index_data(&mut push_decoder)?;
556        self.metadata = Some(metadata);
557        Ok(())
558    }
559
560    // One-shot parse of footer.
561    // Side effect: this will set `self.metadata_size`
562    fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
563        // check file is large enough to hold footer
564        let file_size = chunk_reader.len();
565        if file_size < (FOOTER_SIZE as u64) {
566            return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
567        }
568
569        let mut footer = [0_u8; FOOTER_SIZE];
570        chunk_reader
571            .get_read(file_size - FOOTER_SIZE as u64)?
572            .read_exact(&mut footer)?;
573
574        let footer = FooterTail::try_new(&footer)?;
575        let metadata_len = footer.metadata_length();
576        let footer_metadata_len = FOOTER_SIZE + metadata_len;
577        self.metadata_size = Some(footer_metadata_len);
578
579        if footer_metadata_len as u64 > file_size {
580            return Err(ParquetError::NeedMoreData(footer_metadata_len));
581        }
582
583        let start = file_size - footer_metadata_len as u64;
584        let bytes = chunk_reader.get_bytes(start, metadata_len)?;
585        self.decode_footer_metadata(bytes, file_size, footer)
586    }
587
588    /// Size of the serialized thrift metadata plus the 8 byte footer. Only set if
589    /// `self.parse_metadata` is called.
590    pub fn metadata_size(&self) -> Option<usize> {
591        self.metadata_size
592    }
593
594    /// Return the number of bytes to read in the initial pass. If `prefetch_size` has
595    /// been provided, then return that value if it is larger than the size of the Parquet
596    /// file footer (8 bytes). Otherwise returns `8`.
597    #[cfg(all(feature = "async", feature = "arrow"))]
598    fn get_prefetch_size(&self) -> usize {
599        if let Some(prefetch) = self.prefetch_hint {
600            if prefetch > FOOTER_SIZE {
601                return prefetch;
602            }
603        }
604        FOOTER_SIZE
605    }
606
607    #[cfg(all(feature = "async", feature = "arrow"))]
608    async fn load_metadata<F: MetadataFetch>(
609        &self,
610        fetch: &mut F,
611        file_size: u64,
612    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
613        let prefetch = self.get_prefetch_size() as u64;
614
615        if file_size < FOOTER_SIZE as u64 {
616            return Err(eof_err!("file size of {} is less than footer", file_size));
617        }
618
619        // If a size hint is provided, read more than the minimum size
620        // to try and avoid a second fetch.
621        // Note: prefetch > file_size is ok since we're using saturating_sub.
622        let footer_start = file_size.saturating_sub(prefetch);
623
624        let suffix = fetch.fetch(footer_start..file_size).await?;
625        let suffix_len = suffix.len();
626        let fetch_len = (file_size - footer_start)
627            .try_into()
628            .expect("footer size should never be larger than u32");
629        if suffix_len < fetch_len {
630            return Err(eof_err!(
631                "metadata requires {} bytes, but could only read {}",
632                fetch_len,
633                suffix_len
634            ));
635        }
636
637        let mut footer = [0; FOOTER_SIZE];
638        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
639
640        let footer = FooterTail::try_new(&footer)?;
641        let length = footer.metadata_length();
642
643        if file_size < (length + FOOTER_SIZE) as u64 {
644            return Err(eof_err!(
645                "file size of {} is less than footer + metadata {}",
646                file_size,
647                length + FOOTER_SIZE
648            ));
649        }
650
651        // Did not fetch the entire file metadata in the initial read, need to make a second request
652        if length > suffix_len - FOOTER_SIZE {
653            let metadata_start = file_size - (length + FOOTER_SIZE) as u64;
654            let meta = fetch
655                .fetch(metadata_start..(file_size - FOOTER_SIZE as u64))
656                .await?;
657            Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
658        } else {
659            let metadata_start = (file_size - (length + FOOTER_SIZE) as u64 - footer_start)
660                .try_into()
661                .expect("metadata length should never be larger than u32");
662            let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
663            Ok((
664                self.decode_footer_metadata(slice, file_size, footer)?,
665                Some((footer_start as usize, suffix.slice(..metadata_start))),
666            ))
667        }
668    }
669
670    #[cfg(all(feature = "async", feature = "arrow"))]
671    async fn load_metadata_via_suffix<F: MetadataSuffixFetch>(
672        &self,
673        fetch: &mut F,
674    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
675        let prefetch = self.get_prefetch_size();
676
677        let suffix = fetch.fetch_suffix(prefetch as _).await?;
678        let suffix_len = suffix.len();
679
680        if suffix_len < FOOTER_SIZE {
681            return Err(eof_err!(
682                "footer metadata requires {} bytes, but could only read {}",
683                FOOTER_SIZE,
684                suffix_len
685            ));
686        }
687
688        let mut footer = [0; FOOTER_SIZE];
689        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
690
691        let footer = FooterTail::try_new(&footer)?;
692        let length = footer.metadata_length();
693        // fake file size as we are only parsing the footer metadata here
694        // (cant be parsing page indexes without the full file size)
695        let file_size = (length + FOOTER_SIZE) as u64;
696
697        // Did not fetch the entire file metadata in the initial read, need to make a second request
698        let metadata_offset = length + FOOTER_SIZE;
699        if length > suffix_len - FOOTER_SIZE {
700            let meta = fetch.fetch_suffix(metadata_offset).await?;
701
702            if meta.len() < metadata_offset {
703                return Err(eof_err!(
704                    "metadata requires {} bytes, but could only read {}",
705                    metadata_offset,
706                    meta.len()
707                ));
708            }
709
710            // need to slice off the footer or decryption fails
711            let meta = meta.slice(0..length);
712            Ok((self.decode_footer_metadata(meta, file_size, footer)?, None))
713        } else {
714            let metadata_start = suffix_len - metadata_offset;
715            let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE);
716            Ok((
717                self.decode_footer_metadata(slice, file_size, footer)?,
718                Some((0, suffix.slice(..metadata_start))),
719            ))
720        }
721    }
722
723    /// Decodes a [`FooterTail`] from the provided 8-byte slice.
724    #[deprecated(since = "57.0.0", note = "Use FooterTail::try_from instead")]
725    pub fn decode_footer_tail(slice: &[u8; FOOTER_SIZE]) -> Result<FooterTail> {
726        FooterTail::try_new(slice)
727    }
728
729    /// Decodes the Parquet footer, returning the metadata length in bytes
730    #[deprecated(since = "54.3.0", note = "Use decode_footer_tail instead")]
731    pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
732        FooterTail::try_new(slice).map(|f| f.metadata_length())
733    }
734
735    /// Decodes [`ParquetMetaData`] from the provided bytes.
736    ///
737    /// Typically, this is used to decode the metadata from the end of a parquet
738    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
739    /// by the [Parquet Spec].
740    ///
741    /// It does **NOT** include the 8-byte footer.
742    ///
743    /// This method handles using either `decode_metadata` or
744    /// `decode_metadata_with_encryption` depending on whether the encryption
745    /// feature is enabled.
746    ///
747    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
748    pub(crate) fn decode_footer_metadata(
749        &self,
750        buf: Bytes,
751        file_size: u64,
752        footer_tail: FooterTail,
753    ) -> Result<ParquetMetaData> {
754        // The push decoder expects the metadata to be at the end of the file
755        // (... data ...) + (metadata) + (footer)
756        // so we need to provide the starting offset of the metadata
757        // within the file.
758        let ending_offset = file_size.checked_sub(FOOTER_SIZE as u64).ok_or_else(|| {
759            general_err!(
760                "file size {file_size} is smaller than footer size {}",
761                FOOTER_SIZE
762            )
763        })?;
764
765        let starting_offset = ending_offset.checked_sub(buf.len() as u64).ok_or_else(|| {
766            general_err!(
767                "file size {file_size} is smaller than buffer size {} + footer size {}",
768                buf.len(),
769                FOOTER_SIZE
770            )
771        })?;
772
773        let range = starting_offset..ending_offset;
774
775        let push_decoder =
776            ParquetMetaDataPushDecoder::try_new_with_footer_tail(file_size, footer_tail)?
777                // NOTE: DO NOT enable page indexes here, they are handled separately
778                .with_page_index_policy(PageIndexPolicy::Skip)
779                .with_metadata_options(self.metadata_options.clone());
780
781        let mut push_decoder = self.prepare_push_decoder(push_decoder);
782        push_decoder.push_range(range, buf)?;
783        match push_decoder.try_decode()? {
784            DecodeResult::Data(metadata) => Ok(metadata),
785            DecodeResult::Finished => Err(general_err!(
786                "could not parse parquet metadata -- previously finished"
787            )),
788            DecodeResult::NeedsData(ranges) => Err(general_err!(
789                "could not parse parquet metadata, needs ranges {:?}",
790                ranges
791            )),
792        }
793    }
794
795    /// Prepares a push decoder and runs it to decode the metadata.
796    #[cfg(feature = "encryption")]
797    fn prepare_push_decoder(
798        &self,
799        push_decoder: ParquetMetaDataPushDecoder,
800    ) -> ParquetMetaDataPushDecoder {
801        push_decoder.with_file_decryption_properties(
802            self.file_decryption_properties
803                .as_ref()
804                .map(std::sync::Arc::clone),
805        )
806    }
807    #[cfg(not(feature = "encryption"))]
808    fn prepare_push_decoder(
809        &self,
810        push_decoder: ParquetMetaDataPushDecoder,
811    ) -> ParquetMetaDataPushDecoder {
812        push_decoder
813    }
814
815    /// Decodes [`ParquetMetaData`] from the provided bytes.
816    ///
817    /// Typically this is used to decode the metadata from the end of a parquet
818    /// file. The format of `buf` is the Thrift compact binary protocol, as specified
819    /// by the [Parquet Spec].
820    ///
821    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
822    pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
823        decode_metadata(buf, None)
824    }
825
826    /// Decodes [`ParquetMetaData`] from the provided bytes.
827    ///
828    /// Like [`Self::decode_metadata`] but this also accepts
829    /// metadata parsing options.
830    pub fn decode_metadata_with_options(
831        buf: &[u8],
832        options: Option<&ParquetMetaDataOptions>,
833    ) -> Result<ParquetMetaData> {
834        decode_metadata(buf, options)
835    }
836
837    /// Decodes the schema from the Parquet footer in `buf`. Returned as
838    /// a [`SchemaDescriptor`].
839    pub fn decode_schema(buf: &[u8]) -> Result<Arc<SchemaDescriptor>> {
840        Ok(Arc::new(parquet_schema_from_bytes(buf)?))
841    }
842}
843
844/// The bounds needed to read page indexes
845// this is an internal enum, so it is ok to allow differences in enum size
846#[allow(clippy::large_enum_variant)]
847enum NeedsIndexData {
848    /// no additional data is needed (e.g. the indexes weren't requested)
849    No(ParquetMetaData),
850    /// Additional data is needed, with the range that are required
851    Yes(Range<u64>),
852}
853
854/// Determines a single combined range of bytes needed to read the page indexes,
855/// or returns the metadata if no additional data is needed (e.g. if no page indexes are requested)
856fn needs_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> Result<NeedsIndexData> {
857    match push_decoder.try_decode()? {
858        DecodeResult::NeedsData(ranges) => {
859            let range = ranges
860                .into_iter()
861                .reduce(|a, b| a.start.min(b.start)..a.end.max(b.end))
862                .ok_or_else(|| general_err!("Internal error: no ranges provided"))?;
863            Ok(NeedsIndexData::Yes(range))
864        }
865        DecodeResult::Data(metadata) => Ok(NeedsIndexData::No(metadata)),
866        DecodeResult::Finished => Err(general_err!("Internal error: decoder was finished")),
867    }
868}
869
870/// Given a push decoder that has had the needed ranges pushed to it,
871/// attempt to decode indexes and return the updated metadata.
872fn parse_index_data(push_decoder: &mut ParquetMetaDataPushDecoder) -> Result<ParquetMetaData> {
873    match push_decoder.try_decode()? {
874        DecodeResult::NeedsData(_) => Err(general_err!(
875            "Internal error: decoder still needs data after reading required range"
876        )),
877        DecodeResult::Data(metadata) => Ok(metadata),
878        DecodeResult::Finished => Err(general_err!("Internal error: decoder was finished")),
879    }
880}
881
882#[cfg(test)]
883mod tests {
884    use super::*;
885    use crate::file::reader::Length;
886    use crate::util::test_common::file_util::get_test_file;
887    use std::ops::Range;
888
889    #[test]
890    fn test_parse_metadata_size_smaller_than_footer() {
891        let test_file = tempfile::tempfile().unwrap();
892        let err = ParquetMetaDataReader::new()
893            .parse_metadata(&test_file)
894            .unwrap_err();
895        assert!(matches!(err, ParquetError::NeedMoreData(FOOTER_SIZE)));
896    }
897
898    #[test]
899    fn test_parse_metadata_corrupt_footer() {
900        let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
901        let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
902        assert_eq!(
903            reader_result.unwrap_err().to_string(),
904            "Parquet error: Invalid Parquet file. Corrupt footer"
905        );
906    }
907
908    #[test]
909    fn test_parse_metadata_invalid_start() {
910        let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
911        let err = ParquetMetaDataReader::new()
912            .parse_metadata(&test_file)
913            .unwrap_err();
914        assert!(matches!(err, ParquetError::NeedMoreData(263)));
915    }
916
917    #[test]
918    #[allow(deprecated)]
919    fn test_try_parse() {
920        let file = get_test_file("alltypes_tiny_pages.parquet");
921        let len = file.len();
922
923        let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
924
925        let bytes_for_range = |range: Range<u64>| {
926            file.get_bytes(range.start, (range.end - range.start).try_into().unwrap())
927                .unwrap()
928        };
929
930        // read entire file
931        let bytes = bytes_for_range(0..len);
932        reader.try_parse(&bytes).unwrap();
933        let metadata = reader.finish().unwrap();
934        assert!(metadata.column_index.is_some());
935        assert!(metadata.offset_index.is_some());
936
937        // read more than enough of file
938        let bytes = bytes_for_range(320000..len);
939        reader.try_parse_sized(&bytes, len).unwrap();
940        let metadata = reader.finish().unwrap();
941        assert!(metadata.column_index.is_some());
942        assert!(metadata.offset_index.is_some());
943
944        // exactly enough
945        let bytes = bytes_for_range(323583..len);
946        reader.try_parse_sized(&bytes, len).unwrap();
947        let metadata = reader.finish().unwrap();
948        assert!(metadata.column_index.is_some());
949        assert!(metadata.offset_index.is_some());
950
951        // not enough for page index
952        let bytes = bytes_for_range(323584..len);
953        // should fail
954        match reader.try_parse_sized(&bytes, len).unwrap_err() {
955            // expected error, try again with provided bounds
956            ParquetError::NeedMoreData(needed) => {
957                let bytes = bytes_for_range(len - needed as u64..len);
958                reader.try_parse_sized(&bytes, len).unwrap();
959                let metadata = reader.finish().unwrap();
960                assert!(metadata.column_index.is_some());
961                assert!(metadata.offset_index.is_some());
962            }
963            _ => panic!("unexpected error"),
964        };
965
966        // not enough for file metadata, but keep trying until page indexes are read
967        let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
968        let mut bytes = bytes_for_range(452505..len);
969        loop {
970            match reader.try_parse_sized(&bytes, len) {
971                Ok(_) => break,
972                Err(ParquetError::NeedMoreData(needed)) => {
973                    bytes = bytes_for_range(len - needed as u64..len);
974                    if reader.has_metadata() {
975                        reader.read_page_indexes_sized(&bytes, len).unwrap();
976                        break;
977                    }
978                }
979                _ => panic!("unexpected error"),
980            }
981        }
982        let metadata = reader.finish().unwrap();
983        assert!(metadata.column_index.is_some());
984        assert!(metadata.offset_index.is_some());
985
986        // not enough for page index but lie about file size
987        let bytes = bytes_for_range(323584..len);
988        let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
989        assert_eq!(
990            reader_result.to_string(),
991            "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
992        );
993
994        // not enough for file metadata
995        let mut reader = ParquetMetaDataReader::new();
996        let bytes = bytes_for_range(452505..len);
997        // should fail
998        match reader.try_parse_sized(&bytes, len).unwrap_err() {
999            // expected error, try again with provided bounds
1000            ParquetError::NeedMoreData(needed) => {
1001                let bytes = bytes_for_range(len - needed as u64..len);
1002                reader.try_parse_sized(&bytes, len).unwrap();
1003                reader.finish().unwrap();
1004            }
1005            _ => panic!("unexpected error"),
1006        };
1007
1008        // not enough for file metadata but use try_parse()
1009        let reader_result = reader.try_parse(&bytes).unwrap_err();
1010        assert_eq!(
1011            reader_result.to_string(),
1012            "EOF: Parquet file too small. Size is 1728 but need 1729"
1013        );
1014
1015        // read head of file rather than tail
1016        let bytes = bytes_for_range(0..1000);
1017        let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
1018        assert_eq!(
1019            reader_result.to_string(),
1020            "Parquet error: Invalid Parquet file. Corrupt footer"
1021        );
1022
1023        // lie about file size
1024        let bytes = bytes_for_range(452510..len);
1025        let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
1026        assert_eq!(
1027            reader_result.to_string(),
1028            "EOF: Parquet file too small. Size is 1728 but need 1729"
1029        );
1030    }
1031}
1032
1033#[cfg(all(feature = "async", feature = "arrow", test))]
1034mod async_tests {
1035    use super::*;
1036
1037    use arrow::{array::Int32Array, datatypes::DataType};
1038    use arrow_array::RecordBatch;
1039    use arrow_schema::{Field, Schema};
1040    use bytes::Bytes;
1041    use futures::FutureExt;
1042    use futures::future::BoxFuture;
1043    use std::fs::File;
1044    use std::future::Future;
1045    use std::io::{Read, Seek, SeekFrom};
1046    use std::ops::Range;
1047    use std::sync::Arc;
1048    use std::sync::atomic::{AtomicUsize, Ordering};
1049    use tempfile::NamedTempFile;
1050
1051    use crate::arrow::ArrowWriter;
1052    use crate::file::properties::WriterProperties;
1053    use crate::file::reader::Length;
1054    use crate::util::test_common::file_util::get_test_file;
1055
1056    struct MetadataFetchFn<F>(F);
1057
1058    impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
1059    where
1060        F: FnMut(Range<u64>) -> Fut + Send,
1061        Fut: Future<Output = Result<Bytes>> + Send,
1062    {
1063        fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1064            async move { self.0(range).await }.boxed()
1065        }
1066    }
1067
1068    struct MetadataSuffixFetchFn<F1, F2>(F1, F2);
1069
1070    impl<F1, Fut, F2> MetadataFetch for MetadataSuffixFetchFn<F1, F2>
1071    where
1072        F1: FnMut(Range<u64>) -> Fut + Send,
1073        Fut: Future<Output = Result<Bytes>> + Send,
1074        F2: Send,
1075    {
1076        fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
1077            async move { self.0(range).await }.boxed()
1078        }
1079    }
1080
1081    impl<F1, Fut, F2> MetadataSuffixFetch for MetadataSuffixFetchFn<F1, F2>
1082    where
1083        F1: FnMut(Range<u64>) -> Fut + Send,
1084        F2: FnMut(usize) -> Fut + Send,
1085        Fut: Future<Output = Result<Bytes>> + Send,
1086    {
1087        fn fetch_suffix(&mut self, suffix: usize) -> BoxFuture<'_, Result<Bytes>> {
1088            async move { self.1(suffix).await }.boxed()
1089        }
1090    }
1091
1092    fn read_range(file: &mut File, range: Range<u64>) -> Result<Bytes> {
1093        file.seek(SeekFrom::Start(range.start as _))?;
1094        let len = range.end - range.start;
1095        let mut buf = Vec::with_capacity(len.try_into().unwrap());
1096        file.take(len as _).read_to_end(&mut buf)?;
1097        Ok(buf.into())
1098    }
1099
1100    fn read_suffix(file: &mut File, suffix: usize) -> Result<Bytes> {
1101        let file_len = file.len();
1102        // Don't seek before beginning of file
1103        file.seek(SeekFrom::End(0 - suffix.min(file_len as _) as i64))?;
1104        let mut buf = Vec::with_capacity(suffix);
1105        file.take(suffix as _).read_to_end(&mut buf)?;
1106        Ok(buf.into())
1107    }
1108
1109    #[tokio::test]
1110    async fn test_simple() {
1111        let mut file = get_test_file("nulls.snappy.parquet");
1112        let len = file.len();
1113
1114        let expected = ParquetMetaDataReader::new()
1115            .parse_and_finish(&file)
1116            .unwrap();
1117        let expected = expected.file_metadata().schema();
1118        let fetch_count = AtomicUsize::new(0);
1119
1120        let mut fetch = |range| {
1121            fetch_count.fetch_add(1, Ordering::SeqCst);
1122            futures::future::ready(read_range(&mut file, range))
1123        };
1124
1125        let input = MetadataFetchFn(&mut fetch);
1126        let actual = ParquetMetaDataReader::new()
1127            .load_and_finish(input, len)
1128            .await
1129            .unwrap();
1130        assert_eq!(actual.file_metadata().schema(), expected);
1131        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1132
1133        // Metadata hint too small - below footer size
1134        fetch_count.store(0, Ordering::SeqCst);
1135        let input = MetadataFetchFn(&mut fetch);
1136        let actual = ParquetMetaDataReader::new()
1137            .with_prefetch_hint(Some(7))
1138            .load_and_finish(input, len)
1139            .await
1140            .unwrap();
1141        assert_eq!(actual.file_metadata().schema(), expected);
1142        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1143
1144        // Metadata hint too small
1145        fetch_count.store(0, Ordering::SeqCst);
1146        let input = MetadataFetchFn(&mut fetch);
1147        let actual = ParquetMetaDataReader::new()
1148            .with_prefetch_hint(Some(10))
1149            .load_and_finish(input, len)
1150            .await
1151            .unwrap();
1152        assert_eq!(actual.file_metadata().schema(), expected);
1153        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1154
1155        // Metadata hint too large
1156        fetch_count.store(0, Ordering::SeqCst);
1157        let input = MetadataFetchFn(&mut fetch);
1158        let actual = ParquetMetaDataReader::new()
1159            .with_prefetch_hint(Some(500))
1160            .load_and_finish(input, len)
1161            .await
1162            .unwrap();
1163        assert_eq!(actual.file_metadata().schema(), expected);
1164        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1165
1166        // Metadata hint exactly correct
1167        fetch_count.store(0, Ordering::SeqCst);
1168        let input = MetadataFetchFn(&mut fetch);
1169        let actual = ParquetMetaDataReader::new()
1170            .with_prefetch_hint(Some(428))
1171            .load_and_finish(input, len)
1172            .await
1173            .unwrap();
1174        assert_eq!(actual.file_metadata().schema(), expected);
1175        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1176
1177        let input = MetadataFetchFn(&mut fetch);
1178        let err = ParquetMetaDataReader::new()
1179            .load_and_finish(input, 4)
1180            .await
1181            .unwrap_err()
1182            .to_string();
1183        assert_eq!(err, "EOF: file size of 4 is less than footer");
1184
1185        let input = MetadataFetchFn(&mut fetch);
1186        let err = ParquetMetaDataReader::new()
1187            .load_and_finish(input, 20)
1188            .await
1189            .unwrap_err()
1190            .to_string();
1191        assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
1192    }
1193
1194    #[tokio::test]
1195    async fn test_suffix() {
1196        let mut file = get_test_file("nulls.snappy.parquet");
1197        let mut file2 = file.try_clone().unwrap();
1198
1199        let expected = ParquetMetaDataReader::new()
1200            .parse_and_finish(&file)
1201            .unwrap();
1202        let expected = expected.file_metadata().schema();
1203        let fetch_count = AtomicUsize::new(0);
1204        let suffix_fetch_count = AtomicUsize::new(0);
1205
1206        let mut fetch = |range| {
1207            fetch_count.fetch_add(1, Ordering::SeqCst);
1208            futures::future::ready(read_range(&mut file, range))
1209        };
1210        let mut suffix_fetch = |suffix| {
1211            suffix_fetch_count.fetch_add(1, Ordering::SeqCst);
1212            futures::future::ready(read_suffix(&mut file2, suffix))
1213        };
1214
1215        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1216        let actual = ParquetMetaDataReader::new()
1217            .load_via_suffix_and_finish(input)
1218            .await
1219            .unwrap();
1220        assert_eq!(actual.file_metadata().schema(), expected);
1221        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1222        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1223
1224        // Metadata hint too small - below footer size
1225        fetch_count.store(0, Ordering::SeqCst);
1226        suffix_fetch_count.store(0, Ordering::SeqCst);
1227        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1228        let actual = ParquetMetaDataReader::new()
1229            .with_prefetch_hint(Some(7))
1230            .load_via_suffix_and_finish(input)
1231            .await
1232            .unwrap();
1233        assert_eq!(actual.file_metadata().schema(), expected);
1234        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1235        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1236
1237        // Metadata hint too small
1238        fetch_count.store(0, Ordering::SeqCst);
1239        suffix_fetch_count.store(0, Ordering::SeqCst);
1240        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1241        let actual = ParquetMetaDataReader::new()
1242            .with_prefetch_hint(Some(10))
1243            .load_via_suffix_and_finish(input)
1244            .await
1245            .unwrap();
1246        assert_eq!(actual.file_metadata().schema(), expected);
1247        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1248        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 2);
1249
1250        dbg!("test");
1251        // Metadata hint too large
1252        fetch_count.store(0, Ordering::SeqCst);
1253        suffix_fetch_count.store(0, Ordering::SeqCst);
1254        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1255        let actual = ParquetMetaDataReader::new()
1256            .with_prefetch_hint(Some(500))
1257            .load_via_suffix_and_finish(input)
1258            .await
1259            .unwrap();
1260        assert_eq!(actual.file_metadata().schema(), expected);
1261        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1262        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1263
1264        // Metadata hint exactly correct
1265        fetch_count.store(0, Ordering::SeqCst);
1266        suffix_fetch_count.store(0, Ordering::SeqCst);
1267        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1268        let actual = ParquetMetaDataReader::new()
1269            .with_prefetch_hint(Some(428))
1270            .load_via_suffix_and_finish(input)
1271            .await
1272            .unwrap();
1273        assert_eq!(actual.file_metadata().schema(), expected);
1274        assert_eq!(fetch_count.load(Ordering::SeqCst), 0);
1275        assert_eq!(suffix_fetch_count.load(Ordering::SeqCst), 1);
1276    }
1277
1278    #[cfg(feature = "encryption")]
1279    #[tokio::test]
1280    async fn test_suffix_with_encryption() {
1281        let mut file = get_test_file("uniform_encryption.parquet.encrypted");
1282        let mut file2 = file.try_clone().unwrap();
1283
1284        let mut fetch = |range| futures::future::ready(read_range(&mut file, range));
1285        let mut suffix_fetch = |suffix| futures::future::ready(read_suffix(&mut file2, suffix));
1286
1287        let input = MetadataSuffixFetchFn(&mut fetch, &mut suffix_fetch);
1288
1289        let key_code: &[u8] = "0123456789012345".as_bytes();
1290        let decryption_properties = FileDecryptionProperties::builder(key_code.to_vec())
1291            .build()
1292            .unwrap();
1293
1294        // just make sure the metadata is properly decrypted and read
1295        let expected = ParquetMetaDataReader::new()
1296            .with_decryption_properties(Some(decryption_properties))
1297            .load_via_suffix_and_finish(input)
1298            .await
1299            .unwrap();
1300        assert_eq!(expected.num_row_groups(), 1);
1301    }
1302
1303    #[tokio::test]
1304    #[allow(deprecated)]
1305    async fn test_page_index() {
1306        let mut file = get_test_file("alltypes_tiny_pages.parquet");
1307        let len = file.len();
1308        let fetch_count = AtomicUsize::new(0);
1309        let mut fetch = |range| {
1310            fetch_count.fetch_add(1, Ordering::SeqCst);
1311            futures::future::ready(read_range(&mut file, range))
1312        };
1313
1314        let f = MetadataFetchFn(&mut fetch);
1315        let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
1316        loader.try_load(f, len).await.unwrap();
1317        assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
1318        let metadata = loader.finish().unwrap();
1319        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1320
1321        // Prefetch just footer exactly
1322        fetch_count.store(0, Ordering::SeqCst);
1323        let f = MetadataFetchFn(&mut fetch);
1324        let mut loader = ParquetMetaDataReader::new()
1325            .with_page_indexes(true)
1326            .with_prefetch_hint(Some(1729));
1327        loader.try_load(f, len).await.unwrap();
1328        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1329        let metadata = loader.finish().unwrap();
1330        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1331
1332        // Prefetch more than footer but not enough
1333        fetch_count.store(0, Ordering::SeqCst);
1334        let f = MetadataFetchFn(&mut fetch);
1335        let mut loader = ParquetMetaDataReader::new()
1336            .with_page_indexes(true)
1337            .with_prefetch_hint(Some(130649));
1338        loader.try_load(f, len).await.unwrap();
1339        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1340        let metadata = loader.finish().unwrap();
1341        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1342
1343        // Prefetch exactly enough
1344        fetch_count.store(0, Ordering::SeqCst);
1345        let f = MetadataFetchFn(&mut fetch);
1346        let metadata = ParquetMetaDataReader::new()
1347            .with_page_indexes(true)
1348            .with_prefetch_hint(Some(130650))
1349            .load_and_finish(f, len)
1350            .await
1351            .unwrap();
1352        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1353        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1354
1355        // Prefetch more than enough but less than the entire file
1356        fetch_count.store(0, Ordering::SeqCst);
1357        let f = MetadataFetchFn(&mut fetch);
1358        let metadata = ParquetMetaDataReader::new()
1359            .with_page_indexes(true)
1360            .with_prefetch_hint(Some((len - 1000) as usize)) // prefetch entire file
1361            .load_and_finish(f, len)
1362            .await
1363            .unwrap();
1364        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1365        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1366
1367        // Prefetch the entire file
1368        fetch_count.store(0, Ordering::SeqCst);
1369        let f = MetadataFetchFn(&mut fetch);
1370        let metadata = ParquetMetaDataReader::new()
1371            .with_page_indexes(true)
1372            .with_prefetch_hint(Some(len as usize)) // prefetch entire file
1373            .load_and_finish(f, len)
1374            .await
1375            .unwrap();
1376        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1377        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1378
1379        // Prefetch more than the entire file
1380        fetch_count.store(0, Ordering::SeqCst);
1381        let f = MetadataFetchFn(&mut fetch);
1382        let metadata = ParquetMetaDataReader::new()
1383            .with_page_indexes(true)
1384            .with_prefetch_hint(Some((len + 1000) as usize)) // prefetch entire file
1385            .load_and_finish(f, len)
1386            .await
1387            .unwrap();
1388        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1389        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1390    }
1391
1392    fn write_parquet_file(offset_index_disabled: bool) -> Result<NamedTempFile> {
1393        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1394        let batch = RecordBatch::try_new(
1395            schema.clone(),
1396            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
1397        )?;
1398
1399        let file = NamedTempFile::new().unwrap();
1400
1401        // Write properties with page index disabled
1402        let props = WriterProperties::builder()
1403            .set_offset_index_disabled(offset_index_disabled)
1404            .build();
1405
1406        let mut writer = ArrowWriter::try_new(file.reopen()?, schema, Some(props))?;
1407        writer.write(&batch)?;
1408        writer.close()?;
1409
1410        Ok(file)
1411    }
1412
1413    fn read_and_check(file: &File, policy: PageIndexPolicy) -> Result<ParquetMetaData> {
1414        let mut reader = ParquetMetaDataReader::new().with_page_index_policy(policy);
1415        reader.try_parse(file)?;
1416        reader.finish()
1417    }
1418
1419    #[test]
1420    fn test_page_index_policy() {
1421        // With page index
1422        let f = write_parquet_file(false).unwrap();
1423        read_and_check(f.as_file(), PageIndexPolicy::Required).unwrap();
1424        read_and_check(f.as_file(), PageIndexPolicy::Optional).unwrap();
1425        read_and_check(f.as_file(), PageIndexPolicy::Skip).unwrap();
1426
1427        // Without page index
1428        let f = write_parquet_file(true).unwrap();
1429        let res = read_and_check(f.as_file(), PageIndexPolicy::Required);
1430        assert!(matches!(
1431            res,
1432            Err(ParquetError::General(e)) if e == "missing offset index"
1433        ));
1434        read_and_check(f.as_file(), PageIndexPolicy::Optional).unwrap();
1435        read_and_check(f.as_file(), PageIndexPolicy::Skip).unwrap();
1436    }
1437}