1#[cfg(feature = "encryption")]
19use crate::encryption::{
20 encrypt::{
21 encrypt_object, encrypt_object_to_vec, write_signed_plaintext_object, FileEncryptor,
22 },
23 modules::{create_footer_aad, create_module_aad, ModuleType},
24};
25#[cfg(feature = "encryption")]
26use crate::errors::ParquetError;
27use crate::errors::Result;
28use crate::file::metadata::{KeyValue, ParquetMetaData};
29use crate::file::page_index::index::Index;
30use crate::file::writer::{get_file_magic, TrackedWrite};
31use crate::format::EncryptionAlgorithm;
32#[cfg(feature = "encryption")]
33use crate::format::{AesGcmV1, ColumnCryptoMetaData};
34use crate::format::{ColumnChunk, ColumnIndex, FileMetaData, OffsetIndex, RowGroup};
35use crate::schema::types;
36use crate::schema::types::{SchemaDescPtr, SchemaDescriptor, TypePtr};
37use crate::thrift::TSerializable;
38use std::io::Write;
39use std::sync::Arc;
40use thrift::protocol::TCompactOutputProtocol;
41
42pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
46 buf: &'a mut TrackedWrite<W>,
47 schema: &'a TypePtr,
48 schema_descr: &'a SchemaDescPtr,
49 row_groups: Vec<RowGroup>,
50 column_indexes: Option<&'a [Vec<Option<ColumnIndex>>]>,
51 offset_indexes: Option<&'a [Vec<Option<OffsetIndex>>]>,
52 key_value_metadata: Option<Vec<KeyValue>>,
53 created_by: Option<String>,
54 object_writer: MetadataObjectWriter,
55 writer_version: i32,
56}
57
58impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
59 fn write_offset_indexes(&mut self, offset_indexes: &[Vec<Option<OffsetIndex>>]) -> Result<()> {
65 for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
69 for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
70 if let Some(offset_index) = &offset_indexes[row_group_idx][column_idx] {
71 let start_offset = self.buf.bytes_written();
72 self.object_writer.write_offset_index(
73 offset_index,
74 column_metadata,
75 row_group_idx,
76 column_idx,
77 &mut self.buf,
78 )?;
79 let end_offset = self.buf.bytes_written();
80 column_metadata.offset_index_offset = Some(start_offset as i64);
82 column_metadata.offset_index_length = Some((end_offset - start_offset) as i32);
83 }
84 }
85 }
86 Ok(())
87 }
88
89 fn write_column_indexes(&mut self, column_indexes: &[Vec<Option<ColumnIndex>>]) -> Result<()> {
95 for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
99 for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
100 if let Some(column_index) = &column_indexes[row_group_idx][column_idx] {
101 let start_offset = self.buf.bytes_written();
102 self.object_writer.write_column_index(
103 column_index,
104 column_metadata,
105 row_group_idx,
106 column_idx,
107 &mut self.buf,
108 )?;
109 let end_offset = self.buf.bytes_written();
110 column_metadata.column_index_offset = Some(start_offset as i64);
112 column_metadata.column_index_length = Some((end_offset - start_offset) as i32);
113 }
114 }
115 }
116 Ok(())
117 }
118
119 pub fn finish(mut self) -> Result<crate::format::FileMetaData> {
121 let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
122
123 if let Some(column_indexes) = self.column_indexes {
125 self.write_column_indexes(column_indexes)?;
126 }
127 if let Some(offset_indexes) = self.offset_indexes {
128 self.write_offset_indexes(offset_indexes)?;
129 }
130
131 let column_orders = (0..self.schema_descr.num_columns())
137 .map(|_| crate::format::ColumnOrder::TYPEORDER(crate::format::TypeDefinedOrder {}))
138 .collect();
139 let column_orders = Some(column_orders);
143 let (row_groups, unencrypted_row_groups) = self
144 .object_writer
145 .apply_row_group_encryption(self.row_groups)?;
146
147 let (encryption_algorithm, footer_signing_key_metadata) =
148 self.object_writer.get_plaintext_footer_crypto_metadata();
149 let mut file_metadata = FileMetaData {
150 num_rows,
151 row_groups,
152 key_value_metadata: self.key_value_metadata.clone(),
153 version: self.writer_version,
154 schema: types::to_thrift(self.schema.as_ref())?,
155 created_by: self.created_by.clone(),
156 column_orders,
157 encryption_algorithm,
158 footer_signing_key_metadata,
159 };
160
161 let start_pos = self.buf.bytes_written();
163 self.object_writer
164 .write_file_metadata(&file_metadata, &mut self.buf)?;
165 let end_pos = self.buf.bytes_written();
166
167 let metadata_len = (end_pos - start_pos) as u32;
169
170 self.buf.write_all(&metadata_len.to_le_bytes())?;
171 self.buf.write_all(self.object_writer.get_file_magic())?;
172
173 if let Some(row_groups) = unencrypted_row_groups {
174 file_metadata.row_groups = row_groups;
179 }
180
181 Ok(file_metadata)
182 }
183
184 pub fn new(
185 buf: &'a mut TrackedWrite<W>,
186 schema: &'a TypePtr,
187 schema_descr: &'a SchemaDescPtr,
188 row_groups: Vec<RowGroup>,
189 created_by: Option<String>,
190 writer_version: i32,
191 ) -> Self {
192 Self {
193 buf,
194 schema,
195 schema_descr,
196 row_groups,
197 column_indexes: None,
198 offset_indexes: None,
199 key_value_metadata: None,
200 created_by,
201 object_writer: Default::default(),
202 writer_version,
203 }
204 }
205
206 pub fn with_column_indexes(mut self, column_indexes: &'a [Vec<Option<ColumnIndex>>]) -> Self {
207 self.column_indexes = Some(column_indexes);
208 self
209 }
210
211 pub fn with_offset_indexes(mut self, offset_indexes: &'a [Vec<Option<OffsetIndex>>]) -> Self {
212 self.offset_indexes = Some(offset_indexes);
213 self
214 }
215
216 pub fn with_key_value_metadata(mut self, key_value_metadata: Vec<KeyValue>) -> Self {
217 self.key_value_metadata = Some(key_value_metadata);
218 self
219 }
220
221 #[cfg(feature = "encryption")]
222 pub fn with_file_encryptor(mut self, file_encryptor: Option<Arc<FileEncryptor>>) -> Self {
223 self.object_writer = self.object_writer.with_file_encryptor(file_encryptor);
224 self
225 }
226}
227
228pub struct ParquetMetaDataWriter<'a, W: Write> {
304 buf: TrackedWrite<W>,
305 metadata: &'a ParquetMetaData,
306}
307
308impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
309 pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
317 Self::new_with_tracked(TrackedWrite::new(buf), metadata)
318 }
319
320 pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
327 Self { buf, metadata }
328 }
329
330 pub fn finish(mut self) -> Result<()> {
332 let file_metadata = self.metadata.file_metadata();
333
334 let schema = Arc::new(file_metadata.schema().clone());
335 let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
336 let created_by = file_metadata.created_by().map(str::to_string);
337
338 let row_groups = self
339 .metadata
340 .row_groups()
341 .iter()
342 .map(|rg| rg.to_thrift())
343 .collect::<Vec<_>>();
344
345 let key_value_metadata = file_metadata.key_value_metadata().cloned();
346
347 let column_indexes = self.convert_column_indexes();
348 let offset_indexes = self.convert_offset_index();
349
350 let mut encoder = ThriftMetadataWriter::new(
351 &mut self.buf,
352 &schema,
353 &schema_descr,
354 row_groups,
355 created_by,
356 file_metadata.version(),
357 );
358 encoder = encoder.with_column_indexes(&column_indexes);
359 encoder = encoder.with_offset_indexes(&offset_indexes);
360 if let Some(key_value_metadata) = key_value_metadata {
361 encoder = encoder.with_key_value_metadata(key_value_metadata);
362 }
363 encoder.finish()?;
364
365 Ok(())
366 }
367
368 fn convert_column_indexes(&self) -> Vec<Vec<Option<ColumnIndex>>> {
369 if let Some(row_group_column_indexes) = self.metadata.column_index() {
370 (0..self.metadata.row_groups().len())
371 .map(|rg_idx| {
372 let column_indexes = &row_group_column_indexes[rg_idx];
373 column_indexes
374 .iter()
375 .map(|column_index| match column_index {
376 Index::NONE => None,
377 Index::BOOLEAN(column_index) => Some(column_index.to_thrift()),
378 Index::BYTE_ARRAY(column_index) => Some(column_index.to_thrift()),
379 Index::DOUBLE(column_index) => Some(column_index.to_thrift()),
380 Index::FIXED_LEN_BYTE_ARRAY(column_index) => {
381 Some(column_index.to_thrift())
382 }
383 Index::FLOAT(column_index) => Some(column_index.to_thrift()),
384 Index::INT32(column_index) => Some(column_index.to_thrift()),
385 Index::INT64(column_index) => Some(column_index.to_thrift()),
386 Index::INT96(column_index) => Some(column_index.to_thrift()),
387 })
388 .collect()
389 })
390 .collect()
391 } else {
392 self.metadata
394 .row_groups()
395 .iter()
396 .map(|rg| std::iter::repeat(None).take(rg.columns().len()).collect())
397 .collect()
398 }
399 }
400
401 fn convert_offset_index(&self) -> Vec<Vec<Option<OffsetIndex>>> {
402 if let Some(row_group_offset_indexes) = self.metadata.offset_index() {
403 (0..self.metadata.row_groups().len())
404 .map(|rg_idx| {
405 let offset_indexes = &row_group_offset_indexes[rg_idx];
406 offset_indexes
407 .iter()
408 .map(|offset_index| Some(offset_index.to_thrift()))
409 .collect()
410 })
411 .collect()
412 } else {
413 self.metadata
415 .row_groups()
416 .iter()
417 .map(|rg| std::iter::repeat(None).take(rg.columns().len()).collect())
418 .collect()
419 }
420 }
421}
422
423#[derive(Debug, Default)]
424struct MetadataObjectWriter {
425 #[cfg(feature = "encryption")]
426 file_encryptor: Option<Arc<FileEncryptor>>,
427}
428
429impl MetadataObjectWriter {
430 #[inline]
431 fn write_object(object: &impl TSerializable, sink: impl Write) -> Result<()> {
432 let mut protocol = TCompactOutputProtocol::new(sink);
433 object.write_to_out_protocol(&mut protocol)?;
434 Ok(())
435 }
436}
437
438#[cfg(not(feature = "encryption"))]
440impl MetadataObjectWriter {
441 fn write_file_metadata(&self, file_metadata: &FileMetaData, sink: impl Write) -> Result<()> {
443 Self::write_object(file_metadata, sink)
444 }
445
446 fn write_offset_index(
448 &self,
449 offset_index: &OffsetIndex,
450 _column_chunk: &ColumnChunk,
451 _row_group_idx: usize,
452 _column_idx: usize,
453 sink: impl Write,
454 ) -> Result<()> {
455 Self::write_object(offset_index, sink)
456 }
457
458 fn write_column_index(
460 &self,
461 column_index: &ColumnIndex,
462 _column_chunk: &ColumnChunk,
463 _row_group_idx: usize,
464 _column_idx: usize,
465 sink: impl Write,
466 ) -> Result<()> {
467 Self::write_object(column_index, sink)
468 }
469
470 fn apply_row_group_encryption(
472 &self,
473 row_groups: Vec<RowGroup>,
474 ) -> Result<(Vec<RowGroup>, Option<Vec<RowGroup>>)> {
475 Ok((row_groups, None))
476 }
477
478 pub fn get_file_magic(&self) -> &[u8; 4] {
480 get_file_magic()
481 }
482
483 fn get_plaintext_footer_crypto_metadata(
484 &self,
485 ) -> (Option<EncryptionAlgorithm>, Option<Vec<u8>>) {
486 (None, None)
487 }
488}
489
490#[cfg(feature = "encryption")]
492impl MetadataObjectWriter {
493 fn with_file_encryptor(mut self, encryptor: Option<Arc<FileEncryptor>>) -> Self {
495 self.file_encryptor = encryptor;
496 self
497 }
498
499 fn write_file_metadata(
501 &self,
502 file_metadata: &FileMetaData,
503 mut sink: impl Write,
504 ) -> Result<()> {
505 match self.file_encryptor.as_ref() {
506 Some(file_encryptor) if file_encryptor.properties().encrypt_footer() => {
507 let crypto_metadata = Self::file_crypto_metadata(file_encryptor)?;
509 let mut protocol = TCompactOutputProtocol::new(&mut sink);
510 crypto_metadata.write_to_out_protocol(&mut protocol)?;
511
512 let aad = create_footer_aad(file_encryptor.file_aad())?;
514 let mut encryptor = file_encryptor.get_footer_encryptor()?;
515 encrypt_object(file_metadata, &mut encryptor, &mut sink, &aad)
516 }
517 Some(file_encryptor) if file_metadata.encryption_algorithm.is_some() => {
518 let aad = create_footer_aad(file_encryptor.file_aad())?;
519 let mut encryptor = file_encryptor.get_footer_encryptor()?;
520 write_signed_plaintext_object(file_metadata, &mut encryptor, &mut sink, &aad)
521 }
522 _ => Self::write_object(file_metadata, &mut sink),
523 }
524 }
525
526 fn write_offset_index(
528 &self,
529 offset_index: &OffsetIndex,
530 column_chunk: &ColumnChunk,
531 row_group_idx: usize,
532 column_idx: usize,
533 sink: impl Write,
534 ) -> Result<()> {
535 match &self.file_encryptor {
536 Some(file_encryptor) => Self::write_object_with_encryption(
537 offset_index,
538 sink,
539 file_encryptor,
540 column_chunk,
541 ModuleType::OffsetIndex,
542 row_group_idx,
543 column_idx,
544 ),
545 None => Self::write_object(offset_index, sink),
546 }
547 }
548
549 fn write_column_index(
551 &self,
552 column_index: &ColumnIndex,
553 column_chunk: &ColumnChunk,
554 row_group_idx: usize,
555 column_idx: usize,
556 sink: impl Write,
557 ) -> Result<()> {
558 match &self.file_encryptor {
559 Some(file_encryptor) => Self::write_object_with_encryption(
560 column_index,
561 sink,
562 file_encryptor,
563 column_chunk,
564 ModuleType::ColumnIndex,
565 row_group_idx,
566 column_idx,
567 ),
568 None => Self::write_object(column_index, sink),
569 }
570 }
571
572 fn apply_row_group_encryption(
576 &self,
577 row_groups: Vec<RowGroup>,
578 ) -> Result<(Vec<RowGroup>, Option<Vec<RowGroup>>)> {
579 match &self.file_encryptor {
580 Some(file_encryptor) => {
581 let unencrypted_row_groups = row_groups.clone();
582 let encrypted_row_groups = Self::encrypt_row_groups(row_groups, file_encryptor)?;
583 Ok((encrypted_row_groups, Some(unencrypted_row_groups)))
584 }
585 None => Ok((row_groups, None)),
586 }
587 }
588
589 fn get_file_magic(&self) -> &[u8; 4] {
591 get_file_magic(
592 self.file_encryptor
593 .as_ref()
594 .map(|encryptor| encryptor.properties()),
595 )
596 }
597
598 fn write_object_with_encryption(
599 object: &impl TSerializable,
600 mut sink: impl Write,
601 file_encryptor: &FileEncryptor,
602 column_metadata: &ColumnChunk,
603 module_type: ModuleType,
604 row_group_index: usize,
605 column_index: usize,
606 ) -> Result<()> {
607 let column_path_vec = &column_metadata
608 .meta_data
609 .as_ref()
610 .ok_or_else(|| {
611 general_err!(
612 "Column metadata not set for column {} when encrypting object",
613 column_index
614 )
615 })?
616 .path_in_schema;
617
618 let joined_column_path;
619 let column_path = if column_path_vec.len() == 1 {
620 &column_path_vec[0]
621 } else {
622 joined_column_path = column_path_vec.join(".");
623 &joined_column_path
624 };
625
626 if file_encryptor.is_column_encrypted(column_path) {
627 let aad = create_module_aad(
628 file_encryptor.file_aad(),
629 module_type,
630 row_group_index,
631 column_index,
632 None,
633 )?;
634 let mut encryptor = file_encryptor.get_column_encryptor(column_path)?;
635 encrypt_object(object, &mut encryptor, &mut sink, &aad)
636 } else {
637 Self::write_object(object, sink)
638 }
639 }
640
641 fn get_plaintext_footer_crypto_metadata(
642 &self,
643 ) -> (Option<EncryptionAlgorithm>, Option<Vec<u8>>) {
644 if let Some(file_encryptor) = self.file_encryptor.as_ref() {
646 let encryption_properties = file_encryptor.properties();
647 if !encryption_properties.encrypt_footer() {
648 return (
649 Some(Self::encryption_algorithm_from_encryptor(file_encryptor)),
650 encryption_properties.footer_key_metadata().cloned(),
651 );
652 }
653 }
654 (None, None)
655 }
656
657 fn encryption_algorithm_from_encryptor(file_encryptor: &FileEncryptor) -> EncryptionAlgorithm {
658 let supply_aad_prefix = file_encryptor
659 .properties()
660 .aad_prefix()
661 .map(|_| !file_encryptor.properties().store_aad_prefix());
662 let aad_prefix = if file_encryptor.properties().store_aad_prefix() {
663 file_encryptor.properties().aad_prefix().cloned()
664 } else {
665 None
666 };
667 EncryptionAlgorithm::AESGCMV1(AesGcmV1 {
668 aad_prefix,
669 aad_file_unique: Some(file_encryptor.aad_file_unique().clone()),
670 supply_aad_prefix,
671 })
672 }
673
674 fn file_crypto_metadata(
675 file_encryptor: &FileEncryptor,
676 ) -> Result<crate::format::FileCryptoMetaData> {
677 let properties = file_encryptor.properties();
678 Ok(crate::format::FileCryptoMetaData {
679 encryption_algorithm: Self::encryption_algorithm_from_encryptor(file_encryptor),
680 key_metadata: properties.footer_key_metadata().cloned(),
681 })
682 }
683
684 fn encrypt_row_groups(
685 row_groups: Vec<RowGroup>,
686 file_encryptor: &Arc<FileEncryptor>,
687 ) -> Result<Vec<RowGroup>> {
688 row_groups
689 .into_iter()
690 .enumerate()
691 .map(|(rg_idx, mut rg)| {
692 let cols: Result<Vec<ColumnChunk>> = rg
693 .columns
694 .into_iter()
695 .enumerate()
696 .map(|(col_idx, c)| {
697 Self::encrypt_column_chunk(c, file_encryptor, rg_idx, col_idx)
698 })
699 .collect();
700 rg.columns = cols?;
701 Ok(rg)
702 })
703 .collect()
704 }
705
706 fn encrypt_column_chunk(
708 mut column_chunk: ColumnChunk,
709 file_encryptor: &Arc<FileEncryptor>,
710 row_group_index: usize,
711 column_index: usize,
712 ) -> Result<ColumnChunk> {
713 match column_chunk.crypto_metadata.as_ref() {
716 None => {}
717 Some(ColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) => {
718 }
721 Some(ColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(col_key)) => {
722 let column_path = col_key.path_in_schema.join(".");
723 let mut column_encryptor = file_encryptor.get_column_encryptor(&column_path)?;
724 let meta_data = column_chunk
725 .meta_data
726 .take()
727 .ok_or_else(|| general_err!("Column metadata not set for encryption"))?;
728 let aad = create_module_aad(
729 file_encryptor.file_aad(),
730 ModuleType::ColumnMetaData,
731 row_group_index,
732 column_index,
733 None,
734 )?;
735 let ciphertext = encrypt_object_to_vec(&meta_data, &mut column_encryptor, &aad)?;
736
737 column_chunk.encrypted_column_metadata = Some(ciphertext);
738 debug_assert!(column_chunk.meta_data.is_none());
739 }
740 }
741
742 Ok(column_chunk)
743 }
744}