parquet/encryption/
encrypt.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Configuration and utilities for Parquet Modular Encryption
19
20use crate::encryption::ciphers::{BlockEncryptor, RingGcmBlockEncryptor};
21use crate::errors::{ParquetError, Result};
22use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, EncryptionWithColumnKey};
23use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
24use crate::thrift::TSerializable;
25use ring::rand::{SecureRandom, SystemRandom};
26use std::collections::{HashMap, HashSet};
27use std::io::Write;
28use thrift::protocol::TCompactOutputProtocol;
29
30#[derive(Debug, Clone, PartialEq)]
31struct EncryptionKey {
32    key: Vec<u8>,
33    key_metadata: Option<Vec<u8>>,
34}
35
36impl EncryptionKey {
37    fn new(key: Vec<u8>) -> EncryptionKey {
38        Self {
39            key,
40            key_metadata: None,
41        }
42    }
43
44    fn with_metadata(mut self, metadata: Vec<u8>) -> Self {
45        self.key_metadata = Some(metadata);
46        self
47    }
48
49    fn key(&self) -> &Vec<u8> {
50        &self.key
51    }
52}
53
54#[derive(Debug, Clone, PartialEq)]
55/// Defines how data in a Parquet file should be encrypted
56pub struct FileEncryptionProperties {
57    encrypt_footer: bool,
58    footer_key: EncryptionKey,
59    column_keys: HashMap<String, EncryptionKey>,
60    aad_prefix: Option<Vec<u8>>,
61    store_aad_prefix: bool,
62}
63
64impl FileEncryptionProperties {
65    /// Create a new builder for encryption properties with the given footer encryption key
66    pub fn builder(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder {
67        EncryptionPropertiesBuilder::new(footer_key)
68    }
69
70    /// Should the footer be encrypted
71    pub fn encrypt_footer(&self) -> bool {
72        self.encrypt_footer
73    }
74
75    /// Retrieval metadata of key used for encryption of footer and (possibly) columns
76    pub fn footer_key_metadata(&self) -> Option<&Vec<u8>> {
77        self.footer_key.key_metadata.as_ref()
78    }
79
80    /// Retrieval of key used for encryption of footer and (possibly) columns
81    pub fn footer_key(&self) -> &Vec<u8> {
82        &self.footer_key.key
83    }
84
85    /// Get the column names, keys, and metadata for columns to be encrypted
86    pub fn column_keys(&self) -> (Vec<String>, Vec<Vec<u8>>, Vec<Vec<u8>>) {
87        let mut column_names: Vec<String> = Vec::with_capacity(self.column_keys.len());
88        let mut keys: Vec<Vec<u8>> = Vec::with_capacity(self.column_keys.len());
89        let mut meta: Vec<Vec<u8>> = Vec::with_capacity(self.column_keys.len());
90        for (key, value) in self.column_keys.iter() {
91            column_names.push(key.clone());
92            keys.push(value.key.clone());
93            if let Some(metadata) = value.key_metadata.as_ref() {
94                meta.push(metadata.clone());
95            }
96        }
97        (column_names, keys, meta)
98    }
99
100    /// AAD prefix string uniquely identifies the file and prevents file swapping
101    pub fn aad_prefix(&self) -> Option<&Vec<u8>> {
102        self.aad_prefix.as_ref()
103    }
104
105    /// Should the AAD prefix be stored in the file
106    pub fn store_aad_prefix(&self) -> bool {
107        self.store_aad_prefix && self.aad_prefix.is_some()
108    }
109
110    /// Checks if columns that are to be encrypted are present in schema
111    pub(crate) fn validate_encrypted_column_names(
112        &self,
113        schema: &SchemaDescriptor,
114    ) -> std::result::Result<(), ParquetError> {
115        let column_paths = schema
116            .columns()
117            .iter()
118            .map(|c| c.path().string())
119            .collect::<HashSet<_>>();
120        let encryption_columns = self
121            .column_keys
122            .keys()
123            .cloned()
124            .collect::<HashSet<String>>();
125        if !encryption_columns.is_subset(&column_paths) {
126            let mut columns_missing_in_schema = encryption_columns
127                .difference(&column_paths)
128                .cloned()
129                .collect::<Vec<String>>();
130            columns_missing_in_schema.sort();
131            return Err(ParquetError::General(
132                format!(
133                    "The following columns with encryption keys specified were not found in the schema: {}",
134                    columns_missing_in_schema.join(", ")
135                )
136                .to_string(),
137            ));
138        }
139        Ok(())
140    }
141}
142
143/// Builder for [`FileEncryptionProperties`]
144pub struct EncryptionPropertiesBuilder {
145    encrypt_footer: bool,
146    footer_key: EncryptionKey,
147    column_keys: HashMap<String, EncryptionKey>,
148    aad_prefix: Option<Vec<u8>>,
149    store_aad_prefix: bool,
150}
151
152impl EncryptionPropertiesBuilder {
153    /// Create a new [`EncryptionPropertiesBuilder`] with the given footer encryption key
154    pub fn new(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder {
155        Self {
156            footer_key: EncryptionKey::new(footer_key),
157            column_keys: HashMap::default(),
158            aad_prefix: None,
159            encrypt_footer: true,
160            store_aad_prefix: false,
161        }
162    }
163
164    /// Set if the footer should be stored in plaintext (not encrypted). Defaults to false.
165    pub fn with_plaintext_footer(mut self, plaintext_footer: bool) -> Self {
166        self.encrypt_footer = !plaintext_footer;
167        self
168    }
169
170    /// Set retrieval metadata of key used for encryption of footer and (possibly) columns
171    pub fn with_footer_key_metadata(mut self, metadata: Vec<u8>) -> Self {
172        self.footer_key = self.footer_key.with_metadata(metadata);
173        self
174    }
175
176    /// Set the key used for encryption of a column. Note that if no column keys are configured then
177    /// all columns will be encrypted with the footer key.
178    /// If any column keys are configured then only the columns with a key will be encrypted.
179    pub fn with_column_key(mut self, column_name: &str, key: Vec<u8>) -> Self {
180        self.column_keys
181            .insert(column_name.to_string(), EncryptionKey::new(key));
182        self
183    }
184
185    /// Set the key used for encryption of a column and its metadata. The Key's metadata field is to
186    /// enable file readers to recover the key. For example, the metadata can keep a serialized
187    /// ID of a data key. Note that if no column keys are configured then all columns
188    /// will be encrypted with the footer key. If any column keys are configured then only the
189    /// columns with a key will be encrypted.
190    pub fn with_column_key_and_metadata(
191        mut self,
192        column_name: &str,
193        key: Vec<u8>,
194        metadata: Vec<u8>,
195    ) -> Self {
196        self.column_keys.insert(
197            column_name.to_string(),
198            EncryptionKey::new(key).with_metadata(metadata),
199        );
200        self
201    }
202
203    /// Set the keys used for encryption of columns. Analogous to
204    /// with_column_key but for multiple columns. This will add column keys provided to the
205    /// existing column keys. If column keys were already provided for some columns, the new keys
206    /// will overwrite the old ones.
207    pub fn with_column_keys(mut self, column_names: Vec<&str>, keys: Vec<Vec<u8>>) -> Result<Self> {
208        if column_names.len() != keys.len() {
209            return Err(general_err!(
210                "The number of column names ({}) does not match the number of keys ({})",
211                column_names.len(),
212                keys.len()
213            ));
214        }
215        for (i, column_name) in column_names.into_iter().enumerate() {
216            self.column_keys
217                .insert(column_name.to_string(), EncryptionKey::new(keys[i].clone()));
218        }
219        Ok(self)
220    }
221
222    /// The AAD prefix uniquely identifies the file and allows to differentiate it e.g. from
223    /// older versions of the file or from other partition files in the same data set (table).
224    /// These bytes are optionally passed by a writer upon file creation. When not specified, no
225    /// AAD prefix is used.
226    pub fn with_aad_prefix(mut self, aad_prefix: Vec<u8>) -> Self {
227        self.aad_prefix = Some(aad_prefix);
228        self
229    }
230
231    /// Should the AAD prefix be stored in the file. If false, readers will need to provide the
232    /// AAD prefix to be able to decrypt data. Defaults to false.
233    pub fn with_aad_prefix_storage(mut self, store_aad_prefix: bool) -> Self {
234        self.store_aad_prefix = store_aad_prefix;
235        self
236    }
237
238    /// Build the encryption properties
239    pub fn build(self) -> Result<FileEncryptionProperties> {
240        Ok(FileEncryptionProperties {
241            encrypt_footer: self.encrypt_footer,
242            footer_key: self.footer_key,
243            column_keys: self.column_keys,
244            aad_prefix: self.aad_prefix,
245            store_aad_prefix: self.store_aad_prefix,
246        })
247    }
248}
249
250#[derive(Debug)]
251/// The encryption configuration for a single Parquet file
252pub(crate) struct FileEncryptor {
253    properties: FileEncryptionProperties,
254    aad_file_unique: Vec<u8>,
255    file_aad: Vec<u8>,
256}
257
258impl FileEncryptor {
259    pub(crate) fn new(properties: FileEncryptionProperties) -> Result<Self> {
260        // Generate unique AAD for file
261        let rng = SystemRandom::new();
262        let mut aad_file_unique = vec![0u8; 8];
263        rng.fill(&mut aad_file_unique)?;
264
265        let file_aad = match properties.aad_prefix.as_ref() {
266            None => aad_file_unique.clone(),
267            Some(aad_prefix) => [aad_prefix.clone(), aad_file_unique.clone()].concat(),
268        };
269
270        Ok(Self {
271            properties,
272            aad_file_unique,
273            file_aad,
274        })
275    }
276
277    /// Get the encryptor's file encryption properties
278    pub fn properties(&self) -> &FileEncryptionProperties {
279        &self.properties
280    }
281
282    /// Combined AAD prefix and suffix for the file generated
283    pub fn file_aad(&self) -> &[u8] {
284        &self.file_aad
285    }
286
287    /// Unique file identifier part of AAD suffix. The full AAD suffix is generated per module by
288    /// concatenating aad_file_unique, module type, row group ordinal (all except
289    /// footer), column ordinal (all except footer) and page ordinal (data page and
290    /// header only).
291    pub fn aad_file_unique(&self) -> &Vec<u8> {
292        &self.aad_file_unique
293    }
294
295    /// Returns whether data for the specified column should be encrypted
296    pub fn is_column_encrypted(&self, column_path: &str) -> bool {
297        if self.properties.column_keys.is_empty() {
298            // Uniform encryption
299            true
300        } else {
301            self.properties.column_keys.contains_key(column_path)
302        }
303    }
304
305    /// Get the BlockEncryptor for the footer
306    pub(crate) fn get_footer_encryptor(&self) -> Result<Box<dyn BlockEncryptor>> {
307        Ok(Box::new(RingGcmBlockEncryptor::new(
308            &self.properties.footer_key.key,
309        )?))
310    }
311
312    /// Get the encryptor for a column.
313    /// Will return an error if the column is not an encrypted column.
314    pub(crate) fn get_column_encryptor(
315        &self,
316        column_path: &str,
317    ) -> Result<Box<dyn BlockEncryptor>> {
318        if self.properties.column_keys.is_empty() {
319            return self.get_footer_encryptor();
320        }
321        match self.properties.column_keys.get(column_path) {
322            None => Err(general_err!("Column '{}' is not encrypted", column_path)),
323            Some(column_key) => Ok(Box::new(RingGcmBlockEncryptor::new(column_key.key())?)),
324        }
325    }
326}
327
328/// Write an encrypted Thrift serializable object
329pub(crate) fn encrypt_object<T: TSerializable, W: Write>(
330    object: &T,
331    encryptor: &mut Box<dyn BlockEncryptor>,
332    sink: &mut W,
333    module_aad: &[u8],
334) -> Result<()> {
335    let encrypted_buffer = encrypt_object_to_vec(object, encryptor, module_aad)?;
336    sink.write_all(&encrypted_buffer)?;
337    Ok(())
338}
339
340/// Encrypt a Thrift serializable object to a byte vector
341pub(crate) fn encrypt_object_to_vec<T: TSerializable>(
342    object: &T,
343    encryptor: &mut Box<dyn BlockEncryptor>,
344    module_aad: &[u8],
345) -> Result<Vec<u8>> {
346    let mut buffer: Vec<u8> = vec![];
347    {
348        let mut unencrypted_protocol = TCompactOutputProtocol::new(&mut buffer);
349        object.write_to_out_protocol(&mut unencrypted_protocol)?;
350    }
351
352    encryptor.encrypt(buffer.as_ref(), module_aad)
353}
354
355/// Get the crypto metadata for a column from the file encryption properties
356pub(crate) fn get_column_crypto_metadata(
357    properties: &FileEncryptionProperties,
358    column: &ColumnDescPtr,
359) -> Option<ColumnCryptoMetaData> {
360    if properties.column_keys.is_empty() {
361        // Uniform encryption
362        Some(ColumnCryptoMetaData::EncryptionWithFooterKey)
363    } else {
364        properties
365            .column_keys
366            .get(&column.path().string())
367            .map(|encryption_key| {
368                // Column is encrypted with a column specific key
369                ColumnCryptoMetaData::EncryptionWithColumnKey(EncryptionWithColumnKey {
370                    path_in_schema: column.path().parts().to_vec(),
371                    key_metadata: encryption_key.key_metadata.clone(),
372                })
373            })
374    }
375}