1use crate::basic::ColumnOrder;
24use crate::errors::ParquetError;
25use crate::file::metadata::{
26 ColumnChunkMetaData, FileMetaData, PageIndexPolicy, ParquetMetaData, RowGroupMetaData,
27};
28use crate::file::page_index::index::Index;
29use crate::file::page_index::index_reader::{decode_column_index, decode_offset_index};
30use crate::file::page_index::offset_index::OffsetIndexMetaData;
31use crate::schema::types;
32use crate::schema::types::SchemaDescriptor;
33use crate::thrift::TCompactSliceInputProtocol;
34use crate::thrift::TSerializable;
35use bytes::Bytes;
36use std::sync::Arc;
37
38#[cfg(feature = "encryption")]
39use crate::encryption::{
40 decrypt::{FileDecryptionProperties, FileDecryptor},
41 modules::create_footer_aad,
42};
43#[cfg(feature = "encryption")]
44use crate::format::EncryptionAlgorithm;
45
46pub(crate) use inner::MetadataParser;
54
55#[cfg(feature = "encryption")]
56mod inner {
57 use super::*;
58 use crate::encryption::decrypt::FileDecryptionProperties;
59 use crate::errors::Result;
60
61 #[derive(Debug, Default)]
63 pub(crate) struct MetadataParser {
64 file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
66 }
67
68 impl MetadataParser {
69 pub(crate) fn new() -> Self {
70 MetadataParser::default()
71 }
72
73 pub(crate) fn with_file_decryption_properties(
74 mut self,
75 file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
76 ) -> Self {
77 self.file_decryption_properties = file_decryption_properties;
78 self
79 }
80
81 pub(crate) fn decode_metadata(
82 &self,
83 buf: &[u8],
84 encrypted_footer: bool,
85 ) -> Result<ParquetMetaData> {
86 decode_metadata_with_encryption(
87 buf,
88 encrypted_footer,
89 self.file_decryption_properties.as_deref(),
90 )
91 }
92 }
93}
94
95#[cfg(not(feature = "encryption"))]
96mod inner {
97 use super::*;
98 use crate::errors::Result;
99 #[derive(Debug, Default)]
103 pub(crate) struct MetadataParser;
104
105 impl MetadataParser {
106 pub(crate) fn new() -> Self {
107 MetadataParser
108 }
109
110 pub(crate) fn decode_metadata(
111 &self,
112 buf: &[u8],
113 encrypted_footer: bool,
114 ) -> Result<ParquetMetaData> {
115 if encrypted_footer {
116 Err(general_err!(
117 "Parquet file has an encrypted footer but the encryption feature is disabled"
118 ))
119 } else {
120 decode_metadata(buf)
121 }
122 }
123 }
124}
125
126pub(crate) fn decode_metadata(buf: &[u8]) -> crate::errors::Result<ParquetMetaData> {
134 let mut prot = TCompactSliceInputProtocol::new(buf);
135
136 let t_file_metadata: crate::format::FileMetaData =
137 crate::format::FileMetaData::read_from_in_protocol(&mut prot)
138 .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
139 let schema = types::from_thrift(&t_file_metadata.schema)?;
140 let schema_descr = Arc::new(SchemaDescriptor::new(schema));
141
142 let mut row_groups = Vec::new();
143 for rg in t_file_metadata.row_groups {
144 row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?);
145 }
146 let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
147
148 let file_metadata = FileMetaData::new(
149 t_file_metadata.version,
150 t_file_metadata.num_rows,
151 t_file_metadata.created_by,
152 t_file_metadata.key_value_metadata,
153 schema_descr,
154 column_orders,
155 );
156
157 Ok(ParquetMetaData::new(file_metadata, row_groups))
158}
159
160fn parse_column_orders(
163 t_column_orders: Option<Vec<crate::format::ColumnOrder>>,
164 schema_descr: &SchemaDescriptor,
165) -> crate::errors::Result<Option<Vec<ColumnOrder>>> {
166 match t_column_orders {
167 Some(orders) => {
168 if orders.len() != schema_descr.num_columns() {
170 return Err(general_err!("Column order length mismatch"));
171 };
172 let mut res = Vec::new();
173 for (i, column) in schema_descr.columns().iter().enumerate() {
174 match orders[i] {
175 crate::format::ColumnOrder::TYPEORDER(_) => {
176 let sort_order = ColumnOrder::get_sort_order(
177 column.logical_type(),
178 column.converted_type(),
179 column.physical_type(),
180 );
181 res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
182 }
183 }
184 }
185 Ok(Some(res))
186 }
187 None => Ok(None),
188 }
189}
190
191pub(crate) fn parse_column_index(
200 metadata: &mut ParquetMetaData,
201 column_index_policy: PageIndexPolicy,
202 bytes: &Bytes,
203 start_offset: u64,
204) -> crate::errors::Result<()> {
205 if column_index_policy == PageIndexPolicy::Skip {
206 return Ok(());
207 }
208 let index = metadata
209 .row_groups()
210 .iter()
211 .enumerate()
212 .map(|(rg_idx, x)| {
213 x.columns()
214 .iter()
215 .enumerate()
216 .map(|(col_idx, c)| match c.column_index_range() {
217 Some(r) => {
218 let r_start = usize::try_from(r.start - start_offset)?;
219 let r_end = usize::try_from(r.end - start_offset)?;
220 parse_single_column_index(
221 &bytes[r_start..r_end],
222 metadata,
223 c,
224 rg_idx,
225 col_idx,
226 )
227 }
228 None => Ok(Index::NONE),
229 })
230 .collect::<crate::errors::Result<Vec<_>>>()
231 })
232 .collect::<crate::errors::Result<Vec<_>>>()?;
233
234 metadata.set_column_index(Some(index));
235 Ok(())
236}
237
238#[cfg(feature = "encryption")]
239fn parse_single_column_index(
240 bytes: &[u8],
241 metadata: &ParquetMetaData,
242 column: &ColumnChunkMetaData,
243 row_group_index: usize,
244 col_index: usize,
245) -> crate::errors::Result<Index> {
246 use crate::encryption::decrypt::CryptoContext;
247 match &column.column_crypto_metadata {
248 Some(crypto_metadata) => {
249 let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
250 general_err!("Cannot decrypt column index, no file decryptor set")
251 })?;
252 let crypto_context = CryptoContext::for_column(
253 file_decryptor,
254 crypto_metadata,
255 row_group_index,
256 col_index,
257 )?;
258 let column_decryptor = crypto_context.metadata_decryptor();
259 let aad = crypto_context.create_column_index_aad()?;
260 let plaintext = column_decryptor.decrypt(bytes, &aad)?;
261 decode_column_index(&plaintext, column.column_type())
262 }
263 None => decode_column_index(bytes, column.column_type()),
264 }
265}
266
267#[cfg(not(feature = "encryption"))]
268fn parse_single_column_index(
269 bytes: &[u8],
270 _metadata: &ParquetMetaData,
271 column: &ColumnChunkMetaData,
272 _row_group_index: usize,
273 _col_index: usize,
274) -> crate::errors::Result<Index> {
275 decode_column_index(bytes, column.column_type())
276}
277
278pub(crate) fn parse_offset_index(
279 metadata: &mut ParquetMetaData,
280 offset_index_policy: PageIndexPolicy,
281 bytes: &Bytes,
282 start_offset: u64,
283) -> crate::errors::Result<()> {
284 if offset_index_policy == PageIndexPolicy::Skip {
285 return Ok(());
286 }
287 let row_groups = metadata.row_groups();
288 let mut all_indexes = Vec::with_capacity(row_groups.len());
289 for (rg_idx, x) in row_groups.iter().enumerate() {
290 let mut row_group_indexes = Vec::with_capacity(x.columns().len());
291 for (col_idx, c) in x.columns().iter().enumerate() {
292 let result = match c.offset_index_range() {
293 Some(r) => {
294 let r_start = usize::try_from(r.start - start_offset)?;
295 let r_end = usize::try_from(r.end - start_offset)?;
296 parse_single_offset_index(&bytes[r_start..r_end], metadata, c, rg_idx, col_idx)
297 }
298 None => Err(general_err!("missing offset index")),
299 };
300
301 match result {
302 Ok(index) => row_group_indexes.push(index),
303 Err(e) => {
304 if offset_index_policy == PageIndexPolicy::Required {
305 return Err(e);
306 } else {
307 metadata.set_column_index(None);
309 metadata.set_offset_index(None);
310 return Ok(());
311 }
312 }
313 }
314 }
315 all_indexes.push(row_group_indexes);
316 }
317 metadata.set_offset_index(Some(all_indexes));
318 Ok(())
319}
320
321#[cfg(feature = "encryption")]
322fn parse_single_offset_index(
323 bytes: &[u8],
324 metadata: &ParquetMetaData,
325 column: &ColumnChunkMetaData,
326 row_group_index: usize,
327 col_index: usize,
328) -> crate::errors::Result<OffsetIndexMetaData> {
329 use crate::encryption::decrypt::CryptoContext;
330 match &column.column_crypto_metadata {
331 Some(crypto_metadata) => {
332 let file_decryptor = metadata.file_decryptor.as_ref().ok_or_else(|| {
333 general_err!("Cannot decrypt offset index, no file decryptor set")
334 })?;
335 let crypto_context = CryptoContext::for_column(
336 file_decryptor,
337 crypto_metadata,
338 row_group_index,
339 col_index,
340 )?;
341 let column_decryptor = crypto_context.metadata_decryptor();
342 let aad = crypto_context.create_offset_index_aad()?;
343 let plaintext = column_decryptor.decrypt(bytes, &aad)?;
344 decode_offset_index(&plaintext)
345 }
346 None => decode_offset_index(bytes),
347 }
348}
349
350#[cfg(not(feature = "encryption"))]
351fn parse_single_offset_index(
352 bytes: &[u8],
353 _metadata: &ParquetMetaData,
354 _column: &ColumnChunkMetaData,
355 _row_group_index: usize,
356 _col_index: usize,
357) -> crate::errors::Result<OffsetIndexMetaData> {
358 decode_offset_index(bytes)
359}
360
361#[cfg(feature = "encryption")]
371fn decode_metadata_with_encryption(
372 buf: &[u8],
373 encrypted_footer: bool,
374 file_decryption_properties: Option<&FileDecryptionProperties>,
375) -> crate::errors::Result<ParquetMetaData> {
376 let mut prot = TCompactSliceInputProtocol::new(buf);
377 let mut file_decryptor = None;
378 let decrypted_fmd_buf;
379
380 if encrypted_footer {
381 if let Some(file_decryption_properties) = file_decryption_properties {
382 let t_file_crypto_metadata: crate::format::FileCryptoMetaData =
383 crate::format::FileCryptoMetaData::read_from_in_protocol(&mut prot)
384 .map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
385 let supply_aad_prefix = match &t_file_crypto_metadata.encryption_algorithm {
386 EncryptionAlgorithm::AESGCMV1(algo) => algo.supply_aad_prefix,
387 _ => Some(false),
388 }
389 .unwrap_or(false);
390 if supply_aad_prefix && file_decryption_properties.aad_prefix().is_none() {
391 return Err(general_err!(
392 "Parquet file was encrypted with an AAD prefix that is not stored in the file, \
393 but no AAD prefix was provided in the file decryption properties"
394 ));
395 }
396 let decryptor = get_file_decryptor(
397 t_file_crypto_metadata.encryption_algorithm,
398 t_file_crypto_metadata.key_metadata.as_deref(),
399 file_decryption_properties,
400 )?;
401 let footer_decryptor = decryptor.get_footer_decryptor();
402 let aad_footer = create_footer_aad(decryptor.file_aad())?;
403
404 decrypted_fmd_buf = footer_decryptor?
405 .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
406 .map_err(|_| {
407 general_err!(
408 "Provided footer key and AAD were unable to decrypt parquet footer"
409 )
410 })?;
411 prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
412
413 file_decryptor = Some(decryptor);
414 } else {
415 return Err(general_err!(
416 "Parquet file has an encrypted footer but decryption properties were not provided"
417 ));
418 }
419 }
420
421 use crate::format::FileMetaData as TFileMetaData;
422 let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
423 .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
424 let schema = types::from_thrift(&t_file_metadata.schema)?;
425 let schema_descr = Arc::new(SchemaDescriptor::new(schema));
426
427 if let (Some(algo), Some(file_decryption_properties)) = (
428 t_file_metadata.encryption_algorithm,
429 file_decryption_properties,
430 ) {
431 let file_decryptor_value = get_file_decryptor(
433 algo,
434 t_file_metadata.footer_signing_key_metadata.as_deref(),
435 file_decryption_properties,
436 )?;
437 if file_decryption_properties.check_plaintext_footer_integrity() && !encrypted_footer {
438 file_decryptor_value.verify_plaintext_footer_signature(buf)?;
439 }
440 file_decryptor = Some(file_decryptor_value);
441 }
442
443 let mut row_groups = Vec::new();
444 for rg in t_file_metadata.row_groups {
445 let r = RowGroupMetaData::from_encrypted_thrift(
446 schema_descr.clone(),
447 rg,
448 file_decryptor.as_ref(),
449 )?;
450 row_groups.push(r);
451 }
452 let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_descr)?;
453
454 let file_metadata = FileMetaData::new(
455 t_file_metadata.version,
456 t_file_metadata.num_rows,
457 t_file_metadata.created_by,
458 t_file_metadata.key_value_metadata,
459 schema_descr,
460 column_orders,
461 );
462 let mut metadata = ParquetMetaData::new(file_metadata, row_groups);
463
464 metadata.with_file_decryptor(file_decryptor);
465
466 Ok(metadata)
467}
468
469#[cfg(feature = "encryption")]
470fn get_file_decryptor(
471 encryption_algorithm: EncryptionAlgorithm,
472 footer_key_metadata: Option<&[u8]>,
473 file_decryption_properties: &FileDecryptionProperties,
474) -> crate::errors::Result<FileDecryptor> {
475 match encryption_algorithm {
476 EncryptionAlgorithm::AESGCMV1(algo) => {
477 let aad_file_unique = algo
478 .aad_file_unique
479 .ok_or_else(|| general_err!("AAD unique file identifier is not set"))?;
480 let aad_prefix = if let Some(aad_prefix) = file_decryption_properties.aad_prefix() {
481 aad_prefix.clone()
482 } else {
483 algo.aad_prefix.unwrap_or_default()
484 };
485
486 FileDecryptor::new(
487 file_decryption_properties,
488 footer_key_metadata,
489 aad_file_unique,
490 aad_prefix,
491 )
492 }
493 EncryptionAlgorithm::AESGCMCTRV1(_) => Err(nyi_err!(
494 "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
495 )),
496 }
497}
498
499#[cfg(test)]
500mod test {
501 use super::*;
502 use crate::basic::{SortOrder, Type};
503 use crate::file::metadata::SchemaType;
504 use crate::format::ColumnOrder as TColumnOrder;
505 use crate::format::TypeDefinedOrder;
506 #[test]
507 fn test_metadata_column_orders_parse() {
508 let fields = vec![
510 Arc::new(
511 SchemaType::primitive_type_builder("col1", Type::INT32)
512 .build()
513 .unwrap(),
514 ),
515 Arc::new(
516 SchemaType::primitive_type_builder("col2", Type::FLOAT)
517 .build()
518 .unwrap(),
519 ),
520 ];
521 let schema = SchemaType::group_type_builder("schema")
522 .with_fields(fields)
523 .build()
524 .unwrap();
525 let schema_descr = SchemaDescriptor::new(Arc::new(schema));
526
527 let t_column_orders = Some(vec![
528 TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
529 TColumnOrder::TYPEORDER(TypeDefinedOrder::new()),
530 ]);
531
532 assert_eq!(
533 parse_column_orders(t_column_orders, &schema_descr).unwrap(),
534 Some(vec![
535 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED),
536 ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
537 ])
538 );
539
540 assert_eq!(parse_column_orders(None, &schema_descr).unwrap(), None);
542 }
543
544 #[test]
545 fn test_metadata_column_orders_len_mismatch() {
546 let schema = SchemaType::group_type_builder("schema").build().unwrap();
547 let schema_descr = SchemaDescriptor::new(Arc::new(schema));
548
549 let t_column_orders = Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]);
550
551 let res = parse_column_orders(t_column_orders, &schema_descr);
552 assert!(res.is_err());
553 assert!(format!("{:?}", res.unwrap_err()).contains("Column order length mismatch"));
554 }
555}