parquet/file/metadata/
parser.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Internal metadata parsing routines
19//!
20//! These functions parse thrift-encoded metadata from a byte slice
21//! into the corresponding Rust structures
22
23use crate::errors::ParquetError;
24use crate::file::metadata::thrift::parquet_metadata_from_bytes;
25use crate::file::metadata::{
26    ColumnChunkMetaData, PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions,
27};
28
29use crate::file::page_index::column_index::ColumnIndexMetaData;
30use crate::file::page_index::index_reader::{decode_column_index, decode_offset_index};
31use crate::file::page_index::offset_index::OffsetIndexMetaData;
32use bytes::Bytes;
33
34/// Helper struct for metadata parsing
35///
36/// This structure parses thrift-encoded bytes into the correct Rust structs,
37/// such as [`ParquetMetaData`], handling decryption if necessary.
38//
39// Note this structure is used to minimize the number of
40// places to add `#[cfg(feature = "encryption")]` checks.
41pub(crate) use inner::MetadataParser;
42
43#[cfg(feature = "encryption")]
44mod inner {
45    use std::sync::Arc;
46
47    use super::*;
48    use crate::encryption::decrypt::FileDecryptionProperties;
49    use crate::errors::Result;
50
51    /// API for decoding metadata that may be encrypted
52    #[derive(Debug, Default)]
53    pub(crate) struct MetadataParser {
54        // the credentials and keys needed to decrypt metadata
55        file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
56        // metadata parsing options
57        metadata_options: Option<Arc<ParquetMetaDataOptions>>,
58    }
59
60    impl MetadataParser {
61        pub(crate) fn new() -> Self {
62            MetadataParser::default()
63        }
64
65        pub(crate) fn with_file_decryption_properties(
66            mut self,
67            file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
68        ) -> Self {
69            self.file_decryption_properties = file_decryption_properties;
70            self
71        }
72
73        pub(crate) fn with_metadata_options(
74            self,
75            options: Option<Arc<ParquetMetaDataOptions>>,
76        ) -> Self {
77            Self {
78                metadata_options: options,
79                ..self
80            }
81        }
82
83        pub(crate) fn decode_metadata(
84            &self,
85            buf: &[u8],
86            encrypted_footer: bool,
87        ) -> Result<ParquetMetaData> {
88            if encrypted_footer || self.file_decryption_properties.is_some() {
89                crate::file::metadata::thrift::encryption::parquet_metadata_with_encryption(
90                    self.file_decryption_properties.as_ref(),
91                    encrypted_footer,
92                    buf,
93                    self.metadata_options.as_deref(),
94                )
95            } else {
96                decode_metadata(buf, self.metadata_options.as_deref())
97            }
98        }
99    }
100
101    pub(super) fn parse_single_column_index(
102        bytes: &[u8],
103        metadata: &ParquetMetaData,
104        column: &ColumnChunkMetaData,
105        row_group_index: usize,
106        col_index: usize,
107    ) -> crate::errors::Result<ColumnIndexMetaData> {
108        use crate::encryption::decrypt::CryptoContext;
109        match &column.column_crypto_metadata {
110            Some(crypto_metadata) => {
111                let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
112                    general_err!("Cannot decrypt column index, no file decryptor set")
113                })?;
114                let crypto_context = CryptoContext::for_column(
115                    file_decryptor,
116                    crypto_metadata,
117                    row_group_index,
118                    col_index,
119                )?;
120                let column_decryptor = crypto_context.metadata_decryptor();
121                let aad = crypto_context.create_column_index_aad()?;
122                let plaintext = column_decryptor.decrypt(bytes, &aad)?;
123                decode_column_index(&plaintext, column.column_type())
124            }
125            None => decode_column_index(bytes, column.column_type()),
126        }
127    }
128
129    pub(super) fn parse_single_offset_index(
130        bytes: &[u8],
131        metadata: &ParquetMetaData,
132        column: &ColumnChunkMetaData,
133        row_group_index: usize,
134        col_index: usize,
135    ) -> crate::errors::Result<OffsetIndexMetaData> {
136        use crate::encryption::decrypt::CryptoContext;
137        match &column.column_crypto_metadata {
138            Some(crypto_metadata) => {
139                let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
140                    general_err!("Cannot decrypt offset index, no file decryptor set")
141                })?;
142                let crypto_context = CryptoContext::for_column(
143                    file_decryptor,
144                    crypto_metadata,
145                    row_group_index,
146                    col_index,
147                )?;
148                let column_decryptor = crypto_context.metadata_decryptor();
149                let aad = crypto_context.create_offset_index_aad()?;
150                let plaintext = column_decryptor.decrypt(bytes, &aad)?;
151                decode_offset_index(&plaintext)
152            }
153            None => decode_offset_index(bytes),
154        }
155    }
156}
157
158#[cfg(not(feature = "encryption"))]
159mod inner {
160    use super::*;
161    use crate::errors::Result;
162    use std::sync::Arc;
163    /// parallel implementation when encryption feature is not enabled
164    ///
165    /// This has the same API as the encryption-enabled version
166    #[derive(Debug, Default)]
167    pub(crate) struct MetadataParser {
168        // metadata parsing options
169        metadata_options: Option<Arc<ParquetMetaDataOptions>>,
170    }
171
172    impl MetadataParser {
173        pub(crate) fn new() -> Self {
174            MetadataParser::default()
175        }
176
177        pub(crate) fn with_metadata_options(
178            self,
179            options: Option<Arc<ParquetMetaDataOptions>>,
180        ) -> Self {
181            Self {
182                metadata_options: options,
183            }
184        }
185
186        pub(crate) fn decode_metadata(
187            &self,
188            buf: &[u8],
189            encrypted_footer: bool,
190        ) -> Result<ParquetMetaData> {
191            if encrypted_footer {
192                Err(general_err!(
193                    "Parquet file has an encrypted footer but the encryption feature is disabled"
194                ))
195            } else {
196                decode_metadata(buf, self.metadata_options.as_deref())
197            }
198        }
199    }
200
201    pub(super) fn parse_single_column_index(
202        bytes: &[u8],
203        _metadata: &ParquetMetaData,
204        column: &ColumnChunkMetaData,
205        _row_group_index: usize,
206        _col_index: usize,
207    ) -> crate::errors::Result<ColumnIndexMetaData> {
208        decode_column_index(bytes, column.column_type())
209    }
210
211    pub(super) fn parse_single_offset_index(
212        bytes: &[u8],
213        _metadata: &ParquetMetaData,
214        _column: &ColumnChunkMetaData,
215        _row_group_index: usize,
216        _col_index: usize,
217    ) -> crate::errors::Result<OffsetIndexMetaData> {
218        decode_offset_index(bytes)
219    }
220}
221
222/// Decodes [`ParquetMetaData`] from the provided bytes.
223///
224/// Typically this is used to decode the metadata from the end of a parquet
225/// file. The format of `buf` is the Thrift compact binary protocol, as specified
226/// by the [Parquet Spec].
227///
228/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
229pub(crate) fn decode_metadata(
230    buf: &[u8],
231    options: Option<&ParquetMetaDataOptions>,
232) -> crate::errors::Result<ParquetMetaData> {
233    parquet_metadata_from_bytes(buf, options)
234}
235
236/// Parses column index from the provided bytes and adds it to the metadata.
237///
238/// Arguments
239/// * `metadata` - The ParquetMetaData to which the parsed column index will be added.
240/// * `column_index_policy` - The policy for handling column index parsing (e.g.,
241///   Required, Optional, Skip).
242/// * `bytes` - The byte slice containing the column index data.
243/// * `start_offset` - The offset where `bytes` begin in the file.
244pub(crate) fn parse_column_index(
245    metadata: &mut ParquetMetaData,
246    column_index_policy: PageIndexPolicy,
247    bytes: &Bytes,
248    start_offset: u64,
249) -> crate::errors::Result<()> {
250    if column_index_policy == PageIndexPolicy::Skip {
251        return Ok(());
252    }
253    let index = metadata
254        .row_groups()
255        .iter()
256        .enumerate()
257        .map(|(rg_idx, x)| {
258            x.columns()
259                .iter()
260                .enumerate()
261                .map(|(col_idx, c)| match c.column_index_range() {
262                    Some(r) => {
263                        let r_start = usize::try_from(r.start - start_offset)?;
264                        let r_end = usize::try_from(r.end - start_offset)?;
265                        inner::parse_single_column_index(
266                            &bytes[r_start..r_end],
267                            metadata,
268                            c,
269                            rg_idx,
270                            col_idx,
271                        )
272                    }
273                    None => Ok(ColumnIndexMetaData::NONE),
274                })
275                .collect::<crate::errors::Result<Vec<_>>>()
276        })
277        .collect::<crate::errors::Result<Vec<_>>>()?;
278
279    metadata.set_column_index(Some(index));
280    Ok(())
281}
282
283pub(crate) fn parse_offset_index(
284    metadata: &mut ParquetMetaData,
285    offset_index_policy: PageIndexPolicy,
286    bytes: &Bytes,
287    start_offset: u64,
288) -> crate::errors::Result<()> {
289    if offset_index_policy == PageIndexPolicy::Skip {
290        return Ok(());
291    }
292    let row_groups = metadata.row_groups();
293    let mut all_indexes = Vec::with_capacity(row_groups.len());
294    for (rg_idx, x) in row_groups.iter().enumerate() {
295        let mut row_group_indexes = Vec::with_capacity(x.columns().len());
296        for (col_idx, c) in x.columns().iter().enumerate() {
297            let result = match c.offset_index_range() {
298                Some(r) => {
299                    let r_start = usize::try_from(r.start - start_offset)?;
300                    let r_end = usize::try_from(r.end - start_offset)?;
301                    inner::parse_single_offset_index(
302                        &bytes[r_start..r_end],
303                        metadata,
304                        c,
305                        rg_idx,
306                        col_idx,
307                    )
308                }
309                None => Err(general_err!("missing offset index")),
310            };
311
312            match result {
313                Ok(index) => row_group_indexes.push(index),
314                Err(e) => {
315                    if offset_index_policy == PageIndexPolicy::Required {
316                        return Err(e);
317                    } else {
318                        // Invalidate and return
319                        metadata.set_column_index(None);
320                        metadata.set_offset_index(None);
321                        return Ok(());
322                    }
323                }
324            }
325        }
326        all_indexes.push(row_group_indexes);
327    }
328    metadata.set_offset_index(Some(all_indexes));
329    Ok(())
330}