1use crate::encryption::ciphers::{BlockEncryptor, RingGcmBlockEncryptor};
21use crate::errors::{ParquetError, Result};
22use crate::file::column_crypto_metadata::{ColumnCryptoMetaData, EncryptionWithColumnKey};
23use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
24use crate::thrift::TSerializable;
25use ring::rand::{SecureRandom, SystemRandom};
26use std::collections::{HashMap, HashSet};
27use std::io::Write;
28use thrift::protocol::TCompactOutputProtocol;
29
30#[derive(Debug, Clone, PartialEq)]
31struct EncryptionKey {
32 key: Vec<u8>,
33 key_metadata: Option<Vec<u8>>,
34}
35
36impl EncryptionKey {
37 fn new(key: Vec<u8>) -> EncryptionKey {
38 Self {
39 key,
40 key_metadata: None,
41 }
42 }
43
44 fn with_metadata(mut self, metadata: Vec<u8>) -> Self {
45 self.key_metadata = Some(metadata);
46 self
47 }
48
49 fn key(&self) -> &Vec<u8> {
50 &self.key
51 }
52}
53
54#[derive(Debug, Clone, PartialEq)]
55pub struct FileEncryptionProperties {
57 encrypt_footer: bool,
58 footer_key: EncryptionKey,
59 column_keys: HashMap<String, EncryptionKey>,
60 aad_prefix: Option<Vec<u8>>,
61 store_aad_prefix: bool,
62}
63
64impl FileEncryptionProperties {
65 pub fn builder(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder {
67 EncryptionPropertiesBuilder::new(footer_key)
68 }
69
70 pub fn encrypt_footer(&self) -> bool {
72 self.encrypt_footer
73 }
74
75 pub fn footer_key_metadata(&self) -> Option<&Vec<u8>> {
77 self.footer_key.key_metadata.as_ref()
78 }
79
80 pub fn footer_key(&self) -> &Vec<u8> {
82 &self.footer_key.key
83 }
84
85 pub fn column_keys(&self) -> (Vec<String>, Vec<Vec<u8>>, Vec<Vec<u8>>) {
87 let mut column_names: Vec<String> = Vec::with_capacity(self.column_keys.len());
88 let mut keys: Vec<Vec<u8>> = Vec::with_capacity(self.column_keys.len());
89 let mut meta: Vec<Vec<u8>> = Vec::with_capacity(self.column_keys.len());
90 for (key, value) in self.column_keys.iter() {
91 column_names.push(key.clone());
92 keys.push(value.key.clone());
93 if let Some(metadata) = value.key_metadata.as_ref() {
94 meta.push(metadata.clone());
95 }
96 }
97 (column_names, keys, meta)
98 }
99
100 pub fn aad_prefix(&self) -> Option<&Vec<u8>> {
102 self.aad_prefix.as_ref()
103 }
104
105 pub fn store_aad_prefix(&self) -> bool {
107 self.store_aad_prefix && self.aad_prefix.is_some()
108 }
109
110 pub(crate) fn validate_encrypted_column_names(
112 &self,
113 schema: &SchemaDescriptor,
114 ) -> std::result::Result<(), ParquetError> {
115 let column_paths = schema
116 .columns()
117 .iter()
118 .map(|c| c.path().string())
119 .collect::<HashSet<_>>();
120 let encryption_columns = self
121 .column_keys
122 .keys()
123 .cloned()
124 .collect::<HashSet<String>>();
125 if !encryption_columns.is_subset(&column_paths) {
126 let mut columns_missing_in_schema = encryption_columns
127 .difference(&column_paths)
128 .cloned()
129 .collect::<Vec<String>>();
130 columns_missing_in_schema.sort();
131 return Err(ParquetError::General(
132 format!(
133 "The following columns with encryption keys specified were not found in the schema: {}",
134 columns_missing_in_schema.join(", ")
135 )
136 .to_string(),
137 ));
138 }
139 Ok(())
140 }
141}
142
143pub struct EncryptionPropertiesBuilder {
145 encrypt_footer: bool,
146 footer_key: EncryptionKey,
147 column_keys: HashMap<String, EncryptionKey>,
148 aad_prefix: Option<Vec<u8>>,
149 store_aad_prefix: bool,
150}
151
152impl EncryptionPropertiesBuilder {
153 pub fn new(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder {
155 Self {
156 footer_key: EncryptionKey::new(footer_key),
157 column_keys: HashMap::default(),
158 aad_prefix: None,
159 encrypt_footer: true,
160 store_aad_prefix: false,
161 }
162 }
163
164 pub fn with_plaintext_footer(mut self, plaintext_footer: bool) -> Self {
166 self.encrypt_footer = !plaintext_footer;
167 self
168 }
169
170 pub fn with_footer_key_metadata(mut self, metadata: Vec<u8>) -> Self {
172 self.footer_key = self.footer_key.with_metadata(metadata);
173 self
174 }
175
176 pub fn with_column_key(mut self, column_name: &str, key: Vec<u8>) -> Self {
180 self.column_keys
181 .insert(column_name.to_string(), EncryptionKey::new(key));
182 self
183 }
184
185 pub fn with_column_key_and_metadata(
191 mut self,
192 column_name: &str,
193 key: Vec<u8>,
194 metadata: Vec<u8>,
195 ) -> Self {
196 self.column_keys.insert(
197 column_name.to_string(),
198 EncryptionKey::new(key).with_metadata(metadata),
199 );
200 self
201 }
202
203 pub fn with_column_keys(mut self, column_names: Vec<&str>, keys: Vec<Vec<u8>>) -> Result<Self> {
208 if column_names.len() != keys.len() {
209 return Err(general_err!(
210 "The number of column names ({}) does not match the number of keys ({})",
211 column_names.len(),
212 keys.len()
213 ));
214 }
215 for (i, column_name) in column_names.into_iter().enumerate() {
216 self.column_keys
217 .insert(column_name.to_string(), EncryptionKey::new(keys[i].clone()));
218 }
219 Ok(self)
220 }
221
222 pub fn with_aad_prefix(mut self, aad_prefix: Vec<u8>) -> Self {
227 self.aad_prefix = Some(aad_prefix);
228 self
229 }
230
231 pub fn with_aad_prefix_storage(mut self, store_aad_prefix: bool) -> Self {
234 self.store_aad_prefix = store_aad_prefix;
235 self
236 }
237
238 pub fn build(self) -> Result<FileEncryptionProperties> {
240 Ok(FileEncryptionProperties {
241 encrypt_footer: self.encrypt_footer,
242 footer_key: self.footer_key,
243 column_keys: self.column_keys,
244 aad_prefix: self.aad_prefix,
245 store_aad_prefix: self.store_aad_prefix,
246 })
247 }
248}
249
250#[derive(Debug)]
251pub(crate) struct FileEncryptor {
253 properties: FileEncryptionProperties,
254 aad_file_unique: Vec<u8>,
255 file_aad: Vec<u8>,
256}
257
258impl FileEncryptor {
259 pub(crate) fn new(properties: FileEncryptionProperties) -> Result<Self> {
260 let rng = SystemRandom::new();
262 let mut aad_file_unique = vec![0u8; 8];
263 rng.fill(&mut aad_file_unique)?;
264
265 let file_aad = match properties.aad_prefix.as_ref() {
266 None => aad_file_unique.clone(),
267 Some(aad_prefix) => [aad_prefix.clone(), aad_file_unique.clone()].concat(),
268 };
269
270 Ok(Self {
271 properties,
272 aad_file_unique,
273 file_aad,
274 })
275 }
276
277 pub fn properties(&self) -> &FileEncryptionProperties {
279 &self.properties
280 }
281
282 pub fn file_aad(&self) -> &[u8] {
284 &self.file_aad
285 }
286
287 pub fn aad_file_unique(&self) -> &Vec<u8> {
292 &self.aad_file_unique
293 }
294
295 pub fn is_column_encrypted(&self, column_path: &str) -> bool {
297 if self.properties.column_keys.is_empty() {
298 true
300 } else {
301 self.properties.column_keys.contains_key(column_path)
302 }
303 }
304
305 pub(crate) fn get_footer_encryptor(&self) -> Result<Box<dyn BlockEncryptor>> {
307 Ok(Box::new(RingGcmBlockEncryptor::new(
308 &self.properties.footer_key.key,
309 )?))
310 }
311
312 pub(crate) fn get_column_encryptor(
315 &self,
316 column_path: &str,
317 ) -> Result<Box<dyn BlockEncryptor>> {
318 if self.properties.column_keys.is_empty() {
319 return self.get_footer_encryptor();
320 }
321 match self.properties.column_keys.get(column_path) {
322 None => Err(general_err!("Column '{}' is not encrypted", column_path)),
323 Some(column_key) => Ok(Box::new(RingGcmBlockEncryptor::new(column_key.key())?)),
324 }
325 }
326}
327
328pub(crate) fn encrypt_object<T: TSerializable, W: Write>(
330 object: &T,
331 encryptor: &mut Box<dyn BlockEncryptor>,
332 sink: &mut W,
333 module_aad: &[u8],
334) -> Result<()> {
335 let encrypted_buffer = encrypt_object_to_vec(object, encryptor, module_aad)?;
336 sink.write_all(&encrypted_buffer)?;
337 Ok(())
338}
339
340pub(crate) fn encrypt_object_to_vec<T: TSerializable>(
342 object: &T,
343 encryptor: &mut Box<dyn BlockEncryptor>,
344 module_aad: &[u8],
345) -> Result<Vec<u8>> {
346 let mut buffer: Vec<u8> = vec![];
347 {
348 let mut unencrypted_protocol = TCompactOutputProtocol::new(&mut buffer);
349 object.write_to_out_protocol(&mut unencrypted_protocol)?;
350 }
351
352 encryptor.encrypt(buffer.as_ref(), module_aad)
353}
354
355pub(crate) fn get_column_crypto_metadata(
357 properties: &FileEncryptionProperties,
358 column: &ColumnDescPtr,
359) -> Option<ColumnCryptoMetaData> {
360 if properties.column_keys.is_empty() {
361 Some(ColumnCryptoMetaData::EncryptionWithFooterKey)
363 } else {
364 properties
365 .column_keys
366 .get(&column.path().string())
367 .map(|encryption_key| {
368 ColumnCryptoMetaData::EncryptionWithColumnKey(EncryptionWithColumnKey {
370 path_in_schema: column.path().parts().to_vec(),
371 key_metadata: encryption_key.key_metadata.clone(),
372 })
373 })
374 }
375}