parquet/file/metadata/thrift/
encryption.rs1use crate::{
21 encryption::decrypt::{FileDecryptionProperties, FileDecryptor},
22 errors::{ParquetError, Result},
23 file::{
24 column_crypto_metadata::ColumnCryptoMetaData,
25 metadata::{
26 HeapSize, ParquetMetaData, RowGroupMetaData,
27 thrift::{parquet_metadata_from_bytes, read_column_metadata, validate_column_metadata},
28 },
29 },
30 parquet_thrift::{
31 ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
32 ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
33 },
34 thrift_struct, thrift_union,
35};
36use std::io::Write;
37use std::sync::Arc;
38
39thrift_struct!(
40pub(crate) struct AesGcmV1 {
41 1: optional binary aad_prefix
43
44 2: optional binary aad_file_unique
46
47 3: optional bool supply_aad_prefix
50}
51);
52
53impl HeapSize for AesGcmV1 {
54 fn heap_size(&self) -> usize {
55 self.aad_prefix.heap_size()
56 + self.aad_file_unique.heap_size()
57 + self.supply_aad_prefix.heap_size()
58 }
59}
60
61thrift_struct!(
62pub(crate) struct AesGcmCtrV1 {
63 1: optional binary aad_prefix
65
66 2: optional binary aad_file_unique
68
69 3: optional bool supply_aad_prefix
72}
73);
74
75impl HeapSize for AesGcmCtrV1 {
76 fn heap_size(&self) -> usize {
77 self.aad_prefix.heap_size()
78 + self.aad_file_unique.heap_size()
79 + self.supply_aad_prefix.heap_size()
80 }
81}
82
83thrift_union!(
84union EncryptionAlgorithm {
85 1: (AesGcmV1) AES_GCM_V1
86 2: (AesGcmCtrV1) AES_GCM_CTR_V1
87}
88);
89
90impl HeapSize for EncryptionAlgorithm {
91 fn heap_size(&self) -> usize {
92 match self {
93 Self::AES_GCM_V1(gcm) => gcm.heap_size(),
94 Self::AES_GCM_CTR_V1(gcm_ctr) => gcm_ctr.heap_size(),
95 }
96 }
97}
98
99thrift_struct!(
100pub(crate) struct FileCryptoMetaData<'a> {
102 1: required EncryptionAlgorithm encryption_algorithm
106
107 2: optional binary<'a> key_metadata
110}
111);
112
113fn row_group_from_encrypted_thrift(
114 mut rg: RowGroupMetaData,
115 decryptor: Option<&FileDecryptor>,
116) -> Result<RowGroupMetaData> {
117 let schema_descr = rg.schema_descr;
118
119 if schema_descr.num_columns() != rg.columns.len() {
120 return Err(general_err!(
121 "Column count mismatch. Schema has {} columns while Row Group has {}",
122 schema_descr.num_columns(),
123 rg.columns.len()
124 ));
125 }
126 let total_byte_size = rg.total_byte_size;
127 let num_rows = rg.num_rows;
128 let mut columns = vec![];
129
130 for (i, (mut c, d)) in rg
131 .columns
132 .drain(0..)
133 .zip(schema_descr.columns())
134 .enumerate()
135 {
136 if let (true, Some(decryptor)) = (c.encrypted_column_metadata.is_some(), decryptor) {
138 let column_decryptor = match c.crypto_metadata() {
139 None => {
140 return Err(general_err!(
141 "No crypto_metadata is set for column '{}', which has encrypted metadata",
142 d.path().string()
143 ));
144 }
145 Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(crypto_metadata)) => {
146 let column_name = crypto_metadata.path_in_schema.join(".");
147 decryptor.get_column_metadata_decryptor(
148 column_name.as_str(),
149 crypto_metadata.key_metadata.as_deref(),
150 )?
151 }
152 Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
153 decryptor.get_footer_decryptor()?
154 }
155 };
156
157 let column_aad = crate::encryption::modules::create_module_aad(
158 decryptor.file_aad(),
159 crate::encryption::modules::ModuleType::ColumnMetaData,
160 rg.ordinal.unwrap() as usize,
161 i,
162 None,
163 )?;
164
165 let encrypted_column_metadata = c.encrypted_column_metadata.take();
167 let buf = encrypted_column_metadata.unwrap();
168 let decrypted_cc_buf = column_decryptor
169 .decrypt(&buf, column_aad.as_ref())
170 .map_err(|_| {
171 general_err!(
172 "Unable to decrypt column '{}', perhaps the column key is wrong?",
173 d.path().string()
174 )
175 })?;
176
177 let mut prot = ThriftSliceInputProtocol::new(&decrypted_cc_buf);
179 let mask = read_column_metadata(&mut prot, &mut c)?;
180 validate_column_metadata(mask)?;
181
182 columns.push(c);
183 } else {
184 columns.push(c);
185 }
186 }
187
188 let sorting_columns = rg.sorting_columns;
189 let file_offset = rg.file_offset;
190 let ordinal = rg.ordinal;
191
192 Ok(RowGroupMetaData {
193 columns,
194 num_rows,
195 sorting_columns,
196 total_byte_size,
197 schema_descr,
198 file_offset,
199 ordinal,
200 })
201}
202
203pub(crate) fn parquet_metadata_with_encryption(
213 file_decryption_properties: Option<&Arc<FileDecryptionProperties>>,
214 encrypted_footer: bool,
215 buf: &[u8],
216) -> Result<ParquetMetaData> {
217 use crate::file::metadata::ParquetMetaDataBuilder;
218
219 let mut buf = buf;
220 let mut file_decryptor = None;
221 let decrypted_fmd_buf;
222
223 if encrypted_footer {
224 let mut prot = ThriftSliceInputProtocol::new(buf);
225 if let Some(file_decryption_properties) = file_decryption_properties {
226 let t_file_crypto_metadata: FileCryptoMetaData =
227 FileCryptoMetaData::read_thrift(&mut prot)
228 .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
229 let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm {
230 EncryptionAlgorithm::AES_GCM_V1(algo) => algo.supply_aad_prefix,
231 _ => Some(false),
232 }
233 .unwrap_or(false);
234 if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() {
235 return Err(general_err!(
236 "Parquet file was encrypted with an AAD prefix that is not stored in the file, \
237 but no AAD prefix was provided in the file decryption properties"
238 ));
239 }
240 let decryptor = get_file_decryptor(
241 t_file_crypto_metadata.encryption_algorithm,
242 t_file_crypto_metadata.key_metadata,
243 file_decryption_properties,
244 )?;
245 let footer_decryptor = decryptor.get_footer_decryptor();
246 let aad_footer = crate::encryption::modules::create_footer_aad(decryptor.file_aad())?;
247
248 decrypted_fmd_buf = footer_decryptor?
249 .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
250 .map_err(|_| {
251 general_err!(
252 "Provided footer key and AAD were unable to decrypt parquet footer"
253 )
254 })?;
255
256 buf = &decrypted_fmd_buf;
257 file_decryptor = Some(decryptor);
258 } else {
259 return Err(general_err!(
260 "Parquet file has an encrypted footer but decryption properties were not provided"
261 ));
262 }
263 }
264
265 let parquet_meta = parquet_metadata_from_bytes(buf)
266 .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
267
268 let ParquetMetaData {
269 mut file_metadata,
270 row_groups,
271 column_index: _,
272 offset_index: _,
273 file_decryptor: _,
274 } = parquet_meta;
275
276 if let (Some(algo), Some(file_decryption_properties)) = (
279 file_metadata.encryption_algorithm.take(),
280 file_decryption_properties,
281 ) {
282 let footer_signing_key_metadata = file_metadata.footer_signing_key_metadata.take();
283
284 let file_decryptor_value = get_file_decryptor(
286 *algo,
287 footer_signing_key_metadata.as_deref(),
288 file_decryption_properties,
289 )?;
290 if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer {
291 file_decryptor_value.verify_plaintext_footer_signature(buf)?;
292 }
293 file_decryptor = Some(file_decryptor_value);
294 }
295
296 let row_groups = row_groups
298 .into_iter()
299 .map(|rg| row_group_from_encrypted_thrift(rg, file_decryptor.as_ref()))
300 .collect::<Result<Vec<_>>>()?;
301
302 let metadata = ParquetMetaDataBuilder::new(file_metadata)
303 .set_row_groups(row_groups)
304 .set_file_decryptor(file_decryptor)
305 .build();
306
307 Ok(metadata)
308}
309
310fn get_file_decryptor(
311 encryption_algorithm: EncryptionAlgorithm,
312 footer_key_metadata: Option<&[u8]>,
313 file_decryption_properties: &Arc<FileDecryptionProperties>,
314) -> Result<FileDecryptor> {
315 match encryption_algorithm {
316 EncryptionAlgorithm::AES_GCM_V1(algo) => {
317 let aad_file_unique = algo
318 .aad_file_unique
319 .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?;
320 let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() {
321 aad_prefix.clone()
322 } else {
323 algo.aad_prefix.map(|v| v.to_vec()).unwrap_or_default()
324 };
325 let aad_file_unique = aad_file_unique.to_vec();
326
327 FileDecryptor::new(
328 file_decryption_properties,
329 footer_key_metadata,
330 aad_file_unique,
331 aad_prefix,
332 )
333 }
334 EncryptionAlgorithm::AES_GCM_CTR_V1(_) => Err(nyi_err!(
335 "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
336 )),
337 }
338}