1use crate::file::metadata::thrift_gen::{EncryptionAlgorithm, FileMeta};
19use crate::file::metadata::{
20 ColumnChunkMetaData, ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData,
21};
22use crate::schema::types::{SchemaDescPtr, SchemaDescriptor};
23use crate::{
24 basic::ColumnOrder,
25 file::metadata::{FileMetaData, ParquetMetaDataBuilder},
26};
27#[cfg(feature = "encryption")]
28use crate::{
29 encryption::{
30 encrypt::{FileEncryptor, encrypt_thrift_object, write_signed_plaintext_thrift_object},
31 modules::{ModuleType, create_footer_aad, create_module_aad},
32 },
33 file::column_crypto_metadata::ColumnCryptoMetaData,
34 file::metadata::thrift_gen::{AesGcmV1, FileCryptoMetaData},
35};
36use crate::{errors::Result, file::page_index::column_index::ColumnIndexMetaData};
37
38use crate::{
39 file::writer::{TrackedWrite, get_file_magic},
40 parquet_thrift::WriteThrift,
41};
42use crate::{
43 file::{
44 metadata::{KeyValue, ParquetMetaData},
45 page_index::offset_index::OffsetIndexMetaData,
46 },
47 parquet_thrift::ThriftCompactOutputProtocol,
48};
49use std::io::Write;
50use std::sync::Arc;
51
52pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
56 buf: &'a mut TrackedWrite<W>,
57 schema_descr: &'a SchemaDescPtr,
58 row_groups: Vec<RowGroupMetaData>,
59 column_indexes: Option<Vec<Vec<Option<ColumnIndexMetaData>>>>,
60 offset_indexes: Option<Vec<Vec<Option<OffsetIndexMetaData>>>>,
61 key_value_metadata: Option<Vec<KeyValue>>,
62 created_by: Option<String>,
63 object_writer: MetadataObjectWriter,
64 writer_version: i32,
65}
66
67impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
68 fn write_offset_indexes(
74 &mut self,
75 offset_indexes: &[Vec<Option<OffsetIndexMetaData>>],
76 ) -> Result<()> {
77 for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
81 for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
82 if let Some(offset_index) = &offset_indexes[row_group_idx][column_idx] {
83 let start_offset = self.buf.bytes_written();
84 self.object_writer.write_offset_index(
85 offset_index,
86 column_metadata,
87 row_group_idx,
88 column_idx,
89 &mut self.buf,
90 )?;
91 let end_offset = self.buf.bytes_written();
92 column_metadata.offset_index_offset = Some(start_offset as i64);
94 column_metadata.offset_index_length = Some((end_offset - start_offset) as i32);
95 }
96 }
97 }
98 Ok(())
99 }
100
101 fn write_column_indexes(
107 &mut self,
108 column_indexes: &[Vec<Option<ColumnIndexMetaData>>],
109 ) -> Result<()> {
110 for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
114 for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
115 if let Some(column_index) = &column_indexes[row_group_idx][column_idx] {
116 let start_offset = self.buf.bytes_written();
117 self.object_writer.write_column_index(
118 column_index,
119 column_metadata,
120 row_group_idx,
121 column_idx,
122 &mut self.buf,
123 )?;
124 let end_offset = self.buf.bytes_written();
125 column_metadata.column_index_offset = Some(start_offset as i64);
127 column_metadata.column_index_length = Some((end_offset - start_offset) as i32);
128 }
129 }
130 }
131 Ok(())
132 }
133
134 pub fn finish(mut self) -> Result<ParquetMetaData> {
136 let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
137
138 let column_indexes = std::mem::take(&mut self.column_indexes);
139 let offset_indexes = std::mem::take(&mut self.offset_indexes);
140
141 if let Some(column_indexes) = column_indexes.as_ref() {
143 self.write_column_indexes(column_indexes)?;
144 }
145 if let Some(offset_indexes) = offset_indexes.as_ref() {
146 self.write_offset_indexes(offset_indexes)?;
147 }
148
149 let column_orders = self
155 .schema_descr
156 .columns()
157 .iter()
158 .map(|col| {
159 let sort_order = ColumnOrder::get_sort_order(
160 col.logical_type(),
161 col.converted_type(),
162 col.physical_type(),
163 );
164 ColumnOrder::TYPE_DEFINED_ORDER(sort_order)
165 })
166 .collect();
167
168 let column_orders = Some(column_orders);
172
173 let (row_groups, unencrypted_row_groups) = self
174 .object_writer
175 .apply_row_group_encryption(self.row_groups)?;
176
177 let (encryption_algorithm, footer_signing_key_metadata) =
178 self.object_writer.get_plaintext_footer_crypto_metadata();
179
180 let file_metadata = FileMetaData::new(
181 self.writer_version,
182 num_rows,
183 self.created_by,
184 self.key_value_metadata,
185 self.schema_descr.clone(),
186 column_orders,
187 );
188
189 let file_meta = FileMeta {
190 file_metadata: &file_metadata,
191 row_groups: &row_groups,
192 encryption_algorithm,
193 footer_signing_key_metadata,
194 };
195
196 let start_pos = self.buf.bytes_written();
198 self.object_writer
199 .write_file_metadata(&file_meta, &mut self.buf)?;
200 let end_pos = self.buf.bytes_written();
201
202 let metadata_len = (end_pos - start_pos) as u32;
204
205 self.buf.write_all(&metadata_len.to_le_bytes())?;
206 self.buf.write_all(self.object_writer.get_file_magic())?;
207
208 let mut builder = ParquetMetaDataBuilder::new(file_metadata);
213
214 builder = match unencrypted_row_groups {
215 Some(rg) => builder.set_row_groups(rg),
216 None => builder.set_row_groups(row_groups),
217 };
218
219 let column_indexes: Option<ParquetColumnIndex> = column_indexes.map(|ovvi| {
220 ovvi.into_iter()
221 .map(|vi| {
222 vi.into_iter()
223 .map(|oi| oi.unwrap_or(ColumnIndexMetaData::NONE))
224 .collect()
225 })
226 .collect()
227 });
228
229 let offset_indexes: Option<ParquetOffsetIndex> = offset_indexes.map(|ovvi| {
231 ovvi.into_iter()
232 .map(|vi| vi.into_iter().map(|oi| oi.unwrap()).collect())
233 .collect()
234 });
235
236 builder = builder.set_column_index(column_indexes);
237 builder = builder.set_offset_index(offset_indexes);
238
239 Ok(builder.build())
240 }
241
242 pub fn new(
243 buf: &'a mut TrackedWrite<W>,
244 schema_descr: &'a SchemaDescPtr,
245 row_groups: Vec<RowGroupMetaData>,
246 created_by: Option<String>,
247 writer_version: i32,
248 ) -> Self {
249 Self {
250 buf,
251 schema_descr,
252 row_groups,
253 column_indexes: None,
254 offset_indexes: None,
255 key_value_metadata: None,
256 created_by,
257 object_writer: Default::default(),
258 writer_version,
259 }
260 }
261
262 pub fn with_column_indexes(
263 mut self,
264 column_indexes: Vec<Vec<Option<ColumnIndexMetaData>>>,
265 ) -> Self {
266 self.column_indexes = Some(column_indexes);
267 self
268 }
269
270 pub fn with_offset_indexes(
271 mut self,
272 offset_indexes: Vec<Vec<Option<OffsetIndexMetaData>>>,
273 ) -> Self {
274 self.offset_indexes = Some(offset_indexes);
275 self
276 }
277
278 pub fn with_key_value_metadata(mut self, key_value_metadata: Vec<KeyValue>) -> Self {
279 self.key_value_metadata = Some(key_value_metadata);
280 self
281 }
282
283 #[cfg(feature = "encryption")]
284 pub fn with_file_encryptor(mut self, file_encryptor: Option<Arc<FileEncryptor>>) -> Self {
285 self.object_writer = self.object_writer.with_file_encryptor(file_encryptor);
286 self
287 }
288}
289
290pub struct ParquetMetaDataWriter<'a, W: Write> {
368 buf: TrackedWrite<W>,
369 metadata: &'a ParquetMetaData,
370}
371
372impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
373 pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
381 Self::new_with_tracked(TrackedWrite::new(buf), metadata)
382 }
383
384 pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
391 Self { buf, metadata }
392 }
393
394 pub fn finish(mut self) -> Result<()> {
396 let file_metadata = self.metadata.file_metadata();
397
398 let schema = Arc::new(file_metadata.schema().clone());
399 let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
400 let created_by = file_metadata.created_by().map(str::to_string);
401
402 let row_groups = self.metadata.row_groups.clone();
403
404 let key_value_metadata = file_metadata.key_value_metadata().cloned();
405
406 let column_indexes = self.convert_column_indexes();
407 let offset_indexes = self.convert_offset_index();
408
409 let mut encoder = ThriftMetadataWriter::new(
410 &mut self.buf,
411 &schema_descr,
412 row_groups,
413 created_by,
414 file_metadata.version(),
415 );
416
417 if let Some(column_indexes) = column_indexes {
418 encoder = encoder.with_column_indexes(column_indexes);
419 }
420
421 if let Some(offset_indexes) = offset_indexes {
422 encoder = encoder.with_offset_indexes(offset_indexes);
423 }
424
425 if let Some(key_value_metadata) = key_value_metadata {
426 encoder = encoder.with_key_value_metadata(key_value_metadata);
427 }
428 encoder.finish()?;
429
430 Ok(())
431 }
432
433 fn convert_column_indexes(&self) -> Option<Vec<Vec<Option<ColumnIndexMetaData>>>> {
434 self.metadata
437 .column_index()
438 .map(|row_group_column_indexes| {
439 (0..self.metadata.row_groups().len())
440 .map(|rg_idx| {
441 let column_indexes = &row_group_column_indexes[rg_idx];
442 column_indexes
443 .iter()
444 .map(|column_index| Some(column_index.clone()))
445 .collect()
446 })
447 .collect()
448 })
449 }
450
451 fn convert_offset_index(&self) -> Option<Vec<Vec<Option<OffsetIndexMetaData>>>> {
452 self.metadata
453 .offset_index()
454 .map(|row_group_offset_indexes| {
455 (0..self.metadata.row_groups().len())
456 .map(|rg_idx| {
457 let offset_indexes = &row_group_offset_indexes[rg_idx];
458 offset_indexes
459 .iter()
460 .map(|offset_index| Some(offset_index.clone()))
461 .collect()
462 })
463 .collect()
464 })
465 }
466}
467
468#[derive(Debug, Default)]
469struct MetadataObjectWriter {
470 #[cfg(feature = "encryption")]
471 file_encryptor: Option<Arc<FileEncryptor>>,
472}
473
474impl MetadataObjectWriter {
475 #[inline]
476 fn write_thrift_object(object: &impl WriteThrift, sink: impl Write) -> Result<()> {
477 let mut protocol = ThriftCompactOutputProtocol::new(sink);
478 object.write_thrift(&mut protocol)?;
479 Ok(())
480 }
481}
482
483#[cfg(not(feature = "encryption"))]
485impl MetadataObjectWriter {
486 fn write_file_metadata(&self, file_metadata: &FileMeta, sink: impl Write) -> Result<()> {
488 Self::write_thrift_object(file_metadata, sink)
489 }
490
491 fn write_offset_index(
493 &self,
494 offset_index: &OffsetIndexMetaData,
495 _column_chunk: &ColumnChunkMetaData,
496 _row_group_idx: usize,
497 _column_idx: usize,
498 sink: impl Write,
499 ) -> Result<()> {
500 Self::write_thrift_object(offset_index, sink)
501 }
502
503 fn write_column_index(
505 &self,
506 column_index: &ColumnIndexMetaData,
507 _column_chunk: &ColumnChunkMetaData,
508 _row_group_idx: usize,
509 _column_idx: usize,
510 sink: impl Write,
511 ) -> Result<()> {
512 Self::write_thrift_object(column_index, sink)
513 }
514
515 fn apply_row_group_encryption(
517 &self,
518 row_groups: Vec<RowGroupMetaData>,
519 ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
520 Ok((row_groups, None))
521 }
522
523 pub fn get_file_magic(&self) -> &[u8; 4] {
525 get_file_magic()
526 }
527
528 fn get_plaintext_footer_crypto_metadata(
529 &self,
530 ) -> (Option<EncryptionAlgorithm>, Option<Vec<u8>>) {
531 (None, None)
532 }
533}
534
535#[cfg(feature = "encryption")]
537impl MetadataObjectWriter {
538 fn with_file_encryptor(mut self, encryptor: Option<Arc<FileEncryptor>>) -> Self {
540 self.file_encryptor = encryptor;
541 self
542 }
543
544 fn write_file_metadata(&self, file_metadata: &FileMeta, mut sink: impl Write) -> Result<()> {
548 match self.file_encryptor.as_ref() {
549 Some(file_encryptor) if file_encryptor.properties().encrypt_footer() => {
550 let crypto_metadata = Self::file_crypto_metadata(file_encryptor)?;
552 let mut protocol = ThriftCompactOutputProtocol::new(&mut sink);
553 crypto_metadata.write_thrift(&mut protocol)?;
554
555 let aad = create_footer_aad(file_encryptor.file_aad())?;
557 let mut encryptor = file_encryptor.get_footer_encryptor()?;
558 encrypt_thrift_object(file_metadata, &mut encryptor, &mut sink, &aad)
559 }
560 Some(file_encryptor) if file_metadata.encryption_algorithm.is_some() => {
561 let aad = create_footer_aad(file_encryptor.file_aad())?;
562 let mut encryptor = file_encryptor.get_footer_encryptor()?;
563 write_signed_plaintext_thrift_object(file_metadata, &mut encryptor, &mut sink, &aad)
564 }
565 _ => Self::write_thrift_object(file_metadata, &mut sink),
566 }
567 }
568
569 fn write_offset_index(
573 &self,
574 offset_index: &OffsetIndexMetaData,
575 column_chunk: &ColumnChunkMetaData,
576 row_group_idx: usize,
577 column_idx: usize,
578 sink: impl Write,
579 ) -> Result<()> {
580 match &self.file_encryptor {
581 Some(file_encryptor) => Self::write_thrift_object_with_encryption(
582 offset_index,
583 sink,
584 file_encryptor,
585 column_chunk,
586 ModuleType::OffsetIndex,
587 row_group_idx,
588 column_idx,
589 ),
590 None => Self::write_thrift_object(offset_index, sink),
591 }
592 }
593
594 fn write_column_index(
598 &self,
599 column_index: &ColumnIndexMetaData,
600 column_chunk: &ColumnChunkMetaData,
601 row_group_idx: usize,
602 column_idx: usize,
603 sink: impl Write,
604 ) -> Result<()> {
605 match &self.file_encryptor {
606 Some(file_encryptor) => Self::write_thrift_object_with_encryption(
607 column_index,
608 sink,
609 file_encryptor,
610 column_chunk,
611 ModuleType::ColumnIndex,
612 row_group_idx,
613 column_idx,
614 ),
615 None => Self::write_thrift_object(column_index, sink),
616 }
617 }
618
619 fn apply_row_group_encryption(
623 &self,
624 row_groups: Vec<RowGroupMetaData>,
625 ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
626 match &self.file_encryptor {
627 Some(file_encryptor) => {
628 let unencrypted_row_groups = row_groups.clone();
629 let encrypted_row_groups = Self::encrypt_row_groups(row_groups, file_encryptor)?;
630 Ok((encrypted_row_groups, Some(unencrypted_row_groups)))
631 }
632 None => Ok((row_groups, None)),
633 }
634 }
635
636 fn get_file_magic(&self) -> &[u8; 4] {
638 get_file_magic(
639 self.file_encryptor
640 .as_ref()
641 .map(|encryptor| encryptor.properties()),
642 )
643 }
644
645 fn write_thrift_object_with_encryption(
646 object: &impl WriteThrift,
647 mut sink: impl Write,
648 file_encryptor: &FileEncryptor,
649 column_metadata: &ColumnChunkMetaData,
650 module_type: ModuleType,
651 row_group_index: usize,
652 column_index: usize,
653 ) -> Result<()> {
654 let column_path_vec = column_metadata.column_path().as_ref();
655
656 let joined_column_path;
657 let column_path = if column_path_vec.len() == 1 {
658 &column_path_vec[0]
659 } else {
660 joined_column_path = column_path_vec.join(".");
661 &joined_column_path
662 };
663
664 if file_encryptor.is_column_encrypted(column_path) {
665 use crate::encryption::encrypt::encrypt_thrift_object;
666
667 let aad = create_module_aad(
668 file_encryptor.file_aad(),
669 module_type,
670 row_group_index,
671 column_index,
672 None,
673 )?;
674 let mut encryptor = file_encryptor.get_column_encryptor(column_path)?;
675 encrypt_thrift_object(object, &mut encryptor, &mut sink, &aad)
676 } else {
677 Self::write_thrift_object(object, sink)
678 }
679 }
680
681 fn get_plaintext_footer_crypto_metadata(
682 &self,
683 ) -> (Option<EncryptionAlgorithm>, Option<Vec<u8>>) {
684 if let Some(file_encryptor) = self.file_encryptor.as_ref() {
686 let encryption_properties = file_encryptor.properties();
687 if !encryption_properties.encrypt_footer() {
688 return (
689 Some(Self::encryption_algorithm_from_encryptor(file_encryptor)),
690 encryption_properties.footer_key_metadata().cloned(),
691 );
692 }
693 }
694 (None, None)
695 }
696
697 fn encryption_algorithm_from_encryptor(file_encryptor: &FileEncryptor) -> EncryptionAlgorithm {
698 let supply_aad_prefix = file_encryptor
699 .properties()
700 .aad_prefix()
701 .map(|_| !file_encryptor.properties().store_aad_prefix());
702 let aad_prefix = if file_encryptor.properties().store_aad_prefix() {
703 file_encryptor.properties().aad_prefix()
704 } else {
705 None
706 };
707 EncryptionAlgorithm::AES_GCM_V1(AesGcmV1 {
708 aad_prefix: aad_prefix.cloned(),
709 aad_file_unique: Some(file_encryptor.aad_file_unique().clone()),
710 supply_aad_prefix,
711 })
712 }
713
714 fn file_crypto_metadata(file_encryptor: &'_ FileEncryptor) -> Result<FileCryptoMetaData<'_>> {
715 let properties = file_encryptor.properties();
716 Ok(FileCryptoMetaData {
717 encryption_algorithm: Self::encryption_algorithm_from_encryptor(file_encryptor),
718 key_metadata: properties.footer_key_metadata().map(|v| v.as_slice()),
719 })
720 }
721
722 fn encrypt_row_groups(
723 row_groups: Vec<RowGroupMetaData>,
724 file_encryptor: &Arc<FileEncryptor>,
725 ) -> Result<Vec<RowGroupMetaData>> {
726 row_groups
727 .into_iter()
728 .enumerate()
729 .map(|(rg_idx, mut rg)| {
730 let cols: Result<Vec<ColumnChunkMetaData>> = rg
731 .columns
732 .into_iter()
733 .enumerate()
734 .map(|(col_idx, c)| {
735 Self::encrypt_column_chunk(c, file_encryptor, rg_idx, col_idx)
736 })
737 .collect();
738 rg.columns = cols?;
739 Ok(rg)
740 })
741 .collect()
742 }
743
744 fn encrypt_column_chunk(
746 mut column_chunk: ColumnChunkMetaData,
747 file_encryptor: &Arc<FileEncryptor>,
748 row_group_index: usize,
749 column_index: usize,
750 ) -> Result<ColumnChunkMetaData> {
751 match column_chunk.column_crypto_metadata.as_ref() {
754 None => {}
755 Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
756 }
759 Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(col_key)) => {
760 use crate::file::metadata::thrift_gen::serialize_column_meta_data;
761
762 let column_path = col_key.path_in_schema.join(".");
763 let mut column_encryptor = file_encryptor.get_column_encryptor(&column_path)?;
764 let aad = create_module_aad(
765 file_encryptor.file_aad(),
766 ModuleType::ColumnMetaData,
767 row_group_index,
768 column_index,
769 None,
770 )?;
771 let mut buffer: Vec<u8> = vec![];
773 {
774 let mut prot = ThriftCompactOutputProtocol::new(&mut buffer);
775 serialize_column_meta_data(&column_chunk, &mut prot)?;
776 }
777 let ciphertext = column_encryptor.encrypt(&buffer, &aad)?;
778
779 column_chunk.encrypted_column_metadata = Some(ciphertext);
780 }
781 }
782
783 Ok(column_chunk)
784 }
785}