parquet/file/metadata/
parser.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Internal metadata parsing routines
19//!
20//! These functions parse thrift-encoded metadata from a byte slice
21//! into the corresponding Rust structures
22
23use crate::errors::ParquetError;
24use crate::file::metadata::thrift::parquet_metadata_from_bytes;
25use crate::file::metadata::{ColumnChunkMetaData, PageIndexPolicy, ParquetMetaData};
26
27use crate::file::page_index::column_index::ColumnIndexMetaData;
28use crate::file::page_index::index_reader::{decode_column_index, decode_offset_index};
29use crate::file::page_index::offset_index::OffsetIndexMetaData;
30use bytes::Bytes;
31
32/// Helper struct for metadata parsing
33///
34/// This structure parses thrift-encoded bytes into the correct Rust structs,
35/// such as [`ParquetMetaData`], handling decryption if necessary.
36//
37// Note this structure is used to minimize the number of
38// places to add `#[cfg(feature = "encryption")]` checks.
39pub(crate) use inner::MetadataParser;
40
41#[cfg(feature = "encryption")]
42mod inner {
43    use std::sync::Arc;
44
45    use super::*;
46    use crate::encryption::decrypt::FileDecryptionProperties;
47    use crate::errors::Result;
48
49    /// API for decoding metadata that may be encrypted
50    #[derive(Debug, Default)]
51    pub(crate) struct MetadataParser {
52        // the credentials and keys needed to decrypt metadata
53        file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
54    }
55
56    impl MetadataParser {
57        pub(crate) fn new() -> Self {
58            MetadataParser::default()
59        }
60
61        pub(crate) fn with_file_decryption_properties(
62            mut self,
63            file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
64        ) -> Self {
65            self.file_decryption_properties = file_decryption_properties;
66            self
67        }
68
69        pub(crate) fn decode_metadata(
70            &self,
71            buf: &[u8],
72            encrypted_footer: bool,
73        ) -> Result<ParquetMetaData> {
74            if encrypted_footer || self.file_decryption_properties.is_some() {
75                crate::file::metadata::thrift::encryption::parquet_metadata_with_encryption(
76                    self.file_decryption_properties.as_ref(),
77                    encrypted_footer,
78                    buf,
79                )
80            } else {
81                decode_metadata(buf)
82            }
83        }
84    }
85
86    pub(super) fn parse_single_column_index(
87        bytes: &[u8],
88        metadata: &ParquetMetaData,
89        column: &ColumnChunkMetaData,
90        row_group_index: usize,
91        col_index: usize,
92    ) -> crate::errors::Result<ColumnIndexMetaData> {
93        use crate::encryption::decrypt::CryptoContext;
94        match &column.column_crypto_metadata {
95            Some(crypto_metadata) => {
96                let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
97                    general_err!("Cannot decrypt column index, no file decryptor set")
98                })?;
99                let crypto_context = CryptoContext::for_column(
100                    file_decryptor,
101                    crypto_metadata,
102                    row_group_index,
103                    col_index,
104                )?;
105                let column_decryptor = crypto_context.metadata_decryptor();
106                let aad = crypto_context.create_column_index_aad()?;
107                let plaintext = column_decryptor.decrypt(bytes, &aad)?;
108                decode_column_index(&plaintext, column.column_type())
109            }
110            None => decode_column_index(bytes, column.column_type()),
111        }
112    }
113
114    pub(super) fn parse_single_offset_index(
115        bytes: &[u8],
116        metadata: &ParquetMetaData,
117        column: &ColumnChunkMetaData,
118        row_group_index: usize,
119        col_index: usize,
120    ) -> crate::errors::Result<OffsetIndexMetaData> {
121        use crate::encryption::decrypt::CryptoContext;
122        match &column.column_crypto_metadata {
123            Some(crypto_metadata) => {
124                let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
125                    general_err!("Cannot decrypt offset index, no file decryptor set")
126                })?;
127                let crypto_context = CryptoContext::for_column(
128                    file_decryptor,
129                    crypto_metadata,
130                    row_group_index,
131                    col_index,
132                )?;
133                let column_decryptor = crypto_context.metadata_decryptor();
134                let aad = crypto_context.create_offset_index_aad()?;
135                let plaintext = column_decryptor.decrypt(bytes, &aad)?;
136                decode_offset_index(&plaintext)
137            }
138            None => decode_offset_index(bytes),
139        }
140    }
141}
142
143#[cfg(not(feature = "encryption"))]
144mod inner {
145    use super::*;
146    use crate::errors::Result;
147    /// parallel implementation when encryption feature is not enabled
148    ///
149    /// This has the same API as the encryption-enabled version
150    #[derive(Debug, Default)]
151    pub(crate) struct MetadataParser;
152
153    impl MetadataParser {
154        pub(crate) fn new() -> Self {
155            MetadataParser
156        }
157
158        pub(crate) fn decode_metadata(
159            &self,
160            buf: &[u8],
161            encrypted_footer: bool,
162        ) -> Result<ParquetMetaData> {
163            if encrypted_footer {
164                Err(general_err!(
165                    "Parquet file has an encrypted footer but the encryption feature is disabled"
166                ))
167            } else {
168                decode_metadata(buf)
169            }
170        }
171    }
172
173    pub(super) fn parse_single_column_index(
174        bytes: &[u8],
175        _metadata: &ParquetMetaData,
176        column: &ColumnChunkMetaData,
177        _row_group_index: usize,
178        _col_index: usize,
179    ) -> crate::errors::Result<ColumnIndexMetaData> {
180        decode_column_index(bytes, column.column_type())
181    }
182
183    pub(super) fn parse_single_offset_index(
184        bytes: &[u8],
185        _metadata: &ParquetMetaData,
186        _column: &ColumnChunkMetaData,
187        _row_group_index: usize,
188        _col_index: usize,
189    ) -> crate::errors::Result<OffsetIndexMetaData> {
190        decode_offset_index(bytes)
191    }
192}
193
194/// Decodes [`ParquetMetaData`] from the provided bytes.
195///
196/// Typically this is used to decode the metadata from the end of a parquet
197/// file. The format of `buf` is the Thrift compact binary protocol, as specified
198/// by the [Parquet Spec].
199///
200/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
201pub(crate) fn decode_metadata(buf: &[u8]) -> crate::errors::Result<ParquetMetaData> {
202    parquet_metadata_from_bytes(buf)
203}
204
205/// Parses column index from the provided bytes and adds it to the metadata.
206///
207/// Arguments
208/// * `metadata` - The ParquetMetaData to which the parsed column index will be added.
209/// * `column_index_policy` - The policy for handling column index parsing (e.g.,
210///   Required, Optional, Skip).
211/// * `bytes` - The byte slice containing the column index data.
212/// * `start_offset` - The offset where `bytes` begin in the file.
213pub(crate) fn parse_column_index(
214    metadata: &mut ParquetMetaData,
215    column_index_policy: PageIndexPolicy,
216    bytes: &Bytes,
217    start_offset: u64,
218) -> crate::errors::Result<()> {
219    if column_index_policy == PageIndexPolicy::Skip {
220        return Ok(());
221    }
222    let index = metadata
223        .row_groups()
224        .iter()
225        .enumerate()
226        .map(|(rg_idx, x)| {
227            x.columns()
228                .iter()
229                .enumerate()
230                .map(|(col_idx, c)| match c.column_index_range() {
231                    Some(r) => {
232                        let r_start = usize::try_from(r.start - start_offset)?;
233                        let r_end = usize::try_from(r.end - start_offset)?;
234                        inner::parse_single_column_index(
235                            &bytes[r_start..r_end],
236                            metadata,
237                            c,
238                            rg_idx,
239                            col_idx,
240                        )
241                    }
242                    None => Ok(ColumnIndexMetaData::NONE),
243                })
244                .collect::<crate::errors::Result<Vec<_>>>()
245        })
246        .collect::<crate::errors::Result<Vec<_>>>()?;
247
248    metadata.set_column_index(Some(index));
249    Ok(())
250}
251
252pub(crate) fn parse_offset_index(
253    metadata: &mut ParquetMetaData,
254    offset_index_policy: PageIndexPolicy,
255    bytes: &Bytes,
256    start_offset: u64,
257) -> crate::errors::Result<()> {
258    if offset_index_policy == PageIndexPolicy::Skip {
259        return Ok(());
260    }
261    let row_groups = metadata.row_groups();
262    let mut all_indexes = Vec::with_capacity(row_groups.len());
263    for (rg_idx, x) in row_groups.iter().enumerate() {
264        let mut row_group_indexes = Vec::with_capacity(x.columns().len());
265        for (col_idx, c) in x.columns().iter().enumerate() {
266            let result = match c.offset_index_range() {
267                Some(r) => {
268                    let r_start = usize::try_from(r.start - start_offset)?;
269                    let r_end = usize::try_from(r.end - start_offset)?;
270                    inner::parse_single_offset_index(
271                        &bytes[r_start..r_end],
272                        metadata,
273                        c,
274                        rg_idx,
275                        col_idx,
276                    )
277                }
278                None => Err(general_err!("missing offset index")),
279            };
280
281            match result {
282                Ok(index) => row_group_indexes.push(index),
283                Err(e) => {
284                    if offset_index_policy == PageIndexPolicy::Required {
285                        return Err(e);
286                    } else {
287                        // Invalidate and return
288                        metadata.set_column_index(None);
289                        metadata.set_offset_index(None);
290                        return Ok(());
291                    }
292                }
293            }
294        }
295        all_indexes.push(row_group_indexes);
296    }
297    metadata.set_offset_index(Some(all_indexes));
298    Ok(())
299}