1use crate::file::metadata::thrift::FileMeta;
19use crate::file::metadata::{
20 ColumnChunkMetaData, ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData,
21};
22use crate::schema::types::{SchemaDescPtr, SchemaDescriptor};
23use crate::{
24 basic::ColumnOrder,
25 file::metadata::{FileMetaData, ParquetMetaDataBuilder},
26};
27#[cfg(feature = "encryption")]
28use crate::{
29 encryption::{
30 encrypt::{FileEncryptor, encrypt_thrift_object, write_signed_plaintext_thrift_object},
31 modules::{ModuleType, create_footer_aad, create_module_aad},
32 },
33 file::column_crypto_metadata::ColumnCryptoMetaData,
34 file::metadata::thrift::encryption::{AesGcmV1, EncryptionAlgorithm, FileCryptoMetaData},
35};
36use crate::{errors::Result, file::page_index::column_index::ColumnIndexMetaData};
37
38use crate::{
39 file::writer::{TrackedWrite, get_file_magic},
40 parquet_thrift::WriteThrift,
41};
42use crate::{
43 file::{
44 metadata::{KeyValue, ParquetMetaData},
45 page_index::offset_index::OffsetIndexMetaData,
46 },
47 parquet_thrift::ThriftCompactOutputProtocol,
48};
49use std::io::Write;
50use std::sync::Arc;
51
52pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
56 buf: &'a mut TrackedWrite<W>,
57 schema_descr: &'a SchemaDescPtr,
58 row_groups: Vec<RowGroupMetaData>,
59 column_indexes: Option<Vec<Vec<Option<ColumnIndexMetaData>>>>,
60 offset_indexes: Option<Vec<Vec<Option<OffsetIndexMetaData>>>>,
61 key_value_metadata: Option<Vec<KeyValue>>,
62 created_by: Option<String>,
63 object_writer: MetadataObjectWriter,
64 writer_version: i32,
65 write_path_in_schema: bool,
66}
67
68impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
69 fn write_offset_indexes(
75 &mut self,
76 offset_indexes: &[Vec<Option<OffsetIndexMetaData>>],
77 ) -> Result<()> {
78 for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
82 for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
83 if let Some(offset_index) = &offset_indexes[row_group_idx][column_idx] {
84 let start_offset = self.buf.bytes_written();
85 self.object_writer.write_offset_index(
86 offset_index,
87 column_metadata,
88 row_group_idx,
89 column_idx,
90 &mut self.buf,
91 )?;
92 let end_offset = self.buf.bytes_written();
93 column_metadata.offset_index_offset = Some(start_offset as i64);
95 column_metadata.offset_index_length = Some((end_offset - start_offset) as i32);
96 }
97 }
98 }
99 Ok(())
100 }
101
102 fn write_column_indexes(
108 &mut self,
109 column_indexes: &[Vec<Option<ColumnIndexMetaData>>],
110 ) -> Result<()> {
111 for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
115 for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
116 if let Some(column_index) = &column_indexes[row_group_idx][column_idx] {
117 let start_offset = self.buf.bytes_written();
118 if self.object_writer.write_column_index(
120 column_index,
121 column_metadata,
122 row_group_idx,
123 column_idx,
124 &mut self.buf,
125 )? {
126 let end_offset = self.buf.bytes_written();
127 column_metadata.column_index_offset = Some(start_offset as i64);
129 column_metadata.column_index_length =
130 Some((end_offset - start_offset) as i32);
131 }
132 }
133 }
134 }
135 Ok(())
136 }
137
138 fn finalize_column_indexes(&mut self) -> Result<Option<ParquetColumnIndex>> {
140 let column_indexes = std::mem::take(&mut self.column_indexes);
141
142 if let Some(column_indexes) = column_indexes.as_ref() {
144 self.write_column_indexes(column_indexes)?;
145 }
146
147 let all_none = column_indexes
149 .as_ref()
150 .is_some_and(|ci| ci.iter().all(|cii| cii.iter().all(|idx| idx.is_none())));
151
152 let column_indexes: Option<ParquetColumnIndex> = if all_none {
155 None
156 } else {
157 column_indexes.map(|ovvi| {
158 ovvi.into_iter()
159 .map(|vi| {
160 vi.into_iter()
161 .map(|ci| ci.unwrap_or(ColumnIndexMetaData::NONE))
162 .collect()
163 })
164 .collect()
165 })
166 };
167
168 Ok(column_indexes)
169 }
170
171 fn finalize_offset_indexes(&mut self) -> Result<Option<ParquetOffsetIndex>> {
173 let offset_indexes = std::mem::take(&mut self.offset_indexes);
174
175 if let Some(offset_indexes) = offset_indexes.as_ref() {
177 self.write_offset_indexes(offset_indexes)?;
178 }
179
180 let all_none = offset_indexes
182 .as_ref()
183 .is_some_and(|oi| oi.iter().all(|oii| oii.iter().all(|idx| idx.is_none())));
184
185 let offset_indexes: Option<ParquetOffsetIndex> = if all_none {
186 None
187 } else {
188 offset_indexes.map(|ovvi| {
190 ovvi.into_iter()
191 .map(|vi| vi.into_iter().map(|oi| oi.unwrap()).collect())
192 .collect()
193 })
194 };
195
196 Ok(offset_indexes)
197 }
198
199 pub fn finish(mut self) -> Result<ParquetMetaData> {
201 let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
202
203 let column_indexes = self.finalize_column_indexes()?;
205 let offset_indexes = self.finalize_offset_indexes()?;
206
207 let column_orders = self
213 .schema_descr
214 .columns()
215 .iter()
216 .map(|col| {
217 let sort_order = ColumnOrder::sort_order_for_type(
218 col.logical_type_ref(),
219 col.converted_type(),
220 col.physical_type(),
221 );
222 ColumnOrder::TYPE_DEFINED_ORDER(sort_order)
223 })
224 .collect();
225
226 let column_orders = Some(column_orders);
230
231 let (row_groups, unencrypted_row_groups) = self
232 .object_writer
233 .apply_row_group_encryption(self.row_groups)?;
234
235 #[cfg(feature = "encryption")]
236 let (encryption_algorithm, footer_signing_key_metadata) =
237 self.object_writer.get_plaintext_footer_crypto_metadata();
238 #[cfg(feature = "encryption")]
239 let file_metadata = FileMetaData::new(
240 self.writer_version,
241 num_rows,
242 self.created_by,
243 self.key_value_metadata,
244 self.schema_descr.clone(),
245 column_orders,
246 )
247 .with_encryption_algorithm(encryption_algorithm)
248 .with_footer_signing_key_metadata(footer_signing_key_metadata);
249
250 #[cfg(not(feature = "encryption"))]
251 let file_metadata = FileMetaData::new(
252 self.writer_version,
253 num_rows,
254 self.created_by,
255 self.key_value_metadata,
256 self.schema_descr.clone(),
257 column_orders,
258 );
259
260 let file_meta = FileMeta {
261 file_metadata: &file_metadata,
262 row_groups: &row_groups,
263 write_path_in_schema: self.write_path_in_schema,
264 };
265
266 let start_pos = self.buf.bytes_written();
268 self.object_writer
269 .write_file_metadata(&file_meta, &mut self.buf)?;
270 let end_pos = self.buf.bytes_written();
271
272 let metadata_len = (end_pos - start_pos) as u32;
274
275 self.buf.write_all(&metadata_len.to_le_bytes())?;
276 self.buf.write_all(self.object_writer.get_file_magic())?;
277
278 let builder = ParquetMetaDataBuilder::new(file_metadata)
283 .set_column_index(column_indexes)
284 .set_offset_index(offset_indexes);
285
286 Ok(match unencrypted_row_groups {
287 Some(rg) => builder.set_row_groups(rg).build(),
288 None => builder.set_row_groups(row_groups).build(),
289 })
290 }
291
292 pub fn new(
293 buf: &'a mut TrackedWrite<W>,
294 schema_descr: &'a SchemaDescPtr,
295 row_groups: Vec<RowGroupMetaData>,
296 created_by: Option<String>,
297 writer_version: i32,
298 write_path_in_schema: bool,
299 ) -> Self {
300 Self {
301 buf,
302 schema_descr,
303 row_groups,
304 column_indexes: None,
305 offset_indexes: None,
306 key_value_metadata: None,
307 created_by,
308 object_writer: Default::default(),
309 writer_version,
310 write_path_in_schema,
311 }
312 }
313
314 pub fn with_column_indexes(
315 mut self,
316 column_indexes: Vec<Vec<Option<ColumnIndexMetaData>>>,
317 ) -> Self {
318 self.column_indexes = Some(column_indexes);
319 self
320 }
321
322 pub fn with_offset_indexes(
323 mut self,
324 offset_indexes: Vec<Vec<Option<OffsetIndexMetaData>>>,
325 ) -> Self {
326 self.offset_indexes = Some(offset_indexes);
327 self
328 }
329
330 pub fn with_key_value_metadata(mut self, key_value_metadata: Vec<KeyValue>) -> Self {
331 self.key_value_metadata = Some(key_value_metadata);
332 self
333 }
334
335 #[cfg(feature = "encryption")]
336 pub fn with_file_encryptor(mut self, file_encryptor: Option<Arc<FileEncryptor>>) -> Self {
337 self.object_writer = self.object_writer.with_file_encryptor(file_encryptor);
338 self
339 }
340}
341
342pub struct ParquetMetaDataWriter<'a, W: Write> {
420 buf: TrackedWrite<W>,
421 metadata: &'a ParquetMetaData,
422 write_path_in_schema: bool,
423}
424
425impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
426 pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
434 Self::new_with_tracked(TrackedWrite::new(buf), metadata)
435 }
436
437 pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
444 Self {
445 buf,
446 metadata,
447 write_path_in_schema: true,
448 }
449 }
450
451 pub fn with_write_path_in_schema(self, val: bool) -> Self {
454 Self {
455 write_path_in_schema: val,
456 ..self
457 }
458 }
459
460 pub fn finish(mut self) -> Result<()> {
462 let file_metadata = self.metadata.file_metadata();
463
464 let schema = Arc::new(file_metadata.schema().clone());
465 let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
466 let created_by = file_metadata.created_by().map(str::to_string);
467
468 let row_groups = self.metadata.row_groups.clone();
469
470 let key_value_metadata = file_metadata.key_value_metadata().cloned();
471
472 let column_indexes = self.convert_column_indexes();
473 let offset_indexes = self.convert_offset_index();
474
475 let mut encoder = ThriftMetadataWriter::new(
476 &mut self.buf,
477 &schema_descr,
478 row_groups,
479 created_by,
480 file_metadata.version(),
481 self.write_path_in_schema,
482 );
483
484 if let Some(column_indexes) = column_indexes {
485 encoder = encoder.with_column_indexes(column_indexes);
486 }
487
488 if let Some(offset_indexes) = offset_indexes {
489 encoder = encoder.with_offset_indexes(offset_indexes);
490 }
491
492 if let Some(key_value_metadata) = key_value_metadata {
493 encoder = encoder.with_key_value_metadata(key_value_metadata);
494 }
495 encoder.finish()?;
496
497 Ok(())
498 }
499
500 fn convert_column_indexes(&self) -> Option<Vec<Vec<Option<ColumnIndexMetaData>>>> {
501 self.metadata
504 .column_index()
505 .map(|row_group_column_indexes| {
506 (0..self.metadata.row_groups().len())
507 .map(|rg_idx| {
508 let column_indexes = &row_group_column_indexes[rg_idx];
509 column_indexes
510 .iter()
511 .map(|column_index| Some(column_index.clone()))
512 .collect()
513 })
514 .collect()
515 })
516 }
517
518 fn convert_offset_index(&self) -> Option<Vec<Vec<Option<OffsetIndexMetaData>>>> {
519 self.metadata
520 .offset_index()
521 .map(|row_group_offset_indexes| {
522 (0..self.metadata.row_groups().len())
523 .map(|rg_idx| {
524 let offset_indexes = &row_group_offset_indexes[rg_idx];
525 offset_indexes
526 .iter()
527 .map(|offset_index| Some(offset_index.clone()))
528 .collect()
529 })
530 .collect()
531 })
532 }
533}
534
535#[derive(Debug, Default)]
536struct MetadataObjectWriter {
537 #[cfg(feature = "encryption")]
538 file_encryptor: Option<Arc<FileEncryptor>>,
539}
540
541impl MetadataObjectWriter {
542 #[inline]
543 fn write_thrift_object(object: &impl WriteThrift, sink: impl Write) -> Result<()> {
544 let mut protocol = ThriftCompactOutputProtocol::new(sink);
545 object.write_thrift(&mut protocol)?;
546 Ok(())
547 }
548}
549
550#[cfg(not(feature = "encryption"))]
552impl MetadataObjectWriter {
553 fn write_file_metadata(&self, file_metadata: &FileMeta, sink: impl Write) -> Result<()> {
557 Self::write_thrift_object(file_metadata, sink)
558 }
559
560 fn write_offset_index(
564 &self,
565 offset_index: &OffsetIndexMetaData,
566 _column_chunk: &ColumnChunkMetaData,
567 _row_group_idx: usize,
568 _column_idx: usize,
569 sink: impl Write,
570 ) -> Result<()> {
571 Self::write_thrift_object(offset_index, sink)
572 }
573
574 fn write_column_index(
581 &self,
582 column_index: &ColumnIndexMetaData,
583 _column_chunk: &ColumnChunkMetaData,
584 _row_group_idx: usize,
585 _column_idx: usize,
586 sink: impl Write,
587 ) -> Result<bool> {
588 match column_index {
589 ColumnIndexMetaData::NONE => Ok(false),
591 _ => {
592 Self::write_thrift_object(column_index, sink)?;
593 Ok(true)
594 }
595 }
596 }
597
598 fn apply_row_group_encryption(
600 &self,
601 row_groups: Vec<RowGroupMetaData>,
602 ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
603 Ok((row_groups, None))
604 }
605
606 pub fn get_file_magic(&self) -> &[u8; 4] {
608 get_file_magic()
609 }
610}
611
612#[cfg(feature = "encryption")]
614impl MetadataObjectWriter {
615 fn with_file_encryptor(mut self, encryptor: Option<Arc<FileEncryptor>>) -> Self {
617 self.file_encryptor = encryptor;
618 self
619 }
620
621 fn write_file_metadata(&self, file_metadata: &FileMeta, mut sink: impl Write) -> Result<()> {
625 match self.file_encryptor.as_ref() {
626 Some(file_encryptor) if file_encryptor.properties().encrypt_footer() => {
627 let crypto_metadata = Self::file_crypto_metadata(file_encryptor)?;
629 let mut protocol = ThriftCompactOutputProtocol::new(&mut sink);
630 crypto_metadata.write_thrift(&mut protocol)?;
631
632 let aad = create_footer_aad(file_encryptor.file_aad())?;
634 let mut encryptor = file_encryptor.get_footer_encryptor()?;
635 encrypt_thrift_object(file_metadata, &mut encryptor, &mut sink, &aad)
636 }
637 Some(file_encryptor) if file_metadata.file_metadata.encryption_algorithm.is_some() => {
638 let aad = create_footer_aad(file_encryptor.file_aad())?;
639 let mut encryptor = file_encryptor.get_footer_encryptor()?;
640 write_signed_plaintext_thrift_object(file_metadata, &mut encryptor, &mut sink, &aad)
641 }
642 _ => Self::write_thrift_object(file_metadata, &mut sink),
643 }
644 }
645
646 fn write_offset_index(
650 &self,
651 offset_index: &OffsetIndexMetaData,
652 column_chunk: &ColumnChunkMetaData,
653 row_group_idx: usize,
654 column_idx: usize,
655 sink: impl Write,
656 ) -> Result<()> {
657 match &self.file_encryptor {
658 Some(file_encryptor) => Self::write_thrift_object_with_encryption(
659 offset_index,
660 sink,
661 file_encryptor,
662 column_chunk,
663 ModuleType::OffsetIndex,
664 row_group_idx,
665 column_idx,
666 ),
667 None => Self::write_thrift_object(offset_index, sink),
668 }
669 }
670
671 fn write_column_index(
678 &self,
679 column_index: &ColumnIndexMetaData,
680 column_chunk: &ColumnChunkMetaData,
681 row_group_idx: usize,
682 column_idx: usize,
683 sink: impl Write,
684 ) -> Result<bool> {
685 match column_index {
686 ColumnIndexMetaData::NONE => Ok(false),
688 _ => {
689 match &self.file_encryptor {
690 Some(file_encryptor) => Self::write_thrift_object_with_encryption(
691 column_index,
692 sink,
693 file_encryptor,
694 column_chunk,
695 ModuleType::ColumnIndex,
696 row_group_idx,
697 column_idx,
698 )?,
699 None => Self::write_thrift_object(column_index, sink)?,
700 }
701 Ok(true)
702 }
703 }
704 }
705
706 fn apply_row_group_encryption(
710 &self,
711 row_groups: Vec<RowGroupMetaData>,
712 ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
713 match &self.file_encryptor {
714 Some(file_encryptor) => {
715 let unencrypted_row_groups = row_groups.clone();
716 let encrypted_row_groups = Self::encrypt_row_groups(row_groups, file_encryptor)?;
717 Ok((encrypted_row_groups, Some(unencrypted_row_groups)))
718 }
719 None => Ok((row_groups, None)),
720 }
721 }
722
723 fn get_file_magic(&self) -> &[u8; 4] {
725 get_file_magic(
726 self.file_encryptor
727 .as_ref()
728 .map(|encryptor| encryptor.properties()),
729 )
730 }
731
732 fn write_thrift_object_with_encryption(
733 object: &impl WriteThrift,
734 mut sink: impl Write,
735 file_encryptor: &FileEncryptor,
736 column_metadata: &ColumnChunkMetaData,
737 module_type: ModuleType,
738 row_group_index: usize,
739 column_index: usize,
740 ) -> Result<()> {
741 let column_path_vec = column_metadata.column_path().as_ref();
742
743 let joined_column_path;
744 let column_path = if column_path_vec.len() == 1 {
745 &column_path_vec[0]
746 } else {
747 joined_column_path = column_path_vec.join(".");
748 &joined_column_path
749 };
750
751 if file_encryptor.is_column_encrypted(column_path) {
752 use crate::encryption::encrypt::encrypt_thrift_object;
753
754 let aad = create_module_aad(
755 file_encryptor.file_aad(),
756 module_type,
757 row_group_index,
758 column_index,
759 None,
760 )?;
761 let mut encryptor = file_encryptor.get_column_encryptor(column_path)?;
762 encrypt_thrift_object(object, &mut encryptor, &mut sink, &aad)
763 } else {
764 Self::write_thrift_object(object, sink)
765 }
766 }
767
768 fn get_plaintext_footer_crypto_metadata(
769 &self,
770 ) -> (Option<EncryptionAlgorithm>, Option<Vec<u8>>) {
771 if let Some(file_encryptor) = self.file_encryptor.as_ref() {
773 let encryption_properties = file_encryptor.properties();
774 if !encryption_properties.encrypt_footer() {
775 return (
776 Some(Self::encryption_algorithm_from_encryptor(file_encryptor)),
777 encryption_properties.footer_key_metadata().cloned(),
778 );
779 }
780 }
781 (None, None)
782 }
783
784 fn encryption_algorithm_from_encryptor(file_encryptor: &FileEncryptor) -> EncryptionAlgorithm {
785 let supply_aad_prefix = file_encryptor
786 .properties()
787 .aad_prefix()
788 .map(|_| !file_encryptor.properties().store_aad_prefix());
789 let aad_prefix = if file_encryptor.properties().store_aad_prefix() {
790 file_encryptor.properties().aad_prefix()
791 } else {
792 None
793 };
794 EncryptionAlgorithm::AES_GCM_V1(AesGcmV1 {
795 aad_prefix: aad_prefix.cloned(),
796 aad_file_unique: Some(file_encryptor.aad_file_unique().clone()),
797 supply_aad_prefix,
798 })
799 }
800
801 fn file_crypto_metadata(file_encryptor: &'_ FileEncryptor) -> Result<FileCryptoMetaData<'_>> {
802 let properties = file_encryptor.properties();
803 Ok(FileCryptoMetaData {
804 encryption_algorithm: Self::encryption_algorithm_from_encryptor(file_encryptor),
805 key_metadata: properties.footer_key_metadata().map(|v| v.as_slice()),
806 })
807 }
808
809 fn encrypt_row_groups(
810 row_groups: Vec<RowGroupMetaData>,
811 file_encryptor: &Arc<FileEncryptor>,
812 ) -> Result<Vec<RowGroupMetaData>> {
813 row_groups
814 .into_iter()
815 .enumerate()
816 .map(|(rg_idx, mut rg)| {
817 let cols: Result<Vec<ColumnChunkMetaData>> = rg
818 .columns
819 .into_iter()
820 .enumerate()
821 .map(|(col_idx, c)| {
822 Self::encrypt_column_chunk(c, file_encryptor, rg_idx, col_idx)
823 })
824 .collect();
825 rg.columns = cols?;
826 Ok(rg)
827 })
828 .collect()
829 }
830
831 fn encrypt_column_chunk(
833 mut column_chunk: ColumnChunkMetaData,
834 file_encryptor: &Arc<FileEncryptor>,
835 row_group_index: usize,
836 column_index: usize,
837 ) -> Result<ColumnChunkMetaData> {
838 let encryptor = match column_chunk.column_crypto_metadata.as_deref() {
841 None => None,
842 Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
843 let is_footer_encrypted = file_encryptor.properties().encrypt_footer();
844
845 if !is_footer_encrypted {
850 Some(file_encryptor.get_footer_encryptor()?)
851 } else {
852 None
853 }
854 }
855 Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(col_key)) => {
856 let column_path = col_key.path_in_schema.join(".");
857 Some(file_encryptor.get_column_encryptor(&column_path)?)
858 }
859 };
860
861 if let Some(mut encryptor) = encryptor {
862 use crate::file::metadata::thrift::serialize_column_meta_data;
863
864 let aad = create_module_aad(
865 file_encryptor.file_aad(),
866 ModuleType::ColumnMetaData,
867 row_group_index,
868 column_index,
869 None,
870 )?;
871 let mut buffer: Vec<u8> = vec![];
873 {
874 let mut prot = ThriftCompactOutputProtocol::new(&mut buffer);
875 serialize_column_meta_data(&column_chunk, &mut prot)?;
876 }
877 let ciphertext = encryptor.encrypt(&buffer, &aad)?;
878 column_chunk.encrypted_column_metadata = Some(ciphertext);
879 column_chunk.plaintext_footer_mode = !file_encryptor.properties().encrypt_footer();
882 }
883
884 Ok(column_chunk)
885 }
886}