parquet/file/metadata/thrift/
encryption.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Encryption support for Thrift serialization
19
20use crate::{
21    encryption::decrypt::{FileDecryptionProperties, FileDecryptor},
22    errors::{ParquetError, Result},
23    file::{
24        column_crypto_metadata::ColumnCryptoMetaData,
25        metadata::{
26            HeapSize, ParquetMetaData, RowGroupMetaData,
27            thrift::{parquet_metadata_from_bytes, read_column_metadata, validate_column_metadata},
28        },
29    },
30    parquet_thrift::{
31        ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
32        ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
33    },
34    thrift_struct, thrift_union,
35};
36use std::io::Write;
37use std::sync::Arc;
38
39thrift_struct!(
40pub(crate) struct AesGcmV1 {
41  /// AAD prefix
42  1: optional binary aad_prefix
43
44  /// Unique file identifier part of AAD suffix
45  2: optional binary aad_file_unique
46
47  /// In files encrypted with AAD prefix without storing it,
48  /// readers must supply the prefix
49  3: optional bool supply_aad_prefix
50}
51);
52
53impl HeapSize for AesGcmV1 {
54    fn heap_size(&self) -> usize {
55        self.aad_prefix.heap_size()
56            + self.aad_file_unique.heap_size()
57            + self.supply_aad_prefix.heap_size()
58    }
59}
60
61thrift_struct!(
62pub(crate) struct AesGcmCtrV1 {
63  /// AAD prefix
64  1: optional binary aad_prefix
65
66  /// Unique file identifier part of AAD suffix
67  2: optional binary aad_file_unique
68
69  /// In files encrypted with AAD prefix without storing it,
70  /// readers must supply the prefix
71  3: optional bool supply_aad_prefix
72}
73);
74
75impl HeapSize for AesGcmCtrV1 {
76    fn heap_size(&self) -> usize {
77        self.aad_prefix.heap_size()
78            + self.aad_file_unique.heap_size()
79            + self.supply_aad_prefix.heap_size()
80    }
81}
82
83thrift_union!(
84union EncryptionAlgorithm {
85  1: (AesGcmV1) AES_GCM_V1
86  2: (AesGcmCtrV1) AES_GCM_CTR_V1
87}
88);
89
90impl HeapSize for EncryptionAlgorithm {
91    fn heap_size(&self) -> usize {
92        match self {
93            Self::AES_GCM_V1(gcm) => gcm.heap_size(),
94            Self::AES_GCM_CTR_V1(gcm_ctr) => gcm_ctr.heap_size(),
95        }
96    }
97}
98
99thrift_struct!(
100/// Crypto metadata for files with encrypted footer
101pub(crate) struct FileCryptoMetaData<'a> {
102  /// Encryption algorithm. This field is only used for files
103  /// with encrypted footer. Files with plaintext footer store algorithm id
104  /// inside footer (FileMetaData structure).
105  1: required EncryptionAlgorithm encryption_algorithm
106
107  /// Retrieval metadata of key used for encryption of footer,
108  /// and (possibly) columns.
109  2: optional binary<'a> key_metadata
110}
111);
112
113fn row_group_from_encrypted_thrift(
114    mut rg: RowGroupMetaData,
115    decryptor: Option<&FileDecryptor>,
116) -> Result<RowGroupMetaData> {
117    let schema_descr = rg.schema_descr;
118
119    if schema_descr.num_columns() != rg.columns.len() {
120        return Err(general_err!(
121            "Column count mismatch. Schema has {} columns while Row Group has {}",
122            schema_descr.num_columns(),
123            rg.columns.len()
124        ));
125    }
126    let total_byte_size = rg.total_byte_size;
127    let num_rows = rg.num_rows;
128    let mut columns = vec![];
129
130    for (i, (mut c, d)) in rg
131        .columns
132        .drain(0..)
133        .zip(schema_descr.columns())
134        .enumerate()
135    {
136        // Read encrypted metadata if it's present and we have a decryptor.
137        if let (true, Some(decryptor)) = (c.encrypted_column_metadata.is_some(), decryptor) {
138            let column_decryptor = match c.crypto_metadata() {
139                None => {
140                    return Err(general_err!(
141                        "No crypto_metadata is set for column '{}', which has encrypted metadata",
142                        d.path().string()
143                    ));
144                }
145                Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(crypto_metadata)) => {
146                    let column_name = crypto_metadata.path_in_schema.join(".");
147                    decryptor.get_column_metadata_decryptor(
148                        column_name.as_str(),
149                        crypto_metadata.key_metadata.as_deref(),
150                    )?
151                }
152                Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
153                    decryptor.get_footer_decryptor()?
154                }
155            };
156
157            let column_aad = crate::encryption::modules::create_module_aad(
158                decryptor.file_aad(),
159                crate::encryption::modules::ModuleType::ColumnMetaData,
160                rg.ordinal.unwrap() as usize,
161                i,
162                None,
163            )?;
164
165            // Take the encrypted column metadata as it is no longer needed.
166            let encrypted_column_metadata = c.encrypted_column_metadata.take();
167            let buf = encrypted_column_metadata.unwrap();
168            let decrypted_cc_buf = column_decryptor
169                .decrypt(&buf, column_aad.as_ref())
170                .map_err(|_| {
171                    general_err!(
172                        "Unable to decrypt column '{}', perhaps the column key is wrong?",
173                        d.path().string()
174                    )
175                })?;
176
177            // parse decrypted buffer and then replace fields in 'c'
178            let mut prot = ThriftSliceInputProtocol::new(&decrypted_cc_buf);
179            let mask = read_column_metadata(&mut prot, &mut c)?;
180            validate_column_metadata(mask)?;
181
182            columns.push(c);
183        } else {
184            columns.push(c);
185        }
186    }
187
188    let sorting_columns = rg.sorting_columns;
189    let file_offset = rg.file_offset;
190    let ordinal = rg.ordinal;
191
192    Ok(RowGroupMetaData {
193        columns,
194        num_rows,
195        sorting_columns,
196        total_byte_size,
197        schema_descr,
198        file_offset,
199        ordinal,
200    })
201}
202
203/// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata that may be encrypted.
204///
205/// Typically this is used to decode the metadata from the end of a parquet
206/// file. The format of `buf` is the Thrift compact binary protocol, as specified
207/// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR
208/// ciphers as specfied in the [Parquet Encryption Spec].
209///
210/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
211/// [Parquet Encryption Spec]: https://parquet.apache.org/docs/file-format/data-pages/encryption/
212pub(crate) fn parquet_metadata_with_encryption(
213    file_decryption_properties: Option<&Arc<FileDecryptionProperties>>,
214    encrypted_footer: bool,
215    buf: &[u8],
216) -> Result<ParquetMetaData> {
217    use crate::file::metadata::ParquetMetaDataBuilder;
218
219    let mut buf = buf;
220    let mut file_decryptor = None;
221    let decrypted_fmd_buf;
222
223    if encrypted_footer {
224        let mut prot = ThriftSliceInputProtocol::new(buf);
225        if let Some(file_decryption_properties) = file_decryption_properties {
226            let t_file_crypto_metadata: FileCryptoMetaData =
227                FileCryptoMetaData::read_thrift(&mut prot)
228                    .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
229            let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm {
230                EncryptionAlgorithm::AES_GCM_V1(algo) => algo.supply_aad_prefix,
231                _ => Some(false),
232            }
233            .unwrap_or(false);
234            if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() {
235                return Err(general_err!(
236                    "Parquet file was encrypted with an AAD prefix that is not stored in the file, \
237                        but no AAD prefix was provided in the file decryption properties"
238                ));
239            }
240            let decryptor = get_file_decryptor(
241                t_file_crypto_metadata.encryption_algorithm,
242                t_file_crypto_metadata.key_metadata,
243                file_decryption_properties,
244            )?;
245            let footer_decryptor = decryptor.get_footer_decryptor();
246            let aad_footer = crate::encryption::modules::create_footer_aad(decryptor.file_aad())?;
247
248            decrypted_fmd_buf = footer_decryptor?
249                .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
250                .map_err(|_| {
251                    general_err!(
252                        "Provided footer key and AAD were unable to decrypt parquet footer"
253                    )
254                })?;
255
256            buf = &decrypted_fmd_buf;
257            file_decryptor = Some(decryptor);
258        } else {
259            return Err(general_err!(
260                "Parquet file has an encrypted footer but decryption properties were not provided"
261            ));
262        }
263    }
264
265    let parquet_meta = parquet_metadata_from_bytes(buf)
266        .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
267
268    let ParquetMetaData {
269        mut file_metadata,
270        row_groups,
271        column_index: _,
272        offset_index: _,
273        file_decryptor: _,
274    } = parquet_meta;
275
276    // Take the encryption algorithm and footer signing key metadata as they are no longer
277    // needed after this.
278    if let (Some(algo), Some(file_decryption_properties)) = (
279        file_metadata.encryption_algorithm.take(),
280        file_decryption_properties,
281    ) {
282        let footer_signing_key_metadata = file_metadata.footer_signing_key_metadata.take();
283
284        // File has a plaintext footer but encryption algorithm is set
285        let file_decryptor_value = get_file_decryptor(
286            *algo,
287            footer_signing_key_metadata.as_deref(),
288            file_decryption_properties,
289        )?;
290        if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer {
291            file_decryptor_value.verify_plaintext_footer_signature(buf)?;
292        }
293        file_decryptor = Some(file_decryptor_value);
294    }
295
296    // decrypt column chunk info
297    let row_groups = row_groups
298        .into_iter()
299        .map(|rg| row_group_from_encrypted_thrift(rg, file_decryptor.as_ref()))
300        .collect::<Result<Vec<_>>>()?;
301
302    let metadata = ParquetMetaDataBuilder::new(file_metadata)
303        .set_row_groups(row_groups)
304        .set_file_decryptor(file_decryptor)
305        .build();
306
307    Ok(metadata)
308}
309
310fn get_file_decryptor(
311    encryption_algorithm: EncryptionAlgorithm,
312    footer_key_metadata: Option<&[u8]>,
313    file_decryption_properties: &Arc<FileDecryptionProperties>,
314) -> Result<FileDecryptor> {
315    match encryption_algorithm {
316        EncryptionAlgorithm::AES_GCM_V1(algo) => {
317            let aad_file_unique = algo
318                .aad_file_unique
319                .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?;
320            let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() {
321                aad_prefix.clone()
322            } else {
323                algo.aad_prefix.map(|v| v.to_vec()).unwrap_or_default()
324            };
325            let aad_file_unique = aad_file_unique.to_vec();
326
327            FileDecryptor::new(
328                file_decryption_properties,
329                footer_key_metadata,
330                aad_file_unique,
331                aad_prefix,
332            )
333        }
334        EncryptionAlgorithm::AES_GCM_CTR_V1(_) => Err(nyi_err!(
335            "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
336        )),
337    }
338}