parquet/file/metadata/thrift/
encryption.rs1use crate::{
21 encryption::decrypt::{FileDecryptionProperties, FileDecryptor},
22 errors::{ParquetError, Result},
23 file::{
24 column_crypto_metadata::ColumnCryptoMetaData,
25 metadata::{
26 HeapSize, ParquetMetaData, ParquetMetaDataOptions, RowGroupMetaData,
27 thrift::{parquet_metadata_from_bytes, read_column_metadata, validate_column_metadata},
28 },
29 },
30 parquet_thrift::{
31 ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
32 ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift, WriteThriftField,
33 },
34 thrift_struct, thrift_union,
35};
36use std::io::Write;
37use std::sync::Arc;
38
39thrift_struct!(
40pub(crate) struct AesGcmV1 {
41 1: optional binary aad_prefix
43
44 2: optional binary aad_file_unique
46
47 3: optional bool supply_aad_prefix
50}
51);
52
53impl HeapSize for AesGcmV1 {
54 fn heap_size(&self) -> usize {
55 self.aad_prefix.heap_size()
56 + self.aad_file_unique.heap_size()
57 + self.supply_aad_prefix.heap_size()
58 }
59}
60
61thrift_struct!(
62pub(crate) struct AesGcmCtrV1 {
63 1: optional binary aad_prefix
65
66 2: optional binary aad_file_unique
68
69 3: optional bool supply_aad_prefix
72}
73);
74
75impl HeapSize for AesGcmCtrV1 {
76 fn heap_size(&self) -> usize {
77 self.aad_prefix.heap_size()
78 + self.aad_file_unique.heap_size()
79 + self.supply_aad_prefix.heap_size()
80 }
81}
82
83thrift_union!(
84union EncryptionAlgorithm {
85 1: (AesGcmV1) AES_GCM_V1
86 2: (AesGcmCtrV1) AES_GCM_CTR_V1
87}
88);
89
90impl HeapSize for EncryptionAlgorithm {
91 fn heap_size(&self) -> usize {
92 match self {
93 Self::AES_GCM_V1(gcm) => gcm.heap_size(),
94 Self::AES_GCM_CTR_V1(gcm_ctr) => gcm_ctr.heap_size(),
95 }
96 }
97}
98
99thrift_struct!(
100pub(crate) struct FileCryptoMetaData<'a> {
102 1: required EncryptionAlgorithm encryption_algorithm
106
107 2: optional binary<'a> key_metadata
110}
111);
112
113fn row_group_from_encrypted_thrift(
114 mut rg: RowGroupMetaData,
115 decryptor: Option<&FileDecryptor>,
116 options: Option<&ParquetMetaDataOptions>,
117) -> Result<RowGroupMetaData> {
118 let schema_descr = rg.schema_descr;
119
120 if schema_descr.num_columns() != rg.columns.len() {
121 return Err(general_err!(
122 "Column count mismatch. Schema has {} columns while Row Group has {}",
123 schema_descr.num_columns(),
124 rg.columns.len()
125 ));
126 }
127 let total_byte_size = rg.total_byte_size;
128 let num_rows = rg.num_rows;
129 let mut columns = vec![];
130
131 for (i, (mut c, d)) in rg
132 .columns
133 .drain(0..)
134 .zip(schema_descr.columns())
135 .enumerate()
136 {
137 if let (true, Some(decryptor)) = (c.encrypted_column_metadata.is_some(), decryptor) {
139 let column_decryptor = match c.crypto_metadata() {
140 None => {
141 return Err(general_err!(
142 "No crypto_metadata is set for column '{}', which has encrypted metadata",
143 d.path().string()
144 ));
145 }
146 Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(crypto_metadata)) => {
147 let column_name = crypto_metadata.path_in_schema.join(".");
148 match decryptor.get_column_metadata_decryptor(
150 column_name.as_str(),
151 crypto_metadata.key_metadata.as_deref(),
152 ) {
153 Ok(dec) => dec,
154 Err(_) => {
155 columns.push(c);
157 continue;
158 }
159 }
160 }
161 Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
162 decryptor.get_footer_decryptor()?
163 }
164 };
165
166 let column_aad = crate::encryption::modules::create_module_aad(
167 decryptor.file_aad(),
168 crate::encryption::modules::ModuleType::ColumnMetaData,
169 rg.ordinal.unwrap() as usize,
170 i,
171 None,
172 )?;
173
174 let encrypted_column_metadata = c.encrypted_column_metadata.take();
176 let buf = encrypted_column_metadata.unwrap();
177 let decrypted_cc_buf = column_decryptor
178 .decrypt(&buf, column_aad.as_ref())
179 .map_err(|_| {
180 general_err!(
181 "Unable to decrypt column '{}', perhaps the column key is wrong?",
182 d.path().string()
183 )
184 })?;
185
186 let mut prot = ThriftSliceInputProtocol::new(&decrypted_cc_buf);
188 let mask = read_column_metadata(&mut prot, &mut c, i, options)?;
189 validate_column_metadata(mask)?;
190
191 columns.push(c);
192 } else {
193 columns.push(c);
194 }
195 }
196
197 let sorting_columns = rg.sorting_columns;
198 let file_offset = rg.file_offset;
199 let ordinal = rg.ordinal;
200
201 Ok(RowGroupMetaData {
202 columns,
203 num_rows,
204 sorting_columns,
205 total_byte_size,
206 schema_descr,
207 file_offset,
208 ordinal,
209 })
210}
211
212pub(crate) fn parquet_metadata_with_encryption(
222 file_decryption_properties: Option<&Arc<FileDecryptionProperties>>,
223 encrypted_footer: bool,
224 buf: &[u8],
225 options: Option<&ParquetMetaDataOptions>,
226) -> Result<ParquetMetaData> {
227 use crate::file::metadata::ParquetMetaDataBuilder;
228
229 let mut buf = buf;
230 let mut file_decryptor = None;
231 let decrypted_fmd_buf;
232
233 if encrypted_footer {
234 let mut prot = ThriftSliceInputProtocol::new(buf);
235 if let Some(file_decryption_properties) = file_decryption_properties {
236 let t_file_crypto_metadata: FileCryptoMetaData =
237 FileCryptoMetaData::read_thrift(&mut prot)
238 .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
239 let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm {
240 EncryptionAlgorithm::AES_GCM_V1(algo) => algo.supply_aad_prefix,
241 _ => Some(false),
242 }
243 .unwrap_or(false);
244 if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() {
245 return Err(general_err!(
246 "Parquet file was encrypted with an AAD prefix that is not stored in the file, \
247 but no AAD prefix was provided in the file decryption properties"
248 ));
249 }
250 let decryptor = get_file_decryptor(
251 t_file_crypto_metadata.encryption_algorithm,
252 t_file_crypto_metadata.key_metadata,
253 file_decryption_properties,
254 )?;
255 let footer_decryptor = decryptor.get_footer_decryptor();
256 let aad_footer = crate::encryption::modules::create_footer_aad(decryptor.file_aad())?;
257
258 decrypted_fmd_buf = footer_decryptor?
259 .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
260 .map_err(|_| {
261 general_err!(
262 "Provided footer key and AAD were unable to decrypt parquet footer"
263 )
264 })?;
265
266 buf = &decrypted_fmd_buf;
267 file_decryptor = Some(decryptor);
268 } else {
269 return Err(general_err!(
270 "Parquet file has an encrypted footer but decryption properties were not provided"
271 ));
272 }
273 }
274
275 let parquet_meta = parquet_metadata_from_bytes(buf, options)
276 .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
277
278 let ParquetMetaData {
279 mut file_metadata,
280 row_groups,
281 column_index: _,
282 offset_index: _,
283 file_decryptor: _,
284 } = parquet_meta;
285
286 if let (Some(algo), Some(file_decryption_properties)) = (
289 file_metadata.encryption_algorithm.take(),
290 file_decryption_properties,
291 ) {
292 let footer_signing_key_metadata = file_metadata.footer_signing_key_metadata.take();
293
294 let file_decryptor_value = get_file_decryptor(
296 *algo,
297 footer_signing_key_metadata.as_deref(),
298 file_decryption_properties,
299 )?;
300 if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer {
301 file_decryptor_value.verify_plaintext_footer_signature(buf)?;
302 }
303 file_decryptor = Some(file_decryptor_value);
304 }
305
306 let row_groups = row_groups
308 .into_iter()
309 .map(|rg| row_group_from_encrypted_thrift(rg, file_decryptor.as_ref(), options))
310 .collect::<Result<Vec<_>>>()?;
311
312 let metadata = ParquetMetaDataBuilder::new(file_metadata)
313 .set_row_groups(row_groups)
314 .set_file_decryptor(file_decryptor)
315 .build();
316
317 Ok(metadata)
318}
319
320fn get_file_decryptor(
321 encryption_algorithm: EncryptionAlgorithm,
322 footer_key_metadata: Option<&[u8]>,
323 file_decryption_properties: &Arc<FileDecryptionProperties>,
324) -> Result<FileDecryptor> {
325 match encryption_algorithm {
326 EncryptionAlgorithm::AES_GCM_V1(algo) => {
327 let aad_file_unique = algo
328 .aad_file_unique
329 .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?;
330 let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() {
331 aad_prefix.clone()
332 } else {
333 algo.aad_prefix.map(|v| v.to_vec()).unwrap_or_default()
334 };
335 let aad_file_unique = aad_file_unique.to_vec();
336
337 FileDecryptor::new(
338 file_decryption_properties,
339 footer_key_metadata,
340 aad_file_unique,
341 aad_prefix,
342 )
343 }
344 EncryptionAlgorithm::AES_GCM_CTR_V1(_) => Err(nyi_err!(
345 "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
346 )),
347 }
348}