1use crate::file::metadata::thrift::FileMeta;
19use crate::file::metadata::{
20 ColumnChunkMetaData, ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData,
21};
22use crate::schema::types::{SchemaDescPtr, SchemaDescriptor};
23use crate::{
24 basic::ColumnOrder,
25 file::metadata::{FileMetaData, ParquetMetaDataBuilder},
26};
27#[cfg(feature = "encryption")]
28use crate::{
29 encryption::{
30 encrypt::{FileEncryptor, encrypt_thrift_object, write_signed_plaintext_thrift_object},
31 modules::{ModuleType, create_footer_aad, create_module_aad},
32 },
33 file::column_crypto_metadata::ColumnCryptoMetaData,
34 file::metadata::thrift::encryption::{AesGcmV1, EncryptionAlgorithm, FileCryptoMetaData},
35};
36use crate::{errors::Result, file::page_index::column_index::ColumnIndexMetaData};
37
38use crate::{
39 file::writer::{TrackedWrite, get_file_magic},
40 parquet_thrift::WriteThrift,
41};
42use crate::{
43 file::{
44 metadata::{KeyValue, ParquetMetaData},
45 page_index::offset_index::OffsetIndexMetaData,
46 },
47 parquet_thrift::ThriftCompactOutputProtocol,
48};
49use std::io::Write;
50use std::sync::Arc;
51
52pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
56 buf: &'a mut TrackedWrite<W>,
57 schema_descr: &'a SchemaDescPtr,
58 row_groups: Vec<RowGroupMetaData>,
59 column_indexes: Option<Vec<Vec<Option<ColumnIndexMetaData>>>>,
60 offset_indexes: Option<Vec<Vec<Option<OffsetIndexMetaData>>>>,
61 key_value_metadata: Option<Vec<KeyValue>>,
62 created_by: Option<String>,
63 object_writer: MetadataObjectWriter,
64 writer_version: i32,
65}
66
67impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
68 fn write_offset_indexes(
74 &mut self,
75 offset_indexes: &[Vec<Option<OffsetIndexMetaData>>],
76 ) -> Result<()> {
77 for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
81 for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
82 if let Some(offset_index) = &offset_indexes[row_group_idx][column_idx] {
83 let start_offset = self.buf.bytes_written();
84 self.object_writer.write_offset_index(
85 offset_index,
86 column_metadata,
87 row_group_idx,
88 column_idx,
89 &mut self.buf,
90 )?;
91 let end_offset = self.buf.bytes_written();
92 column_metadata.offset_index_offset = Some(start_offset as i64);
94 column_metadata.offset_index_length = Some((end_offset - start_offset) as i32);
95 }
96 }
97 }
98 Ok(())
99 }
100
101 fn write_column_indexes(
107 &mut self,
108 column_indexes: &[Vec<Option<ColumnIndexMetaData>>],
109 ) -> Result<()> {
110 for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
114 for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
115 if let Some(column_index) = &column_indexes[row_group_idx][column_idx] {
116 let start_offset = self.buf.bytes_written();
117 self.object_writer.write_column_index(
118 column_index,
119 column_metadata,
120 row_group_idx,
121 column_idx,
122 &mut self.buf,
123 )?;
124 let end_offset = self.buf.bytes_written();
125 column_metadata.column_index_offset = Some(start_offset as i64);
127 column_metadata.column_index_length = Some((end_offset - start_offset) as i32);
128 }
129 }
130 }
131 Ok(())
132 }
133
134 pub fn finish(mut self) -> Result<ParquetMetaData> {
136 let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
137
138 let column_indexes = std::mem::take(&mut self.column_indexes);
139 let offset_indexes = std::mem::take(&mut self.offset_indexes);
140
141 if let Some(column_indexes) = column_indexes.as_ref() {
143 self.write_column_indexes(column_indexes)?;
144 }
145 if let Some(offset_indexes) = offset_indexes.as_ref() {
146 self.write_offset_indexes(offset_indexes)?;
147 }
148
149 let column_orders = self
155 .schema_descr
156 .columns()
157 .iter()
158 .map(|col| {
159 let sort_order = ColumnOrder::get_sort_order(
160 col.logical_type(),
161 col.converted_type(),
162 col.physical_type(),
163 );
164 ColumnOrder::TYPE_DEFINED_ORDER(sort_order)
165 })
166 .collect();
167
168 let column_orders = Some(column_orders);
172
173 let (row_groups, unencrypted_row_groups) = self
174 .object_writer
175 .apply_row_group_encryption(self.row_groups)?;
176
177 #[cfg(feature = "encryption")]
178 let (encryption_algorithm, footer_signing_key_metadata) =
179 self.object_writer.get_plaintext_footer_crypto_metadata();
180 #[cfg(feature = "encryption")]
181 let file_metadata = FileMetaData::new(
182 self.writer_version,
183 num_rows,
184 self.created_by,
185 self.key_value_metadata,
186 self.schema_descr.clone(),
187 column_orders,
188 )
189 .with_encryption_algorithm(encryption_algorithm)
190 .with_footer_signing_key_metadata(footer_signing_key_metadata);
191
192 #[cfg(not(feature = "encryption"))]
193 let file_metadata = FileMetaData::new(
194 self.writer_version,
195 num_rows,
196 self.created_by,
197 self.key_value_metadata,
198 self.schema_descr.clone(),
199 column_orders,
200 );
201
202 let file_meta = FileMeta {
203 file_metadata: &file_metadata,
204 row_groups: &row_groups,
205 };
206
207 let start_pos = self.buf.bytes_written();
209 self.object_writer
210 .write_file_metadata(&file_meta, &mut self.buf)?;
211 let end_pos = self.buf.bytes_written();
212
213 let metadata_len = (end_pos - start_pos) as u32;
215
216 self.buf.write_all(&metadata_len.to_le_bytes())?;
217 self.buf.write_all(self.object_writer.get_file_magic())?;
218
219 let mut builder = ParquetMetaDataBuilder::new(file_metadata);
224
225 builder = match unencrypted_row_groups {
226 Some(rg) => builder.set_row_groups(rg),
227 None => builder.set_row_groups(row_groups),
228 };
229
230 let column_indexes: Option<ParquetColumnIndex> = column_indexes.map(|ovvi| {
231 ovvi.into_iter()
232 .map(|vi| {
233 vi.into_iter()
234 .map(|oi| oi.unwrap_or(ColumnIndexMetaData::NONE))
235 .collect()
236 })
237 .collect()
238 });
239
240 let offset_indexes: Option<ParquetOffsetIndex> = offset_indexes.map(|ovvi| {
242 ovvi.into_iter()
243 .map(|vi| vi.into_iter().map(|oi| oi.unwrap()).collect())
244 .collect()
245 });
246
247 builder = builder.set_column_index(column_indexes);
248 builder = builder.set_offset_index(offset_indexes);
249
250 Ok(builder.build())
251 }
252
253 pub fn new(
254 buf: &'a mut TrackedWrite<W>,
255 schema_descr: &'a SchemaDescPtr,
256 row_groups: Vec<RowGroupMetaData>,
257 created_by: Option<String>,
258 writer_version: i32,
259 ) -> Self {
260 Self {
261 buf,
262 schema_descr,
263 row_groups,
264 column_indexes: None,
265 offset_indexes: None,
266 key_value_metadata: None,
267 created_by,
268 object_writer: Default::default(),
269 writer_version,
270 }
271 }
272
273 pub fn with_column_indexes(
274 mut self,
275 column_indexes: Vec<Vec<Option<ColumnIndexMetaData>>>,
276 ) -> Self {
277 self.column_indexes = Some(column_indexes);
278 self
279 }
280
281 pub fn with_offset_indexes(
282 mut self,
283 offset_indexes: Vec<Vec<Option<OffsetIndexMetaData>>>,
284 ) -> Self {
285 self.offset_indexes = Some(offset_indexes);
286 self
287 }
288
289 pub fn with_key_value_metadata(mut self, key_value_metadata: Vec<KeyValue>) -> Self {
290 self.key_value_metadata = Some(key_value_metadata);
291 self
292 }
293
294 #[cfg(feature = "encryption")]
295 pub fn with_file_encryptor(mut self, file_encryptor: Option<Arc<FileEncryptor>>) -> Self {
296 self.object_writer = self.object_writer.with_file_encryptor(file_encryptor);
297 self
298 }
299}
300
301pub struct ParquetMetaDataWriter<'a, W: Write> {
379 buf: TrackedWrite<W>,
380 metadata: &'a ParquetMetaData,
381}
382
383impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
384 pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
392 Self::new_with_tracked(TrackedWrite::new(buf), metadata)
393 }
394
395 pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
402 Self { buf, metadata }
403 }
404
405 pub fn finish(mut self) -> Result<()> {
407 let file_metadata = self.metadata.file_metadata();
408
409 let schema = Arc::new(file_metadata.schema().clone());
410 let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
411 let created_by = file_metadata.created_by().map(str::to_string);
412
413 let row_groups = self.metadata.row_groups.clone();
414
415 let key_value_metadata = file_metadata.key_value_metadata().cloned();
416
417 let column_indexes = self.convert_column_indexes();
418 let offset_indexes = self.convert_offset_index();
419
420 let mut encoder = ThriftMetadataWriter::new(
421 &mut self.buf,
422 &schema_descr,
423 row_groups,
424 created_by,
425 file_metadata.version(),
426 );
427
428 if let Some(column_indexes) = column_indexes {
429 encoder = encoder.with_column_indexes(column_indexes);
430 }
431
432 if let Some(offset_indexes) = offset_indexes {
433 encoder = encoder.with_offset_indexes(offset_indexes);
434 }
435
436 if let Some(key_value_metadata) = key_value_metadata {
437 encoder = encoder.with_key_value_metadata(key_value_metadata);
438 }
439 encoder.finish()?;
440
441 Ok(())
442 }
443
444 fn convert_column_indexes(&self) -> Option<Vec<Vec<Option<ColumnIndexMetaData>>>> {
445 self.metadata
448 .column_index()
449 .map(|row_group_column_indexes| {
450 (0..self.metadata.row_groups().len())
451 .map(|rg_idx| {
452 let column_indexes = &row_group_column_indexes[rg_idx];
453 column_indexes
454 .iter()
455 .map(|column_index| Some(column_index.clone()))
456 .collect()
457 })
458 .collect()
459 })
460 }
461
462 fn convert_offset_index(&self) -> Option<Vec<Vec<Option<OffsetIndexMetaData>>>> {
463 self.metadata
464 .offset_index()
465 .map(|row_group_offset_indexes| {
466 (0..self.metadata.row_groups().len())
467 .map(|rg_idx| {
468 let offset_indexes = &row_group_offset_indexes[rg_idx];
469 offset_indexes
470 .iter()
471 .map(|offset_index| Some(offset_index.clone()))
472 .collect()
473 })
474 .collect()
475 })
476 }
477}
478
479#[derive(Debug, Default)]
480struct MetadataObjectWriter {
481 #[cfg(feature = "encryption")]
482 file_encryptor: Option<Arc<FileEncryptor>>,
483}
484
485impl MetadataObjectWriter {
486 #[inline]
487 fn write_thrift_object(object: &impl WriteThrift, sink: impl Write) -> Result<()> {
488 let mut protocol = ThriftCompactOutputProtocol::new(sink);
489 object.write_thrift(&mut protocol)?;
490 Ok(())
491 }
492}
493
494#[cfg(not(feature = "encryption"))]
496impl MetadataObjectWriter {
497 fn write_file_metadata(&self, file_metadata: &FileMeta, sink: impl Write) -> Result<()> {
499 Self::write_thrift_object(file_metadata, sink)
500 }
501
502 fn write_offset_index(
504 &self,
505 offset_index: &OffsetIndexMetaData,
506 _column_chunk: &ColumnChunkMetaData,
507 _row_group_idx: usize,
508 _column_idx: usize,
509 sink: impl Write,
510 ) -> Result<()> {
511 Self::write_thrift_object(offset_index, sink)
512 }
513
514 fn write_column_index(
516 &self,
517 column_index: &ColumnIndexMetaData,
518 _column_chunk: &ColumnChunkMetaData,
519 _row_group_idx: usize,
520 _column_idx: usize,
521 sink: impl Write,
522 ) -> Result<()> {
523 Self::write_thrift_object(column_index, sink)
524 }
525
526 fn apply_row_group_encryption(
528 &self,
529 row_groups: Vec<RowGroupMetaData>,
530 ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
531 Ok((row_groups, None))
532 }
533
534 pub fn get_file_magic(&self) -> &[u8; 4] {
536 get_file_magic()
537 }
538}
539
540#[cfg(feature = "encryption")]
542impl MetadataObjectWriter {
543 fn with_file_encryptor(mut self, encryptor: Option<Arc<FileEncryptor>>) -> Self {
545 self.file_encryptor = encryptor;
546 self
547 }
548
549 fn write_file_metadata(&self, file_metadata: &FileMeta, mut sink: impl Write) -> Result<()> {
553 match self.file_encryptor.as_ref() {
554 Some(file_encryptor) if file_encryptor.properties().encrypt_footer() => {
555 let crypto_metadata = Self::file_crypto_metadata(file_encryptor)?;
557 let mut protocol = ThriftCompactOutputProtocol::new(&mut sink);
558 crypto_metadata.write_thrift(&mut protocol)?;
559
560 let aad = create_footer_aad(file_encryptor.file_aad())?;
562 let mut encryptor = file_encryptor.get_footer_encryptor()?;
563 encrypt_thrift_object(file_metadata, &mut encryptor, &mut sink, &aad)
564 }
565 Some(file_encryptor) if file_metadata.file_metadata.encryption_algorithm.is_some() => {
566 let aad = create_footer_aad(file_encryptor.file_aad())?;
567 let mut encryptor = file_encryptor.get_footer_encryptor()?;
568 write_signed_plaintext_thrift_object(file_metadata, &mut encryptor, &mut sink, &aad)
569 }
570 _ => Self::write_thrift_object(file_metadata, &mut sink),
571 }
572 }
573
574 fn write_offset_index(
578 &self,
579 offset_index: &OffsetIndexMetaData,
580 column_chunk: &ColumnChunkMetaData,
581 row_group_idx: usize,
582 column_idx: usize,
583 sink: impl Write,
584 ) -> Result<()> {
585 match &self.file_encryptor {
586 Some(file_encryptor) => Self::write_thrift_object_with_encryption(
587 offset_index,
588 sink,
589 file_encryptor,
590 column_chunk,
591 ModuleType::OffsetIndex,
592 row_group_idx,
593 column_idx,
594 ),
595 None => Self::write_thrift_object(offset_index, sink),
596 }
597 }
598
599 fn write_column_index(
603 &self,
604 column_index: &ColumnIndexMetaData,
605 column_chunk: &ColumnChunkMetaData,
606 row_group_idx: usize,
607 column_idx: usize,
608 sink: impl Write,
609 ) -> Result<()> {
610 match &self.file_encryptor {
611 Some(file_encryptor) => Self::write_thrift_object_with_encryption(
612 column_index,
613 sink,
614 file_encryptor,
615 column_chunk,
616 ModuleType::ColumnIndex,
617 row_group_idx,
618 column_idx,
619 ),
620 None => Self::write_thrift_object(column_index, sink),
621 }
622 }
623
624 fn apply_row_group_encryption(
628 &self,
629 row_groups: Vec<RowGroupMetaData>,
630 ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
631 match &self.file_encryptor {
632 Some(file_encryptor) => {
633 let unencrypted_row_groups = row_groups.clone();
634 let encrypted_row_groups = Self::encrypt_row_groups(row_groups, file_encryptor)?;
635 Ok((encrypted_row_groups, Some(unencrypted_row_groups)))
636 }
637 None => Ok((row_groups, None)),
638 }
639 }
640
641 fn get_file_magic(&self) -> &[u8; 4] {
643 get_file_magic(
644 self.file_encryptor
645 .as_ref()
646 .map(|encryptor| encryptor.properties()),
647 )
648 }
649
650 fn write_thrift_object_with_encryption(
651 object: &impl WriteThrift,
652 mut sink: impl Write,
653 file_encryptor: &FileEncryptor,
654 column_metadata: &ColumnChunkMetaData,
655 module_type: ModuleType,
656 row_group_index: usize,
657 column_index: usize,
658 ) -> Result<()> {
659 let column_path_vec = column_metadata.column_path().as_ref();
660
661 let joined_column_path;
662 let column_path = if column_path_vec.len() == 1 {
663 &column_path_vec[0]
664 } else {
665 joined_column_path = column_path_vec.join(".");
666 &joined_column_path
667 };
668
669 if file_encryptor.is_column_encrypted(column_path) {
670 use crate::encryption::encrypt::encrypt_thrift_object;
671
672 let aad = create_module_aad(
673 file_encryptor.file_aad(),
674 module_type,
675 row_group_index,
676 column_index,
677 None,
678 )?;
679 let mut encryptor = file_encryptor.get_column_encryptor(column_path)?;
680 encrypt_thrift_object(object, &mut encryptor, &mut sink, &aad)
681 } else {
682 Self::write_thrift_object(object, sink)
683 }
684 }
685
686 fn get_plaintext_footer_crypto_metadata(
687 &self,
688 ) -> (Option<EncryptionAlgorithm>, Option<Vec<u8>>) {
689 if let Some(file_encryptor) = self.file_encryptor.as_ref() {
691 let encryption_properties = file_encryptor.properties();
692 if !encryption_properties.encrypt_footer() {
693 return (
694 Some(Self::encryption_algorithm_from_encryptor(file_encryptor)),
695 encryption_properties.footer_key_metadata().cloned(),
696 );
697 }
698 }
699 (None, None)
700 }
701
702 fn encryption_algorithm_from_encryptor(file_encryptor: &FileEncryptor) -> EncryptionAlgorithm {
703 let supply_aad_prefix = file_encryptor
704 .properties()
705 .aad_prefix()
706 .map(|_| !file_encryptor.properties().store_aad_prefix());
707 let aad_prefix = if file_encryptor.properties().store_aad_prefix() {
708 file_encryptor.properties().aad_prefix()
709 } else {
710 None
711 };
712 EncryptionAlgorithm::AES_GCM_V1(AesGcmV1 {
713 aad_prefix: aad_prefix.cloned(),
714 aad_file_unique: Some(file_encryptor.aad_file_unique().clone()),
715 supply_aad_prefix,
716 })
717 }
718
719 fn file_crypto_metadata(file_encryptor: &'_ FileEncryptor) -> Result<FileCryptoMetaData<'_>> {
720 let properties = file_encryptor.properties();
721 Ok(FileCryptoMetaData {
722 encryption_algorithm: Self::encryption_algorithm_from_encryptor(file_encryptor),
723 key_metadata: properties.footer_key_metadata().map(|v| v.as_slice()),
724 })
725 }
726
727 fn encrypt_row_groups(
728 row_groups: Vec<RowGroupMetaData>,
729 file_encryptor: &Arc<FileEncryptor>,
730 ) -> Result<Vec<RowGroupMetaData>> {
731 row_groups
732 .into_iter()
733 .enumerate()
734 .map(|(rg_idx, mut rg)| {
735 let cols: Result<Vec<ColumnChunkMetaData>> = rg
736 .columns
737 .into_iter()
738 .enumerate()
739 .map(|(col_idx, c)| {
740 Self::encrypt_column_chunk(c, file_encryptor, rg_idx, col_idx)
741 })
742 .collect();
743 rg.columns = cols?;
744 Ok(rg)
745 })
746 .collect()
747 }
748
749 fn encrypt_column_chunk(
751 mut column_chunk: ColumnChunkMetaData,
752 file_encryptor: &Arc<FileEncryptor>,
753 row_group_index: usize,
754 column_index: usize,
755 ) -> Result<ColumnChunkMetaData> {
756 match column_chunk.column_crypto_metadata.as_deref() {
759 None => {}
760 Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
761 }
764 Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(col_key)) => {
765 use crate::file::metadata::thrift::serialize_column_meta_data;
766
767 let column_path = col_key.path_in_schema.join(".");
768 let mut column_encryptor = file_encryptor.get_column_encryptor(&column_path)?;
769 let aad = create_module_aad(
770 file_encryptor.file_aad(),
771 ModuleType::ColumnMetaData,
772 row_group_index,
773 column_index,
774 None,
775 )?;
776 let mut buffer: Vec<u8> = vec![];
778 {
779 let mut prot = ThriftCompactOutputProtocol::new(&mut buffer);
780 serialize_column_meta_data(&column_chunk, &mut prot)?;
781 }
782 let ciphertext = column_encryptor.encrypt(&buffer, &aad)?;
783
784 column_chunk.encrypted_column_metadata = Some(ciphertext);
785 }
786 }
787
788 Ok(column_chunk)
789 }
790}