1use crate::encryption::ciphers::{BlockDecryptor, RingGcmBlockDecryptor, TAG_LEN};
21use crate::encryption::modules::{ModuleType, create_footer_aad, create_module_aad};
22use crate::errors::{ParquetError, Result};
23use crate::file::column_crypto_metadata::ColumnCryptoMetaData;
24use crate::file::metadata::HeapSize;
25use std::borrow::Cow;
26use std::collections::HashMap;
27use std::fmt::Formatter;
28use std::io::Read;
29use std::sync::Arc;
30
31pub trait KeyRetriever: Send + Sync {
104 fn retrieve_key(&self, key_metadata: &[u8]) -> Result<Vec<u8>>;
106}
107
108pub(crate) fn read_and_decrypt<T: Read>(
109 decryptor: &Arc<dyn BlockDecryptor>,
110 input: &mut T,
111 aad: &[u8],
112) -> Result<Vec<u8>> {
113 let mut len_bytes = [0; 4];
114 input.read_exact(&mut len_bytes)?;
115 let ciphertext_len = u32::from_le_bytes(len_bytes) as usize;
116 let mut ciphertext = vec![0; 4 + ciphertext_len];
117 input.read_exact(&mut ciphertext[4..])?;
118
119 decryptor.decrypt(&ciphertext, aad.as_ref())
120}
121
122#[derive(Debug, Clone)]
125pub(crate) struct CryptoContext {
126 pub(crate) row_group_idx: usize,
127 pub(crate) column_ordinal: usize,
128 pub(crate) page_ordinal: Option<usize>,
129 pub(crate) dictionary_page: bool,
130 data_decryptor: Arc<dyn BlockDecryptor>,
134 metadata_decryptor: Arc<dyn BlockDecryptor>,
135 file_aad: Vec<u8>,
136}
137
138impl CryptoContext {
139 pub(crate) fn for_column(
140 file_decryptor: &FileDecryptor,
141 column_crypto_metadata: &ColumnCryptoMetaData,
142 row_group_idx: usize,
143 column_ordinal: usize,
144 ) -> Result<Self> {
145 let (data_decryptor, metadata_decryptor) = match column_crypto_metadata {
146 ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY => {
147 let data_decryptor = file_decryptor.get_footer_decryptor()?;
149 let metadata_decryptor = file_decryptor.get_footer_decryptor()?;
150 (data_decryptor, metadata_decryptor)
151 }
152 ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(column_key_encryption) => {
153 let key_metadata = &column_key_encryption.key_metadata;
154 let full_column_name;
155 let column_name = if column_key_encryption.path_in_schema.len() == 1 {
156 &column_key_encryption.path_in_schema[0]
157 } else {
158 full_column_name = column_key_encryption.path_in_schema.join(".");
159 &full_column_name
160 };
161 let data_decryptor = file_decryptor
162 .get_column_data_decryptor(column_name, key_metadata.as_deref())?;
163 let metadata_decryptor = file_decryptor
164 .get_column_metadata_decryptor(column_name, key_metadata.as_deref())?;
165 (data_decryptor, metadata_decryptor)
166 }
167 };
168
169 Ok(CryptoContext {
170 row_group_idx,
171 column_ordinal,
172 page_ordinal: None,
173 dictionary_page: false,
174 data_decryptor,
175 metadata_decryptor,
176 file_aad: file_decryptor.file_aad().clone(),
177 })
178 }
179
180 pub(crate) fn with_page_ordinal(&self, page_ordinal: usize) -> Self {
181 Self {
182 row_group_idx: self.row_group_idx,
183 column_ordinal: self.column_ordinal,
184 page_ordinal: Some(page_ordinal),
185 dictionary_page: false,
186 data_decryptor: self.data_decryptor.clone(),
187 metadata_decryptor: self.metadata_decryptor.clone(),
188 file_aad: self.file_aad.clone(),
189 }
190 }
191
192 pub(crate) fn create_page_header_aad(&self) -> Result<Vec<u8>> {
193 let module_type = if self.dictionary_page {
194 ModuleType::DictionaryPageHeader
195 } else {
196 ModuleType::DataPageHeader
197 };
198
199 create_module_aad(
200 self.file_aad(),
201 module_type,
202 self.row_group_idx,
203 self.column_ordinal,
204 self.page_ordinal,
205 )
206 }
207
208 pub(crate) fn create_page_aad(&self) -> Result<Vec<u8>> {
209 let module_type = if self.dictionary_page {
210 ModuleType::DictionaryPage
211 } else {
212 ModuleType::DataPage
213 };
214
215 create_module_aad(
216 self.file_aad(),
217 module_type,
218 self.row_group_idx,
219 self.column_ordinal,
220 self.page_ordinal,
221 )
222 }
223
224 pub(crate) fn create_column_index_aad(&self) -> Result<Vec<u8>> {
225 create_module_aad(
226 self.file_aad(),
227 ModuleType::ColumnIndex,
228 self.row_group_idx,
229 self.column_ordinal,
230 self.page_ordinal,
231 )
232 }
233
234 pub(crate) fn create_offset_index_aad(&self) -> Result<Vec<u8>> {
235 create_module_aad(
236 self.file_aad(),
237 ModuleType::OffsetIndex,
238 self.row_group_idx,
239 self.column_ordinal,
240 self.page_ordinal,
241 )
242 }
243
244 pub(crate) fn for_dictionary_page(&self) -> Self {
245 Self {
246 row_group_idx: self.row_group_idx,
247 column_ordinal: self.column_ordinal,
248 page_ordinal: self.page_ordinal,
249 dictionary_page: true,
250 data_decryptor: self.data_decryptor.clone(),
251 metadata_decryptor: self.metadata_decryptor.clone(),
252 file_aad: self.file_aad.clone(),
253 }
254 }
255
256 pub(crate) fn data_decryptor(&self) -> &Arc<dyn BlockDecryptor> {
257 &self.data_decryptor
258 }
259
260 pub(crate) fn metadata_decryptor(&self) -> &Arc<dyn BlockDecryptor> {
261 &self.metadata_decryptor
262 }
263
264 pub(crate) fn file_aad(&self) -> &Vec<u8> {
265 &self.file_aad
266 }
267}
268
269#[derive(Clone, PartialEq)]
270struct ExplicitDecryptionKeys {
271 footer_key: Vec<u8>,
272 column_keys: HashMap<String, Vec<u8>>,
273}
274
275impl HeapSize for ExplicitDecryptionKeys {
276 fn heap_size(&self) -> usize {
277 self.footer_key.heap_size() + self.column_keys.heap_size()
278 }
279}
280
281#[derive(Clone)]
282enum DecryptionKeys {
283 Explicit(ExplicitDecryptionKeys),
284 ViaRetriever(Arc<dyn KeyRetriever>),
285}
286
287impl PartialEq for DecryptionKeys {
288 fn eq(&self, other: &Self) -> bool {
289 match (self, other) {
290 (DecryptionKeys::Explicit(keys), DecryptionKeys::Explicit(other_keys)) => {
291 keys.footer_key == other_keys.footer_key
292 && keys.column_keys == other_keys.column_keys
293 }
294 (DecryptionKeys::ViaRetriever(_), DecryptionKeys::ViaRetriever(_)) => true,
295 _ => false,
296 }
297 }
298}
299
300impl HeapSize for DecryptionKeys {
301 fn heap_size(&self) -> usize {
302 match self {
303 Self::Explicit(keys) => keys.heap_size(),
304 Self::ViaRetriever(_) => {
305 0
308 }
309 }
310 }
311}
312
313#[derive(Clone, PartialEq)]
351pub struct FileDecryptionProperties {
352 keys: DecryptionKeys,
353 aad_prefix: Option<Vec<u8>>,
354 footer_signature_verification: bool,
355}
356
357impl HeapSize for FileDecryptionProperties {
358 fn heap_size(&self) -> usize {
359 self.keys.heap_size() + self.aad_prefix.heap_size()
360 }
361}
362impl FileDecryptionProperties {
363 pub fn builder(footer_key: Vec<u8>) -> DecryptionPropertiesBuilder {
366 DecryptionPropertiesBuilder::new(footer_key)
367 }
368
369 pub fn with_key_retriever(
372 key_retriever: Arc<dyn KeyRetriever>,
373 ) -> DecryptionPropertiesBuilderWithRetriever {
374 DecryptionPropertiesBuilderWithRetriever::new(key_retriever)
375 }
376
377 pub fn aad_prefix(&self) -> Option<&Vec<u8>> {
379 self.aad_prefix.as_ref()
380 }
381
382 pub fn check_plaintext_footer_integrity(&self) -> bool {
384 self.footer_signature_verification
385 }
386
387 pub fn footer_key(&self, key_metadata: Option<&[u8]>) -> Result<Cow<'_, Vec<u8>>> {
390 match &self.keys {
391 DecryptionKeys::Explicit(keys) => Ok(Cow::Borrowed(&keys.footer_key)),
392 DecryptionKeys::ViaRetriever(retriever) => {
393 let key = retriever.retrieve_key(key_metadata.unwrap_or_default())?;
394 Ok(Cow::Owned(key))
395 }
396 }
397 }
398
399 pub fn column_key(
401 &self,
402 column_name: &str,
403 key_metadata: Option<&[u8]>,
404 ) -> Result<Cow<'_, Vec<u8>>> {
405 match &self.keys {
406 DecryptionKeys::Explicit(keys) => match keys.column_keys.get(column_name) {
407 None => Err(general_err!(
408 "No column decryption key set for encrypted column '{}'",
409 column_name
410 )),
411 Some(key) => Ok(Cow::Borrowed(key)),
412 },
413 DecryptionKeys::ViaRetriever(retriever) => {
414 let key = retriever.retrieve_key(key_metadata.unwrap_or_default())?;
415 Ok(Cow::Owned(key))
416 }
417 }
418 }
419
420 pub fn column_keys(&self) -> (Vec<String>, Vec<Vec<u8>>) {
425 let mut column_names: Vec<String> = Vec::new();
426 let mut column_keys: Vec<Vec<u8>> = Vec::new();
427 if let DecryptionKeys::Explicit(keys) = &self.keys {
428 for (key, value) in keys.column_keys.iter() {
429 column_names.push(key.clone());
430 column_keys.push(value.clone());
431 }
432 }
433 (column_names, column_keys)
434 }
435}
436
437impl std::fmt::Debug for FileDecryptionProperties {
438 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
439 write!(f, "FileDecryptionProperties {{ }}")
440 }
441}
442
443pub struct DecryptionPropertiesBuilder {
447 footer_key: Vec<u8>,
448 column_keys: HashMap<String, Vec<u8>>,
449 aad_prefix: Option<Vec<u8>>,
450 footer_signature_verification: bool,
451}
452
453impl DecryptionPropertiesBuilder {
454 pub fn new(footer_key: Vec<u8>) -> DecryptionPropertiesBuilder {
457 Self {
458 footer_key,
459 column_keys: HashMap::default(),
460 aad_prefix: None,
461 footer_signature_verification: true,
462 }
463 }
464
465 pub fn build(self) -> Result<Arc<FileDecryptionProperties>> {
467 let keys = DecryptionKeys::Explicit(ExplicitDecryptionKeys {
468 footer_key: self.footer_key,
469 column_keys: self.column_keys,
470 });
471 Ok(Arc::new(FileDecryptionProperties {
472 keys,
473 aad_prefix: self.aad_prefix,
474 footer_signature_verification: self.footer_signature_verification,
475 }))
476 }
477
478 pub fn with_aad_prefix(mut self, value: Vec<u8>) -> Self {
482 self.aad_prefix = Some(value);
483 self
484 }
485
486 pub fn with_column_key(mut self, column_name: &str, decryption_key: Vec<u8>) -> Self {
488 self.column_keys
489 .insert(column_name.to_string(), decryption_key);
490 self
491 }
492
493 pub fn with_column_keys(mut self, column_names: Vec<&str>, keys: Vec<Vec<u8>>) -> Result<Self> {
495 if column_names.len() != keys.len() {
496 return Err(general_err!(
497 "The number of column names ({}) does not match the number of keys ({})",
498 column_names.len(),
499 keys.len()
500 ));
501 }
502 for (column_name, key) in column_names.into_iter().zip(keys.into_iter()) {
503 self.column_keys.insert(column_name.to_string(), key);
504 }
505 Ok(self)
506 }
507
508 pub fn disable_footer_signature_verification(mut self) -> Self {
511 self.footer_signature_verification = false;
512 self
513 }
514}
515
516pub struct DecryptionPropertiesBuilderWithRetriever {
520 key_retriever: Arc<dyn KeyRetriever>,
521 aad_prefix: Option<Vec<u8>>,
522 footer_signature_verification: bool,
523}
524
525impl DecryptionPropertiesBuilderWithRetriever {
526 pub fn new(key_retriever: Arc<dyn KeyRetriever>) -> DecryptionPropertiesBuilderWithRetriever {
529 Self {
530 key_retriever,
531 aad_prefix: None,
532 footer_signature_verification: true,
533 }
534 }
535
536 pub fn build(self) -> Result<Arc<FileDecryptionProperties>> {
538 let keys = DecryptionKeys::ViaRetriever(self.key_retriever);
539 Ok(Arc::new(FileDecryptionProperties {
540 keys,
541 aad_prefix: self.aad_prefix,
542 footer_signature_verification: self.footer_signature_verification,
543 }))
544 }
545
546 pub fn with_aad_prefix(mut self, value: Vec<u8>) -> Self {
550 self.aad_prefix = Some(value);
551 self
552 }
553
554 pub fn disable_footer_signature_verification(mut self) -> Self {
557 self.footer_signature_verification = false;
558 self
559 }
560}
561
562#[derive(Clone, Debug)]
563pub(crate) struct FileDecryptor {
564 decryption_properties: Arc<FileDecryptionProperties>,
565 footer_decryptor: Arc<dyn BlockDecryptor>,
566 file_aad: Vec<u8>,
567}
568
569impl PartialEq for FileDecryptor {
570 fn eq(&self, other: &Self) -> bool {
571 self.decryption_properties == other.decryption_properties && self.file_aad == other.file_aad
572 }
573}
574
575impl HeapSize for FileDecryptor {
583 fn heap_size(&self) -> usize {
584 self.decryption_properties.heap_size()
585 + (Arc::clone(&self.footer_decryptor) as Arc<dyn HeapSize>).heap_size()
586 + self.file_aad.heap_size()
587 }
588}
589
590impl FileDecryptor {
591 pub(crate) fn new(
592 decryption_properties: &Arc<FileDecryptionProperties>,
593 footer_key_metadata: Option<&[u8]>,
594 aad_file_unique: Vec<u8>,
595 aad_prefix: Vec<u8>,
596 ) -> Result<Self> {
597 let file_aad = [aad_prefix.as_slice(), aad_file_unique.as_slice()].concat();
598 let footer_key = decryption_properties.footer_key(footer_key_metadata)?;
599 let footer_decryptor = RingGcmBlockDecryptor::new(&footer_key).map_err(|e| {
600 general_err!(
601 "Invalid footer key. {}",
602 e.to_string().replace("Parquet error: ", "")
603 )
604 })?;
605
606 Ok(Self {
607 footer_decryptor: Arc::new(footer_decryptor),
608 decryption_properties: Arc::clone(decryption_properties),
609 file_aad,
610 })
611 }
612
613 pub(crate) fn get_footer_decryptor(&self) -> Result<Arc<dyn BlockDecryptor>> {
614 Ok(self.footer_decryptor.clone())
615 }
616
617 pub(crate) fn verify_plaintext_footer_signature(&self, plaintext_footer: &[u8]) -> Result<()> {
619 let tag = &plaintext_footer[plaintext_footer.len() - TAG_LEN..];
621 let aad = create_footer_aad(self.file_aad())?;
622 let footer_decryptor = self.get_footer_decryptor()?;
623
624 let computed_tag = footer_decryptor.compute_plaintext_tag(&aad, plaintext_footer)?;
625
626 if computed_tag != tag {
627 return Err(general_err!(
628 "Footer signature verification failed. Computed: {:?}, Expected: {:?}",
629 computed_tag,
630 tag
631 ));
632 }
633 Ok(())
634 }
635
636 pub(crate) fn get_column_data_decryptor(
637 &self,
638 column_name: &str,
639 key_metadata: Option<&[u8]>,
640 ) -> Result<Arc<dyn BlockDecryptor>> {
641 let column_key = self
642 .decryption_properties
643 .column_key(column_name, key_metadata)?;
644 Ok(Arc::new(RingGcmBlockDecryptor::new(&column_key)?))
645 }
646
647 pub(crate) fn get_column_metadata_decryptor(
648 &self,
649 column_name: &str,
650 key_metadata: Option<&[u8]>,
651 ) -> Result<Arc<dyn BlockDecryptor>> {
652 self.get_column_data_decryptor(column_name, key_metadata)
654 }
655
656 pub(crate) fn file_aad(&self) -> &Vec<u8> {
657 &self.file_aad
658 }
659}