1#[cfg(feature = "encryption")]
19use crate::encryption::{
20 encrypt::{
21 encrypt_object, encrypt_object_to_vec, write_signed_plaintext_object, FileEncryptor,
22 },
23 modules::{create_footer_aad, create_module_aad, ModuleType},
24};
25#[cfg(feature = "encryption")]
26use crate::errors::ParquetError;
27use crate::errors::Result;
28use crate::file::metadata::{KeyValue, ParquetMetaData};
29use crate::file::page_index::index::Index;
30use crate::file::writer::{get_file_magic, TrackedWrite};
31use crate::format::EncryptionAlgorithm;
32#[cfg(feature = "encryption")]
33use crate::format::{AesGcmV1, ColumnCryptoMetaData};
34use crate::format::{ColumnChunk, ColumnIndex, FileMetaData, OffsetIndex, RowGroup};
35use crate::schema::types;
36use crate::schema::types::{SchemaDescPtr, SchemaDescriptor, TypePtr};
37use crate::thrift::TSerializable;
38use std::io::Write;
39use std::sync::Arc;
40use thrift::protocol::TCompactOutputProtocol;
41
42pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
46 buf: &'a mut TrackedWrite<W>,
47 schema: &'a TypePtr,
48 schema_descr: &'a SchemaDescPtr,
49 row_groups: Vec<RowGroup>,
50 column_indexes: Option<&'a [Vec<Option<ColumnIndex>>]>,
51 offset_indexes: Option<&'a [Vec<Option<OffsetIndex>>]>,
52 key_value_metadata: Option<Vec<KeyValue>>,
53 created_by: Option<String>,
54 object_writer: MetadataObjectWriter,
55 writer_version: i32,
56}
57
58impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
59 fn write_offset_indexes(&mut self, offset_indexes: &[Vec<Option<OffsetIndex>>]) -> Result<()> {
65 for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
69 for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
70 if let Some(offset_index) = &offset_indexes[row_group_idx][column_idx] {
71 let start_offset = self.buf.bytes_written();
72 self.object_writer.write_offset_index(
73 offset_index,
74 column_metadata,
75 row_group_idx,
76 column_idx,
77 &mut self.buf,
78 )?;
79 let end_offset = self.buf.bytes_written();
80 column_metadata.offset_index_offset = Some(start_offset as i64);
82 column_metadata.offset_index_length = Some((end_offset - start_offset) as i32);
83 }
84 }
85 }
86 Ok(())
87 }
88
89 fn write_column_indexes(&mut self, column_indexes: &[Vec<Option<ColumnIndex>>]) -> Result<()> {
95 for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
99 for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
100 if let Some(column_index) = &column_indexes[row_group_idx][column_idx] {
101 let start_offset = self.buf.bytes_written();
102 self.object_writer.write_column_index(
103 column_index,
104 column_metadata,
105 row_group_idx,
106 column_idx,
107 &mut self.buf,
108 )?;
109 let end_offset = self.buf.bytes_written();
110 column_metadata.column_index_offset = Some(start_offset as i64);
112 column_metadata.column_index_length = Some((end_offset - start_offset) as i32);
113 }
114 }
115 }
116 Ok(())
117 }
118
119 pub fn finish(mut self) -> Result<crate::format::FileMetaData> {
121 let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
122
123 if let Some(column_indexes) = self.column_indexes {
125 self.write_column_indexes(column_indexes)?;
126 }
127 if let Some(offset_indexes) = self.offset_indexes {
128 self.write_offset_indexes(offset_indexes)?;
129 }
130
131 let column_orders = (0..self.schema_descr.num_columns())
137 .map(|_| crate::format::ColumnOrder::TYPEORDER(crate::format::TypeDefinedOrder {}))
138 .collect();
139 let column_orders = Some(column_orders);
143
144 let (row_groups, unencrypted_row_groups) = self
145 .object_writer
146 .apply_row_group_encryption(self.row_groups)?;
147
148 let mut file_metadata = FileMetaData {
149 num_rows,
150 row_groups,
151 key_value_metadata: self.key_value_metadata.clone(),
152 version: self.writer_version,
153 schema: types::to_thrift(self.schema.as_ref())?,
154 created_by: self.created_by.clone(),
155 column_orders,
156 encryption_algorithm: self.object_writer.get_footer_encryption_algorithm(),
157 footer_signing_key_metadata: None,
158 };
159
160 let start_pos = self.buf.bytes_written();
162 self.object_writer
163 .write_file_metadata(&file_metadata, &mut self.buf)?;
164 let end_pos = self.buf.bytes_written();
165
166 let metadata_len = (end_pos - start_pos) as u32;
168
169 self.buf.write_all(&metadata_len.to_le_bytes())?;
170 self.buf.write_all(self.object_writer.get_file_magic())?;
171
172 if let Some(row_groups) = unencrypted_row_groups {
173 file_metadata.row_groups = row_groups;
178 }
179
180 Ok(file_metadata)
181 }
182
183 pub fn new(
184 buf: &'a mut TrackedWrite<W>,
185 schema: &'a TypePtr,
186 schema_descr: &'a SchemaDescPtr,
187 row_groups: Vec<RowGroup>,
188 created_by: Option<String>,
189 writer_version: i32,
190 ) -> Self {
191 Self {
192 buf,
193 schema,
194 schema_descr,
195 row_groups,
196 column_indexes: None,
197 offset_indexes: None,
198 key_value_metadata: None,
199 created_by,
200 object_writer: Default::default(),
201 writer_version,
202 }
203 }
204
205 pub fn with_column_indexes(mut self, column_indexes: &'a [Vec<Option<ColumnIndex>>]) -> Self {
206 self.column_indexes = Some(column_indexes);
207 self
208 }
209
210 pub fn with_offset_indexes(mut self, offset_indexes: &'a [Vec<Option<OffsetIndex>>]) -> Self {
211 self.offset_indexes = Some(offset_indexes);
212 self
213 }
214
215 pub fn with_key_value_metadata(mut self, key_value_metadata: Vec<KeyValue>) -> Self {
216 self.key_value_metadata = Some(key_value_metadata);
217 self
218 }
219
220 #[cfg(feature = "encryption")]
221 pub fn with_file_encryptor(mut self, file_encryptor: Option<Arc<FileEncryptor>>) -> Self {
222 self.object_writer = self.object_writer.with_file_encryptor(file_encryptor);
223 self
224 }
225}
226
227pub struct ParquetMetaDataWriter<'a, W: Write> {
303 buf: TrackedWrite<W>,
304 metadata: &'a ParquetMetaData,
305}
306
307impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
308 pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
316 Self::new_with_tracked(TrackedWrite::new(buf), metadata)
317 }
318
319 pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
326 Self { buf, metadata }
327 }
328
329 pub fn finish(mut self) -> Result<()> {
331 let file_metadata = self.metadata.file_metadata();
332
333 let schema = Arc::new(file_metadata.schema().clone());
334 let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
335 let created_by = file_metadata.created_by().map(str::to_string);
336
337 let row_groups = self
338 .metadata
339 .row_groups()
340 .iter()
341 .map(|rg| rg.to_thrift())
342 .collect::<Vec<_>>();
343
344 let key_value_metadata = file_metadata.key_value_metadata().cloned();
345
346 let column_indexes = self.convert_column_indexes();
347 let offset_indexes = self.convert_offset_index();
348
349 let mut encoder = ThriftMetadataWriter::new(
350 &mut self.buf,
351 &schema,
352 &schema_descr,
353 row_groups,
354 created_by,
355 file_metadata.version(),
356 );
357 encoder = encoder.with_column_indexes(&column_indexes);
358 encoder = encoder.with_offset_indexes(&offset_indexes);
359 if let Some(key_value_metadata) = key_value_metadata {
360 encoder = encoder.with_key_value_metadata(key_value_metadata);
361 }
362 encoder.finish()?;
363
364 Ok(())
365 }
366
367 fn convert_column_indexes(&self) -> Vec<Vec<Option<ColumnIndex>>> {
368 if let Some(row_group_column_indexes) = self.metadata.column_index() {
369 (0..self.metadata.row_groups().len())
370 .map(|rg_idx| {
371 let column_indexes = &row_group_column_indexes[rg_idx];
372 column_indexes
373 .iter()
374 .map(|column_index| match column_index {
375 Index::NONE => None,
376 Index::BOOLEAN(column_index) => Some(column_index.to_thrift()),
377 Index::BYTE_ARRAY(column_index) => Some(column_index.to_thrift()),
378 Index::DOUBLE(column_index) => Some(column_index.to_thrift()),
379 Index::FIXED_LEN_BYTE_ARRAY(column_index) => {
380 Some(column_index.to_thrift())
381 }
382 Index::FLOAT(column_index) => Some(column_index.to_thrift()),
383 Index::INT32(column_index) => Some(column_index.to_thrift()),
384 Index::INT64(column_index) => Some(column_index.to_thrift()),
385 Index::INT96(column_index) => Some(column_index.to_thrift()),
386 })
387 .collect()
388 })
389 .collect()
390 } else {
391 self.metadata
393 .row_groups()
394 .iter()
395 .map(|rg| std::iter::repeat(None).take(rg.columns().len()).collect())
396 .collect()
397 }
398 }
399
400 fn convert_offset_index(&self) -> Vec<Vec<Option<OffsetIndex>>> {
401 if let Some(row_group_offset_indexes) = self.metadata.offset_index() {
402 (0..self.metadata.row_groups().len())
403 .map(|rg_idx| {
404 let offset_indexes = &row_group_offset_indexes[rg_idx];
405 offset_indexes
406 .iter()
407 .map(|offset_index| Some(offset_index.to_thrift()))
408 .collect()
409 })
410 .collect()
411 } else {
412 self.metadata
414 .row_groups()
415 .iter()
416 .map(|rg| std::iter::repeat(None).take(rg.columns().len()).collect())
417 .collect()
418 }
419 }
420}
421
422#[derive(Debug, Default)]
423struct MetadataObjectWriter {
424 #[cfg(feature = "encryption")]
425 file_encryptor: Option<Arc<FileEncryptor>>,
426}
427
428impl MetadataObjectWriter {
429 #[inline]
430 fn write_object(object: &impl TSerializable, sink: impl Write) -> Result<()> {
431 let mut protocol = TCompactOutputProtocol::new(sink);
432 object.write_to_out_protocol(&mut protocol)?;
433 Ok(())
434 }
435}
436
437#[cfg(not(feature = "encryption"))]
439impl MetadataObjectWriter {
440 fn write_file_metadata(&self, file_metadata: &FileMetaData, sink: impl Write) -> Result<()> {
442 Self::write_object(file_metadata, sink)
443 }
444
445 fn write_offset_index(
447 &self,
448 offset_index: &OffsetIndex,
449 _column_chunk: &ColumnChunk,
450 _row_group_idx: usize,
451 _column_idx: usize,
452 sink: impl Write,
453 ) -> Result<()> {
454 Self::write_object(offset_index, sink)
455 }
456
457 fn write_column_index(
459 &self,
460 column_index: &ColumnIndex,
461 _column_chunk: &ColumnChunk,
462 _row_group_idx: usize,
463 _column_idx: usize,
464 sink: impl Write,
465 ) -> Result<()> {
466 Self::write_object(column_index, sink)
467 }
468
469 fn apply_row_group_encryption(
471 &self,
472 row_groups: Vec<RowGroup>,
473 ) -> Result<(Vec<RowGroup>, Option<Vec<RowGroup>>)> {
474 Ok((row_groups, None))
475 }
476
477 pub fn get_file_magic(&self) -> &[u8; 4] {
479 get_file_magic()
480 }
481
482 fn get_footer_encryption_algorithm(&self) -> Option<EncryptionAlgorithm> {
483 None
484 }
485}
486
487#[cfg(feature = "encryption")]
489impl MetadataObjectWriter {
490 fn with_file_encryptor(mut self, encryptor: Option<Arc<FileEncryptor>>) -> Self {
492 self.file_encryptor = encryptor;
493 self
494 }
495
496 fn write_file_metadata(
498 &self,
499 file_metadata: &FileMetaData,
500 mut sink: impl Write,
501 ) -> Result<()> {
502 match self.file_encryptor.as_ref() {
503 Some(file_encryptor) if file_encryptor.properties().encrypt_footer() => {
504 let crypto_metadata = Self::file_crypto_metadata(file_encryptor)?;
506 let mut protocol = TCompactOutputProtocol::new(&mut sink);
507 crypto_metadata.write_to_out_protocol(&mut protocol)?;
508
509 let aad = create_footer_aad(file_encryptor.file_aad())?;
511 let mut encryptor = file_encryptor.get_footer_encryptor()?;
512 encrypt_object(file_metadata, &mut encryptor, &mut sink, &aad)
513 }
514 Some(file_encryptor) if file_metadata.encryption_algorithm.is_some() => {
515 let aad = create_footer_aad(file_encryptor.file_aad())?;
516 let mut encryptor = file_encryptor.get_footer_encryptor()?;
517 write_signed_plaintext_object(file_metadata, &mut encryptor, &mut sink, &aad)
518 }
519 _ => Self::write_object(file_metadata, &mut sink),
520 }
521 }
522
523 fn write_offset_index(
525 &self,
526 offset_index: &OffsetIndex,
527 column_chunk: &ColumnChunk,
528 row_group_idx: usize,
529 column_idx: usize,
530 sink: impl Write,
531 ) -> Result<()> {
532 match &self.file_encryptor {
533 Some(file_encryptor) => Self::write_object_with_encryption(
534 offset_index,
535 sink,
536 file_encryptor,
537 column_chunk,
538 ModuleType::OffsetIndex,
539 row_group_idx,
540 column_idx,
541 ),
542 None => Self::write_object(offset_index, sink),
543 }
544 }
545
546 fn write_column_index(
548 &self,
549 column_index: &ColumnIndex,
550 column_chunk: &ColumnChunk,
551 row_group_idx: usize,
552 column_idx: usize,
553 sink: impl Write,
554 ) -> Result<()> {
555 match &self.file_encryptor {
556 Some(file_encryptor) => Self::write_object_with_encryption(
557 column_index,
558 sink,
559 file_encryptor,
560 column_chunk,
561 ModuleType::ColumnIndex,
562 row_group_idx,
563 column_idx,
564 ),
565 None => Self::write_object(column_index, sink),
566 }
567 }
568
569 fn apply_row_group_encryption(
573 &self,
574 row_groups: Vec<RowGroup>,
575 ) -> Result<(Vec<RowGroup>, Option<Vec<RowGroup>>)> {
576 match &self.file_encryptor {
577 Some(file_encryptor) => {
578 let unencrypted_row_groups = row_groups.clone();
579 let encrypted_row_groups = Self::encrypt_row_groups(row_groups, file_encryptor)?;
580 Ok((encrypted_row_groups, Some(unencrypted_row_groups)))
581 }
582 None => Ok((row_groups, None)),
583 }
584 }
585
586 fn get_file_magic(&self) -> &[u8; 4] {
588 get_file_magic(
589 self.file_encryptor
590 .as_ref()
591 .map(|encryptor| encryptor.properties()),
592 )
593 }
594
595 fn write_object_with_encryption(
596 object: &impl TSerializable,
597 mut sink: impl Write,
598 file_encryptor: &FileEncryptor,
599 column_metadata: &ColumnChunk,
600 module_type: ModuleType,
601 row_group_index: usize,
602 column_index: usize,
603 ) -> Result<()> {
604 let column_path_vec = &column_metadata
605 .meta_data
606 .as_ref()
607 .ok_or_else(|| {
608 general_err!(
609 "Column metadata not set for column {} when encrypting object",
610 column_index
611 )
612 })?
613 .path_in_schema;
614
615 let joined_column_path;
616 let column_path = if column_path_vec.len() == 1 {
617 &column_path_vec[0]
618 } else {
619 joined_column_path = column_path_vec.join(".");
620 &joined_column_path
621 };
622
623 if file_encryptor.is_column_encrypted(column_path) {
624 let aad = create_module_aad(
625 file_encryptor.file_aad(),
626 module_type,
627 row_group_index,
628 column_index,
629 None,
630 )?;
631 let mut encryptor = file_encryptor.get_column_encryptor(column_path)?;
632 encrypt_object(object, &mut encryptor, &mut sink, &aad)
633 } else {
634 Self::write_object(object, sink)
635 }
636 }
637
638 fn get_footer_encryption_algorithm(&self) -> Option<EncryptionAlgorithm> {
639 if let Some(file_encryptor) = &self.file_encryptor {
640 return Some(Self::encryption_algorithm_from_encryptor(file_encryptor));
641 }
642 None
643 }
644
645 fn encryption_algorithm_from_encryptor(file_encryptor: &FileEncryptor) -> EncryptionAlgorithm {
646 let supply_aad_prefix = file_encryptor
647 .properties()
648 .aad_prefix()
649 .map(|_| !file_encryptor.properties().store_aad_prefix());
650 let aad_prefix = if file_encryptor.properties().store_aad_prefix() {
651 file_encryptor.properties().aad_prefix().cloned()
652 } else {
653 None
654 };
655 EncryptionAlgorithm::AESGCMV1(AesGcmV1 {
656 aad_prefix,
657 aad_file_unique: Some(file_encryptor.aad_file_unique().clone()),
658 supply_aad_prefix,
659 })
660 }
661
662 fn file_crypto_metadata(
663 file_encryptor: &FileEncryptor,
664 ) -> Result<crate::format::FileCryptoMetaData> {
665 let properties = file_encryptor.properties();
666 Ok(crate::format::FileCryptoMetaData {
667 encryption_algorithm: Self::encryption_algorithm_from_encryptor(file_encryptor),
668 key_metadata: properties.footer_key_metadata().cloned(),
669 })
670 }
671
672 fn encrypt_row_groups(
673 row_groups: Vec<RowGroup>,
674 file_encryptor: &Arc<FileEncryptor>,
675 ) -> Result<Vec<RowGroup>> {
676 row_groups
677 .into_iter()
678 .enumerate()
679 .map(|(rg_idx, mut rg)| {
680 let cols: Result<Vec<ColumnChunk>> = rg
681 .columns
682 .into_iter()
683 .enumerate()
684 .map(|(col_idx, c)| {
685 Self::encrypt_column_chunk(c, file_encryptor, rg_idx, col_idx)
686 })
687 .collect();
688 rg.columns = cols?;
689 Ok(rg)
690 })
691 .collect()
692 }
693
694 fn encrypt_column_chunk(
696 mut column_chunk: ColumnChunk,
697 file_encryptor: &Arc<FileEncryptor>,
698 row_group_index: usize,
699 column_index: usize,
700 ) -> Result<ColumnChunk> {
701 match column_chunk.crypto_metadata.as_ref() {
704 None => {}
705 Some(ColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) => {
706 }
709 Some(ColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(col_key)) => {
710 let column_path = col_key.path_in_schema.join(".");
711 let mut column_encryptor = file_encryptor.get_column_encryptor(&column_path)?;
712 let meta_data = column_chunk
713 .meta_data
714 .take()
715 .ok_or_else(|| general_err!("Column metadata not set for encryption"))?;
716 let aad = create_module_aad(
717 file_encryptor.file_aad(),
718 ModuleType::ColumnMetaData,
719 row_group_index,
720 column_index,
721 None,
722 )?;
723 let ciphertext = encrypt_object_to_vec(&meta_data, &mut column_encryptor, &aad)?;
724
725 column_chunk.encrypted_column_metadata = Some(ciphertext);
726 debug_assert!(column_chunk.meta_data.is_none());
727 }
728 }
729
730 Ok(column_chunk)
731 }
732}