parquet/encryption/
decrypt.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Configuration and utilities for decryption of files using Parquet Modular Encryption
19
20use crate::encryption::ciphers::{BlockDecryptor, RingGcmBlockDecryptor};
21use crate::encryption::modules::{create_module_aad, ModuleType};
22use crate::errors::{ParquetError, Result};
23use crate::file::column_crypto_metadata::ColumnCryptoMetaData;
24use std::borrow::Cow;
25use std::collections::HashMap;
26use std::fmt::Formatter;
27use std::io::Read;
28use std::sync::Arc;
29
30/// Trait for retrieving an encryption key using the key's metadata
31///
32/// # Example
33///
34/// This shows how you might use a `KeyRetriever` to decrypt a Parquet file
35/// if you have a set of known encryption keys with identifiers, but at read time
36/// you may not know which columns were encrypted and which keys were used.
37///
38/// In practice, the key metadata might instead store an encrypted key that must
39/// be decrypted with a Key Management Server.
40///
41/// ```
42/// # use std::collections::HashMap;
43/// # use std::sync::{Arc, Mutex};
44/// # use parquet::encryption::decrypt::{FileDecryptionProperties, KeyRetriever};
45/// # use parquet::encryption::encrypt::FileEncryptionProperties;
46/// # use parquet::errors::ParquetError;
47/// // Define known encryption keys
48/// let mut keys = HashMap::new();
49/// keys.insert("kf".to_owned(), b"0123456789012345".to_vec());
50/// keys.insert("kc1".to_owned(), b"1234567890123450".to_vec());
51/// keys.insert("kc2".to_owned(), b"1234567890123451".to_vec());
52///
53/// // Create encryption properties for writing a file,
54/// // and specify the key identifiers as the key metadata.
55/// let encryption_properties = FileEncryptionProperties::builder(keys.get("kf").unwrap().clone())
56///     .with_footer_key_metadata("kf".into())
57///     .with_column_key_and_metadata("x", keys.get("kc1").unwrap().clone(), "kc1".as_bytes().into())
58///     .with_column_key_and_metadata("y", keys.get("kc2").unwrap().clone(), "kc2".as_bytes().into())
59///     .build()?;
60///
61/// // Write an encrypted file with the properties
62/// // ...
63///
64/// // Define a KeyRetriever that can get encryption keys using their identifiers
65/// struct CustomKeyRetriever {
66///     keys: Mutex<HashMap<String, Vec<u8>>>,
67/// }
68///
69/// impl KeyRetriever for CustomKeyRetriever {
70///     fn retrieve_key(&self, key_metadata: &[u8]) -> parquet::errors::Result<Vec<u8>> {
71///         // Metadata is bytes, so convert it to a string identifier
72///         let key_metadata = std::str::from_utf8(key_metadata).map_err(|e| {
73///             ParquetError::General(format!("Could not convert key metadata to string: {e}"))
74///         })?;
75///         // Lookup the key
76///         let keys = self.keys.lock().unwrap();
77///         match keys.get(key_metadata) {
78///             Some(key) => Ok(key.clone()),
79///             None => Err(ParquetError::General(format!(
80///                 "Could not retrieve key for metadata {key_metadata:?}"
81///             ))),
82///         }
83///     }
84/// }
85///
86/// let key_retriever = Arc::new(CustomKeyRetriever {
87///     keys: Mutex::new(keys),
88/// });
89///
90/// // Create decryption properties for reading an encrypted file.
91/// // Note that we don't need to specify which columns are encrypted,
92/// // this is determined by the file metadata and the required keys will be retrieved
93/// // dynamically using our key retriever.
94/// let decryption_properties = FileDecryptionProperties::with_key_retriever(key_retriever)
95///     .build()?;
96///
97/// // Read an encrypted file with the decryption properties
98/// // ...
99///
100/// # Ok::<(), parquet::errors::ParquetError>(())
101/// ```
102pub trait KeyRetriever: Send + Sync {
103    /// Retrieve a decryption key given the key metadata
104    fn retrieve_key(&self, key_metadata: &[u8]) -> Result<Vec<u8>>;
105}
106
107pub(crate) fn read_and_decrypt<T: Read>(
108    decryptor: &Arc<dyn BlockDecryptor>,
109    input: &mut T,
110    aad: &[u8],
111) -> Result<Vec<u8>> {
112    let mut len_bytes = [0; 4];
113    input.read_exact(&mut len_bytes)?;
114    let ciphertext_len = u32::from_le_bytes(len_bytes) as usize;
115    let mut ciphertext = vec![0; 4 + ciphertext_len];
116    input.read_exact(&mut ciphertext[4..])?;
117
118    decryptor.decrypt(&ciphertext, aad.as_ref())
119}
120
121// CryptoContext is a data structure that holds the context required to
122// decrypt parquet modules (data pages, dictionary pages, etc.).
123#[derive(Debug, Clone)]
124pub(crate) struct CryptoContext {
125    pub(crate) row_group_idx: usize,
126    pub(crate) column_ordinal: usize,
127    pub(crate) page_ordinal: Option<usize>,
128    pub(crate) dictionary_page: bool,
129    // We have separate data and metadata decryptors because
130    // in GCM CTR mode, the metadata and data pages use
131    // different algorithms.
132    data_decryptor: Arc<dyn BlockDecryptor>,
133    metadata_decryptor: Arc<dyn BlockDecryptor>,
134    file_aad: Vec<u8>,
135}
136
137impl CryptoContext {
138    pub(crate) fn for_column(
139        file_decryptor: &FileDecryptor,
140        column_crypto_metadata: &ColumnCryptoMetaData,
141        row_group_idx: usize,
142        column_ordinal: usize,
143    ) -> Result<Self> {
144        let (data_decryptor, metadata_decryptor) = match column_crypto_metadata {
145            ColumnCryptoMetaData::EncryptionWithFooterKey => {
146                // TODO: In GCM-CTR mode will this need to be a non-GCM decryptor?
147                let data_decryptor = file_decryptor.get_footer_decryptor()?;
148                let metadata_decryptor = file_decryptor.get_footer_decryptor()?;
149                (data_decryptor, metadata_decryptor)
150            }
151            ColumnCryptoMetaData::EncryptionWithColumnKey(column_key_encryption) => {
152                let key_metadata = &column_key_encryption.key_metadata;
153                let full_column_name;
154                let column_name = if column_key_encryption.path_in_schema.len() == 1 {
155                    &column_key_encryption.path_in_schema[0]
156                } else {
157                    full_column_name = column_key_encryption.path_in_schema.join(".");
158                    &full_column_name
159                };
160                let data_decryptor = file_decryptor
161                    .get_column_data_decryptor(column_name, key_metadata.as_deref())?;
162                let metadata_decryptor = file_decryptor
163                    .get_column_metadata_decryptor(column_name, key_metadata.as_deref())?;
164                (data_decryptor, metadata_decryptor)
165            }
166        };
167
168        Ok(CryptoContext {
169            row_group_idx,
170            column_ordinal,
171            page_ordinal: None,
172            dictionary_page: false,
173            data_decryptor,
174            metadata_decryptor,
175            file_aad: file_decryptor.file_aad().clone(),
176        })
177    }
178
179    pub(crate) fn with_page_ordinal(&self, page_ordinal: usize) -> Self {
180        Self {
181            row_group_idx: self.row_group_idx,
182            column_ordinal: self.column_ordinal,
183            page_ordinal: Some(page_ordinal),
184            dictionary_page: false,
185            data_decryptor: self.data_decryptor.clone(),
186            metadata_decryptor: self.metadata_decryptor.clone(),
187            file_aad: self.file_aad.clone(),
188        }
189    }
190
191    pub(crate) fn create_page_header_aad(&self) -> Result<Vec<u8>> {
192        let module_type = if self.dictionary_page {
193            ModuleType::DictionaryPageHeader
194        } else {
195            ModuleType::DataPageHeader
196        };
197
198        create_module_aad(
199            self.file_aad(),
200            module_type,
201            self.row_group_idx,
202            self.column_ordinal,
203            self.page_ordinal,
204        )
205    }
206
207    pub(crate) fn create_page_aad(&self) -> Result<Vec<u8>> {
208        let module_type = if self.dictionary_page {
209            ModuleType::DictionaryPage
210        } else {
211            ModuleType::DataPage
212        };
213
214        create_module_aad(
215            self.file_aad(),
216            module_type,
217            self.row_group_idx,
218            self.column_ordinal,
219            self.page_ordinal,
220        )
221    }
222
223    pub(crate) fn for_dictionary_page(&self) -> Self {
224        Self {
225            row_group_idx: self.row_group_idx,
226            column_ordinal: self.column_ordinal,
227            page_ordinal: self.page_ordinal,
228            dictionary_page: true,
229            data_decryptor: self.data_decryptor.clone(),
230            metadata_decryptor: self.metadata_decryptor.clone(),
231            file_aad: self.file_aad.clone(),
232        }
233    }
234
235    pub(crate) fn data_decryptor(&self) -> &Arc<dyn BlockDecryptor> {
236        &self.data_decryptor
237    }
238
239    pub(crate) fn file_aad(&self) -> &Vec<u8> {
240        &self.file_aad
241    }
242}
243
244#[derive(Clone, PartialEq)]
245struct ExplicitDecryptionKeys {
246    footer_key: Vec<u8>,
247    column_keys: HashMap<String, Vec<u8>>,
248}
249
250#[derive(Clone)]
251enum DecryptionKeys {
252    Explicit(ExplicitDecryptionKeys),
253    ViaRetriever(Arc<dyn KeyRetriever>),
254}
255
256impl PartialEq for DecryptionKeys {
257    fn eq(&self, other: &Self) -> bool {
258        match (self, other) {
259            (DecryptionKeys::Explicit(keys), DecryptionKeys::Explicit(other_keys)) => {
260                keys.footer_key == other_keys.footer_key
261                    && keys.column_keys == other_keys.column_keys
262            }
263            (DecryptionKeys::ViaRetriever(_), DecryptionKeys::ViaRetriever(_)) => true,
264            _ => false,
265        }
266    }
267}
268
269/// `FileDecryptionProperties` hold keys and AAD data required to decrypt a Parquet file.
270///
271/// When reading Arrow data, the `FileDecryptionProperties` should be included in the
272/// [`ArrowReaderOptions`](crate::arrow::arrow_reader::ArrowReaderOptions)  using
273/// [`with_file_decryption_properties`](crate::arrow::arrow_reader::ArrowReaderOptions::with_file_decryption_properties).
274///
275/// # Examples
276///
277/// Create `FileDecryptionProperties` for a file encrypted with uniform encryption,
278/// where all metadata and data are encrypted with the footer key:
279/// ```
280/// # use parquet::encryption::decrypt::FileDecryptionProperties;
281/// let file_encryption_properties = FileDecryptionProperties::builder(b"0123456789012345".into())
282///     .build()?;
283/// # Ok::<(), parquet::errors::ParquetError>(())
284/// ```
285///
286/// Create properties for a file where columns are encrypted with different keys:
287/// ```
288/// # use parquet::encryption::decrypt::FileDecryptionProperties;
289/// let file_encryption_properties = FileDecryptionProperties::builder(b"0123456789012345".into())
290///     .with_column_key("x", b"1234567890123450".into())
291///     .with_column_key("y", b"1234567890123451".into())
292///     .build()?;
293/// # Ok::<(), parquet::errors::ParquetError>(())
294/// ```
295///
296/// Specify additional authenticated data, used to protect against data replacement.
297/// This must match the AAD prefix provided when the file was written, otherwise
298/// data decryption will fail.
299/// ```
300/// # use parquet::encryption::decrypt::FileDecryptionProperties;
301/// let file_encryption_properties = FileDecryptionProperties::builder(b"0123456789012345".into())
302///     .with_aad_prefix("example_file".into())
303///     .build()?;
304/// # Ok::<(), parquet::errors::ParquetError>(())
305/// ```
306#[derive(Clone, PartialEq)]
307pub struct FileDecryptionProperties {
308    keys: DecryptionKeys,
309    aad_prefix: Option<Vec<u8>>,
310}
311
312impl FileDecryptionProperties {
313    /// Returns a new [`FileDecryptionProperties`] builder that will use the provided key to
314    /// decrypt footer metadata.
315    pub fn builder(footer_key: Vec<u8>) -> DecryptionPropertiesBuilder {
316        DecryptionPropertiesBuilder::new(footer_key)
317    }
318
319    /// Returns a new [`FileDecryptionProperties`] builder that uses a [`KeyRetriever`]
320    /// to get decryption keys based on key metadata.
321    pub fn with_key_retriever(key_retriever: Arc<dyn KeyRetriever>) -> DecryptionPropertiesBuilder {
322        DecryptionPropertiesBuilder::new_with_key_retriever(key_retriever)
323    }
324
325    /// AAD prefix string uniquely identifies the file and prevents file swapping
326    pub fn aad_prefix(&self) -> Option<&Vec<u8>> {
327        self.aad_prefix.as_ref()
328    }
329
330    /// Get the encryption key for decrypting a file's footer,
331    /// and also column data if uniform encryption is used.
332    pub fn footer_key(&self, key_metadata: Option<&[u8]>) -> Result<Cow<Vec<u8>>> {
333        match &self.keys {
334            DecryptionKeys::Explicit(keys) => Ok(Cow::Borrowed(&keys.footer_key)),
335            DecryptionKeys::ViaRetriever(retriever) => {
336                let key = retriever.retrieve_key(key_metadata.unwrap_or_default())?;
337                Ok(Cow::Owned(key))
338            }
339        }
340    }
341
342    /// Get the column-specific encryption key for decrypting column data and metadata within a file
343    pub fn column_key(
344        &self,
345        column_name: &str,
346        key_metadata: Option<&[u8]>,
347    ) -> Result<Cow<Vec<u8>>> {
348        match &self.keys {
349            DecryptionKeys::Explicit(keys) => match keys.column_keys.get(column_name) {
350                None => Err(general_err!(
351                    "No column decryption key set for encrypted column '{}'",
352                    column_name
353                )),
354                Some(key) => Ok(Cow::Borrowed(key)),
355            },
356            DecryptionKeys::ViaRetriever(retriever) => {
357                let key = retriever.retrieve_key(key_metadata.unwrap_or_default())?;
358                Ok(Cow::Owned(key))
359            }
360        }
361    }
362
363    /// Get the column names and associated decryption keys that have been configured.
364    /// If a key retriever is used rather than explicit decryption keys, the result
365    /// will be empty.
366    /// Provided for testing consumer code.
367    pub fn column_keys(&self) -> (Vec<String>, Vec<Vec<u8>>) {
368        let mut column_names: Vec<String> = Vec::new();
369        let mut column_keys: Vec<Vec<u8>> = Vec::new();
370        if let DecryptionKeys::Explicit(keys) = &self.keys {
371            for (key, value) in keys.column_keys.iter() {
372                column_names.push(key.clone());
373                column_keys.push(value.clone());
374            }
375        }
376        (column_names, column_keys)
377    }
378}
379
380impl std::fmt::Debug for FileDecryptionProperties {
381    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
382        write!(f, "FileDecryptionProperties {{ }}")
383    }
384}
385
386/// Builder for [`FileDecryptionProperties`]
387///
388/// See [`FileDecryptionProperties`] for example usage.
389pub struct DecryptionPropertiesBuilder {
390    footer_key: Option<Vec<u8>>,
391    key_retriever: Option<Arc<dyn KeyRetriever>>,
392    column_keys: HashMap<String, Vec<u8>>,
393    aad_prefix: Option<Vec<u8>>,
394}
395
396impl DecryptionPropertiesBuilder {
397    /// Create a new [`DecryptionPropertiesBuilder`] builder that will use the provided key to
398    /// decrypt footer metadata.
399    pub fn new(footer_key: Vec<u8>) -> DecryptionPropertiesBuilder {
400        Self {
401            footer_key: Some(footer_key),
402            key_retriever: None,
403            column_keys: HashMap::default(),
404            aad_prefix: None,
405        }
406    }
407
408    /// Create a new [`DecryptionPropertiesBuilder`] by providing a [`KeyRetriever`] that
409    /// can be used to get decryption keys based on key metadata.
410    pub fn new_with_key_retriever(
411        key_retriever: Arc<dyn KeyRetriever>,
412    ) -> DecryptionPropertiesBuilder {
413        Self {
414            footer_key: None,
415            key_retriever: Some(key_retriever),
416            column_keys: HashMap::default(),
417            aad_prefix: None,
418        }
419    }
420
421    /// Finalize the builder and return created [`FileDecryptionProperties`]
422    pub fn build(self) -> Result<FileDecryptionProperties> {
423        let keys = match (self.footer_key, self.key_retriever) {
424            (Some(footer_key), None) => DecryptionKeys::Explicit(ExplicitDecryptionKeys {
425                footer_key,
426                column_keys: self.column_keys,
427            }),
428            (None, Some(key_retriever)) => {
429                if !self.column_keys.is_empty() {
430                    return Err(general_err!(
431                        "Cannot specify column keys directly when using a key retriever"
432                    ));
433                }
434                DecryptionKeys::ViaRetriever(key_retriever)
435            }
436            _ => {
437                unreachable!()
438            }
439        };
440        Ok(FileDecryptionProperties {
441            keys,
442            aad_prefix: self.aad_prefix,
443        })
444    }
445
446    /// Specify the expected AAD prefix to be used for decryption.
447    /// This must be set if the file was written with an AAD prefix and the
448    /// prefix is not stored in the file metadata.
449    pub fn with_aad_prefix(mut self, value: Vec<u8>) -> Self {
450        self.aad_prefix = Some(value);
451        self
452    }
453
454    /// Specify the decryption key to use for a column
455    pub fn with_column_key(mut self, column_name: &str, decryption_key: Vec<u8>) -> Self {
456        self.column_keys
457            .insert(column_name.to_string(), decryption_key);
458        self
459    }
460
461    /// Specify multiple column decryption keys
462    pub fn with_column_keys(mut self, column_names: Vec<&str>, keys: Vec<Vec<u8>>) -> Result<Self> {
463        if column_names.len() != keys.len() {
464            return Err(general_err!(
465                "The number of column names ({}) does not match the number of keys ({})",
466                column_names.len(),
467                keys.len()
468            ));
469        }
470        for (column_name, key) in column_names.into_iter().zip(keys.into_iter()) {
471            self.column_keys.insert(column_name.to_string(), key);
472        }
473        Ok(self)
474    }
475}
476
477#[derive(Clone, Debug)]
478pub(crate) struct FileDecryptor {
479    decryption_properties: FileDecryptionProperties,
480    footer_decryptor: Arc<dyn BlockDecryptor>,
481    file_aad: Vec<u8>,
482}
483
484impl PartialEq for FileDecryptor {
485    fn eq(&self, other: &Self) -> bool {
486        self.decryption_properties == other.decryption_properties && self.file_aad == other.file_aad
487    }
488}
489
490impl FileDecryptor {
491    pub(crate) fn new(
492        decryption_properties: &FileDecryptionProperties,
493        footer_key_metadata: Option<&[u8]>,
494        aad_file_unique: Vec<u8>,
495        aad_prefix: Vec<u8>,
496    ) -> Result<Self> {
497        let file_aad = [aad_prefix.as_slice(), aad_file_unique.as_slice()].concat();
498        let footer_key = decryption_properties.footer_key(footer_key_metadata)?;
499        let footer_decryptor = RingGcmBlockDecryptor::new(&footer_key).map_err(|e| {
500            general_err!(
501                "Invalid footer key. {}",
502                e.to_string().replace("Parquet error: ", "")
503            )
504        })?;
505
506        Ok(Self {
507            footer_decryptor: Arc::new(footer_decryptor),
508            decryption_properties: decryption_properties.clone(),
509            file_aad,
510        })
511    }
512
513    pub(crate) fn get_footer_decryptor(&self) -> Result<Arc<dyn BlockDecryptor>> {
514        Ok(self.footer_decryptor.clone())
515    }
516
517    pub(crate) fn get_column_data_decryptor(
518        &self,
519        column_name: &str,
520        key_metadata: Option<&[u8]>,
521    ) -> Result<Arc<dyn BlockDecryptor>> {
522        let column_key = self
523            .decryption_properties
524            .column_key(column_name, key_metadata)?;
525        Ok(Arc::new(RingGcmBlockDecryptor::new(&column_key)?))
526    }
527
528    pub(crate) fn get_column_metadata_decryptor(
529        &self,
530        column_name: &str,
531        key_metadata: Option<&[u8]>,
532    ) -> Result<Arc<dyn BlockDecryptor>> {
533        // Once GCM CTR mode is implemented, data and metadata decryptors may be different
534        self.get_column_data_decryptor(column_name, key_metadata)
535    }
536
537    pub(crate) fn file_aad(&self) -> &Vec<u8> {
538        &self.file_aad
539    }
540}