parquet/file/metadata/thrift/
encryption.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Encryption support for Thrift serialization
19
20use crate::{
21    encryption::decrypt::{FileDecryptionProperties, FileDecryptor},
22    errors::{ParquetError, Result},
23    file::{
24        column_crypto_metadata::ColumnCryptoMetaData,
25        metadata::{
26            HeapSize, ParquetMetaData, ParquetMetaDataOptions, RowGroupMetaData,
27            thrift::{parquet_metadata_from_bytes, read_column_metadata, validate_column_metadata},
28        },
29    },
30    parquet_thrift::{
31        ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
32        ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
33    },
34    thrift_struct, thrift_union,
35};
36use std::io::Write;
37use std::sync::Arc;
38
39thrift_struct!(
40pub(crate) struct AesGcmV1 {
41  /// AAD prefix
42  1: optional binary aad_prefix
43
44  /// Unique file identifier part of AAD suffix
45  2: optional binary aad_file_unique
46
47  /// In files encrypted with AAD prefix without storing it,
48  /// readers must supply the prefix
49  3: optional bool supply_aad_prefix
50}
51);
52
53impl HeapSize for AesGcmV1 {
54    fn heap_size(&self) -> usize {
55        self.aad_prefix.heap_size()
56            + self.aad_file_unique.heap_size()
57            + self.supply_aad_prefix.heap_size()
58    }
59}
60
61thrift_struct!(
62pub(crate) struct AesGcmCtrV1 {
63  /// AAD prefix
64  1: optional binary aad_prefix
65
66  /// Unique file identifier part of AAD suffix
67  2: optional binary aad_file_unique
68
69  /// In files encrypted with AAD prefix without storing it,
70  /// readers must supply the prefix
71  3: optional bool supply_aad_prefix
72}
73);
74
75impl HeapSize for AesGcmCtrV1 {
76    fn heap_size(&self) -> usize {
77        self.aad_prefix.heap_size()
78            + self.aad_file_unique.heap_size()
79            + self.supply_aad_prefix.heap_size()
80    }
81}
82
83thrift_union!(
84union EncryptionAlgorithm {
85  1: (AesGcmV1) AES_GCM_V1
86  2: (AesGcmCtrV1) AES_GCM_CTR_V1
87}
88);
89
90impl HeapSize for EncryptionAlgorithm {
91    fn heap_size(&self) -> usize {
92        match self {
93            Self::AES_GCM_V1(gcm) => gcm.heap_size(),
94            Self::AES_GCM_CTR_V1(gcm_ctr) => gcm_ctr.heap_size(),
95        }
96    }
97}
98
99thrift_struct!(
100/// Crypto metadata for files with encrypted footer
101pub(crate) struct FileCryptoMetaData<'a> {
102  /// Encryption algorithm. This field is only used for files
103  /// with encrypted footer. Files with plaintext footer store algorithm id
104  /// inside footer (FileMetaData structure).
105  1: required EncryptionAlgorithm encryption_algorithm
106
107  /// Retrieval metadata of key used for encryption of footer,
108  /// and (possibly) columns.
109  2: optional binary<'a> key_metadata
110}
111);
112
113fn row_group_from_encrypted_thrift(
114    mut rg: RowGroupMetaData,
115    decryptor: Option<&FileDecryptor>,
116    options: Option<&ParquetMetaDataOptions>,
117) -> Result<RowGroupMetaData> {
118    let schema_descr = rg.schema_descr;
119
120    if schema_descr.num_columns() != rg.columns.len() {
121        return Err(general_err!(
122            "Column count mismatch. Schema has {} columns while Row Group has {}",
123            schema_descr.num_columns(),
124            rg.columns.len()
125        ));
126    }
127    let total_byte_size = rg.total_byte_size;
128    let num_rows = rg.num_rows;
129    let mut columns = vec![];
130
131    for (i, (mut c, d)) in rg
132        .columns
133        .drain(0..)
134        .zip(schema_descr.columns())
135        .enumerate()
136    {
137        // Read encrypted metadata if it's present and we have a decryptor.
138        if let (true, Some(decryptor)) = (c.encrypted_column_metadata.is_some(), decryptor) {
139            let column_decryptor = match c.crypto_metadata() {
140                None => {
141                    return Err(general_err!(
142                        "No crypto_metadata is set for column '{}', which has encrypted metadata",
143                        d.path().string()
144                    ));
145                }
146                Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(crypto_metadata)) => {
147                    let column_name = crypto_metadata.path_in_schema.join(".");
148                    decryptor.get_column_metadata_decryptor(
149                        column_name.as_str(),
150                        crypto_metadata.key_metadata.as_deref(),
151                    )?
152                }
153                Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
154                    decryptor.get_footer_decryptor()?
155                }
156            };
157
158            let column_aad = crate::encryption::modules::create_module_aad(
159                decryptor.file_aad(),
160                crate::encryption::modules::ModuleType::ColumnMetaData,
161                rg.ordinal.unwrap() as usize,
162                i,
163                None,
164            )?;
165
166            // Take the encrypted column metadata as it is no longer needed.
167            let encrypted_column_metadata = c.encrypted_column_metadata.take();
168            let buf = encrypted_column_metadata.unwrap();
169            let decrypted_cc_buf = column_decryptor
170                .decrypt(&buf, column_aad.as_ref())
171                .map_err(|_| {
172                    general_err!(
173                        "Unable to decrypt column '{}', perhaps the column key is wrong?",
174                        d.path().string()
175                    )
176                })?;
177
178            // parse decrypted buffer and then replace fields in 'c'
179            let mut prot = ThriftSliceInputProtocol::new(&decrypted_cc_buf);
180            let mask = read_column_metadata(&mut prot, &mut c, i, options)?;
181            validate_column_metadata(mask)?;
182
183            columns.push(c);
184        } else {
185            columns.push(c);
186        }
187    }
188
189    let sorting_columns = rg.sorting_columns;
190    let file_offset = rg.file_offset;
191    let ordinal = rg.ordinal;
192
193    Ok(RowGroupMetaData {
194        columns,
195        num_rows,
196        sorting_columns,
197        total_byte_size,
198        schema_descr,
199        file_offset,
200        ordinal,
201    })
202}
203
204/// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata that may be encrypted.
205///
206/// Typically this is used to decode the metadata from the end of a parquet
207/// file. The format of `buf` is the Thrift compact binary protocol, as specified
208/// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR
209/// ciphers as specfied in the [Parquet Encryption Spec].
210///
211/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
212/// [Parquet Encryption Spec]: https://parquet.apache.org/docs/file-format/data-pages/encryption/
213pub(crate) fn parquet_metadata_with_encryption(
214    file_decryption_properties: Option<&Arc<FileDecryptionProperties>>,
215    encrypted_footer: bool,
216    buf: &[u8],
217    options: Option<&ParquetMetaDataOptions>,
218) -> Result<ParquetMetaData> {
219    use crate::file::metadata::ParquetMetaDataBuilder;
220
221    let mut buf = buf;
222    let mut file_decryptor = None;
223    let decrypted_fmd_buf;
224
225    if encrypted_footer {
226        let mut prot = ThriftSliceInputProtocol::new(buf);
227        if let Some(file_decryption_properties) = file_decryption_properties {
228            let t_file_crypto_metadata: FileCryptoMetaData =
229                FileCryptoMetaData::read_thrift(&mut prot)
230                    .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
231            let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm {
232                EncryptionAlgorithm::AES_GCM_V1(algo) => algo.supply_aad_prefix,
233                _ => Some(false),
234            }
235            .unwrap_or(false);
236            if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() {
237                return Err(general_err!(
238                    "Parquet file was encrypted with an AAD prefix that is not stored in the file, \
239                        but no AAD prefix was provided in the file decryption properties"
240                ));
241            }
242            let decryptor = get_file_decryptor(
243                t_file_crypto_metadata.encryption_algorithm,
244                t_file_crypto_metadata.key_metadata,
245                file_decryption_properties,
246            )?;
247            let footer_decryptor = decryptor.get_footer_decryptor();
248            let aad_footer = crate::encryption::modules::create_footer_aad(decryptor.file_aad())?;
249
250            decrypted_fmd_buf = footer_decryptor?
251                .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
252                .map_err(|_| {
253                    general_err!(
254                        "Provided footer key and AAD were unable to decrypt parquet footer"
255                    )
256                })?;
257
258            buf = &decrypted_fmd_buf;
259            file_decryptor = Some(decryptor);
260        } else {
261            return Err(general_err!(
262                "Parquet file has an encrypted footer but decryption properties were not provided"
263            ));
264        }
265    }
266
267    let parquet_meta = parquet_metadata_from_bytes(buf, options)
268        .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
269
270    let ParquetMetaData {
271        mut file_metadata,
272        row_groups,
273        column_index: _,
274        offset_index: _,
275        file_decryptor: _,
276    } = parquet_meta;
277
278    // Take the encryption algorithm and footer signing key metadata as they are no longer
279    // needed after this.
280    if let (Some(algo), Some(file_decryption_properties)) = (
281        file_metadata.encryption_algorithm.take(),
282        file_decryption_properties,
283    ) {
284        let footer_signing_key_metadata = file_metadata.footer_signing_key_metadata.take();
285
286        // File has a plaintext footer but encryption algorithm is set
287        let file_decryptor_value = get_file_decryptor(
288            *algo,
289            footer_signing_key_metadata.as_deref(),
290            file_decryption_properties,
291        )?;
292        if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer {
293            file_decryptor_value.verify_plaintext_footer_signature(buf)?;
294        }
295        file_decryptor = Some(file_decryptor_value);
296    }
297
298    // decrypt column chunk info
299    let row_groups = row_groups
300        .into_iter()
301        .map(|rg| row_group_from_encrypted_thrift(rg, file_decryptor.as_ref(), options))
302        .collect::<Result<Vec<_>>>()?;
303
304    let metadata = ParquetMetaDataBuilder::new(file_metadata)
305        .set_row_groups(row_groups)
306        .set_file_decryptor(file_decryptor)
307        .build();
308
309    Ok(metadata)
310}
311
312fn get_file_decryptor(
313    encryption_algorithm: EncryptionAlgorithm,
314    footer_key_metadata: Option<&[u8]>,
315    file_decryption_properties: &Arc<FileDecryptionProperties>,
316) -> Result<FileDecryptor> {
317    match encryption_algorithm {
318        EncryptionAlgorithm::AES_GCM_V1(algo) => {
319            let aad_file_unique = algo
320                .aad_file_unique
321                .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?;
322            let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() {
323                aad_prefix.clone()
324            } else {
325                algo.aad_prefix.map(|v| v.to_vec()).unwrap_or_default()
326            };
327            let aad_file_unique = aad_file_unique.to_vec();
328
329            FileDecryptor::new(
330                file_decryption_properties,
331                footer_key_metadata,
332                aad_file_unique,
333                aad_prefix,
334            )
335        }
336        EncryptionAlgorithm::AES_GCM_CTR_V1(_) => Err(nyi_err!(
337            "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
338        )),
339    }
340}