Skip to main content

parquet/encryption/
decrypt.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Configuration and utilities for decryption of files using Parquet Modular Encryption
19
20use crate::encryption::ciphers::{BlockDecryptor, RingGcmBlockDecryptor, TAG_LEN};
21use crate::encryption::modules::{ModuleType, create_footer_aad, create_module_aad};
22use crate::errors::{ParquetError, Result};
23use crate::file::column_crypto_metadata::ColumnCryptoMetaData;
24use crate::file::metadata::HeapSize;
25use std::borrow::Cow;
26use std::collections::HashMap;
27use std::fmt::Formatter;
28use std::io::Read;
29use std::sync::Arc;
30
31/// Trait for retrieving an encryption key using the key's metadata
32///
33/// # Example
34///
35/// This shows how you might use a `KeyRetriever` to decrypt a Parquet file
36/// if you have a set of known encryption keys with identifiers, but at read time
37/// you may not know which columns were encrypted and which keys were used.
38///
39/// In practice, the key metadata might instead store an encrypted key that must
40/// be decrypted with a Key Management Server.
41///
42/// ```
43/// # use std::collections::HashMap;
44/// # use std::sync::{Arc, Mutex};
45/// # use parquet::encryption::decrypt::{FileDecryptionProperties, KeyRetriever};
46/// # use parquet::encryption::encrypt::FileEncryptionProperties;
47/// # use parquet::errors::ParquetError;
48/// // Define known encryption keys
49/// let mut keys = HashMap::new();
50/// keys.insert("kf".to_owned(), b"0123456789012345".to_vec());
51/// keys.insert("kc1".to_owned(), b"1234567890123450".to_vec());
52/// keys.insert("kc2".to_owned(), b"1234567890123451".to_vec());
53///
54/// // Create encryption properties for writing a file,
55/// // and specify the key identifiers as the key metadata.
56/// let encryption_properties = FileEncryptionProperties::builder(keys.get("kf").unwrap().clone())
57///     .with_footer_key_metadata("kf".into())
58///     .with_column_key_and_metadata("x", keys.get("kc1").unwrap().clone(), "kc1".as_bytes().into())
59///     .with_column_key_and_metadata("y", keys.get("kc2").unwrap().clone(), "kc2".as_bytes().into())
60///     .build()?;
61///
62/// // Write an encrypted file with the properties
63/// // ...
64///
65/// // Define a KeyRetriever that can get encryption keys using their identifiers
66/// struct CustomKeyRetriever {
67///     keys: Mutex<HashMap<String, Vec<u8>>>,
68/// }
69///
70/// impl KeyRetriever for CustomKeyRetriever {
71///     fn retrieve_key(&self, key_metadata: &[u8]) -> parquet::errors::Result<Vec<u8>> {
72///         // Metadata is bytes, so convert it to a string identifier
73///         let key_metadata = std::str::from_utf8(key_metadata).map_err(|e| {
74///             ParquetError::General(format!("Could not convert key metadata to string: {e}"))
75///         })?;
76///         // Lookup the key
77///         let keys = self.keys.lock().unwrap();
78///         match keys.get(key_metadata) {
79///             Some(key) => Ok(key.clone()),
80///             None => Err(ParquetError::General(format!(
81///                 "Could not retrieve key for metadata {key_metadata:?}"
82///             ))),
83///         }
84///     }
85/// }
86///
87/// let key_retriever = Arc::new(CustomKeyRetriever {
88///     keys: Mutex::new(keys),
89/// });
90///
91/// // Create decryption properties for reading an encrypted file.
92/// // Note that we don't need to specify which columns are encrypted,
93/// // this is determined by the file metadata, and the required keys will be retrieved
94/// // dynamically using our key retriever.
95/// let decryption_properties = FileDecryptionProperties::with_key_retriever(key_retriever)
96///     .build()?;
97///
98/// // Read an encrypted file with the decryption properties
99/// // ...
100///
101/// # Ok::<(), parquet::errors::ParquetError>(())
102/// ```
103pub trait KeyRetriever: Send + Sync {
104    /// Retrieve a decryption key given the key metadata
105    fn retrieve_key(&self, key_metadata: &[u8]) -> Result<Vec<u8>>;
106}
107
108pub(crate) fn read_and_decrypt<T: Read>(
109    decryptor: &Arc<dyn BlockDecryptor>,
110    input: &mut T,
111    aad: &[u8],
112) -> Result<Vec<u8>> {
113    let mut len_bytes = [0; 4];
114    input.read_exact(&mut len_bytes)?;
115    let ciphertext_len = u32::from_le_bytes(len_bytes) as usize;
116    let mut ciphertext = vec![0; 4 + ciphertext_len];
117    input.read_exact(&mut ciphertext[4..])?;
118
119    decryptor.decrypt(&ciphertext, aad.as_ref())
120}
121
122// CryptoContext is a data structure that holds the context required to
123// decrypt parquet modules (data pages, dictionary pages, etc.).
124#[derive(Debug, Clone)]
125pub(crate) struct CryptoContext {
126    pub(crate) row_group_idx: usize,
127    pub(crate) column_ordinal: usize,
128    pub(crate) page_ordinal: Option<usize>,
129    pub(crate) dictionary_page: bool,
130    // We have separate data and metadata decryptors because
131    // in GCM CTR mode, the metadata and data pages use
132    // different algorithms.
133    data_decryptor: Arc<dyn BlockDecryptor>,
134    metadata_decryptor: Arc<dyn BlockDecryptor>,
135    file_aad: Vec<u8>,
136}
137
138impl CryptoContext {
139    pub(crate) fn for_column(
140        file_decryptor: &FileDecryptor,
141        column_crypto_metadata: &ColumnCryptoMetaData,
142        row_group_idx: usize,
143        column_ordinal: usize,
144    ) -> Result<Self> {
145        let (data_decryptor, metadata_decryptor) = match column_crypto_metadata {
146            ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY => {
147                // TODO: In GCM-CTR mode will this need to be a non-GCM decryptor?
148                let data_decryptor = file_decryptor.get_footer_decryptor()?;
149                let metadata_decryptor = file_decryptor.get_footer_decryptor()?;
150                (data_decryptor, metadata_decryptor)
151            }
152            ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(column_key_encryption) => {
153                let key_metadata = &column_key_encryption.key_metadata;
154                let full_column_name;
155                let column_name = if column_key_encryption.path_in_schema.len() == 1 {
156                    &column_key_encryption.path_in_schema[0]
157                } else {
158                    full_column_name = column_key_encryption.path_in_schema.join(".");
159                    &full_column_name
160                };
161                let data_decryptor = file_decryptor
162                    .get_column_data_decryptor(column_name, key_metadata.as_deref())?;
163                let metadata_decryptor = file_decryptor
164                    .get_column_metadata_decryptor(column_name, key_metadata.as_deref())?;
165                (data_decryptor, metadata_decryptor)
166            }
167        };
168
169        Ok(CryptoContext {
170            row_group_idx,
171            column_ordinal,
172            page_ordinal: None,
173            dictionary_page: false,
174            data_decryptor,
175            metadata_decryptor,
176            file_aad: file_decryptor.file_aad().clone(),
177        })
178    }
179
180    pub(crate) fn with_page_ordinal(&self, page_ordinal: usize) -> Self {
181        Self {
182            row_group_idx: self.row_group_idx,
183            column_ordinal: self.column_ordinal,
184            page_ordinal: Some(page_ordinal),
185            dictionary_page: false,
186            data_decryptor: self.data_decryptor.clone(),
187            metadata_decryptor: self.metadata_decryptor.clone(),
188            file_aad: self.file_aad.clone(),
189        }
190    }
191
192    pub(crate) fn create_page_header_aad(&self) -> Result<Vec<u8>> {
193        let module_type = if self.dictionary_page {
194            ModuleType::DictionaryPageHeader
195        } else {
196            ModuleType::DataPageHeader
197        };
198
199        create_module_aad(
200            self.file_aad(),
201            module_type,
202            self.row_group_idx,
203            self.column_ordinal,
204            self.page_ordinal,
205        )
206    }
207
208    pub(crate) fn create_page_aad(&self) -> Result<Vec<u8>> {
209        let module_type = if self.dictionary_page {
210            ModuleType::DictionaryPage
211        } else {
212            ModuleType::DataPage
213        };
214
215        create_module_aad(
216            self.file_aad(),
217            module_type,
218            self.row_group_idx,
219            self.column_ordinal,
220            self.page_ordinal,
221        )
222    }
223
224    pub(crate) fn create_column_index_aad(&self) -> Result<Vec<u8>> {
225        create_module_aad(
226            self.file_aad(),
227            ModuleType::ColumnIndex,
228            self.row_group_idx,
229            self.column_ordinal,
230            self.page_ordinal,
231        )
232    }
233
234    pub(crate) fn create_offset_index_aad(&self) -> Result<Vec<u8>> {
235        create_module_aad(
236            self.file_aad(),
237            ModuleType::OffsetIndex,
238            self.row_group_idx,
239            self.column_ordinal,
240            self.page_ordinal,
241        )
242    }
243
244    pub(crate) fn for_dictionary_page(&self) -> Self {
245        Self {
246            row_group_idx: self.row_group_idx,
247            column_ordinal: self.column_ordinal,
248            page_ordinal: self.page_ordinal,
249            dictionary_page: true,
250            data_decryptor: self.data_decryptor.clone(),
251            metadata_decryptor: self.metadata_decryptor.clone(),
252            file_aad: self.file_aad.clone(),
253        }
254    }
255
256    pub(crate) fn data_decryptor(&self) -> &Arc<dyn BlockDecryptor> {
257        &self.data_decryptor
258    }
259
260    pub(crate) fn metadata_decryptor(&self) -> &Arc<dyn BlockDecryptor> {
261        &self.metadata_decryptor
262    }
263
264    pub(crate) fn file_aad(&self) -> &Vec<u8> {
265        &self.file_aad
266    }
267}
268
269#[derive(Clone, PartialEq)]
270struct ExplicitDecryptionKeys {
271    footer_key: Vec<u8>,
272    column_keys: HashMap<String, Vec<u8>>,
273}
274
275impl HeapSize for ExplicitDecryptionKeys {
276    fn heap_size(&self) -> usize {
277        self.footer_key.heap_size() + self.column_keys.heap_size()
278    }
279}
280
281#[derive(Clone)]
282enum DecryptionKeys {
283    Explicit(ExplicitDecryptionKeys),
284    ViaRetriever(Arc<dyn KeyRetriever>),
285}
286
287impl PartialEq for DecryptionKeys {
288    fn eq(&self, other: &Self) -> bool {
289        match (self, other) {
290            (DecryptionKeys::Explicit(keys), DecryptionKeys::Explicit(other_keys)) => {
291                keys.footer_key == other_keys.footer_key
292                    && keys.column_keys == other_keys.column_keys
293            }
294            (DecryptionKeys::ViaRetriever(_), DecryptionKeys::ViaRetriever(_)) => true,
295            _ => false,
296        }
297    }
298}
299
300impl HeapSize for DecryptionKeys {
301    fn heap_size(&self) -> usize {
302        match self {
303            Self::Explicit(keys) => keys.heap_size(),
304            Self::ViaRetriever(_) => {
305                // The retriever is a user-defined type we don't control,
306                // so we can't determine the heap size.
307                0
308            }
309        }
310    }
311}
312
313/// `FileDecryptionProperties` hold keys and AAD data required to decrypt a Parquet file.
314///
315/// When reading Arrow data, the `FileDecryptionProperties` should be included in the
316/// [`ArrowReaderOptions`](crate::arrow::arrow_reader::ArrowReaderOptions) using
317/// [`with_file_decryption_properties`](crate::arrow::arrow_reader::ArrowReaderOptions::with_file_decryption_properties).
318///
319/// # Examples
320///
321/// Create `FileDecryptionProperties` for a file encrypted with uniform encryption,
322/// where all metadata and data are encrypted with the footer key:
323/// ```
324/// # use parquet::encryption::decrypt::FileDecryptionProperties;
325/// let file_encryption_properties = FileDecryptionProperties::builder(b"0123456789012345".into())
326///     .build()?;
327/// # Ok::<(), parquet::errors::ParquetError>(())
328/// ```
329///
330/// Create properties for a file where columns are encrypted with different keys:
331/// ```
332/// # use parquet::encryption::decrypt::FileDecryptionProperties;
333/// let file_encryption_properties = FileDecryptionProperties::builder(b"0123456789012345".into())
334///     .with_column_key("x", b"1234567890123450".into())
335///     .with_column_key("y", b"1234567890123451".into())
336///     .build()?;
337/// # Ok::<(), parquet::errors::ParquetError>(())
338/// ```
339///
340/// Specify additional authenticated data, used to protect against data replacement.
341/// This must match the AAD prefix provided when the file was written, otherwise
342/// data decryption will fail.
343/// ```
344/// # use parquet::encryption::decrypt::FileDecryptionProperties;
345/// let file_encryption_properties = FileDecryptionProperties::builder(b"0123456789012345".into())
346///     .with_aad_prefix("example_file".into())
347///     .build()?;
348/// # Ok::<(), parquet::errors::ParquetError>(())
349/// ```
350#[derive(Clone, PartialEq)]
351pub struct FileDecryptionProperties {
352    keys: DecryptionKeys,
353    aad_prefix: Option<Vec<u8>>,
354    footer_signature_verification: bool,
355}
356
357impl HeapSize for FileDecryptionProperties {
358    fn heap_size(&self) -> usize {
359        self.keys.heap_size() + self.aad_prefix.heap_size()
360    }
361}
362impl FileDecryptionProperties {
363    /// Returns a new [`FileDecryptionProperties`] builder that will use the provided key to
364    /// decrypt footer metadata.
365    pub fn builder(footer_key: Vec<u8>) -> DecryptionPropertiesBuilder {
366        DecryptionPropertiesBuilder::new(footer_key)
367    }
368
369    /// Returns a new [`FileDecryptionProperties`] builder that uses a [`KeyRetriever`]
370    /// to get decryption keys based on key metadata.
371    pub fn with_key_retriever(
372        key_retriever: Arc<dyn KeyRetriever>,
373    ) -> DecryptionPropertiesBuilderWithRetriever {
374        DecryptionPropertiesBuilderWithRetriever::new(key_retriever)
375    }
376
377    /// AAD prefix string uniquely identifies the file and prevents file swapping
378    pub fn aad_prefix(&self) -> Option<&Vec<u8>> {
379        self.aad_prefix.as_ref()
380    }
381
382    /// Returns true if footer signature verification is enabled for files with plaintext footers.
383    pub fn check_plaintext_footer_integrity(&self) -> bool {
384        self.footer_signature_verification
385    }
386
387    /// Get the encryption key for decrypting a file's footer,
388    /// and also column data if uniform encryption is used.
389    pub fn footer_key(&self, key_metadata: Option<&[u8]>) -> Result<Cow<'_, Vec<u8>>> {
390        match &self.keys {
391            DecryptionKeys::Explicit(keys) => Ok(Cow::Borrowed(&keys.footer_key)),
392            DecryptionKeys::ViaRetriever(retriever) => {
393                let key = retriever.retrieve_key(key_metadata.unwrap_or_default())?;
394                Ok(Cow::Owned(key))
395            }
396        }
397    }
398
399    /// Get the column-specific encryption key for decrypting column data and metadata within a file
400    pub fn column_key(
401        &self,
402        column_name: &str,
403        key_metadata: Option<&[u8]>,
404    ) -> Result<Cow<'_, Vec<u8>>> {
405        match &self.keys {
406            DecryptionKeys::Explicit(keys) => match keys.column_keys.get(column_name) {
407                None => Err(general_err!(
408                    "No column decryption key set for encrypted column '{}'",
409                    column_name
410                )),
411                Some(key) => Ok(Cow::Borrowed(key)),
412            },
413            DecryptionKeys::ViaRetriever(retriever) => {
414                let key = retriever.retrieve_key(key_metadata.unwrap_or_default())?;
415                Ok(Cow::Owned(key))
416            }
417        }
418    }
419
420    /// Get the column names and associated decryption keys that have been configured.
421    /// If a key retriever is used rather than explicit decryption keys, the result
422    /// will be empty.
423    /// Provided for testing consumer code.
424    pub fn column_keys(&self) -> (Vec<String>, Vec<Vec<u8>>) {
425        let mut column_names: Vec<String> = Vec::new();
426        let mut column_keys: Vec<Vec<u8>> = Vec::new();
427        if let DecryptionKeys::Explicit(keys) = &self.keys {
428            for (key, value) in keys.column_keys.iter() {
429                column_names.push(key.clone());
430                column_keys.push(value.clone());
431            }
432        }
433        (column_names, column_keys)
434    }
435}
436
437impl std::fmt::Debug for FileDecryptionProperties {
438    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
439        write!(f, "FileDecryptionProperties {{ }}")
440    }
441}
442
443/// Builder for [`FileDecryptionProperties`]
444///
445/// See [`FileDecryptionProperties`] for example usage.
446pub struct DecryptionPropertiesBuilder {
447    footer_key: Vec<u8>,
448    column_keys: HashMap<String, Vec<u8>>,
449    aad_prefix: Option<Vec<u8>>,
450    footer_signature_verification: bool,
451}
452
453impl DecryptionPropertiesBuilder {
454    /// Create a new [`DecryptionPropertiesBuilder`] builder that will use the provided key to
455    /// decrypt footer metadata.
456    pub fn new(footer_key: Vec<u8>) -> DecryptionPropertiesBuilder {
457        Self {
458            footer_key,
459            column_keys: HashMap::default(),
460            aad_prefix: None,
461            footer_signature_verification: true,
462        }
463    }
464
465    /// Finalize the builder and return created [`FileDecryptionProperties`]
466    pub fn build(self) -> Result<Arc<FileDecryptionProperties>> {
467        let keys = DecryptionKeys::Explicit(ExplicitDecryptionKeys {
468            footer_key: self.footer_key,
469            column_keys: self.column_keys,
470        });
471        Ok(Arc::new(FileDecryptionProperties {
472            keys,
473            aad_prefix: self.aad_prefix,
474            footer_signature_verification: self.footer_signature_verification,
475        }))
476    }
477
478    /// Specify the expected AAD prefix to be used for decryption.
479    /// This must be set if the file was written with an AAD prefix and the
480    /// prefix is not stored in the file metadata.
481    pub fn with_aad_prefix(mut self, value: Vec<u8>) -> Self {
482        self.aad_prefix = Some(value);
483        self
484    }
485
486    /// Specify the decryption key to use for a column
487    pub fn with_column_key(mut self, column_name: &str, decryption_key: Vec<u8>) -> Self {
488        self.column_keys
489            .insert(column_name.to_string(), decryption_key);
490        self
491    }
492
493    /// Specify multiple column decryption keys
494    pub fn with_column_keys(mut self, column_names: Vec<&str>, keys: Vec<Vec<u8>>) -> Result<Self> {
495        if column_names.len() != keys.len() {
496            return Err(general_err!(
497                "The number of column names ({}) does not match the number of keys ({})",
498                column_names.len(),
499                keys.len()
500            ));
501        }
502        for (column_name, key) in column_names.into_iter().zip(keys.into_iter()) {
503            self.column_keys.insert(column_name.to_string(), key);
504        }
505        Ok(self)
506    }
507
508    /// Disable verification of footer tags for files that use plaintext footers.
509    /// Signature verification is enabled by default.
510    pub fn disable_footer_signature_verification(mut self) -> Self {
511        self.footer_signature_verification = false;
512        self
513    }
514}
515
516/// Builder for [`FileDecryptionProperties`] that uses a [`KeyRetriever`]
517///
518/// See the [`KeyRetriever`] documentation for example usage.
519pub struct DecryptionPropertiesBuilderWithRetriever {
520    key_retriever: Arc<dyn KeyRetriever>,
521    aad_prefix: Option<Vec<u8>>,
522    footer_signature_verification: bool,
523}
524
525impl DecryptionPropertiesBuilderWithRetriever {
526    /// Create a new [`DecryptionPropertiesBuilderWithRetriever`] by providing a [`KeyRetriever`] that
527    /// can be used to get decryption keys based on key metadata.
528    pub fn new(key_retriever: Arc<dyn KeyRetriever>) -> DecryptionPropertiesBuilderWithRetriever {
529        Self {
530            key_retriever,
531            aad_prefix: None,
532            footer_signature_verification: true,
533        }
534    }
535
536    /// Finalize the builder and return created [`FileDecryptionProperties`]
537    pub fn build(self) -> Result<Arc<FileDecryptionProperties>> {
538        let keys = DecryptionKeys::ViaRetriever(self.key_retriever);
539        Ok(Arc::new(FileDecryptionProperties {
540            keys,
541            aad_prefix: self.aad_prefix,
542            footer_signature_verification: self.footer_signature_verification,
543        }))
544    }
545
546    /// Specify the expected AAD prefix to be used for decryption.
547    /// This must be set if the file was written with an AAD prefix and the
548    /// prefix is not stored in the file metadata.
549    pub fn with_aad_prefix(mut self, value: Vec<u8>) -> Self {
550        self.aad_prefix = Some(value);
551        self
552    }
553
554    /// Disable verification of footer tags for files that use plaintext footers.
555    /// Signature verification is enabled by default.
556    pub fn disable_footer_signature_verification(mut self) -> Self {
557        self.footer_signature_verification = false;
558        self
559    }
560}
561
562#[derive(Clone, Debug)]
563pub(crate) struct FileDecryptor {
564    decryption_properties: Arc<FileDecryptionProperties>,
565    footer_decryptor: Arc<dyn BlockDecryptor>,
566    file_aad: Vec<u8>,
567}
568
569impl PartialEq for FileDecryptor {
570    fn eq(&self, other: &Self) -> bool {
571        self.decryption_properties == other.decryption_properties && self.file_aad == other.file_aad
572    }
573}
574
575/// Estimate the size in bytes required for the file decryptor.
576/// This is important to track the memory usage of cached Parquet meta data,
577/// and is used via [`crate::file::metadata::ParquetMetaData::memory_size`].
578/// Note that when a [`KeyRetriever`] is used, its heap size won't be included
579/// and the result will be an underestimate.
580/// If the [`FileDecryptionProperties`] are shared between multiple files then the
581/// heap size may also be an overestimate.
582impl HeapSize for FileDecryptor {
583    fn heap_size(&self) -> usize {
584        self.decryption_properties.heap_size()
585            + (Arc::clone(&self.footer_decryptor) as Arc<dyn HeapSize>).heap_size()
586            + self.file_aad.heap_size()
587    }
588}
589
590impl FileDecryptor {
591    pub(crate) fn new(
592        decryption_properties: &Arc<FileDecryptionProperties>,
593        footer_key_metadata: Option<&[u8]>,
594        aad_file_unique: Vec<u8>,
595        aad_prefix: Vec<u8>,
596    ) -> Result<Self> {
597        let file_aad = [aad_prefix.as_slice(), aad_file_unique.as_slice()].concat();
598        let footer_key = decryption_properties.footer_key(footer_key_metadata)?;
599        let footer_decryptor = RingGcmBlockDecryptor::new(&footer_key).map_err(|e| {
600            general_err!(
601                "Invalid footer key. {}",
602                e.to_string().replace("Parquet error: ", "")
603            )
604        })?;
605
606        Ok(Self {
607            footer_decryptor: Arc::new(footer_decryptor),
608            decryption_properties: Arc::clone(decryption_properties),
609            file_aad,
610        })
611    }
612
613    pub(crate) fn get_footer_decryptor(&self) -> Result<Arc<dyn BlockDecryptor>> {
614        Ok(self.footer_decryptor.clone())
615    }
616
617    /// Verify the signature of the footer
618    pub(crate) fn verify_plaintext_footer_signature(&self, plaintext_footer: &[u8]) -> Result<()> {
619        // Plaintext footer format is: [plaintext metadata, nonce, authentication tag]
620        let tag = &plaintext_footer[plaintext_footer.len() - TAG_LEN..];
621        let aad = create_footer_aad(self.file_aad())?;
622        let footer_decryptor = self.get_footer_decryptor()?;
623
624        let computed_tag = footer_decryptor.compute_plaintext_tag(&aad, plaintext_footer)?;
625
626        if computed_tag != tag {
627            return Err(general_err!(
628                "Footer signature verification failed. Computed: {:?}, Expected: {:?}",
629                computed_tag,
630                tag
631            ));
632        }
633        Ok(())
634    }
635
636    pub(crate) fn get_column_data_decryptor(
637        &self,
638        column_name: &str,
639        key_metadata: Option<&[u8]>,
640    ) -> Result<Arc<dyn BlockDecryptor>> {
641        let column_key = self
642            .decryption_properties
643            .column_key(column_name, key_metadata)?;
644        Ok(Arc::new(RingGcmBlockDecryptor::new(&column_key)?))
645    }
646
647    pub(crate) fn get_column_metadata_decryptor(
648        &self,
649        column_name: &str,
650        key_metadata: Option<&[u8]>,
651    ) -> Result<Arc<dyn BlockDecryptor>> {
652        // Once GCM CTR mode is implemented, data and metadata decryptors may be different
653        self.get_column_data_decryptor(column_name, key_metadata)
654    }
655
656    pub(crate) fn file_aad(&self) -> &Vec<u8> {
657        &self.file_aad
658    }
659}