parquet/column/
page_encryption.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::column::page::CompressedPage;
19use crate::encryption::ciphers::BlockEncryptor;
20use crate::encryption::encrypt::{encrypt_object, FileEncryptor};
21use crate::encryption::modules::{create_module_aad, ModuleType};
22use crate::errors::ParquetError;
23use crate::errors::Result;
24use crate::format::PageHeader;
25use crate::format::PageType;
26use bytes::Bytes;
27use std::io::Write;
28use std::sync::Arc;
29
30#[derive(Debug)]
31/// Encrypts page headers and page data for columns
32pub(crate) struct PageEncryptor {
33    file_encryptor: Arc<FileEncryptor>,
34    block_encryptor: Box<dyn BlockEncryptor>,
35    row_group_index: usize,
36    column_index: usize,
37    page_index: usize,
38}
39
40impl PageEncryptor {
41    /// Create a [`PageEncryptor`] for a column if it should be encrypted
42    pub fn create_if_column_encrypted(
43        file_encryptor: &Option<Arc<FileEncryptor>>,
44        row_group_index: usize,
45        column_index: usize,
46        column_path: &str,
47    ) -> Result<Option<Self>> {
48        match file_encryptor {
49            Some(file_encryptor) if file_encryptor.is_column_encrypted(column_path) => {
50                let block_encryptor = file_encryptor.get_column_encryptor(column_path)?;
51                Ok(Some(Self {
52                    file_encryptor: file_encryptor.clone(),
53                    block_encryptor,
54                    row_group_index,
55                    column_index,
56                    page_index: 0,
57                }))
58            }
59            _ => Ok(None),
60        }
61    }
62
63    /// Update the page index after a data page has been processed
64    pub fn increment_page(&mut self) {
65        self.page_index += 1;
66    }
67
68    fn encrypt_page(&mut self, page: &CompressedPage) -> Result<Vec<u8>> {
69        let module_type = if page.compressed_page().is_data_page() {
70            ModuleType::DataPage
71        } else {
72            ModuleType::DictionaryPage
73        };
74        let aad = create_module_aad(
75            self.file_encryptor.file_aad(),
76            module_type,
77            self.row_group_index,
78            self.column_index,
79            Some(self.page_index),
80        )?;
81        let encrypted_buffer = self.block_encryptor.encrypt(page.data(), &aad)?;
82
83        Ok(encrypted_buffer)
84    }
85
86    /// Encrypt compressed column page data
87    pub fn encrypt_compressed_page(&mut self, page: CompressedPage) -> Result<CompressedPage> {
88        let encrypted_page = self.encrypt_page(&page)?;
89        Ok(page.with_new_compressed_buffer(Bytes::from(encrypted_page)))
90    }
91
92    /// Encrypt a column page header
93    pub fn encrypt_page_header<W: Write>(
94        &mut self,
95        page_header: &PageHeader,
96        sink: &mut W,
97    ) -> Result<()> {
98        let module_type = match page_header.type_ {
99            PageType::DATA_PAGE => ModuleType::DataPageHeader,
100            PageType::DATA_PAGE_V2 => ModuleType::DataPageHeader,
101            PageType::DICTIONARY_PAGE => ModuleType::DictionaryPageHeader,
102            _ => {
103                return Err(general_err!(
104                    "Unsupported page type for page header encryption: {:?}",
105                    page_header.type_
106                ))
107            }
108        };
109        let aad = create_module_aad(
110            self.file_encryptor.file_aad(),
111            module_type,
112            self.row_group_index,
113            self.column_index,
114            Some(self.page_index),
115        )?;
116
117        encrypt_object(page_header, &mut self.block_encryptor, sink, &aad)
118    }
119}