1use crate::file::metadata::thrift::FileMeta;
19use crate::file::metadata::{
20 ColumnChunkMetaData, ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData,
21};
22use crate::schema::types::{SchemaDescPtr, SchemaDescriptor};
23use crate::{
24 basic::ColumnOrder,
25 file::metadata::{FileMetaData, ParquetMetaDataBuilder},
26};
27#[cfg(feature = "encryption")]
28use crate::{
29 encryption::{
30 encrypt::{FileEncryptor, encrypt_thrift_object, write_signed_plaintext_thrift_object},
31 modules::{ModuleType, create_footer_aad, create_module_aad},
32 },
33 file::column_crypto_metadata::ColumnCryptoMetaData,
34 file::metadata::thrift::encryption::{AesGcmV1, EncryptionAlgorithm, FileCryptoMetaData},
35};
36use crate::{errors::Result, file::page_index::column_index::ColumnIndexMetaData};
37
38use crate::{
39 file::writer::{TrackedWrite, get_file_magic},
40 parquet_thrift::WriteThrift,
41};
42use crate::{
43 file::{
44 metadata::{KeyValue, ParquetMetaData},
45 page_index::offset_index::OffsetIndexMetaData,
46 },
47 parquet_thrift::ThriftCompactOutputProtocol,
48};
49use std::io::Write;
50use std::sync::Arc;
51
52pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
56 buf: &'a mut TrackedWrite<W>,
57 schema_descr: &'a SchemaDescPtr,
58 row_groups: Vec<RowGroupMetaData>,
59 column_indexes: Option<Vec<Vec<Option<ColumnIndexMetaData>>>>,
60 offset_indexes: Option<Vec<Vec<Option<OffsetIndexMetaData>>>>,
61 key_value_metadata: Option<Vec<KeyValue>>,
62 created_by: Option<String>,
63 object_writer: MetadataObjectWriter,
64 writer_version: i32,
65}
66
67impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
68 fn write_offset_indexes(
74 &mut self,
75 offset_indexes: &[Vec<Option<OffsetIndexMetaData>>],
76 ) -> Result<()> {
77 for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
81 for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
82 if let Some(offset_index) = &offset_indexes[row_group_idx][column_idx] {
83 let start_offset = self.buf.bytes_written();
84 self.object_writer.write_offset_index(
85 offset_index,
86 column_metadata,
87 row_group_idx,
88 column_idx,
89 &mut self.buf,
90 )?;
91 let end_offset = self.buf.bytes_written();
92 column_metadata.offset_index_offset = Some(start_offset as i64);
94 column_metadata.offset_index_length = Some((end_offset - start_offset) as i32);
95 }
96 }
97 }
98 Ok(())
99 }
100
101 fn write_column_indexes(
107 &mut self,
108 column_indexes: &[Vec<Option<ColumnIndexMetaData>>],
109 ) -> Result<()> {
110 for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
114 for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
115 if let Some(column_index) = &column_indexes[row_group_idx][column_idx] {
116 let start_offset = self.buf.bytes_written();
117 if self.object_writer.write_column_index(
119 column_index,
120 column_metadata,
121 row_group_idx,
122 column_idx,
123 &mut self.buf,
124 )? {
125 let end_offset = self.buf.bytes_written();
126 column_metadata.column_index_offset = Some(start_offset as i64);
128 column_metadata.column_index_length =
129 Some((end_offset - start_offset) as i32);
130 }
131 }
132 }
133 }
134 Ok(())
135 }
136
137 fn finalize_column_indexes(&mut self) -> Result<Option<ParquetColumnIndex>> {
139 let column_indexes = std::mem::take(&mut self.column_indexes);
140
141 if let Some(column_indexes) = column_indexes.as_ref() {
143 self.write_column_indexes(column_indexes)?;
144 }
145
146 let all_none = column_indexes
148 .as_ref()
149 .is_some_and(|ci| ci.iter().all(|cii| cii.iter().all(|idx| idx.is_none())));
150
151 let column_indexes: Option<ParquetColumnIndex> = if all_none {
154 None
155 } else {
156 column_indexes.map(|ovvi| {
157 ovvi.into_iter()
158 .map(|vi| {
159 vi.into_iter()
160 .map(|ci| ci.unwrap_or(ColumnIndexMetaData::NONE))
161 .collect()
162 })
163 .collect()
164 })
165 };
166
167 Ok(column_indexes)
168 }
169
170 fn finalize_offset_indexes(&mut self) -> Result<Option<ParquetOffsetIndex>> {
172 let offset_indexes = std::mem::take(&mut self.offset_indexes);
173
174 if let Some(offset_indexes) = offset_indexes.as_ref() {
176 self.write_offset_indexes(offset_indexes)?;
177 }
178
179 let all_none = offset_indexes
181 .as_ref()
182 .is_some_and(|oi| oi.iter().all(|oii| oii.iter().all(|idx| idx.is_none())));
183
184 let offset_indexes: Option<ParquetOffsetIndex> = if all_none {
185 None
186 } else {
187 offset_indexes.map(|ovvi| {
189 ovvi.into_iter()
190 .map(|vi| vi.into_iter().map(|oi| oi.unwrap()).collect())
191 .collect()
192 })
193 };
194
195 Ok(offset_indexes)
196 }
197
198 pub fn finish(mut self) -> Result<ParquetMetaData> {
200 let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
201
202 let column_indexes = self.finalize_column_indexes()?;
204 let offset_indexes = self.finalize_offset_indexes()?;
205
206 let column_orders = self
212 .schema_descr
213 .columns()
214 .iter()
215 .map(|col| {
216 let sort_order = ColumnOrder::sort_order_for_type(
217 col.logical_type_ref(),
218 col.converted_type(),
219 col.physical_type(),
220 );
221 ColumnOrder::TYPE_DEFINED_ORDER(sort_order)
222 })
223 .collect();
224
225 let column_orders = Some(column_orders);
229
230 let (row_groups, unencrypted_row_groups) = self
231 .object_writer
232 .apply_row_group_encryption(self.row_groups)?;
233
234 #[cfg(feature = "encryption")]
235 let (encryption_algorithm, footer_signing_key_metadata) =
236 self.object_writer.get_plaintext_footer_crypto_metadata();
237 #[cfg(feature = "encryption")]
238 let file_metadata = FileMetaData::new(
239 self.writer_version,
240 num_rows,
241 self.created_by,
242 self.key_value_metadata,
243 self.schema_descr.clone(),
244 column_orders,
245 )
246 .with_encryption_algorithm(encryption_algorithm)
247 .with_footer_signing_key_metadata(footer_signing_key_metadata);
248
249 #[cfg(not(feature = "encryption"))]
250 let file_metadata = FileMetaData::new(
251 self.writer_version,
252 num_rows,
253 self.created_by,
254 self.key_value_metadata,
255 self.schema_descr.clone(),
256 column_orders,
257 );
258
259 let file_meta = FileMeta {
260 file_metadata: &file_metadata,
261 row_groups: &row_groups,
262 };
263
264 let start_pos = self.buf.bytes_written();
266 self.object_writer
267 .write_file_metadata(&file_meta, &mut self.buf)?;
268 let end_pos = self.buf.bytes_written();
269
270 let metadata_len = (end_pos - start_pos) as u32;
272
273 self.buf.write_all(&metadata_len.to_le_bytes())?;
274 self.buf.write_all(self.object_writer.get_file_magic())?;
275
276 let builder = ParquetMetaDataBuilder::new(file_metadata)
281 .set_column_index(column_indexes)
282 .set_offset_index(offset_indexes);
283
284 Ok(match unencrypted_row_groups {
285 Some(rg) => builder.set_row_groups(rg).build(),
286 None => builder.set_row_groups(row_groups).build(),
287 })
288 }
289
290 pub fn new(
291 buf: &'a mut TrackedWrite<W>,
292 schema_descr: &'a SchemaDescPtr,
293 row_groups: Vec<RowGroupMetaData>,
294 created_by: Option<String>,
295 writer_version: i32,
296 ) -> Self {
297 Self {
298 buf,
299 schema_descr,
300 row_groups,
301 column_indexes: None,
302 offset_indexes: None,
303 key_value_metadata: None,
304 created_by,
305 object_writer: Default::default(),
306 writer_version,
307 }
308 }
309
310 pub fn with_column_indexes(
311 mut self,
312 column_indexes: Vec<Vec<Option<ColumnIndexMetaData>>>,
313 ) -> Self {
314 self.column_indexes = Some(column_indexes);
315 self
316 }
317
318 pub fn with_offset_indexes(
319 mut self,
320 offset_indexes: Vec<Vec<Option<OffsetIndexMetaData>>>,
321 ) -> Self {
322 self.offset_indexes = Some(offset_indexes);
323 self
324 }
325
326 pub fn with_key_value_metadata(mut self, key_value_metadata: Vec<KeyValue>) -> Self {
327 self.key_value_metadata = Some(key_value_metadata);
328 self
329 }
330
331 #[cfg(feature = "encryption")]
332 pub fn with_file_encryptor(mut self, file_encryptor: Option<Arc<FileEncryptor>>) -> Self {
333 self.object_writer = self.object_writer.with_file_encryptor(file_encryptor);
334 self
335 }
336}
337
338pub struct ParquetMetaDataWriter<'a, W: Write> {
416 buf: TrackedWrite<W>,
417 metadata: &'a ParquetMetaData,
418}
419
420impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
421 pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
429 Self::new_with_tracked(TrackedWrite::new(buf), metadata)
430 }
431
432 pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
439 Self { buf, metadata }
440 }
441
442 pub fn finish(mut self) -> Result<()> {
444 let file_metadata = self.metadata.file_metadata();
445
446 let schema = Arc::new(file_metadata.schema().clone());
447 let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
448 let created_by = file_metadata.created_by().map(str::to_string);
449
450 let row_groups = self.metadata.row_groups.clone();
451
452 let key_value_metadata = file_metadata.key_value_metadata().cloned();
453
454 let column_indexes = self.convert_column_indexes();
455 let offset_indexes = self.convert_offset_index();
456
457 let mut encoder = ThriftMetadataWriter::new(
458 &mut self.buf,
459 &schema_descr,
460 row_groups,
461 created_by,
462 file_metadata.version(),
463 );
464
465 if let Some(column_indexes) = column_indexes {
466 encoder = encoder.with_column_indexes(column_indexes);
467 }
468
469 if let Some(offset_indexes) = offset_indexes {
470 encoder = encoder.with_offset_indexes(offset_indexes);
471 }
472
473 if let Some(key_value_metadata) = key_value_metadata {
474 encoder = encoder.with_key_value_metadata(key_value_metadata);
475 }
476 encoder.finish()?;
477
478 Ok(())
479 }
480
481 fn convert_column_indexes(&self) -> Option<Vec<Vec<Option<ColumnIndexMetaData>>>> {
482 self.metadata
485 .column_index()
486 .map(|row_group_column_indexes| {
487 (0..self.metadata.row_groups().len())
488 .map(|rg_idx| {
489 let column_indexes = &row_group_column_indexes[rg_idx];
490 column_indexes
491 .iter()
492 .map(|column_index| Some(column_index.clone()))
493 .collect()
494 })
495 .collect()
496 })
497 }
498
499 fn convert_offset_index(&self) -> Option<Vec<Vec<Option<OffsetIndexMetaData>>>> {
500 self.metadata
501 .offset_index()
502 .map(|row_group_offset_indexes| {
503 (0..self.metadata.row_groups().len())
504 .map(|rg_idx| {
505 let offset_indexes = &row_group_offset_indexes[rg_idx];
506 offset_indexes
507 .iter()
508 .map(|offset_index| Some(offset_index.clone()))
509 .collect()
510 })
511 .collect()
512 })
513 }
514}
515
516#[derive(Debug, Default)]
517struct MetadataObjectWriter {
518 #[cfg(feature = "encryption")]
519 file_encryptor: Option<Arc<FileEncryptor>>,
520}
521
522impl MetadataObjectWriter {
523 #[inline]
524 fn write_thrift_object(object: &impl WriteThrift, sink: impl Write) -> Result<()> {
525 let mut protocol = ThriftCompactOutputProtocol::new(sink);
526 object.write_thrift(&mut protocol)?;
527 Ok(())
528 }
529}
530
531#[cfg(not(feature = "encryption"))]
533impl MetadataObjectWriter {
534 fn write_file_metadata(&self, file_metadata: &FileMeta, sink: impl Write) -> Result<()> {
538 Self::write_thrift_object(file_metadata, sink)
539 }
540
541 fn write_offset_index(
545 &self,
546 offset_index: &OffsetIndexMetaData,
547 _column_chunk: &ColumnChunkMetaData,
548 _row_group_idx: usize,
549 _column_idx: usize,
550 sink: impl Write,
551 ) -> Result<()> {
552 Self::write_thrift_object(offset_index, sink)
553 }
554
555 fn write_column_index(
562 &self,
563 column_index: &ColumnIndexMetaData,
564 _column_chunk: &ColumnChunkMetaData,
565 _row_group_idx: usize,
566 _column_idx: usize,
567 sink: impl Write,
568 ) -> Result<bool> {
569 match column_index {
570 ColumnIndexMetaData::NONE => Ok(false),
572 _ => {
573 Self::write_thrift_object(column_index, sink)?;
574 Ok(true)
575 }
576 }
577 }
578
579 fn apply_row_group_encryption(
581 &self,
582 row_groups: Vec<RowGroupMetaData>,
583 ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
584 Ok((row_groups, None))
585 }
586
587 pub fn get_file_magic(&self) -> &[u8; 4] {
589 get_file_magic()
590 }
591}
592
593#[cfg(feature = "encryption")]
595impl MetadataObjectWriter {
596 fn with_file_encryptor(mut self, encryptor: Option<Arc<FileEncryptor>>) -> Self {
598 self.file_encryptor = encryptor;
599 self
600 }
601
602 fn write_file_metadata(&self, file_metadata: &FileMeta, mut sink: impl Write) -> Result<()> {
606 match self.file_encryptor.as_ref() {
607 Some(file_encryptor) if file_encryptor.properties().encrypt_footer() => {
608 let crypto_metadata = Self::file_crypto_metadata(file_encryptor)?;
610 let mut protocol = ThriftCompactOutputProtocol::new(&mut sink);
611 crypto_metadata.write_thrift(&mut protocol)?;
612
613 let aad = create_footer_aad(file_encryptor.file_aad())?;
615 let mut encryptor = file_encryptor.get_footer_encryptor()?;
616 encrypt_thrift_object(file_metadata, &mut encryptor, &mut sink, &aad)
617 }
618 Some(file_encryptor) if file_metadata.file_metadata.encryption_algorithm.is_some() => {
619 let aad = create_footer_aad(file_encryptor.file_aad())?;
620 let mut encryptor = file_encryptor.get_footer_encryptor()?;
621 write_signed_plaintext_thrift_object(file_metadata, &mut encryptor, &mut sink, &aad)
622 }
623 _ => Self::write_thrift_object(file_metadata, &mut sink),
624 }
625 }
626
627 fn write_offset_index(
631 &self,
632 offset_index: &OffsetIndexMetaData,
633 column_chunk: &ColumnChunkMetaData,
634 row_group_idx: usize,
635 column_idx: usize,
636 sink: impl Write,
637 ) -> Result<()> {
638 match &self.file_encryptor {
639 Some(file_encryptor) => Self::write_thrift_object_with_encryption(
640 offset_index,
641 sink,
642 file_encryptor,
643 column_chunk,
644 ModuleType::OffsetIndex,
645 row_group_idx,
646 column_idx,
647 ),
648 None => Self::write_thrift_object(offset_index, sink),
649 }
650 }
651
652 fn write_column_index(
659 &self,
660 column_index: &ColumnIndexMetaData,
661 column_chunk: &ColumnChunkMetaData,
662 row_group_idx: usize,
663 column_idx: usize,
664 sink: impl Write,
665 ) -> Result<bool> {
666 match column_index {
667 ColumnIndexMetaData::NONE => Ok(false),
669 _ => {
670 match &self.file_encryptor {
671 Some(file_encryptor) => Self::write_thrift_object_with_encryption(
672 column_index,
673 sink,
674 file_encryptor,
675 column_chunk,
676 ModuleType::ColumnIndex,
677 row_group_idx,
678 column_idx,
679 )?,
680 None => Self::write_thrift_object(column_index, sink)?,
681 }
682 Ok(true)
683 }
684 }
685 }
686
687 fn apply_row_group_encryption(
691 &self,
692 row_groups: Vec<RowGroupMetaData>,
693 ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
694 match &self.file_encryptor {
695 Some(file_encryptor) => {
696 let unencrypted_row_groups = row_groups.clone();
697 let encrypted_row_groups = Self::encrypt_row_groups(row_groups, file_encryptor)?;
698 Ok((encrypted_row_groups, Some(unencrypted_row_groups)))
699 }
700 None => Ok((row_groups, None)),
701 }
702 }
703
704 fn get_file_magic(&self) -> &[u8; 4] {
706 get_file_magic(
707 self.file_encryptor
708 .as_ref()
709 .map(|encryptor| encryptor.properties()),
710 )
711 }
712
713 fn write_thrift_object_with_encryption(
714 object: &impl WriteThrift,
715 mut sink: impl Write,
716 file_encryptor: &FileEncryptor,
717 column_metadata: &ColumnChunkMetaData,
718 module_type: ModuleType,
719 row_group_index: usize,
720 column_index: usize,
721 ) -> Result<()> {
722 let column_path_vec = column_metadata.column_path().as_ref();
723
724 let joined_column_path;
725 let column_path = if column_path_vec.len() == 1 {
726 &column_path_vec[0]
727 } else {
728 joined_column_path = column_path_vec.join(".");
729 &joined_column_path
730 };
731
732 if file_encryptor.is_column_encrypted(column_path) {
733 use crate::encryption::encrypt::encrypt_thrift_object;
734
735 let aad = create_module_aad(
736 file_encryptor.file_aad(),
737 module_type,
738 row_group_index,
739 column_index,
740 None,
741 )?;
742 let mut encryptor = file_encryptor.get_column_encryptor(column_path)?;
743 encrypt_thrift_object(object, &mut encryptor, &mut sink, &aad)
744 } else {
745 Self::write_thrift_object(object, sink)
746 }
747 }
748
749 fn get_plaintext_footer_crypto_metadata(
750 &self,
751 ) -> (Option<EncryptionAlgorithm>, Option<Vec<u8>>) {
752 if let Some(file_encryptor) = self.file_encryptor.as_ref() {
754 let encryption_properties = file_encryptor.properties();
755 if !encryption_properties.encrypt_footer() {
756 return (
757 Some(Self::encryption_algorithm_from_encryptor(file_encryptor)),
758 encryption_properties.footer_key_metadata().cloned(),
759 );
760 }
761 }
762 (None, None)
763 }
764
765 fn encryption_algorithm_from_encryptor(file_encryptor: &FileEncryptor) -> EncryptionAlgorithm {
766 let supply_aad_prefix = file_encryptor
767 .properties()
768 .aad_prefix()
769 .map(|_| !file_encryptor.properties().store_aad_prefix());
770 let aad_prefix = if file_encryptor.properties().store_aad_prefix() {
771 file_encryptor.properties().aad_prefix()
772 } else {
773 None
774 };
775 EncryptionAlgorithm::AES_GCM_V1(AesGcmV1 {
776 aad_prefix: aad_prefix.cloned(),
777 aad_file_unique: Some(file_encryptor.aad_file_unique().clone()),
778 supply_aad_prefix,
779 })
780 }
781
782 fn file_crypto_metadata(file_encryptor: &'_ FileEncryptor) -> Result<FileCryptoMetaData<'_>> {
783 let properties = file_encryptor.properties();
784 Ok(FileCryptoMetaData {
785 encryption_algorithm: Self::encryption_algorithm_from_encryptor(file_encryptor),
786 key_metadata: properties.footer_key_metadata().map(|v| v.as_slice()),
787 })
788 }
789
790 fn encrypt_row_groups(
791 row_groups: Vec<RowGroupMetaData>,
792 file_encryptor: &Arc<FileEncryptor>,
793 ) -> Result<Vec<RowGroupMetaData>> {
794 row_groups
795 .into_iter()
796 .enumerate()
797 .map(|(rg_idx, mut rg)| {
798 let cols: Result<Vec<ColumnChunkMetaData>> = rg
799 .columns
800 .into_iter()
801 .enumerate()
802 .map(|(col_idx, c)| {
803 Self::encrypt_column_chunk(c, file_encryptor, rg_idx, col_idx)
804 })
805 .collect();
806 rg.columns = cols?;
807 Ok(rg)
808 })
809 .collect()
810 }
811
812 fn encrypt_column_chunk(
814 mut column_chunk: ColumnChunkMetaData,
815 file_encryptor: &Arc<FileEncryptor>,
816 row_group_index: usize,
817 column_index: usize,
818 ) -> Result<ColumnChunkMetaData> {
819 match column_chunk.column_crypto_metadata.as_deref() {
822 None => {}
823 Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
824 }
827 Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(col_key)) => {
828 use crate::file::metadata::thrift::serialize_column_meta_data;
829
830 let column_path = col_key.path_in_schema.join(".");
831 let mut column_encryptor = file_encryptor.get_column_encryptor(&column_path)?;
832 let aad = create_module_aad(
833 file_encryptor.file_aad(),
834 ModuleType::ColumnMetaData,
835 row_group_index,
836 column_index,
837 None,
838 )?;
839 let mut buffer: Vec<u8> = vec![];
841 {
842 let mut prot = ThriftCompactOutputProtocol::new(&mut buffer);
843 serialize_column_meta_data(&column_chunk, &mut prot)?;
844 }
845 let ciphertext = column_encryptor.encrypt(&buffer, &aad)?;
846
847 column_chunk.encrypted_column_metadata = Some(ciphertext);
848 }
849 }
850
851 Ok(column_chunk)
852 }
853}