parquet/file/metadata/thrift/
encryption.rs1use crate::{
21 encryption::decrypt::{FileDecryptionProperties, FileDecryptor},
22 errors::{ParquetError, Result},
23 file::{
24 column_crypto_metadata::ColumnCryptoMetaData,
25 metadata::{
26 HeapSize, ParquetMetaData, ParquetMetaDataOptions, RowGroupMetaData,
27 thrift::{parquet_metadata_from_bytes, read_column_metadata, validate_column_metadata},
28 },
29 },
30 parquet_thrift::{
31 ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
32 ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
33 },
34 thrift_struct, thrift_union,
35};
36use std::io::Write;
37use std::sync::Arc;
38
39thrift_struct!(
40pub(crate) struct AesGcmV1 {
41 1: optional binary aad_prefix
43
44 2: optional binary aad_file_unique
46
47 3: optional bool supply_aad_prefix
50}
51);
52
53impl HeapSize for AesGcmV1 {
54 fn heap_size(&self) -> usize {
55 self.aad_prefix.heap_size()
56 + self.aad_file_unique.heap_size()
57 + self.supply_aad_prefix.heap_size()
58 }
59}
60
61thrift_struct!(
62pub(crate) struct AesGcmCtrV1 {
63 1: optional binary aad_prefix
65
66 2: optional binary aad_file_unique
68
69 3: optional bool supply_aad_prefix
72}
73);
74
75impl HeapSize for AesGcmCtrV1 {
76 fn heap_size(&self) -> usize {
77 self.aad_prefix.heap_size()
78 + self.aad_file_unique.heap_size()
79 + self.supply_aad_prefix.heap_size()
80 }
81}
82
83thrift_union!(
84union EncryptionAlgorithm {
85 1: (AesGcmV1) AES_GCM_V1
86 2: (AesGcmCtrV1) AES_GCM_CTR_V1
87}
88);
89
90impl HeapSize for EncryptionAlgorithm {
91 fn heap_size(&self) -> usize {
92 match self {
93 Self::AES_GCM_V1(gcm) => gcm.heap_size(),
94 Self::AES_GCM_CTR_V1(gcm_ctr) => gcm_ctr.heap_size(),
95 }
96 }
97}
98
99thrift_struct!(
100pub(crate) struct FileCryptoMetaData<'a> {
102 1: required EncryptionAlgorithm encryption_algorithm
106
107 2: optional binary<'a> key_metadata
110}
111);
112
113fn row_group_from_encrypted_thrift(
114 mut rg: RowGroupMetaData,
115 decryptor: Option<&FileDecryptor>,
116 options: Option<&ParquetMetaDataOptions>,
117) -> Result<RowGroupMetaData> {
118 let schema_descr = rg.schema_descr;
119
120 if schema_descr.num_columns() != rg.columns.len() {
121 return Err(general_err!(
122 "Column count mismatch. Schema has {} columns while Row Group has {}",
123 schema_descr.num_columns(),
124 rg.columns.len()
125 ));
126 }
127 let total_byte_size = rg.total_byte_size;
128 let num_rows = rg.num_rows;
129 let mut columns = vec![];
130
131 for (i, (mut c, d)) in rg
132 .columns
133 .drain(0..)
134 .zip(schema_descr.columns())
135 .enumerate()
136 {
137 if let (true, Some(decryptor)) = (c.encrypted_column_metadata.is_some(), decryptor) {
139 let column_decryptor = match c.crypto_metadata() {
140 None => {
141 return Err(general_err!(
142 "No crypto_metadata is set for column '{}', which has encrypted metadata",
143 d.path().string()
144 ));
145 }
146 Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(crypto_metadata)) => {
147 let column_name = crypto_metadata.path_in_schema.join(".");
148 decryptor.get_column_metadata_decryptor(
149 column_name.as_str(),
150 crypto_metadata.key_metadata.as_deref(),
151 )?
152 }
153 Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
154 decryptor.get_footer_decryptor()?
155 }
156 };
157
158 let column_aad = crate::encryption::modules::create_module_aad(
159 decryptor.file_aad(),
160 crate::encryption::modules::ModuleType::ColumnMetaData,
161 rg.ordinal.unwrap() as usize,
162 i,
163 None,
164 )?;
165
166 let encrypted_column_metadata = c.encrypted_column_metadata.take();
168 let buf = encrypted_column_metadata.unwrap();
169 let decrypted_cc_buf = column_decryptor
170 .decrypt(&buf, column_aad.as_ref())
171 .map_err(|_| {
172 general_err!(
173 "Unable to decrypt column '{}', perhaps the column key is wrong?",
174 d.path().string()
175 )
176 })?;
177
178 let mut prot = ThriftSliceInputProtocol::new(&decrypted_cc_buf);
180 let mask = read_column_metadata(&mut prot, &mut c, i, options)?;
181 validate_column_metadata(mask)?;
182
183 columns.push(c);
184 } else {
185 columns.push(c);
186 }
187 }
188
189 let sorting_columns = rg.sorting_columns;
190 let file_offset = rg.file_offset;
191 let ordinal = rg.ordinal;
192
193 Ok(RowGroupMetaData {
194 columns,
195 num_rows,
196 sorting_columns,
197 total_byte_size,
198 schema_descr,
199 file_offset,
200 ordinal,
201 })
202}
203
204pub(crate) fn parquet_metadata_with_encryption(
214 file_decryption_properties: Option<&Arc<FileDecryptionProperties>>,
215 encrypted_footer: bool,
216 buf: &[u8],
217 options: Option<&ParquetMetaDataOptions>,
218) -> Result<ParquetMetaData> {
219 use crate::file::metadata::ParquetMetaDataBuilder;
220
221 let mut buf = buf;
222 let mut file_decryptor = None;
223 let decrypted_fmd_buf;
224
225 if encrypted_footer {
226 let mut prot = ThriftSliceInputProtocol::new(buf);
227 if let Some(file_decryption_properties) = file_decryption_properties {
228 let t_file_crypto_metadata: FileCryptoMetaData =
229 FileCryptoMetaData::read_thrift(&mut prot)
230 .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
231 let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm {
232 EncryptionAlgorithm::AES_GCM_V1(algo) => algo.supply_aad_prefix,
233 _ => Some(false),
234 }
235 .unwrap_or(false);
236 if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() {
237 return Err(general_err!(
238 "Parquet file was encrypted with an AAD prefix that is not stored in the file, \
239 but no AAD prefix was provided in the file decryption properties"
240 ));
241 }
242 let decryptor = get_file_decryptor(
243 t_file_crypto_metadata.encryption_algorithm,
244 t_file_crypto_metadata.key_metadata,
245 file_decryption_properties,
246 )?;
247 let footer_decryptor = decryptor.get_footer_decryptor();
248 let aad_footer = crate::encryption::modules::create_footer_aad(decryptor.file_aad())?;
249
250 decrypted_fmd_buf = footer_decryptor?
251 .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
252 .map_err(|_| {
253 general_err!(
254 "Provided footer key and AAD were unable to decrypt parquet footer"
255 )
256 })?;
257
258 buf = &decrypted_fmd_buf;
259 file_decryptor = Some(decryptor);
260 } else {
261 return Err(general_err!(
262 "Parquet file has an encrypted footer but decryption properties were not provided"
263 ));
264 }
265 }
266
267 let parquet_meta = parquet_metadata_from_bytes(buf, options)
268 .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
269
270 let ParquetMetaData {
271 mut file_metadata,
272 row_groups,
273 column_index: _,
274 offset_index: _,
275 file_decryptor: _,
276 } = parquet_meta;
277
278 if let (Some(algo), Some(file_decryption_properties)) = (
281 file_metadata.encryption_algorithm.take(),
282 file_decryption_properties,
283 ) {
284 let footer_signing_key_metadata = file_metadata.footer_signing_key_metadata.take();
285
286 let file_decryptor_value = get_file_decryptor(
288 *algo,
289 footer_signing_key_metadata.as_deref(),
290 file_decryption_properties,
291 )?;
292 if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer {
293 file_decryptor_value.verify_plaintext_footer_signature(buf)?;
294 }
295 file_decryptor = Some(file_decryptor_value);
296 }
297
298 let row_groups = row_groups
300 .into_iter()
301 .map(|rg| row_group_from_encrypted_thrift(rg, file_decryptor.as_ref(), options))
302 .collect::<Result<Vec<_>>>()?;
303
304 let metadata = ParquetMetaDataBuilder::new(file_metadata)
305 .set_row_groups(row_groups)
306 .set_file_decryptor(file_decryptor)
307 .build();
308
309 Ok(metadata)
310}
311
312fn get_file_decryptor(
313 encryption_algorithm: EncryptionAlgorithm,
314 footer_key_metadata: Option<&[u8]>,
315 file_decryption_properties: &Arc<FileDecryptionProperties>,
316) -> Result<FileDecryptor> {
317 match encryption_algorithm {
318 EncryptionAlgorithm::AES_GCM_V1(algo) => {
319 let aad_file_unique = algo
320 .aad_file_unique
321 .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?;
322 let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() {
323 aad_prefix.clone()
324 } else {
325 algo.aad_prefix.map(|v| v.to_vec()).unwrap_or_default()
326 };
327 let aad_file_unique = aad_file_unique.to_vec();
328
329 FileDecryptor::new(
330 file_decryption_properties,
331 footer_key_metadata,
332 aad_file_unique,
333 aad_prefix,
334 )
335 }
336 EncryptionAlgorithm::AES_GCM_CTR_V1(_) => Err(nyi_err!(
337 "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
338 )),
339 }
340}