1#[cfg(feature = "encryption")]
19use crate::encryption::encrypt::{encrypt_object, encrypt_object_to_vec, FileEncryptor};
20#[cfg(feature = "encryption")]
21use crate::encryption::modules::{create_footer_aad, create_module_aad, ModuleType};
22#[cfg(feature = "encryption")]
23use crate::errors::ParquetError;
24use crate::errors::Result;
25use crate::file::metadata::{KeyValue, ParquetMetaData};
26use crate::file::page_index::index::Index;
27use crate::file::writer::{get_file_magic, TrackedWrite};
28#[cfg(feature = "encryption")]
29use crate::format::{AesGcmV1, ColumnCryptoMetaData, EncryptionAlgorithm};
30use crate::format::{ColumnChunk, ColumnIndex, FileMetaData, OffsetIndex, RowGroup};
31use crate::schema::types;
32use crate::schema::types::{SchemaDescPtr, SchemaDescriptor, TypePtr};
33use crate::thrift::TSerializable;
34use std::io::Write;
35use std::sync::Arc;
36use thrift::protocol::TCompactOutputProtocol;
37
38pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
42 buf: &'a mut TrackedWrite<W>,
43 schema: &'a TypePtr,
44 schema_descr: &'a SchemaDescPtr,
45 row_groups: Vec<RowGroup>,
46 column_indexes: Option<&'a [Vec<Option<ColumnIndex>>]>,
47 offset_indexes: Option<&'a [Vec<Option<OffsetIndex>>]>,
48 key_value_metadata: Option<Vec<KeyValue>>,
49 created_by: Option<String>,
50 object_writer: MetadataObjectWriter,
51 writer_version: i32,
52}
53
54impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
55 fn write_offset_indexes(&mut self, offset_indexes: &[Vec<Option<OffsetIndex>>]) -> Result<()> {
61 for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
65 for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
66 if let Some(offset_index) = &offset_indexes[row_group_idx][column_idx] {
67 let start_offset = self.buf.bytes_written();
68 self.object_writer.write_offset_index(
69 offset_index,
70 column_metadata,
71 row_group_idx,
72 column_idx,
73 &mut self.buf,
74 )?;
75 let end_offset = self.buf.bytes_written();
76 column_metadata.offset_index_offset = Some(start_offset as i64);
78 column_metadata.offset_index_length = Some((end_offset - start_offset) as i32);
79 }
80 }
81 }
82 Ok(())
83 }
84
85 fn write_column_indexes(&mut self, column_indexes: &[Vec<Option<ColumnIndex>>]) -> Result<()> {
91 for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
95 for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
96 if let Some(column_index) = &column_indexes[row_group_idx][column_idx] {
97 let start_offset = self.buf.bytes_written();
98 self.object_writer.write_column_index(
99 column_index,
100 column_metadata,
101 row_group_idx,
102 column_idx,
103 &mut self.buf,
104 )?;
105 let end_offset = self.buf.bytes_written();
106 column_metadata.column_index_offset = Some(start_offset as i64);
108 column_metadata.column_index_length = Some((end_offset - start_offset) as i32);
109 }
110 }
111 }
112 Ok(())
113 }
114
115 pub fn finish(mut self) -> Result<crate::format::FileMetaData> {
117 let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
118
119 if let Some(column_indexes) = self.column_indexes {
121 self.write_column_indexes(column_indexes)?;
122 }
123 if let Some(offset_indexes) = self.offset_indexes {
124 self.write_offset_indexes(offset_indexes)?;
125 }
126
127 let column_orders = (0..self.schema_descr.num_columns())
133 .map(|_| crate::format::ColumnOrder::TYPEORDER(crate::format::TypeDefinedOrder {}))
134 .collect();
135 let column_orders = Some(column_orders);
139
140 let (row_groups, unencrypted_row_groups) = self
141 .object_writer
142 .apply_row_group_encryption(self.row_groups)?;
143
144 let mut file_metadata = FileMetaData {
145 num_rows,
146 row_groups,
147 key_value_metadata: self.key_value_metadata.clone(),
148 version: self.writer_version,
149 schema: types::to_thrift(self.schema.as_ref())?,
150 created_by: self.created_by.clone(),
151 column_orders,
152 encryption_algorithm: None,
153 footer_signing_key_metadata: None,
154 };
155
156 let start_pos = self.buf.bytes_written();
158 self.object_writer
159 .write_file_metadata(&file_metadata, &mut self.buf)?;
160 let end_pos = self.buf.bytes_written();
161
162 let metadata_len = (end_pos - start_pos) as u32;
164
165 self.buf.write_all(&metadata_len.to_le_bytes())?;
166 self.buf.write_all(self.object_writer.get_file_magic())?;
167
168 if let Some(row_groups) = unencrypted_row_groups {
169 file_metadata.row_groups = row_groups;
174 }
175
176 Ok(file_metadata)
177 }
178
179 pub fn new(
180 buf: &'a mut TrackedWrite<W>,
181 schema: &'a TypePtr,
182 schema_descr: &'a SchemaDescPtr,
183 row_groups: Vec<RowGroup>,
184 created_by: Option<String>,
185 writer_version: i32,
186 ) -> Self {
187 Self {
188 buf,
189 schema,
190 schema_descr,
191 row_groups,
192 column_indexes: None,
193 offset_indexes: None,
194 key_value_metadata: None,
195 created_by,
196 object_writer: Default::default(),
197 writer_version,
198 }
199 }
200
201 pub fn with_column_indexes(mut self, column_indexes: &'a [Vec<Option<ColumnIndex>>]) -> Self {
202 self.column_indexes = Some(column_indexes);
203 self
204 }
205
206 pub fn with_offset_indexes(mut self, offset_indexes: &'a [Vec<Option<OffsetIndex>>]) -> Self {
207 self.offset_indexes = Some(offset_indexes);
208 self
209 }
210
211 pub fn with_key_value_metadata(mut self, key_value_metadata: Vec<KeyValue>) -> Self {
212 self.key_value_metadata = Some(key_value_metadata);
213 self
214 }
215
216 #[cfg(feature = "encryption")]
217 pub fn with_file_encryptor(mut self, file_encryptor: Option<Arc<FileEncryptor>>) -> Self {
218 self.object_writer = self.object_writer.with_file_encryptor(file_encryptor);
219 self
220 }
221}
222
223pub struct ParquetMetaDataWriter<'a, W: Write> {
299 buf: TrackedWrite<W>,
300 metadata: &'a ParquetMetaData,
301}
302
303impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
304 pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
312 Self::new_with_tracked(TrackedWrite::new(buf), metadata)
313 }
314
315 pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
322 Self { buf, metadata }
323 }
324
325 pub fn finish(mut self) -> Result<()> {
327 let file_metadata = self.metadata.file_metadata();
328
329 let schema = Arc::new(file_metadata.schema().clone());
330 let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
331 let created_by = file_metadata.created_by().map(str::to_string);
332
333 let row_groups = self
334 .metadata
335 .row_groups()
336 .iter()
337 .map(|rg| rg.to_thrift())
338 .collect::<Vec<_>>();
339
340 let key_value_metadata = file_metadata.key_value_metadata().cloned();
341
342 let column_indexes = self.convert_column_indexes();
343 let offset_indexes = self.convert_offset_index();
344
345 let mut encoder = ThriftMetadataWriter::new(
346 &mut self.buf,
347 &schema,
348 &schema_descr,
349 row_groups,
350 created_by,
351 file_metadata.version(),
352 );
353 encoder = encoder.with_column_indexes(&column_indexes);
354 encoder = encoder.with_offset_indexes(&offset_indexes);
355 if let Some(key_value_metadata) = key_value_metadata {
356 encoder = encoder.with_key_value_metadata(key_value_metadata);
357 }
358 encoder.finish()?;
359
360 Ok(())
361 }
362
363 fn convert_column_indexes(&self) -> Vec<Vec<Option<ColumnIndex>>> {
364 if let Some(row_group_column_indexes) = self.metadata.column_index() {
365 (0..self.metadata.row_groups().len())
366 .map(|rg_idx| {
367 let column_indexes = &row_group_column_indexes[rg_idx];
368 column_indexes
369 .iter()
370 .map(|column_index| match column_index {
371 Index::NONE => None,
372 Index::BOOLEAN(column_index) => Some(column_index.to_thrift()),
373 Index::BYTE_ARRAY(column_index) => Some(column_index.to_thrift()),
374 Index::DOUBLE(column_index) => Some(column_index.to_thrift()),
375 Index::FIXED_LEN_BYTE_ARRAY(column_index) => {
376 Some(column_index.to_thrift())
377 }
378 Index::FLOAT(column_index) => Some(column_index.to_thrift()),
379 Index::INT32(column_index) => Some(column_index.to_thrift()),
380 Index::INT64(column_index) => Some(column_index.to_thrift()),
381 Index::INT96(column_index) => Some(column_index.to_thrift()),
382 })
383 .collect()
384 })
385 .collect()
386 } else {
387 self.metadata
389 .row_groups()
390 .iter()
391 .map(|rg| std::iter::repeat(None).take(rg.columns().len()).collect())
392 .collect()
393 }
394 }
395
396 fn convert_offset_index(&self) -> Vec<Vec<Option<OffsetIndex>>> {
397 if let Some(row_group_offset_indexes) = self.metadata.offset_index() {
398 (0..self.metadata.row_groups().len())
399 .map(|rg_idx| {
400 let offset_indexes = &row_group_offset_indexes[rg_idx];
401 offset_indexes
402 .iter()
403 .map(|offset_index| Some(offset_index.to_thrift()))
404 .collect()
405 })
406 .collect()
407 } else {
408 self.metadata
410 .row_groups()
411 .iter()
412 .map(|rg| std::iter::repeat(None).take(rg.columns().len()).collect())
413 .collect()
414 }
415 }
416}
417
418#[derive(Debug, Default)]
419struct MetadataObjectWriter {
420 #[cfg(feature = "encryption")]
421 file_encryptor: Option<Arc<FileEncryptor>>,
422}
423
424impl MetadataObjectWriter {
425 #[inline]
426 fn write_object(object: &impl TSerializable, sink: impl Write) -> Result<()> {
427 let mut protocol = TCompactOutputProtocol::new(sink);
428 object.write_to_out_protocol(&mut protocol)?;
429 Ok(())
430 }
431}
432
433#[cfg(not(feature = "encryption"))]
435impl MetadataObjectWriter {
436 fn write_file_metadata(&self, file_metadata: &FileMetaData, sink: impl Write) -> Result<()> {
438 Self::write_object(file_metadata, sink)
439 }
440
441 fn write_offset_index(
443 &self,
444 offset_index: &OffsetIndex,
445 _column_chunk: &ColumnChunk,
446 _row_group_idx: usize,
447 _column_idx: usize,
448 sink: impl Write,
449 ) -> Result<()> {
450 Self::write_object(offset_index, sink)
451 }
452
453 fn write_column_index(
455 &self,
456 column_index: &ColumnIndex,
457 _column_chunk: &ColumnChunk,
458 _row_group_idx: usize,
459 _column_idx: usize,
460 sink: impl Write,
461 ) -> Result<()> {
462 Self::write_object(column_index, sink)
463 }
464
465 fn apply_row_group_encryption(
467 &self,
468 row_groups: Vec<RowGroup>,
469 ) -> Result<(Vec<RowGroup>, Option<Vec<RowGroup>>)> {
470 Ok((row_groups, None))
471 }
472
473 pub fn get_file_magic(&self) -> &[u8; 4] {
475 get_file_magic()
476 }
477}
478
479#[cfg(feature = "encryption")]
481impl MetadataObjectWriter {
482 fn with_file_encryptor(mut self, encryptor: Option<Arc<FileEncryptor>>) -> Self {
484 self.file_encryptor = encryptor;
485 self
486 }
487
488 fn write_file_metadata(
490 &self,
491 file_metadata: &FileMetaData,
492 mut sink: impl Write,
493 ) -> Result<()> {
494 match self.file_encryptor.as_ref() {
495 Some(file_encryptor) if file_encryptor.properties().encrypt_footer() => {
496 let crypto_metadata = Self::file_crypto_metadata(file_encryptor)?;
498 let mut protocol = TCompactOutputProtocol::new(&mut sink);
499 crypto_metadata.write_to_out_protocol(&mut protocol)?;
500
501 let aad = create_footer_aad(file_encryptor.file_aad())?;
503 let mut encryptor = file_encryptor.get_footer_encryptor()?;
504 encrypt_object(file_metadata, &mut encryptor, &mut sink, &aad)
505 }
506 _ => Self::write_object(file_metadata, &mut sink),
507 }
508 }
509
510 fn write_offset_index(
512 &self,
513 offset_index: &OffsetIndex,
514 column_chunk: &ColumnChunk,
515 row_group_idx: usize,
516 column_idx: usize,
517 sink: impl Write,
518 ) -> Result<()> {
519 match &self.file_encryptor {
520 Some(file_encryptor) => Self::write_object_with_encryption(
521 offset_index,
522 sink,
523 file_encryptor,
524 column_chunk,
525 ModuleType::OffsetIndex,
526 row_group_idx,
527 column_idx,
528 ),
529 None => Self::write_object(offset_index, sink),
530 }
531 }
532
533 fn write_column_index(
535 &self,
536 column_index: &ColumnIndex,
537 column_chunk: &ColumnChunk,
538 row_group_idx: usize,
539 column_idx: usize,
540 sink: impl Write,
541 ) -> Result<()> {
542 match &self.file_encryptor {
543 Some(file_encryptor) => Self::write_object_with_encryption(
544 column_index,
545 sink,
546 file_encryptor,
547 column_chunk,
548 ModuleType::ColumnIndex,
549 row_group_idx,
550 column_idx,
551 ),
552 None => Self::write_object(column_index, sink),
553 }
554 }
555
556 fn apply_row_group_encryption(
560 &self,
561 row_groups: Vec<RowGroup>,
562 ) -> Result<(Vec<RowGroup>, Option<Vec<RowGroup>>)> {
563 match &self.file_encryptor {
564 Some(file_encryptor) => {
565 let unencrypted_row_groups = row_groups.clone();
566 let encrypted_row_groups = Self::encrypt_row_groups(row_groups, file_encryptor)?;
567 Ok((encrypted_row_groups, Some(unencrypted_row_groups)))
568 }
569 None => Ok((row_groups, None)),
570 }
571 }
572
573 fn get_file_magic(&self) -> &[u8; 4] {
575 get_file_magic(
576 self.file_encryptor
577 .as_ref()
578 .map(|encryptor| encryptor.properties()),
579 )
580 }
581
582 fn write_object_with_encryption(
583 object: &impl TSerializable,
584 mut sink: impl Write,
585 file_encryptor: &FileEncryptor,
586 column_metadata: &ColumnChunk,
587 module_type: ModuleType,
588 row_group_index: usize,
589 column_index: usize,
590 ) -> Result<()> {
591 let column_path_vec = &column_metadata
592 .meta_data
593 .as_ref()
594 .ok_or_else(|| {
595 general_err!(
596 "Column metadata not set for column {} when encrypting object",
597 column_index
598 )
599 })?
600 .path_in_schema;
601
602 let joined_column_path;
603 let column_path = if column_path_vec.len() == 1 {
604 &column_path_vec[0]
605 } else {
606 joined_column_path = column_path_vec.join(".");
607 &joined_column_path
608 };
609
610 if file_encryptor.is_column_encrypted(column_path) {
611 let aad = create_module_aad(
612 file_encryptor.file_aad(),
613 module_type,
614 row_group_index,
615 column_index,
616 None,
617 )?;
618 let mut encryptor = file_encryptor.get_column_encryptor(column_path)?;
619 encrypt_object(object, &mut encryptor, &mut sink, &aad)
620 } else {
621 Self::write_object(object, sink)
622 }
623 }
624
625 fn file_crypto_metadata(
626 file_encryptor: &FileEncryptor,
627 ) -> Result<crate::format::FileCryptoMetaData> {
628 let properties = file_encryptor.properties();
629 let supply_aad_prefix = properties
630 .aad_prefix()
631 .map(|_| !properties.store_aad_prefix());
632 let encryption_algorithm = AesGcmV1 {
633 aad_prefix: if properties.store_aad_prefix() {
634 properties.aad_prefix().cloned()
635 } else {
636 None
637 },
638 aad_file_unique: Some(file_encryptor.aad_file_unique().clone()),
639 supply_aad_prefix,
640 };
641
642 Ok(crate::format::FileCryptoMetaData {
643 encryption_algorithm: EncryptionAlgorithm::AESGCMV1(encryption_algorithm),
644 key_metadata: properties.footer_key_metadata().cloned(),
645 })
646 }
647
648 fn encrypt_row_groups(
649 row_groups: Vec<RowGroup>,
650 file_encryptor: &Arc<FileEncryptor>,
651 ) -> Result<Vec<RowGroup>> {
652 row_groups
653 .into_iter()
654 .enumerate()
655 .map(|(rg_idx, mut rg)| {
656 let cols: Result<Vec<ColumnChunk>> = rg
657 .columns
658 .into_iter()
659 .enumerate()
660 .map(|(col_idx, c)| {
661 Self::encrypt_column_chunk(c, file_encryptor, rg_idx, col_idx)
662 })
663 .collect();
664 rg.columns = cols?;
665 Ok(rg)
666 })
667 .collect()
668 }
669
670 fn encrypt_column_chunk(
672 mut column_chunk: ColumnChunk,
673 file_encryptor: &Arc<FileEncryptor>,
674 row_group_index: usize,
675 column_index: usize,
676 ) -> Result<ColumnChunk> {
677 match column_chunk.crypto_metadata.as_ref() {
680 None => {}
681 Some(ColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) => {
682 }
685 Some(ColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(col_key)) => {
686 let column_path = col_key.path_in_schema.join(".");
687 let mut column_encryptor = file_encryptor.get_column_encryptor(&column_path)?;
688 let meta_data = column_chunk
689 .meta_data
690 .take()
691 .ok_or_else(|| general_err!("Column metadata not set for encryption"))?;
692 let aad = create_module_aad(
693 file_encryptor.file_aad(),
694 ModuleType::ColumnMetaData,
695 row_group_index,
696 column_index,
697 None,
698 )?;
699 let ciphertext = encrypt_object_to_vec(&meta_data, &mut column_encryptor, &aad)?;
700
701 column_chunk.encrypted_column_metadata = Some(ciphertext);
702 debug_assert!(column_chunk.meta_data.is_none());
703 }
704 }
705
706 Ok(column_chunk)
707 }
708}