parquet/file/metadata/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{io::Read, ops::Range, sync::Arc};
19
20use bytes::Bytes;
21
22use crate::basic::ColumnOrder;
23use crate::errors::{ParquetError, Result};
24use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
25use crate::file::page_index::index::Index;
26use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
27use crate::file::reader::ChunkReader;
28use crate::file::{FOOTER_SIZE, PARQUET_MAGIC};
29use crate::format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
30use crate::schema::types;
31use crate::schema::types::SchemaDescriptor;
32use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
33
34#[cfg(all(feature = "async", feature = "arrow"))]
35use crate::arrow::async_reader::MetadataFetch;
36
37/// Reads the [`ParquetMetaData`] from a byte stream.
38///
39/// See [`crate::file::metadata::ParquetMetaDataWriter#output-format`] for a description of
40/// the Parquet metadata.
41///
42/// Parquet metadata is not necessarily contiguous in the files: part is stored
43/// in the footer (the last bytes of the file), but other portions (such as the
44/// PageIndex) can be stored elsewhere.
45///
46/// This reader handles reading the footer as well as the non contiguous parts
47/// of the metadata such as the page indexes; excluding Bloom Filters.
48///
49/// # Example
50/// ```no_run
51/// # use parquet::file::metadata::ParquetMetaDataReader;
52/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
53/// // read parquet metadata including page indexes from a file
54/// let file = open_parquet_file("some_path.parquet");
55/// let mut reader = ParquetMetaDataReader::new()
56///     .with_page_indexes(true);
57/// reader.try_parse(&file).unwrap();
58/// let metadata = reader.finish().unwrap();
59/// assert!(metadata.column_index().is_some());
60/// assert!(metadata.offset_index().is_some());
61/// ```
62#[derive(Default)]
63pub struct ParquetMetaDataReader {
64    metadata: Option<ParquetMetaData>,
65    column_index: bool,
66    offset_index: bool,
67    prefetch_hint: Option<usize>,
68    // Size of the serialized thrift metadata plus the 8 byte footer. Only set if
69    // `self.parse_metadata` is called.
70    metadata_size: Option<usize>,
71}
72
73impl ParquetMetaDataReader {
74    /// Create a new [`ParquetMetaDataReader`]
75    pub fn new() -> Self {
76        Default::default()
77    }
78
79    /// Create a new [`ParquetMetaDataReader`] populated with a [`ParquetMetaData`] struct
80    /// obtained via other means.
81    pub fn new_with_metadata(metadata: ParquetMetaData) -> Self {
82        Self {
83            metadata: Some(metadata),
84            ..Default::default()
85        }
86    }
87
88    /// Enable or disable reading the page index structures described in
89    /// "[Parquet page index]: Layout to Support Page Skipping". Equivalent to:
90    /// `self.with_column_indexes(val).with_offset_indexes(val)`
91    ///
92    /// [Parquet page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
93    pub fn with_page_indexes(self, val: bool) -> Self {
94        self.with_column_indexes(val).with_offset_indexes(val)
95    }
96
97    /// Enable or disable reading the Parquet [ColumnIndex] structure.
98    ///
99    /// [ColumnIndex]:  https://github.com/apache/parquet-format/blob/master/PageIndex.md
100    pub fn with_column_indexes(mut self, val: bool) -> Self {
101        self.column_index = val;
102        self
103    }
104
105    /// Enable or disable reading the Parquet [OffsetIndex] structure.
106    ///
107    /// [OffsetIndex]:  https://github.com/apache/parquet-format/blob/master/PageIndex.md
108    pub fn with_offset_indexes(mut self, val: bool) -> Self {
109        self.offset_index = val;
110        self
111    }
112
113    /// Provide a hint as to the number of bytes needed to fully parse the [`ParquetMetaData`].
114    /// Only used for the asynchronous [`Self::try_load()`] method.
115    ///
116    /// By default, the reader will first fetch the last 8 bytes of the input file to obtain the
117    /// size of the footer metadata. A second fetch will be performed to obtain the needed bytes.
118    /// After parsing the footer metadata, a third fetch will be performed to obtain the bytes
119    /// needed to decode the page index structures, if they have been requested. To avoid
120    /// unnecessary fetches, `prefetch` can be set to an estimate of the number of bytes needed
121    /// to fully decode the [`ParquetMetaData`], which can reduce the number of fetch requests and
122    /// reduce latency. Setting `prefetch` too small will not trigger an error, but will result
123    /// in extra fetches being performed.
124    pub fn with_prefetch_hint(mut self, prefetch: Option<usize>) -> Self {
125        self.prefetch_hint = prefetch;
126        self
127    }
128
129    /// Indicates whether this reader has a [`ParquetMetaData`] internally.
130    pub fn has_metadata(&self) -> bool {
131        self.metadata.is_some()
132    }
133
134    /// Return the parsed [`ParquetMetaData`] struct, leaving `None` in its place.
135    pub fn finish(&mut self) -> Result<ParquetMetaData> {
136        self.metadata
137            .take()
138            .ok_or_else(|| general_err!("could not parse parquet metadata"))
139    }
140
141    /// Given a [`ChunkReader`], parse and return the [`ParquetMetaData`] in a single pass.
142    ///
143    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
144    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
145    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
146    ///
147    /// This call will consume `self`.
148    ///
149    /// # Example
150    /// ```no_run
151    /// # use parquet::file::metadata::ParquetMetaDataReader;
152    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
153    /// // read parquet metadata including page indexes
154    /// let file = open_parquet_file("some_path.parquet");
155    /// let metadata = ParquetMetaDataReader::new()
156    ///     .with_page_indexes(true)
157    ///     .parse_and_finish(&file).unwrap();
158    /// ```
159    pub fn parse_and_finish<R: ChunkReader>(mut self, reader: &R) -> Result<ParquetMetaData> {
160        self.try_parse(reader)?;
161        self.finish()
162    }
163
164    /// Attempts to parse the footer metadata (and optionally page indexes) given a [`ChunkReader`].
165    ///
166    /// If `reader` is [`Bytes`] based, then the buffer must contain sufficient bytes to complete
167    /// the request, and must include the Parquet footer. If page indexes are desired, the buffer
168    /// must contain the entire file, or [`Self::try_parse_sized()`] should be used.
169    pub fn try_parse<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
170        self.try_parse_sized(reader, reader.len() as usize)
171    }
172
173    /// Same as [`Self::try_parse()`], but provide the original file size in the case that `reader`
174    /// is a [`Bytes`] struct that does not contain the entire file. This information is necessary
175    /// when the page indexes are desired. `reader` must have access to the Parquet footer.
176    ///
177    /// Using this function also allows for retrying with a larger buffer.
178    ///
179    /// # Errors
180    ///
181    /// This function will return [`ParquetError::NeedMoreData`] in the event `reader` does not
182    /// provide enough data to fully parse the metadata (see example below). The returned error
183    /// will be populated with a `usize` field indicating the number of bytes required from the
184    /// tail of the file to completely parse the requested metadata.
185    ///
186    /// Other errors returned include [`ParquetError::General`] and [`ParquetError::EOF`].
187    ///
188    /// # Example
189    /// ```no_run
190    /// # use parquet::file::metadata::ParquetMetaDataReader;
191    /// # use parquet::errors::ParquetError;
192    /// # use crate::parquet::file::reader::Length;
193    /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<usize>) -> bytes::Bytes { unimplemented!(); }
194    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
195    /// let file = open_parquet_file("some_path.parquet");
196    /// let len = file.len() as usize;
197    /// // Speculatively read 1 kilobyte from the end of the file
198    /// let bytes = get_bytes(&file, len - 1024..len);
199    /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
200    /// match reader.try_parse_sized(&bytes, len) {
201    ///     Ok(_) => (),
202    ///     Err(ParquetError::NeedMoreData(needed)) => {
203    ///         // Read the needed number of bytes from the end of the file
204    ///         let bytes = get_bytes(&file, len - needed..len);
205    ///         reader.try_parse_sized(&bytes, len).unwrap();
206    ///     }
207    ///     _ => panic!("unexpected error")
208    /// }
209    /// let metadata = reader.finish().unwrap();
210    /// ```
211    ///
212    /// Note that it is possible for the file metadata to be completely read, but there are
213    /// insufficient bytes available to read the page indexes. [`Self::has_metadata()`] can be used
214    /// to test for this. In the event the file metadata is present, re-parsing of the file
215    /// metadata can be skipped by using [`Self::read_page_indexes_sized()`], as shown below.
216    /// ```no_run
217    /// # use parquet::file::metadata::ParquetMetaDataReader;
218    /// # use parquet::errors::ParquetError;
219    /// # use crate::parquet::file::reader::Length;
220    /// # fn get_bytes(file: &std::fs::File, range: std::ops::Range<usize>) -> bytes::Bytes { unimplemented!(); }
221    /// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
222    /// let file = open_parquet_file("some_path.parquet");
223    /// let len = file.len() as usize;
224    /// // Speculatively read 1 kilobyte from the end of the file
225    /// let mut bytes = get_bytes(&file, len - 1024..len);
226    /// let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
227    /// // Loop until `bytes` is large enough
228    /// loop {
229    ///     match reader.try_parse_sized(&bytes, len) {
230    ///         Ok(_) => break,
231    ///         Err(ParquetError::NeedMoreData(needed)) => {
232    ///             // Read the needed number of bytes from the end of the file
233    ///             bytes = get_bytes(&file, len - needed..len);
234    ///             // If file metadata was read only read page indexes, otherwise continue loop
235    ///             if reader.has_metadata() {
236    ///                 reader.read_page_indexes_sized(&bytes, len);
237    ///                 break;
238    ///             }
239    ///         }
240    ///         _ => panic!("unexpected error")
241    ///     }
242    /// }
243    /// let metadata = reader.finish().unwrap();
244    /// ```
245    pub fn try_parse_sized<R: ChunkReader>(&mut self, reader: &R, file_size: usize) -> Result<()> {
246        self.metadata = match self.parse_metadata(reader) {
247            Ok(metadata) => Some(metadata),
248            Err(ParquetError::NeedMoreData(needed)) => {
249                // If reader is the same length as `file_size` then presumably there is no more to
250                // read, so return an EOF error.
251                if file_size == reader.len() as usize || needed > file_size {
252                    return Err(eof_err!(
253                        "Parquet file too small. Size is {} but need {}",
254                        file_size,
255                        needed
256                    ));
257                } else {
258                    // Ask for a larger buffer
259                    return Err(ParquetError::NeedMoreData(needed));
260                }
261            }
262            Err(e) => return Err(e),
263        };
264
265        // we can return if page indexes aren't requested
266        if !self.column_index && !self.offset_index {
267            return Ok(());
268        }
269
270        self.read_page_indexes_sized(reader, file_size)
271    }
272
273    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
274    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`].
275    pub fn read_page_indexes<R: ChunkReader>(&mut self, reader: &R) -> Result<()> {
276        self.read_page_indexes_sized(reader, reader.len() as usize)
277    }
278
279    /// Read the page index structures when a [`ParquetMetaData`] has already been obtained.
280    /// This variant is used when `reader` cannot access the entire Parquet file (e.g. it is
281    /// a [`Bytes`] struct containing the tail of the file).
282    /// See [`Self::new_with_metadata()`] and [`Self::has_metadata()`]. Like
283    /// [`Self::try_parse_sized()`] this function may return [`ParquetError::NeedMoreData`].
284    pub fn read_page_indexes_sized<R: ChunkReader>(
285        &mut self,
286        reader: &R,
287        file_size: usize,
288    ) -> Result<()> {
289        if self.metadata.is_none() {
290            return Err(general_err!(
291                "Tried to read page indexes without ParquetMetaData metadata"
292            ));
293        }
294
295        // FIXME: there are differing implementations in the case where page indexes are missing
296        // from the file. `MetadataLoader` will leave them as `None`, while the parser in
297        // `index_reader::read_columns_indexes` returns a vector of empty vectors.
298        // It is best for this function to replicate the latter behavior for now, but in a future
299        // breaking release, the two paths to retrieve metadata should be made consistent. Note that this is only
300        // an issue if the user requested page indexes, so there is no need to provide empty
301        // vectors in `try_parse_sized()`.
302        // https://github.com/apache/arrow-rs/issues/6447
303
304        // Get bounds needed for page indexes (if any are present in the file).
305        let Some(range) = self.range_for_page_index() else {
306            return Ok(());
307        };
308
309        // Check to see if needed range is within `file_range`. Checking `range.end` seems
310        // redundant, but it guards against `range_for_page_index()` returning garbage.
311        let file_range = file_size.saturating_sub(reader.len() as usize)..file_size;
312        if !(file_range.contains(&range.start) && file_range.contains(&range.end)) {
313            // Requested range starts beyond EOF
314            if range.end > file_size {
315                return Err(eof_err!(
316                    "Parquet file too small. Range {:?} is beyond file bounds {file_size}",
317                    range
318                ));
319            } else {
320                // Ask for a larger buffer
321                return Err(ParquetError::NeedMoreData(file_size - range.start));
322            }
323        }
324
325        // Perform extra sanity check to make sure `range` and the footer metadata don't
326        // overlap.
327        if let Some(metadata_size) = self.metadata_size {
328            let metadata_range = file_size.saturating_sub(metadata_size)..file_size;
329            if range.end > metadata_range.start {
330                return Err(eof_err!(
331                    "Parquet file too small. Page index range {:?} overlaps with file metadata {:?}",
332                    range,
333                    metadata_range
334                ));
335            }
336        }
337
338        let bytes_needed = range.end - range.start;
339        let bytes = reader.get_bytes((range.start - file_range.start) as u64, bytes_needed)?;
340        let offset = range.start;
341
342        self.parse_column_index(&bytes, offset)?;
343        self.parse_offset_index(&bytes, offset)?;
344
345        Ok(())
346    }
347
348    /// Given a [`MetadataFetch`], parse and return the [`ParquetMetaData`] in a single pass.
349    ///
350    /// This call will consume `self`.
351    ///
352    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
353    /// performed by this function.
354    #[cfg(all(feature = "async", feature = "arrow"))]
355    pub async fn load_and_finish<F: MetadataFetch>(
356        mut self,
357        fetch: F,
358        file_size: usize,
359    ) -> Result<ParquetMetaData> {
360        self.try_load(fetch, file_size).await?;
361        self.finish()
362    }
363
364    /// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes)
365    /// given a [`MetadataFetch`].
366    ///
367    /// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches
368    /// performed by this function.
369    #[cfg(all(feature = "async", feature = "arrow"))]
370    pub async fn try_load<F: MetadataFetch>(
371        &mut self,
372        mut fetch: F,
373        file_size: usize,
374    ) -> Result<()> {
375        let (metadata, remainder) =
376            Self::load_metadata(&mut fetch, file_size, self.get_prefetch_size()).await?;
377
378        self.metadata = Some(metadata);
379
380        // we can return if page indexes aren't requested
381        if !self.column_index && !self.offset_index {
382            return Ok(());
383        }
384
385        self.load_page_index_with_remainder(fetch, remainder).await
386    }
387
388    /// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already
389    /// been obtained. See [`Self::new_with_metadata()`].
390    #[cfg(all(feature = "async", feature = "arrow"))]
391    pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> {
392        self.load_page_index_with_remainder(fetch, None).await
393    }
394
395    #[cfg(all(feature = "async", feature = "arrow"))]
396    async fn load_page_index_with_remainder<F: MetadataFetch>(
397        &mut self,
398        mut fetch: F,
399        remainder: Option<(usize, Bytes)>,
400    ) -> Result<()> {
401        if self.metadata.is_none() {
402            return Err(general_err!("Footer metadata is not present"));
403        }
404
405        // Get bounds needed for page indexes (if any are present in the file).
406        let range = self.range_for_page_index();
407        let range = match range {
408            Some(range) => range,
409            None => return Ok(()),
410        };
411
412        let bytes = match &remainder {
413            Some((remainder_start, remainder)) if *remainder_start <= range.start => {
414                let offset = range.start - *remainder_start;
415                let end = offset + range.end - range.start;
416                assert!(end <= remainder.len());
417                remainder.slice(offset..end)
418            }
419            // Note: this will potentially fetch data already in remainder, this keeps things simple
420            _ => fetch.fetch(range.start..range.end).await?,
421        };
422
423        // Sanity check
424        assert_eq!(bytes.len(), range.end - range.start);
425        let offset = range.start;
426
427        self.parse_column_index(&bytes, offset)?;
428        self.parse_offset_index(&bytes, offset)?;
429
430        Ok(())
431    }
432
433    fn parse_column_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> {
434        let metadata = self.metadata.as_mut().unwrap();
435        if self.column_index {
436            let index = metadata
437                .row_groups()
438                .iter()
439                .map(|x| {
440                    x.columns()
441                        .iter()
442                        .map(|c| match c.column_index_range() {
443                            Some(r) => decode_column_index(
444                                &bytes[r.start - start_offset..r.end - start_offset],
445                                c.column_type(),
446                            ),
447                            None => Ok(Index::NONE),
448                        })
449                        .collect::<Result<Vec<_>>>()
450                })
451                .collect::<Result<Vec<_>>>()?;
452            metadata.set_column_index(Some(index));
453        }
454        Ok(())
455    }
456
457    fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: usize) -> Result<()> {
458        let metadata = self.metadata.as_mut().unwrap();
459        if self.offset_index {
460            let index = metadata
461                .row_groups()
462                .iter()
463                .map(|x| {
464                    x.columns()
465                        .iter()
466                        .map(|c| match c.offset_index_range() {
467                            Some(r) => decode_offset_index(
468                                &bytes[r.start - start_offset..r.end - start_offset],
469                            ),
470                            None => Err(general_err!("missing offset index")),
471                        })
472                        .collect::<Result<Vec<_>>>()
473                })
474                .collect::<Result<Vec<_>>>()?;
475
476            metadata.set_offset_index(Some(index));
477        }
478        Ok(())
479    }
480
481    fn range_for_page_index(&self) -> Option<Range<usize>> {
482        // sanity check
483        self.metadata.as_ref()?;
484
485        // Get bounds needed for page indexes (if any are present in the file).
486        let mut range = None;
487        let metadata = self.metadata.as_ref().unwrap();
488        for c in metadata.row_groups().iter().flat_map(|r| r.columns()) {
489            if self.column_index {
490                range = acc_range(range, c.column_index_range());
491            }
492            if self.offset_index {
493                range = acc_range(range, c.offset_index_range());
494            }
495        }
496        range
497    }
498
499    // One-shot parse of footer.
500    // Side effect: this will set `self.metadata_size`
501    fn parse_metadata<R: ChunkReader>(&mut self, chunk_reader: &R) -> Result<ParquetMetaData> {
502        // check file is large enough to hold footer
503        let file_size = chunk_reader.len();
504        if file_size < (FOOTER_SIZE as u64) {
505            return Err(ParquetError::NeedMoreData(FOOTER_SIZE));
506        }
507
508        let mut footer = [0_u8; 8];
509        chunk_reader
510            .get_read(file_size - 8)?
511            .read_exact(&mut footer)?;
512
513        let metadata_len = Self::decode_footer(&footer)?;
514        let footer_metadata_len = FOOTER_SIZE + metadata_len;
515        self.metadata_size = Some(footer_metadata_len);
516
517        if footer_metadata_len > file_size as usize {
518            return Err(ParquetError::NeedMoreData(footer_metadata_len));
519        }
520
521        let start = file_size - footer_metadata_len as u64;
522        Self::decode_metadata(chunk_reader.get_bytes(start, metadata_len)?.as_ref())
523    }
524
525    /// Return the number of bytes to read in the initial pass. If `prefetch_size` has
526    /// been provided, then return that value if it is larger than the size of the Parquet
527    /// file footer (8 bytes). Otherwise returns `8`.
528    #[cfg(all(feature = "async", feature = "arrow"))]
529    fn get_prefetch_size(&self) -> usize {
530        if let Some(prefetch) = self.prefetch_hint {
531            if prefetch > FOOTER_SIZE {
532                return prefetch;
533            }
534        }
535        FOOTER_SIZE
536    }
537
538    #[cfg(all(feature = "async", feature = "arrow"))]
539    async fn load_metadata<F: MetadataFetch>(
540        fetch: &mut F,
541        file_size: usize,
542        prefetch: usize,
543    ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> {
544        if file_size < FOOTER_SIZE {
545            return Err(eof_err!("file size of {} is less than footer", file_size));
546        }
547
548        // If a size hint is provided, read more than the minimum size
549        // to try and avoid a second fetch.
550        // Note: prefetch > file_size is ok since we're using saturating_sub.
551        let footer_start = file_size.saturating_sub(prefetch);
552
553        let suffix = fetch.fetch(footer_start..file_size).await?;
554        let suffix_len = suffix.len();
555        let fetch_len = file_size - footer_start;
556        if suffix_len < fetch_len {
557            return Err(eof_err!(
558                "metadata requires {} bytes, but could only read {}",
559                fetch_len,
560                suffix_len
561            ));
562        }
563
564        let mut footer = [0; FOOTER_SIZE];
565        footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);
566
567        let length = Self::decode_footer(&footer)?;
568
569        if file_size < length + FOOTER_SIZE {
570            return Err(eof_err!(
571                "file size of {} is less than footer + metadata {}",
572                file_size,
573                length + FOOTER_SIZE
574            ));
575        }
576
577        // Did not fetch the entire file metadata in the initial read, need to make a second request
578        if length > suffix_len - FOOTER_SIZE {
579            let metadata_start = file_size - length - FOOTER_SIZE;
580            let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?;
581            Ok((Self::decode_metadata(&meta)?, None))
582        } else {
583            let metadata_start = file_size - length - FOOTER_SIZE - footer_start;
584            let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
585            Ok((
586                Self::decode_metadata(slice)?,
587                Some((footer_start, suffix.slice(..metadata_start))),
588            ))
589        }
590    }
591
592    /// Decodes the Parquet footer returning the metadata length in bytes
593    ///
594    /// A parquet footer is 8 bytes long and has the following layout:
595    /// * 4 bytes for the metadata length
596    /// * 4 bytes for the magic bytes 'PAR1'
597    ///
598    /// ```text
599    /// +-----+--------+
600    /// | len | 'PAR1' |
601    /// +-----+--------+
602    /// ```
603    pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
604        // check this is indeed a parquet file
605        if slice[4..] != PARQUET_MAGIC {
606            return Err(general_err!("Invalid Parquet file. Corrupt footer"));
607        }
608
609        // get the metadata length from the footer
610        let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap());
611        // u32 won't be larger than usize in most cases
612        Ok(metadata_len as usize)
613    }
614
615    /// Decodes [`ParquetMetaData`] from the provided bytes.
616    ///
617    /// Typically this is used to decode the metadata from the end of a parquet
618    /// file. The format of `buf` is the Thift compact binary protocol, as specified
619    /// by the [Parquet Spec].
620    ///
621    /// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
622    pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
623        let mut prot = TCompactSliceInputProtocol::new(buf);
624        let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
625            .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
626        let schema = types::from_thrift(&t_file_metadata.schema)?;
627        let schema_descr = Arc::new(SchemaDescriptor::new(schema));
628        let mut row_groups = Vec::new();
629        for rg in t_file_metadata.row_groups {
630            row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
631        }
632        let column_orders =
633            Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
634
635        let file_metadata = FileMetaData::new(
636            t_file_metadata.version,
637            t_file_metadata.num_rows,
638            t_file_metadata.created_by,
639            t_file_metadata.key_value_metadata,
640            schema_descr,
641            column_orders,
642        );
643        Ok(ParquetMetaData::new(file_metadata, row_groups))
644    }
645
646    /// Parses column orders from Thrift definition.
647    /// If no column orders are defined, returns `None`.
648    fn parse_column_orders(
649        t_column_orders: Option<Vec<TColumnOrder>>,
650        schema_descr: &SchemaDescriptor,
651    ) -> Result<Option<Vec<ColumnOrder>>> {
652        match t_column_orders {
653            Some(orders) => {
654                // Should always be the case
655                if orders.len() != schema_descr.num_columns() {
656                    return Err(general_err!("Column order length mismatch"));
657                };
658                let mut res = Vec::new();
659                for (i, column) in schema_descr.columns().iter().enumerate() {
660                    match orders[i] {
661                        TColumnOrder::TYPEORDER(_) => {
662                            let sort_order = ColumnOrder::get_sort_order(
663                                column.logical_type(),
664                                column.converted_type(),
665                                column.physical_type(),
666                            );
667                            res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
668                        }
669                    }
670                }
671                Ok(Some(res))
672            }
673            None => Ok(None),
674        }
675    }
676}
677
678#[cfg(test)]
679mod tests {
680    use super::*;
681    use bytes::Bytes;
682
683    use crate::basic::SortOrder;
684    use crate::basic::Type;
685    use crate::file::reader::Length;
686    use crate::format::TypeDefinedOrder;
687    use crate::schema::types::Type as SchemaType;
688    use crate::util::test_common::file_util::get_test_file;
689
690    #[test]
691    fn test_parse_metadata_size_smaller_than_footer() {
692        let test_file = tempfile::tempfile().unwrap();
693        let err = ParquetMetaDataReader::new()
694            .parse_metadata(&test_file)
695            .unwrap_err();
696        assert!(matches!(err, ParquetError::NeedMoreData(8)));
697    }
698
699    #[test]
700    fn test_parse_metadata_corrupt_footer() {
701        let data = Bytes::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
702        let reader_result = ParquetMetaDataReader::new().parse_metadata(&data);
703        assert_eq!(
704            reader_result.unwrap_err().to_string(),
705            "Parquet error: Invalid Parquet file. Corrupt footer"
706        );
707    }
708
709    #[test]
710    fn test_parse_metadata_invalid_start() {
711        let test_file = Bytes::from(vec![255, 0, 0, 0, b'P', b'A', b'R', b'1']);
712        let err = ParquetMetaDataReader::new()
713            .parse_metadata(&test_file)
714            .unwrap_err();
715        assert!(matches!(err, ParquetError::NeedMoreData(263)));
716    }
717
718    #[test]
719    fn test_metadata_column_orders_parse() {
720        // Define simple schema, we do not need to provide logical types.
721        let fields = vec![
722            Arc::new(
723                SchemaType::primitive_type_builder("col1", Type::INT32)
724                    .build()
725                    .unwrap(),
726            ),
727            Arc::new(
728                SchemaType::primitive_type_builder("col2", Type::FLOAT)
729                    .build()
730                    .unwrap(),
731            ),
732        ];
733        let schema = SchemaType::group_type_builder("schema")
734            .with_fields(fields)
735            .build()
736            .unwrap();
737        let schema_descr = SchemaDescriptor::new(Arc::new(schema));
738
739        let t_column_orders = Some(vec![
740            TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
741            TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
742        ]);
743
744        assert_eq!(
745            ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr).unwrap(),
746            Some(vec![
747                ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
748                ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
749            ])
750        );
751
752        // Test when no column orders are defined.
753        assert_eq!(
754            ParquetMetaDataReader::parse_column_orders(None, &schema_descr).unwrap(),
755            None
756        );
757    }
758
759    #[test]
760    fn test_metadata_column_orders_len_mismatch() {
761        let schema = SchemaType::group_type_builder("schema").build().unwrap();
762        let schema_descr = SchemaDescriptor::new(Arc::new(schema));
763
764        let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);
765
766        let res = ParquetMetaDataReader::parse_column_orders(t_column_orders, &schema_descr);
767        assert!(res.is_err());
768        assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch"));
769    }
770
771    #[test]
772    fn test_try_parse() {
773        let file = get_test_file("alltypes_tiny_pages.parquet");
774        let len = file.len() as usize;
775
776        let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
777
778        let bytes_for_range = |range: Range<usize>| {
779            file.get_bytes(range.start as u64, range.end - range.start)
780                .unwrap()
781        };
782
783        // read entire file
784        let bytes = bytes_for_range(0..len);
785        reader.try_parse(&bytes).unwrap();
786        let metadata = reader.finish().unwrap();
787        assert!(metadata.column_index.is_some());
788        assert!(metadata.offset_index.is_some());
789
790        // read more than enough of file
791        let bytes = bytes_for_range(320000..len);
792        reader.try_parse_sized(&bytes, len).unwrap();
793        let metadata = reader.finish().unwrap();
794        assert!(metadata.column_index.is_some());
795        assert!(metadata.offset_index.is_some());
796
797        // exactly enough
798        let bytes = bytes_for_range(323583..len);
799        reader.try_parse_sized(&bytes, len).unwrap();
800        let metadata = reader.finish().unwrap();
801        assert!(metadata.column_index.is_some());
802        assert!(metadata.offset_index.is_some());
803
804        // not enough for page index
805        let bytes = bytes_for_range(323584..len);
806        // should fail
807        match reader.try_parse_sized(&bytes, len).unwrap_err() {
808            // expected error, try again with provided bounds
809            ParquetError::NeedMoreData(needed) => {
810                let bytes = bytes_for_range(len - needed..len);
811                reader.try_parse_sized(&bytes, len).unwrap();
812                let metadata = reader.finish().unwrap();
813                assert!(metadata.column_index.is_some());
814                assert!(metadata.offset_index.is_some());
815            }
816            _ => panic!("unexpected error"),
817        };
818
819        // not enough for file metadata, but keep trying until page indexes are read
820        let mut reader = ParquetMetaDataReader::new().with_page_indexes(true);
821        let mut bytes = bytes_for_range(452505..len);
822        loop {
823            match reader.try_parse_sized(&bytes, len) {
824                Ok(_) => break,
825                Err(ParquetError::NeedMoreData(needed)) => {
826                    bytes = bytes_for_range(len - needed..len);
827                    if reader.has_metadata() {
828                        reader.read_page_indexes_sized(&bytes, len).unwrap();
829                        break;
830                    }
831                }
832                _ => panic!("unexpected error"),
833            }
834        }
835        let metadata = reader.finish().unwrap();
836        assert!(metadata.column_index.is_some());
837        assert!(metadata.offset_index.is_some());
838
839        // not enough for page index but lie about file size
840        let bytes = bytes_for_range(323584..len);
841        let reader_result = reader.try_parse_sized(&bytes, len - 323584).unwrap_err();
842        assert_eq!(
843            reader_result.to_string(),
844            "EOF: Parquet file too small. Range 323583..452504 is beyond file bounds 130649"
845        );
846
847        // not enough for file metadata
848        let mut reader = ParquetMetaDataReader::new();
849        let bytes = bytes_for_range(452505..len);
850        // should fail
851        match reader.try_parse_sized(&bytes, len).unwrap_err() {
852            // expected error, try again with provided bounds
853            ParquetError::NeedMoreData(needed) => {
854                let bytes = bytes_for_range(len - needed..len);
855                reader.try_parse_sized(&bytes, len).unwrap();
856                reader.finish().unwrap();
857            }
858            _ => panic!("unexpected error"),
859        };
860
861        // not enough for file metadata but use try_parse()
862        let reader_result = reader.try_parse(&bytes).unwrap_err();
863        assert_eq!(
864            reader_result.to_string(),
865            "EOF: Parquet file too small. Size is 1728 but need 1729"
866        );
867
868        // read head of file rather than tail
869        let bytes = bytes_for_range(0..1000);
870        let reader_result = reader.try_parse_sized(&bytes, len).unwrap_err();
871        assert_eq!(
872            reader_result.to_string(),
873            "Parquet error: Invalid Parquet file. Corrupt footer"
874        );
875
876        // lie about file size
877        let bytes = bytes_for_range(452510..len);
878        let reader_result = reader.try_parse_sized(&bytes, len - 452505).unwrap_err();
879        assert_eq!(
880            reader_result.to_string(),
881            "EOF: Parquet file too small. Size is 1728 but need 1729"
882        );
883    }
884}
885
886#[cfg(all(feature = "async", feature = "arrow", test))]
887mod async_tests {
888    use super::*;
889    use bytes::Bytes;
890    use futures::future::BoxFuture;
891    use futures::FutureExt;
892    use std::fs::File;
893    use std::future::Future;
894    use std::io::{Read, Seek, SeekFrom};
895    use std::ops::Range;
896    use std::sync::atomic::{AtomicUsize, Ordering};
897
898    use crate::arrow::async_reader::MetadataFetch;
899    use crate::file::reader::Length;
900    use crate::util::test_common::file_util::get_test_file;
901
902    struct MetadataFetchFn<F>(F);
903
904    impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
905    where
906        F: FnMut(Range<usize>) -> Fut + Send,
907        Fut: Future<Output = Result<Bytes>> + Send,
908    {
909        fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
910            async move { self.0(range).await }.boxed()
911        }
912    }
913
914    fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
915        file.seek(SeekFrom::Start(range.start as _))?;
916        let len = range.end - range.start;
917        let mut buf = Vec::with_capacity(len);
918        file.take(len as _).read_to_end(&mut buf)?;
919        Ok(buf.into())
920    }
921
922    #[tokio::test]
923    async fn test_simple() {
924        let mut file = get_test_file("nulls.snappy.parquet");
925        let len = file.len() as usize;
926
927        let expected = ParquetMetaDataReader::new()
928            .parse_and_finish(&file)
929            .unwrap();
930        let expected = expected.file_metadata().schema();
931        let fetch_count = AtomicUsize::new(0);
932
933        let mut fetch = |range| {
934            fetch_count.fetch_add(1, Ordering::SeqCst);
935            futures::future::ready(read_range(&mut file, range))
936        };
937
938        let input = MetadataFetchFn(&mut fetch);
939        let actual = ParquetMetaDataReader::new()
940            .load_and_finish(input, len)
941            .await
942            .unwrap();
943        assert_eq!(actual.file_metadata().schema(), expected);
944        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
945
946        // Metadata hint too small - below footer size
947        fetch_count.store(0, Ordering::SeqCst);
948        let input = MetadataFetchFn(&mut fetch);
949        let actual = ParquetMetaDataReader::new()
950            .with_prefetch_hint(Some(7))
951            .load_and_finish(input, len)
952            .await
953            .unwrap();
954        assert_eq!(actual.file_metadata().schema(), expected);
955        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
956
957        // Metadata hint too small
958        fetch_count.store(0, Ordering::SeqCst);
959        let input = MetadataFetchFn(&mut fetch);
960        let actual = ParquetMetaDataReader::new()
961            .with_prefetch_hint(Some(10))
962            .load_and_finish(input, len)
963            .await
964            .unwrap();
965        assert_eq!(actual.file_metadata().schema(), expected);
966        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
967
968        // Metadata hint too large
969        fetch_count.store(0, Ordering::SeqCst);
970        let input = MetadataFetchFn(&mut fetch);
971        let actual = ParquetMetaDataReader::new()
972            .with_prefetch_hint(Some(500))
973            .load_and_finish(input, len)
974            .await
975            .unwrap();
976        assert_eq!(actual.file_metadata().schema(), expected);
977        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
978
979        // Metadata hint exactly correct
980        fetch_count.store(0, Ordering::SeqCst);
981        let input = MetadataFetchFn(&mut fetch);
982        let actual = ParquetMetaDataReader::new()
983            .with_prefetch_hint(Some(428))
984            .load_and_finish(input, len)
985            .await
986            .unwrap();
987        assert_eq!(actual.file_metadata().schema(), expected);
988        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
989
990        let input = MetadataFetchFn(&mut fetch);
991        let err = ParquetMetaDataReader::new()
992            .load_and_finish(input, 4)
993            .await
994            .unwrap_err()
995            .to_string();
996        assert_eq!(err, "EOF: file size of 4 is less than footer");
997
998        let input = MetadataFetchFn(&mut fetch);
999        let err = ParquetMetaDataReader::new()
1000            .load_and_finish(input, 20)
1001            .await
1002            .unwrap_err()
1003            .to_string();
1004        assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
1005    }
1006
1007    #[tokio::test]
1008    async fn test_page_index() {
1009        let mut file = get_test_file("alltypes_tiny_pages.parquet");
1010        let len = file.len() as usize;
1011        let fetch_count = AtomicUsize::new(0);
1012        let mut fetch = |range| {
1013            fetch_count.fetch_add(1, Ordering::SeqCst);
1014            futures::future::ready(read_range(&mut file, range))
1015        };
1016
1017        let f = MetadataFetchFn(&mut fetch);
1018        let mut loader = ParquetMetaDataReader::new().with_page_indexes(true);
1019        loader.try_load(f, len).await.unwrap();
1020        assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
1021        let metadata = loader.finish().unwrap();
1022        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1023
1024        // Prefetch just footer exactly
1025        fetch_count.store(0, Ordering::SeqCst);
1026        let f = MetadataFetchFn(&mut fetch);
1027        let mut loader = ParquetMetaDataReader::new()
1028            .with_page_indexes(true)
1029            .with_prefetch_hint(Some(1729));
1030        loader.try_load(f, len).await.unwrap();
1031        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1032        let metadata = loader.finish().unwrap();
1033        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1034
1035        // Prefetch more than footer but not enough
1036        fetch_count.store(0, Ordering::SeqCst);
1037        let f = MetadataFetchFn(&mut fetch);
1038        let mut loader = ParquetMetaDataReader::new()
1039            .with_page_indexes(true)
1040            .with_prefetch_hint(Some(130649));
1041        loader.try_load(f, len).await.unwrap();
1042        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
1043        let metadata = loader.finish().unwrap();
1044        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1045
1046        // Prefetch exactly enough
1047        fetch_count.store(0, Ordering::SeqCst);
1048        let f = MetadataFetchFn(&mut fetch);
1049        let metadata = ParquetMetaDataReader::new()
1050            .with_page_indexes(true)
1051            .with_prefetch_hint(Some(130650))
1052            .load_and_finish(f, len)
1053            .await
1054            .unwrap();
1055        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1056        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1057
1058        // Prefetch more than enough but less than the entire file
1059        fetch_count.store(0, Ordering::SeqCst);
1060        let f = MetadataFetchFn(&mut fetch);
1061        let metadata = ParquetMetaDataReader::new()
1062            .with_page_indexes(true)
1063            .with_prefetch_hint(Some(len - 1000)) // prefetch entire file
1064            .load_and_finish(f, len)
1065            .await
1066            .unwrap();
1067        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1068        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1069
1070        // Prefetch the entire file
1071        fetch_count.store(0, Ordering::SeqCst);
1072        let f = MetadataFetchFn(&mut fetch);
1073        let metadata = ParquetMetaDataReader::new()
1074            .with_page_indexes(true)
1075            .with_prefetch_hint(Some(len)) // prefetch entire file
1076            .load_and_finish(f, len)
1077            .await
1078            .unwrap();
1079        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1080        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1081
1082        // Prefetch more than the entire file
1083        fetch_count.store(0, Ordering::SeqCst);
1084        let f = MetadataFetchFn(&mut fetch);
1085        let metadata = ParquetMetaDataReader::new()
1086            .with_page_indexes(true)
1087            .with_prefetch_hint(Some(len + 1000)) // prefetch entire file
1088            .load_and_finish(f, len)
1089            .await
1090            .unwrap();
1091        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
1092        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
1093    }
1094}