parquet/encryption/
encrypt.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Configuration and utilities for Parquet Modular Encryption
19
20use crate::encryption::ciphers::{
21    BlockEncryptor, RingGcmBlockEncryptor, NONCE_LEN, SIZE_LEN, TAG_LEN,
22};
23use crate::errors::{ParquetError, Result};
24use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, EncryptionWithColumnKey};
25use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
26use crate::thrift::TSerializable;
27use ring::rand::{SecureRandom, SystemRandom};
28use std::collections::{HashMap, HashSet};
29use std::io::Write;
30use thrift::protocol::TCompactOutputProtocol;
31
32#[derive(Debug, Clone, PartialEq)]
33struct EncryptionKey {
34    key: Vec<u8>,
35    key_metadata: Option<Vec<u8>>,
36}
37
38impl EncryptionKey {
39    fn new(key: Vec<u8>) -> EncryptionKey {
40        Self {
41            key,
42            key_metadata: None,
43        }
44    }
45
46    fn with_metadata(mut self, metadata: Vec<u8>) -> Self {
47        self.key_metadata = Some(metadata);
48        self
49    }
50
51    fn key(&self) -> &Vec<u8> {
52        &self.key
53    }
54}
55
56#[derive(Debug, Clone, PartialEq)]
57/// Defines how data in a Parquet file should be encrypted
58///
59/// The `FileEncryptionProperties` should be included in the [`WriterProperties`](crate::file::properties::WriterProperties)
60/// used to write a file by using [`WriterPropertiesBuilder::with_file_encryption_properties`](crate::file::properties::WriterPropertiesBuilder::with_file_encryption_properties).
61///
62/// # Examples
63///
64/// Create `FileEncryptionProperties` for a file encrypted with uniform encryption,
65/// where all metadata and data are encrypted with the footer key:
66/// ```
67/// # use parquet::encryption::encrypt::FileEncryptionProperties;
68/// let file_encryption_properties = FileEncryptionProperties::builder(b"0123456789012345".into())
69///     .build()?;
70/// # Ok::<(), parquet::errors::ParquetError>(())
71/// ```
72///
73/// Create properties for a file where columns are encrypted with different keys.
74/// Any columns without a key specified will be unencrypted:
75/// ```
76/// # use parquet::encryption::encrypt::FileEncryptionProperties;
77/// let file_encryption_properties = FileEncryptionProperties::builder(b"0123456789012345".into())
78///     .with_column_key("x", b"1234567890123450".into())
79///     .with_column_key("y", b"1234567890123451".into())
80///     .build()?;
81/// # Ok::<(), parquet::errors::ParquetError>(())
82/// ```
83///
84/// Specify additional authenticated data, used to protect against data replacement.
85/// This should represent the file identity:
86/// ```
87/// # use parquet::encryption::encrypt::FileEncryptionProperties;
88/// let file_encryption_properties = FileEncryptionProperties::builder(b"0123456789012345".into())
89///     .with_aad_prefix("example_file".into())
90///     .build()?;
91/// # Ok::<(), parquet::errors::ParquetError>(())
92/// ```
93pub struct FileEncryptionProperties {
94    encrypt_footer: bool,
95    footer_key: EncryptionKey,
96    column_keys: HashMap<String, EncryptionKey>,
97    aad_prefix: Option<Vec<u8>>,
98    store_aad_prefix: bool,
99}
100
101impl FileEncryptionProperties {
102    /// Create a new builder for encryption properties with the given footer encryption key
103    pub fn builder(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder {
104        EncryptionPropertiesBuilder::new(footer_key)
105    }
106
107    /// Should the footer be encrypted
108    pub fn encrypt_footer(&self) -> bool {
109        self.encrypt_footer
110    }
111
112    /// Retrieval metadata of key used for encryption of footer and (possibly) columns
113    pub fn footer_key_metadata(&self) -> Option<&Vec<u8>> {
114        self.footer_key.key_metadata.as_ref()
115    }
116
117    /// Retrieval of key used for encryption of footer and (possibly) columns
118    pub fn footer_key(&self) -> &Vec<u8> {
119        &self.footer_key.key
120    }
121
122    /// Get the column names, keys, and metadata for columns to be encrypted
123    pub fn column_keys(&self) -> (Vec<String>, Vec<Vec<u8>>, Vec<Vec<u8>>) {
124        let mut column_names: Vec<String> = Vec::with_capacity(self.column_keys.len());
125        let mut keys: Vec<Vec<u8>> = Vec::with_capacity(self.column_keys.len());
126        let mut meta: Vec<Vec<u8>> = Vec::with_capacity(self.column_keys.len());
127        for (key, value) in self.column_keys.iter() {
128            column_names.push(key.clone());
129            keys.push(value.key.clone());
130            if let Some(metadata) = value.key_metadata.as_ref() {
131                meta.push(metadata.clone());
132            }
133        }
134        (column_names, keys, meta)
135    }
136
137    /// AAD prefix string uniquely identifies the file and prevents file swapping
138    pub fn aad_prefix(&self) -> Option<&Vec<u8>> {
139        self.aad_prefix.as_ref()
140    }
141
142    /// Should the AAD prefix be stored in the file
143    pub fn store_aad_prefix(&self) -> bool {
144        self.store_aad_prefix && self.aad_prefix.is_some()
145    }
146
147    /// Checks if columns that are to be encrypted are present in schema
148    pub(crate) fn validate_encrypted_column_names(
149        &self,
150        schema: &SchemaDescriptor,
151    ) -> std::result::Result<(), ParquetError> {
152        let column_paths = schema
153            .columns()
154            .iter()
155            .map(|c| c.path().string())
156            .collect::<HashSet<_>>();
157        let encryption_columns = self
158            .column_keys
159            .keys()
160            .cloned()
161            .collect::<HashSet<String>>();
162        if !encryption_columns.is_subset(&column_paths) {
163            let mut columns_missing_in_schema = encryption_columns
164                .difference(&column_paths)
165                .cloned()
166                .collect::<Vec<String>>();
167            columns_missing_in_schema.sort();
168            return Err(ParquetError::General(
169                format!(
170                    "The following columns with encryption keys specified were not found in the schema: {}",
171                    columns_missing_in_schema.join(", ")
172                )
173                .to_string(),
174            ));
175        }
176        Ok(())
177    }
178}
179
180/// Builder for [`FileEncryptionProperties`]
181///
182/// See [`FileEncryptionProperties`] for example usage.
183pub struct EncryptionPropertiesBuilder {
184    encrypt_footer: bool,
185    footer_key: EncryptionKey,
186    column_keys: HashMap<String, EncryptionKey>,
187    aad_prefix: Option<Vec<u8>>,
188    store_aad_prefix: bool,
189}
190
191impl EncryptionPropertiesBuilder {
192    /// Create a new [`EncryptionPropertiesBuilder`] with the given footer encryption key
193    pub fn new(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder {
194        Self {
195            footer_key: EncryptionKey::new(footer_key),
196            column_keys: HashMap::default(),
197            aad_prefix: None,
198            encrypt_footer: true,
199            store_aad_prefix: false,
200        }
201    }
202
203    /// Set if the footer should be stored in plaintext (not encrypted). Defaults to false.
204    pub fn with_plaintext_footer(mut self, plaintext_footer: bool) -> Self {
205        self.encrypt_footer = !plaintext_footer;
206        self
207    }
208
209    /// Set retrieval metadata of key used for encryption of footer and (possibly) columns
210    pub fn with_footer_key_metadata(mut self, metadata: Vec<u8>) -> Self {
211        self.footer_key = self.footer_key.with_metadata(metadata);
212        self
213    }
214
215    /// Set the key used for encryption of a column. Note that if no column keys are configured then
216    /// all columns will be encrypted with the footer key.
217    /// If any column keys are configured then only the columns with a key will be encrypted.
218    pub fn with_column_key(mut self, column_name: &str, key: Vec<u8>) -> Self {
219        self.column_keys
220            .insert(column_name.to_string(), EncryptionKey::new(key));
221        self
222    }
223
224    /// Set the key used for encryption of a column and its metadata. The Key's metadata field is to
225    /// enable file readers to recover the key. For example, the metadata can keep a serialized
226    /// ID of a data key. Note that if no column keys are configured then all columns
227    /// will be encrypted with the footer key. If any column keys are configured then only the
228    /// columns with a key will be encrypted.
229    pub fn with_column_key_and_metadata(
230        mut self,
231        column_name: &str,
232        key: Vec<u8>,
233        metadata: Vec<u8>,
234    ) -> Self {
235        self.column_keys.insert(
236            column_name.to_string(),
237            EncryptionKey::new(key).with_metadata(metadata),
238        );
239        self
240    }
241
242    /// Set the keys used for encryption of columns. Analogous to
243    /// with_column_key but for multiple columns. This will add column keys provided to the
244    /// existing column keys. If column keys were already provided for some columns, the new keys
245    /// will overwrite the old ones.
246    pub fn with_column_keys(mut self, column_names: Vec<&str>, keys: Vec<Vec<u8>>) -> Result<Self> {
247        if column_names.len() != keys.len() {
248            return Err(general_err!(
249                "The number of column names ({}) does not match the number of keys ({})",
250                column_names.len(),
251                keys.len()
252            ));
253        }
254        for (i, column_name) in column_names.into_iter().enumerate() {
255            self.column_keys
256                .insert(column_name.to_string(), EncryptionKey::new(keys[i].clone()));
257        }
258        Ok(self)
259    }
260
261    /// The AAD prefix uniquely identifies the file and allows to differentiate it e.g. from
262    /// older versions of the file or from other partition files in the same data set (table).
263    /// These bytes are optionally passed by a writer upon file creation. When not specified, no
264    /// AAD prefix is used.
265    pub fn with_aad_prefix(mut self, aad_prefix: Vec<u8>) -> Self {
266        self.aad_prefix = Some(aad_prefix);
267        self
268    }
269
270    /// Should the AAD prefix be stored in the file. If false, readers will need to provide the
271    /// AAD prefix to be able to decrypt data. Defaults to false.
272    pub fn with_aad_prefix_storage(mut self, store_aad_prefix: bool) -> Self {
273        self.store_aad_prefix = store_aad_prefix;
274        self
275    }
276
277    /// Build the encryption properties
278    pub fn build(self) -> Result<FileEncryptionProperties> {
279        Ok(FileEncryptionProperties {
280            encrypt_footer: self.encrypt_footer,
281            footer_key: self.footer_key,
282            column_keys: self.column_keys,
283            aad_prefix: self.aad_prefix,
284            store_aad_prefix: self.store_aad_prefix,
285        })
286    }
287}
288
289#[derive(Debug)]
290/// The encryption configuration for a single Parquet file
291pub(crate) struct FileEncryptor {
292    properties: FileEncryptionProperties,
293    aad_file_unique: Vec<u8>,
294    file_aad: Vec<u8>,
295}
296
297impl FileEncryptor {
298    pub(crate) fn new(properties: FileEncryptionProperties) -> Result<Self> {
299        // Generate unique AAD for file
300        let rng = SystemRandom::new();
301        let mut aad_file_unique = vec![0u8; 8];
302        rng.fill(&mut aad_file_unique)?;
303
304        let file_aad = match properties.aad_prefix.as_ref() {
305            None => aad_file_unique.clone(),
306            Some(aad_prefix) => [aad_prefix.clone(), aad_file_unique.clone()].concat(),
307        };
308
309        Ok(Self {
310            properties,
311            aad_file_unique,
312            file_aad,
313        })
314    }
315
316    /// Get the encryptor's file encryption properties
317    pub fn properties(&self) -> &FileEncryptionProperties {
318        &self.properties
319    }
320
321    /// Combined AAD prefix and suffix for the file generated
322    pub fn file_aad(&self) -> &[u8] {
323        &self.file_aad
324    }
325
326    /// Unique file identifier part of AAD suffix. The full AAD suffix is generated per module by
327    /// concatenating aad_file_unique, module type, row group ordinal (all except
328    /// footer), column ordinal (all except footer) and page ordinal (data page and
329    /// header only).
330    pub fn aad_file_unique(&self) -> &Vec<u8> {
331        &self.aad_file_unique
332    }
333
334    /// Returns whether data for the specified column should be encrypted
335    pub fn is_column_encrypted(&self, column_path: &str) -> bool {
336        if self.properties.column_keys.is_empty() {
337            // Uniform encryption
338            true
339        } else {
340            self.properties.column_keys.contains_key(column_path)
341        }
342    }
343
344    /// Get the BlockEncryptor for the footer
345    pub(crate) fn get_footer_encryptor(&self) -> Result<Box<dyn BlockEncryptor>> {
346        Ok(Box::new(RingGcmBlockEncryptor::new(
347            &self.properties.footer_key.key,
348        )?))
349    }
350
351    /// Get the encryptor for a column.
352    /// Will return an error if the column is not an encrypted column.
353    pub(crate) fn get_column_encryptor(
354        &self,
355        column_path: &str,
356    ) -> Result<Box<dyn BlockEncryptor>> {
357        if self.properties.column_keys.is_empty() {
358            return self.get_footer_encryptor();
359        }
360        match self.properties.column_keys.get(column_path) {
361            None => Err(general_err!("Column '{}' is not encrypted", column_path)),
362            Some(column_key) => Ok(Box::new(RingGcmBlockEncryptor::new(column_key.key())?)),
363        }
364    }
365}
366
367/// Write an encrypted Thrift serializable object
368pub(crate) fn encrypt_object<T: TSerializable, W: Write>(
369    object: &T,
370    encryptor: &mut Box<dyn BlockEncryptor>,
371    sink: &mut W,
372    module_aad: &[u8],
373) -> Result<()> {
374    let encrypted_buffer = encrypt_object_to_vec(object, encryptor, module_aad)?;
375    sink.write_all(&encrypted_buffer)?;
376    Ok(())
377}
378
379pub(crate) fn write_signed_plaintext_object<T: TSerializable, W: Write>(
380    object: &T,
381    encryptor: &mut Box<dyn BlockEncryptor>,
382    sink: &mut W,
383    module_aad: &[u8],
384) -> Result<()> {
385    let mut buffer: Vec<u8> = vec![];
386    {
387        let mut protocol = TCompactOutputProtocol::new(&mut buffer);
388        object.write_to_out_protocol(&mut protocol)?;
389    }
390    sink.write_all(&buffer)?;
391    buffer = encryptor.encrypt(buffer.as_ref(), module_aad)?;
392
393    // Format of encrypted buffer is: [ciphertext size, nonce, ciphertext, authentication tag]
394    let nonce = &buffer[SIZE_LEN..SIZE_LEN + NONCE_LEN];
395    let tag = &buffer[buffer.len() - TAG_LEN..];
396    sink.write_all(nonce)?;
397    sink.write_all(tag)?;
398
399    Ok(())
400}
401
402/// Encrypt a Thrift serializable object to a byte vector
403pub(crate) fn encrypt_object_to_vec<T: TSerializable>(
404    object: &T,
405    encryptor: &mut Box<dyn BlockEncryptor>,
406    module_aad: &[u8],
407) -> Result<Vec<u8>> {
408    let mut buffer: Vec<u8> = vec![];
409    {
410        let mut unencrypted_protocol = TCompactOutputProtocol::new(&mut buffer);
411        object.write_to_out_protocol(&mut unencrypted_protocol)?;
412    }
413
414    encryptor.encrypt(buffer.as_ref(), module_aad)
415}
416
417/// Get the crypto metadata for a column from the file encryption properties
418pub(crate) fn get_column_crypto_metadata(
419    properties: &FileEncryptionProperties,
420    column: &ColumnDescPtr,
421) -> Option<ColumnCryptoMetaData> {
422    if properties.column_keys.is_empty() {
423        // Uniform encryption
424        Some(ColumnCryptoMetaData::EncryptionWithFooterKey)
425    } else {
426        properties
427            .column_keys
428            .get(&column.path().string())
429            .map(|encryption_key| {
430                // Column is encrypted with a column specific key
431                ColumnCryptoMetaData::EncryptionWithColumnKey(EncryptionWithColumnKey {
432                    path_in_schema: column.path().parts().to_vec(),
433                    key_metadata: encryption_key.key_metadata.clone(),
434                })
435            })
436    }
437}