parquet/column/
page_encryption.rs1use crate::column::page::CompressedPage;
19use crate::encryption::ciphers::BlockEncryptor;
20use crate::encryption::encrypt::{encrypt_object, FileEncryptor};
21use crate::encryption::modules::{create_module_aad, ModuleType};
22use crate::errors::ParquetError;
23use crate::errors::Result;
24use crate::format::PageHeader;
25use crate::format::PageType;
26use bytes::Bytes;
27use std::io::Write;
28use std::sync::Arc;
29
30#[derive(Debug)]
31pub(crate) struct PageEncryptor {
33 file_encryptor: Arc<FileEncryptor>,
34 block_encryptor: Box<dyn BlockEncryptor>,
35 row_group_index: usize,
36 column_index: usize,
37 page_index: usize,
38}
39
40impl PageEncryptor {
41 pub fn create_if_column_encrypted(
43 file_encryptor: &Option<Arc<FileEncryptor>>,
44 row_group_index: usize,
45 column_index: usize,
46 column_path: &str,
47 ) -> Result<Option<Self>> {
48 match file_encryptor {
49 Some(file_encryptor) if file_encryptor.is_column_encrypted(column_path) => {
50 let block_encryptor = file_encryptor.get_column_encryptor(column_path)?;
51 Ok(Some(Self {
52 file_encryptor: file_encryptor.clone(),
53 block_encryptor,
54 row_group_index,
55 column_index,
56 page_index: 0,
57 }))
58 }
59 _ => Ok(None),
60 }
61 }
62
63 pub fn increment_page(&mut self) {
65 self.page_index += 1;
66 }
67
68 fn encrypt_page(&mut self, page: &CompressedPage) -> Result<Vec<u8>> {
69 let module_type = if page.compressed_page().is_data_page() {
70 ModuleType::DataPage
71 } else {
72 ModuleType::DictionaryPage
73 };
74 let aad = create_module_aad(
75 self.file_encryptor.file_aad(),
76 module_type,
77 self.row_group_index,
78 self.column_index,
79 Some(self.page_index),
80 )?;
81 let encrypted_buffer = self.block_encryptor.encrypt(page.data(), &aad)?;
82
83 Ok(encrypted_buffer)
84 }
85
86 pub fn encrypt_compressed_page(&mut self, page: CompressedPage) -> Result<CompressedPage> {
88 let encrypted_page = self.encrypt_page(&page)?;
89 Ok(page.with_new_compressed_buffer(Bytes::from(encrypted_page)))
90 }
91
92 pub fn encrypt_page_header<W: Write>(
94 &mut self,
95 page_header: &PageHeader,
96 sink: &mut W,
97 ) -> Result<()> {
98 let module_type = match page_header.type_ {
99 PageType::DATA_PAGE => ModuleType::DataPageHeader,
100 PageType::DATA_PAGE_V2 => ModuleType::DataPageHeader,
101 PageType::DICTIONARY_PAGE => ModuleType::DictionaryPageHeader,
102 _ => {
103 return Err(general_err!(
104 "Unsupported page type for page header encryption: {:?}",
105 page_header.type_
106 ))
107 }
108 };
109 let aad = create_module_aad(
110 self.file_encryptor.file_aad(),
111 module_type,
112 self.row_group_index,
113 self.column_index,
114 Some(self.page_index),
115 )?;
116
117 encrypt_object(page_header, &mut self.block_encryptor, sink, &aad)
118 }
119}