1use crate::encryption::ciphers::{
21 BlockEncryptor, RingGcmBlockEncryptor, NONCE_LEN, SIZE_LEN, TAG_LEN,
22};
23use crate::errors::{ParquetError, Result};
24use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, EncryptionWithColumnKey};
25use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
26use crate::thrift::TSerializable;
27use ring::rand::{SecureRandom, SystemRandom};
28use std::collections::{HashMap, HashSet};
29use std::io::Write;
30use thrift::protocol::TCompactOutputProtocol;
31
32#[derive(Debug, Clone, PartialEq)]
33struct EncryptionKey {
34 key: Vec<u8>,
35 key_metadata: Option<Vec<u8>>,
36}
37
38impl EncryptionKey {
39 fn new(key: Vec<u8>) -> EncryptionKey {
40 Self {
41 key,
42 key_metadata: None,
43 }
44 }
45
46 fn with_metadata(mut self, metadata: Vec<u8>) -> Self {
47 self.key_metadata = Some(metadata);
48 self
49 }
50
51 fn key(&self) -> &Vec<u8> {
52 &self.key
53 }
54}
55
56#[derive(Debug, Clone, PartialEq)]
57pub struct FileEncryptionProperties {
94 encrypt_footer: bool,
95 footer_key: EncryptionKey,
96 column_keys: HashMap<String, EncryptionKey>,
97 aad_prefix: Option<Vec<u8>>,
98 store_aad_prefix: bool,
99}
100
101impl FileEncryptionProperties {
102 pub fn builder(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder {
104 EncryptionPropertiesBuilder::new(footer_key)
105 }
106
107 pub fn encrypt_footer(&self) -> bool {
109 self.encrypt_footer
110 }
111
112 pub fn footer_key_metadata(&self) -> Option<&Vec<u8>> {
114 self.footer_key.key_metadata.as_ref()
115 }
116
117 pub fn footer_key(&self) -> &Vec<u8> {
119 &self.footer_key.key
120 }
121
122 pub fn column_keys(&self) -> (Vec<String>, Vec<Vec<u8>>, Vec<Vec<u8>>) {
124 let mut column_names: Vec<String> = Vec::with_capacity(self.column_keys.len());
125 let mut keys: Vec<Vec<u8>> = Vec::with_capacity(self.column_keys.len());
126 let mut meta: Vec<Vec<u8>> = Vec::with_capacity(self.column_keys.len());
127 for (key, value) in self.column_keys.iter() {
128 column_names.push(key.clone());
129 keys.push(value.key.clone());
130 if let Some(metadata) = value.key_metadata.as_ref() {
131 meta.push(metadata.clone());
132 }
133 }
134 (column_names, keys, meta)
135 }
136
137 pub fn aad_prefix(&self) -> Option<&Vec<u8>> {
139 self.aad_prefix.as_ref()
140 }
141
142 pub fn store_aad_prefix(&self) -> bool {
144 self.store_aad_prefix && self.aad_prefix.is_some()
145 }
146
147 pub(crate) fn validate_encrypted_column_names(
149 &self,
150 schema: &SchemaDescriptor,
151 ) -> std::result::Result<(), ParquetError> {
152 let column_paths = schema
153 .columns()
154 .iter()
155 .map(|c| c.path().string())
156 .collect::<HashSet<_>>();
157 let encryption_columns = self
158 .column_keys
159 .keys()
160 .cloned()
161 .collect::<HashSet<String>>();
162 if !encryption_columns.is_subset(&column_paths) {
163 let mut columns_missing_in_schema = encryption_columns
164 .difference(&column_paths)
165 .cloned()
166 .collect::<Vec<String>>();
167 columns_missing_in_schema.sort();
168 return Err(ParquetError::General(
169 format!(
170 "The following columns with encryption keys specified were not found in the schema: {}",
171 columns_missing_in_schema.join(", ")
172 )
173 .to_string(),
174 ));
175 }
176 Ok(())
177 }
178}
179
180pub struct EncryptionPropertiesBuilder {
184 encrypt_footer: bool,
185 footer_key: EncryptionKey,
186 column_keys: HashMap<String, EncryptionKey>,
187 aad_prefix: Option<Vec<u8>>,
188 store_aad_prefix: bool,
189}
190
191impl EncryptionPropertiesBuilder {
192 pub fn new(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder {
194 Self {
195 footer_key: EncryptionKey::new(footer_key),
196 column_keys: HashMap::default(),
197 aad_prefix: None,
198 encrypt_footer: true,
199 store_aad_prefix: false,
200 }
201 }
202
203 pub fn with_plaintext_footer(mut self, plaintext_footer: bool) -> Self {
205 self.encrypt_footer = !plaintext_footer;
206 self
207 }
208
209 pub fn with_footer_key_metadata(mut self, metadata: Vec<u8>) -> Self {
211 self.footer_key = self.footer_key.with_metadata(metadata);
212 self
213 }
214
215 pub fn with_column_key(mut self, column_name: &str, key: Vec<u8>) -> Self {
219 self.column_keys
220 .insert(column_name.to_string(), EncryptionKey::new(key));
221 self
222 }
223
224 pub fn with_column_key_and_metadata(
230 mut self,
231 column_name: &str,
232 key: Vec<u8>,
233 metadata: Vec<u8>,
234 ) -> Self {
235 self.column_keys.insert(
236 column_name.to_string(),
237 EncryptionKey::new(key).with_metadata(metadata),
238 );
239 self
240 }
241
242 pub fn with_column_keys(mut self, column_names: Vec<&str>, keys: Vec<Vec<u8>>) -> Result<Self> {
247 if column_names.len() != keys.len() {
248 return Err(general_err!(
249 "The number of column names ({}) does not match the number of keys ({})",
250 column_names.len(),
251 keys.len()
252 ));
253 }
254 for (i, column_name) in column_names.into_iter().enumerate() {
255 self.column_keys
256 .insert(column_name.to_string(), EncryptionKey::new(keys[i].clone()));
257 }
258 Ok(self)
259 }
260
261 pub fn with_aad_prefix(mut self, aad_prefix: Vec<u8>) -> Self {
266 self.aad_prefix = Some(aad_prefix);
267 self
268 }
269
270 pub fn with_aad_prefix_storage(mut self, store_aad_prefix: bool) -> Self {
273 self.store_aad_prefix = store_aad_prefix;
274 self
275 }
276
277 pub fn build(self) -> Result<FileEncryptionProperties> {
279 Ok(FileEncryptionProperties {
280 encrypt_footer: self.encrypt_footer,
281 footer_key: self.footer_key,
282 column_keys: self.column_keys,
283 aad_prefix: self.aad_prefix,
284 store_aad_prefix: self.store_aad_prefix,
285 })
286 }
287}
288
289#[derive(Debug)]
290pub(crate) struct FileEncryptor {
292 properties: FileEncryptionProperties,
293 aad_file_unique: Vec<u8>,
294 file_aad: Vec<u8>,
295}
296
297impl FileEncryptor {
298 pub(crate) fn new(properties: FileEncryptionProperties) -> Result<Self> {
299 let rng = SystemRandom::new();
301 let mut aad_file_unique = vec![0u8; 8];
302 rng.fill(&mut aad_file_unique)?;
303
304 let file_aad = match properties.aad_prefix.as_ref() {
305 None => aad_file_unique.clone(),
306 Some(aad_prefix) => [aad_prefix.clone(), aad_file_unique.clone()].concat(),
307 };
308
309 Ok(Self {
310 properties,
311 aad_file_unique,
312 file_aad,
313 })
314 }
315
316 pub fn properties(&self) -> &FileEncryptionProperties {
318 &self.properties
319 }
320
321 pub fn file_aad(&self) -> &[u8] {
323 &self.file_aad
324 }
325
326 pub fn aad_file_unique(&self) -> &Vec<u8> {
331 &self.aad_file_unique
332 }
333
334 pub fn is_column_encrypted(&self, column_path: &str) -> bool {
336 if self.properties.column_keys.is_empty() {
337 true
339 } else {
340 self.properties.column_keys.contains_key(column_path)
341 }
342 }
343
344 pub(crate) fn get_footer_encryptor(&self) -> Result<Box<dyn BlockEncryptor>> {
346 Ok(Box::new(RingGcmBlockEncryptor::new(
347 &self.properties.footer_key.key,
348 )?))
349 }
350
351 pub(crate) fn get_column_encryptor(
354 &self,
355 column_path: &str,
356 ) -> Result<Box<dyn BlockEncryptor>> {
357 if self.properties.column_keys.is_empty() {
358 return self.get_footer_encryptor();
359 }
360 match self.properties.column_keys.get(column_path) {
361 None => Err(general_err!("Column '{}' is not encrypted", column_path)),
362 Some(column_key) => Ok(Box::new(RingGcmBlockEncryptor::new(column_key.key())?)),
363 }
364 }
365}
366
367pub(crate) fn encrypt_object<T: TSerializable, W: Write>(
369 object: &T,
370 encryptor: &mut Box<dyn BlockEncryptor>,
371 sink: &mut W,
372 module_aad: &[u8],
373) -> Result<()> {
374 let encrypted_buffer = encrypt_object_to_vec(object, encryptor, module_aad)?;
375 sink.write_all(&encrypted_buffer)?;
376 Ok(())
377}
378
379pub(crate) fn write_signed_plaintext_object<T: TSerializable, W: Write>(
380 object: &T,
381 encryptor: &mut Box<dyn BlockEncryptor>,
382 sink: &mut W,
383 module_aad: &[u8],
384) -> Result<()> {
385 let mut buffer: Vec<u8> = vec![];
386 {
387 let mut protocol = TCompactOutputProtocol::new(&mut buffer);
388 object.write_to_out_protocol(&mut protocol)?;
389 }
390 sink.write_all(&buffer)?;
391 buffer = encryptor.encrypt(buffer.as_ref(), module_aad)?;
392
393 let nonce = &buffer[SIZE_LEN..SIZE_LEN + NONCE_LEN];
395 let tag = &buffer[buffer.len() - TAG_LEN..];
396 sink.write_all(nonce)?;
397 sink.write_all(tag)?;
398
399 Ok(())
400}
401
402pub(crate) fn encrypt_object_to_vec<T: TSerializable>(
404 object: &T,
405 encryptor: &mut Box<dyn BlockEncryptor>,
406 module_aad: &[u8],
407) -> Result<Vec<u8>> {
408 let mut buffer: Vec<u8> = vec![];
409 {
410 let mut unencrypted_protocol = TCompactOutputProtocol::new(&mut buffer);
411 object.write_to_out_protocol(&mut unencrypted_protocol)?;
412 }
413
414 encryptor.encrypt(buffer.as_ref(), module_aad)
415}
416
417pub(crate) fn get_column_crypto_metadata(
419 properties: &FileEncryptionProperties,
420 column: &ColumnDescPtr,
421) -> Option<ColumnCryptoMetaData> {
422 if properties.column_keys.is_empty() {
423 Some(ColumnCryptoMetaData::EncryptionWithFooterKey)
425 } else {
426 properties
427 .column_keys
428 .get(&column.path().string())
429 .map(|encryption_key| {
430 ColumnCryptoMetaData::EncryptionWithColumnKey(EncryptionWithColumnKey {
432 path_in_schema: column.path().parts().to_vec(),
433 key_metadata: encryption_key.key_metadata.clone(),
434 })
435 })
436 }
437}