Skip to main content

parquet/file/metadata/thrift/
encryption.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Encryption support for Thrift serialization
19
20use crate::{
21    encryption::decrypt::{FileDecryptionProperties, FileDecryptor},
22    errors::{ParquetError, Result},
23    file::{
24        column_crypto_metadata::ColumnCryptoMetaData,
25        metadata::{
26            HeapSize, ParquetMetaData, ParquetMetaDataOptions, RowGroupMetaData,
27            thrift::{parquet_metadata_from_bytes, read_column_metadata, validate_column_metadata},
28        },
29    },
30    parquet_thrift::{
31        ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
32        ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
33    },
34    thrift_struct, thrift_union,
35};
36use std::io::Write;
37use std::sync::Arc;
38
39thrift_struct!(
40pub(crate) struct AesGcmV1 {
41  /// AAD prefix
42  1: optional binary aad_prefix
43
44  /// Unique file identifier part of AAD suffix
45  2: optional binary aad_file_unique
46
47  /// In files encrypted with AAD prefix without storing it,
48  /// readers must supply the prefix
49  3: optional bool supply_aad_prefix
50}
51);
52
53impl HeapSize for AesGcmV1 {
54    fn heap_size(&self) -> usize {
55        self.aad_prefix.heap_size()
56            + self.aad_file_unique.heap_size()
57            + self.supply_aad_prefix.heap_size()
58    }
59}
60
61thrift_struct!(
62pub(crate) struct AesGcmCtrV1 {
63  /// AAD prefix
64  1: optional binary aad_prefix
65
66  /// Unique file identifier part of AAD suffix
67  2: optional binary aad_file_unique
68
69  /// In files encrypted with AAD prefix without storing it,
70  /// readers must supply the prefix
71  3: optional bool supply_aad_prefix
72}
73);
74
75impl HeapSize for AesGcmCtrV1 {
76    fn heap_size(&self) -> usize {
77        self.aad_prefix.heap_size()
78            + self.aad_file_unique.heap_size()
79            + self.supply_aad_prefix.heap_size()
80    }
81}
82
83thrift_union!(
84union EncryptionAlgorithm {
85  1: (AesGcmV1) AES_GCM_V1
86  2: (AesGcmCtrV1) AES_GCM_CTR_V1
87}
88);
89
90impl HeapSize for EncryptionAlgorithm {
91    fn heap_size(&self) -> usize {
92        match self {
93            Self::AES_GCM_V1(gcm) => gcm.heap_size(),
94            Self::AES_GCM_CTR_V1(gcm_ctr) => gcm_ctr.heap_size(),
95        }
96    }
97}
98
99thrift_struct!(
100/// Crypto metadata for files with encrypted footer
101pub(crate) struct FileCryptoMetaData<'a> {
102  /// Encryption algorithm. This field is only used for files
103  /// with encrypted footer. Files with plaintext footer store algorithm id
104  /// inside footer (FileMetaData structure).
105  1: required EncryptionAlgorithm encryption_algorithm
106
107  /// Retrieval metadata of key used for encryption of footer,
108  /// and (possibly) columns.
109  2: optional binary<'a> key_metadata
110}
111);
112
113fn row_group_from_encrypted_thrift(
114    mut rg: RowGroupMetaData,
115    decryptor: Option<&FileDecryptor>,
116    options: Option<&ParquetMetaDataOptions>,
117) -> Result<RowGroupMetaData> {
118    let schema_descr = rg.schema_descr;
119
120    if schema_descr.num_columns() != rg.columns.len() {
121        return Err(general_err!(
122            "Column count mismatch. Schema has {} columns while Row Group has {}",
123            schema_descr.num_columns(),
124            rg.columns.len()
125        ));
126    }
127    let total_byte_size = rg.total_byte_size;
128    let num_rows = rg.num_rows;
129    let mut columns = vec![];
130
131    for (i, (mut c, d)) in rg
132        .columns
133        .drain(0..)
134        .zip(schema_descr.columns())
135        .enumerate()
136    {
137        // Read encrypted metadata if it's present and we have a decryptor.
138        if let (true, Some(decryptor)) = (c.encrypted_column_metadata.is_some(), decryptor) {
139            let column_decryptor = match c.crypto_metadata() {
140                None => {
141                    return Err(general_err!(
142                        "No crypto_metadata is set for column '{}', which has encrypted metadata",
143                        d.path().string()
144                    ));
145                }
146                Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(crypto_metadata)) => {
147                    let column_name = crypto_metadata.path_in_schema.join(".");
148                    // Try to get the decryptor - if it fails, we don't have the key
149                    match decryptor.get_column_metadata_decryptor(
150                        column_name.as_str(),
151                        crypto_metadata.key_metadata.as_deref(),
152                    ) {
153                        Ok(dec) => dec,
154                        Err(_) => {
155                            // We don't have the key for this column, so we can't decrypt its metadata.
156                            columns.push(c);
157                            continue;
158                        }
159                    }
160                }
161                Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
162                    decryptor.get_footer_decryptor()?
163                }
164            };
165
166            let column_aad = crate::encryption::modules::create_module_aad(
167                decryptor.file_aad(),
168                crate::encryption::modules::ModuleType::ColumnMetaData,
169                rg.ordinal.unwrap() as usize,
170                i,
171                None,
172            )?;
173
174            // Take the encrypted column metadata as it is no longer needed.
175            let encrypted_column_metadata = c.encrypted_column_metadata.take();
176            let buf = encrypted_column_metadata.unwrap();
177            let decrypted_cc_buf = column_decryptor
178                .decrypt(&buf, column_aad.as_ref())
179                .map_err(|_| {
180                    general_err!(
181                        "Unable to decrypt column '{}', perhaps the column key is wrong?",
182                        d.path().string()
183                    )
184                })?;
185
186            // parse decrypted buffer and then replace fields in 'c'
187            let mut prot = ThriftSliceInputProtocol::new(&decrypted_cc_buf);
188            let mask = read_column_metadata(&mut prot, &mut c, i, options)?;
189            validate_column_metadata(mask)?;
190
191            columns.push(c);
192        } else {
193            columns.push(c);
194        }
195    }
196
197    let sorting_columns = rg.sorting_columns;
198    let file_offset = rg.file_offset;
199    let ordinal = rg.ordinal;
200
201    Ok(RowGroupMetaData {
202        columns,
203        num_rows,
204        sorting_columns,
205        total_byte_size,
206        schema_descr,
207        file_offset,
208        ordinal,
209    })
210}
211
212/// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata that may be encrypted.
213///
214/// Typically this is used to decode the metadata from the end of a parquet
215/// file. The format of `buf` is the Thrift compact binary protocol, as specified
216/// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR
217/// ciphers as specfied in the [Parquet Encryption Spec].
218///
219/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
220/// [Parquet Encryption Spec]: https://parquet.apache.org/docs/file-format/data-pages/encryption/
221pub(crate) fn parquet_metadata_with_encryption(
222    file_decryption_properties: Option<&Arc<FileDecryptionProperties>>,
223    encrypted_footer: bool,
224    buf: &[u8],
225    options: Option<&ParquetMetaDataOptions>,
226) -> Result<ParquetMetaData> {
227    use crate::file::metadata::ParquetMetaDataBuilder;
228
229    let mut buf = buf;
230    let mut file_decryptor = None;
231    let decrypted_fmd_buf;
232
233    if encrypted_footer {
234        let mut prot = ThriftSliceInputProtocol::new(buf);
235        if let Some(file_decryption_properties) = file_decryption_properties {
236            let t_file_crypto_metadata: FileCryptoMetaData =
237                FileCryptoMetaData::read_thrift(&mut prot)
238                    .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
239            let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm {
240                EncryptionAlgorithm::AES_GCM_V1(algo) => algo.supply_aad_prefix,
241                _ => Some(false),
242            }
243            .unwrap_or(false);
244            if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() {
245                return Err(general_err!(
246                    "Parquet file was encrypted with an AAD prefix that is not stored in the file, \
247                        but no AAD prefix was provided in the file decryption properties"
248                ));
249            }
250            let decryptor = get_file_decryptor(
251                t_file_crypto_metadata.encryption_algorithm,
252                t_file_crypto_metadata.key_metadata,
253                file_decryption_properties,
254            )?;
255            let footer_decryptor = decryptor.get_footer_decryptor();
256            let aad_footer = crate::encryption::modules::create_footer_aad(decryptor.file_aad())?;
257
258            decrypted_fmd_buf = footer_decryptor?
259                .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
260                .map_err(|_| {
261                    general_err!(
262                        "Provided footer key and AAD were unable to decrypt parquet footer"
263                    )
264                })?;
265
266            buf = &decrypted_fmd_buf;
267            file_decryptor = Some(decryptor);
268        } else {
269            return Err(general_err!(
270                "Parquet file has an encrypted footer but decryption properties were not provided"
271            ));
272        }
273    }
274
275    let parquet_meta = parquet_metadata_from_bytes(buf, options)
276        .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
277
278    let ParquetMetaData {
279        mut file_metadata,
280        row_groups,
281        column_index: _,
282        offset_index: _,
283        file_decryptor: _,
284    } = parquet_meta;
285
286    // Take the encryption algorithm and footer signing key metadata as they are no longer
287    // needed after this.
288    if let (Some(algo), Some(file_decryption_properties)) = (
289        file_metadata.encryption_algorithm.take(),
290        file_decryption_properties,
291    ) {
292        let footer_signing_key_metadata = file_metadata.footer_signing_key_metadata.take();
293
294        // File has a plaintext footer but encryption algorithm is set
295        let file_decryptor_value = get_file_decryptor(
296            *algo,
297            footer_signing_key_metadata.as_deref(),
298            file_decryption_properties,
299        )?;
300        if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer {
301            file_decryptor_value.verify_plaintext_footer_signature(buf)?;
302        }
303        file_decryptor = Some(file_decryptor_value);
304    }
305
306    // decrypt column chunk info
307    let row_groups = row_groups
308        .into_iter()
309        .map(|rg| row_group_from_encrypted_thrift(rg, file_decryptor.as_ref(), options))
310        .collect::<Result<Vec<_>>>()?;
311
312    let metadata = ParquetMetaDataBuilder::new(file_metadata)
313        .set_row_groups(row_groups)
314        .set_file_decryptor(file_decryptor)
315        .build();
316
317    Ok(metadata)
318}
319
320fn get_file_decryptor(
321    encryption_algorithm: EncryptionAlgorithm,
322    footer_key_metadata: Option<&[u8]>,
323    file_decryption_properties: &Arc<FileDecryptionProperties>,
324) -> Result<FileDecryptor> {
325    match encryption_algorithm {
326        EncryptionAlgorithm::AES_GCM_V1(algo) => {
327            let aad_file_unique = algo
328                .aad_file_unique
329                .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?;
330            let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() {
331                aad_prefix.clone()
332            } else {
333                algo.aad_prefix.map(|v| v.to_vec()).unwrap_or_default()
334            };
335            let aad_file_unique = aad_file_unique.to_vec();
336
337            FileDecryptor::new(
338                file_decryption_properties,
339                footer_key_metadata,
340                aad_file_unique,
341                aad_prefix,
342            )
343        }
344        EncryptionAlgorithm::AES_GCM_CTR_V1(_) => Err(nyi_err!(
345            "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
346        )),
347    }
348}