parquet/file/metadata/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#[cfg(feature = "encryption")]
19use crate::encryption::{
20    encrypt::{
21        encrypt_object, encrypt_object_to_vec, write_signed_plaintext_object, FileEncryptor,
22    },
23    modules::{create_footer_aad, create_module_aad, ModuleType},
24};
25#[cfg(feature = "encryption")]
26use crate::errors::ParquetError;
27use crate::errors::Result;
28use crate::file::metadata::{KeyValue, ParquetMetaData};
29use crate::file::page_index::index::Index;
30use crate::file::writer::{get_file_magic, TrackedWrite};
31use crate::format::EncryptionAlgorithm;
32#[cfg(feature = "encryption")]
33use crate::format::{AesGcmV1, ColumnCryptoMetaData};
34use crate::format::{ColumnChunk, ColumnIndex, FileMetaData, OffsetIndex, RowGroup};
35use crate::schema::types;
36use crate::schema::types::{SchemaDescPtr, SchemaDescriptor, TypePtr};
37use crate::thrift::TSerializable;
38use std::io::Write;
39use std::sync::Arc;
40use thrift::protocol::TCompactOutputProtocol;
41
42/// Writes `crate::file::metadata` structures to a thrift encoded byte stream
43///
44/// See [`ParquetMetaDataWriter`] for background and example.
45pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
46    buf: &'a mut TrackedWrite<W>,
47    schema: &'a TypePtr,
48    schema_descr: &'a SchemaDescPtr,
49    row_groups: Vec<RowGroup>,
50    column_indexes: Option<&'a [Vec<Option<ColumnIndex>>]>,
51    offset_indexes: Option<&'a [Vec<Option<OffsetIndex>>]>,
52    key_value_metadata: Option<Vec<KeyValue>>,
53    created_by: Option<String>,
54    object_writer: MetadataObjectWriter,
55    writer_version: i32,
56}
57
58impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
59    /// Serialize all the offset indexes to `self.buf`,
60    ///
61    /// Note: also updates the `ColumnChunk::offset_index_offset` and
62    /// `ColumnChunk::offset_index_length` to reflect the position and length
63    /// of the serialized offset indexes.
64    fn write_offset_indexes(&mut self, offset_indexes: &[Vec<Option<OffsetIndex>>]) -> Result<()> {
65        // iter row group
66        // iter each column
67        // write offset index to the file
68        for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
69            for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
70                if let Some(offset_index) = &offset_indexes[row_group_idx][column_idx] {
71                    let start_offset = self.buf.bytes_written();
72                    self.object_writer.write_offset_index(
73                        offset_index,
74                        column_metadata,
75                        row_group_idx,
76                        column_idx,
77                        &mut self.buf,
78                    )?;
79                    let end_offset = self.buf.bytes_written();
80                    // set offset and index for offset index
81                    column_metadata.offset_index_offset = Some(start_offset as i64);
82                    column_metadata.offset_index_length = Some((end_offset - start_offset) as i32);
83                }
84            }
85        }
86        Ok(())
87    }
88
89    /// Serialize all the column indexes to the `self.buf`
90    ///
91    /// Note: also updates the `ColumnChunk::column_index_offset` and
92    /// `ColumnChunk::column_index_length` to reflect the position and length
93    /// of the serialized column indexes.
94    fn write_column_indexes(&mut self, column_indexes: &[Vec<Option<ColumnIndex>>]) -> Result<()> {
95        // iter row group
96        // iter each column
97        // write column index to the file
98        for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
99            for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
100                if let Some(column_index) = &column_indexes[row_group_idx][column_idx] {
101                    let start_offset = self.buf.bytes_written();
102                    self.object_writer.write_column_index(
103                        column_index,
104                        column_metadata,
105                        row_group_idx,
106                        column_idx,
107                        &mut self.buf,
108                    )?;
109                    let end_offset = self.buf.bytes_written();
110                    // set offset and index for offset index
111                    column_metadata.column_index_offset = Some(start_offset as i64);
112                    column_metadata.column_index_length = Some((end_offset - start_offset) as i32);
113                }
114            }
115        }
116        Ok(())
117    }
118
119    /// Assembles and writes the final metadata to self.buf
120    pub fn finish(mut self) -> Result<crate::format::FileMetaData> {
121        let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
122
123        // Write column indexes and offset indexes
124        if let Some(column_indexes) = self.column_indexes {
125            self.write_column_indexes(column_indexes)?;
126        }
127        if let Some(offset_indexes) = self.offset_indexes {
128            self.write_offset_indexes(offset_indexes)?;
129        }
130
131        // We only include ColumnOrder for leaf nodes.
132        // Currently only supported ColumnOrder is TypeDefinedOrder so we set this
133        // for all leaf nodes.
134        // Even if the column has an undefined sort order, such as INTERVAL, this
135        // is still technically the defined TYPEORDER so it should still be set.
136        let column_orders = (0..self.schema_descr.num_columns())
137            .map(|_| crate::format::ColumnOrder::TYPEORDER(crate::format::TypeDefinedOrder {}))
138            .collect();
139        // This field is optional, perhaps in cases where no min/max fields are set
140        // in any Statistics or ColumnIndex object in the whole file.
141        // But for simplicity we always set this field.
142        let column_orders = Some(column_orders);
143        let (row_groups, unencrypted_row_groups) = self
144            .object_writer
145            .apply_row_group_encryption(self.row_groups)?;
146
147        let (encryption_algorithm, footer_signing_key_metadata) =
148            self.object_writer.get_plaintext_footer_crypto_metadata();
149        let mut file_metadata = FileMetaData {
150            num_rows,
151            row_groups,
152            key_value_metadata: self.key_value_metadata.clone(),
153            version: self.writer_version,
154            schema: types::to_thrift(self.schema.as_ref())?,
155            created_by: self.created_by.clone(),
156            column_orders,
157            encryption_algorithm,
158            footer_signing_key_metadata,
159        };
160
161        // Write file metadata
162        let start_pos = self.buf.bytes_written();
163        self.object_writer
164            .write_file_metadata(&file_metadata, &mut self.buf)?;
165        let end_pos = self.buf.bytes_written();
166
167        // Write footer
168        let metadata_len = (end_pos - start_pos) as u32;
169
170        self.buf.write_all(&metadata_len.to_le_bytes())?;
171        self.buf.write_all(self.object_writer.get_file_magic())?;
172
173        if let Some(row_groups) = unencrypted_row_groups {
174            // If row group metadata was encrypted, we replace the encrypted row groups with
175            // unencrypted metadata before it is returned to users. This allows the metadata
176            // to be usable for retrieving the row group statistics for example, without users
177            // needing to decrypt the metadata.
178            file_metadata.row_groups = row_groups;
179        }
180
181        Ok(file_metadata)
182    }
183
184    pub fn new(
185        buf: &'a mut TrackedWrite<W>,
186        schema: &'a TypePtr,
187        schema_descr: &'a SchemaDescPtr,
188        row_groups: Vec<RowGroup>,
189        created_by: Option<String>,
190        writer_version: i32,
191    ) -> Self {
192        Self {
193            buf,
194            schema,
195            schema_descr,
196            row_groups,
197            column_indexes: None,
198            offset_indexes: None,
199            key_value_metadata: None,
200            created_by,
201            object_writer: Default::default(),
202            writer_version,
203        }
204    }
205
206    pub fn with_column_indexes(mut self, column_indexes: &'a [Vec<Option<ColumnIndex>>]) -> Self {
207        self.column_indexes = Some(column_indexes);
208        self
209    }
210
211    pub fn with_offset_indexes(mut self, offset_indexes: &'a [Vec<Option<OffsetIndex>>]) -> Self {
212        self.offset_indexes = Some(offset_indexes);
213        self
214    }
215
216    pub fn with_key_value_metadata(mut self, key_value_metadata: Vec<KeyValue>) -> Self {
217        self.key_value_metadata = Some(key_value_metadata);
218        self
219    }
220
221    #[cfg(feature = "encryption")]
222    pub fn with_file_encryptor(mut self, file_encryptor: Option<Arc<FileEncryptor>>) -> Self {
223        self.object_writer = self.object_writer.with_file_encryptor(file_encryptor);
224        self
225    }
226}
227
228/// Writes [`ParquetMetaData`] to a byte stream
229///
230/// This structure handles the details of writing the various parts of Parquet
231/// metadata into a byte stream. It is used to write the metadata into a parquet
232/// file and can also write metadata into other locations (such as a store of
233/// bytes).
234///
235/// # Discussion
236///
237/// The process of writing Parquet metadata is tricky because the
238/// metadata is not stored as a single inline thrift structure. It can have
239/// several "out of band" structures such as the [`OffsetIndex`] and
240/// BloomFilters stored in separate structures whose locations are stored as
241/// offsets from the beginning of the file.
242///
243/// Note: this writer does not directly write BloomFilters. In order to write
244/// BloomFilters, write the bloom filters into the buffer before creating the
245/// metadata writer. Then set the corresponding `bloom_filter_offset` and
246/// `bloom_filter_length` on [`ColumnChunkMetaData`] passed to this writer.
247///
248/// # Output Format
249///
250/// The format of the metadata is as follows:
251///
252/// 1. Optional [`ColumnIndex`] (thrift encoded)
253/// 2. Optional [`OffsetIndex`] (thrift encoded)
254/// 3. [`FileMetaData`] (thrift encoded)
255/// 4. Length of encoded `FileMetaData` (4 bytes, little endian)
256/// 5. Parquet Magic Bytes (4 bytes)
257///
258/// [`FileMetaData`]: crate::format::FileMetaData
259/// [`ColumnChunkMetaData`]: crate::file::metadata::ColumnChunkMetaData
260///
261/// ```text
262/// ┌──────────────────────┐
263/// │                      │
264/// │         ...          │
265/// │                      │
266/// │┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │
267/// │     ColumnIndex     ◀│─ ─ ─
268/// ││    (Optional)     │ │     │
269/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  │
270/// │┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │     │ FileMetadata
271/// │     OffsetIndex      │       contains embedded
272/// ││    (Optional)     │◀┼ ─   │ offsets to
273/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  │  │    ColumnIndex and
274/// │╔═══════════════════╗ │     │ OffsetIndex
275/// │║                   ║ │  │
276/// │║                   ║ ┼ ─   │
277/// │║   FileMetadata    ║ │
278/// │║                   ║ ┼ ─ ─ ┘
279/// │║                   ║ │
280/// │╚═══════════════════╝ │
281/// │┌───────────────────┐ │
282/// ││  metadata length  │ │ length of FileMetadata  (only)
283/// │└───────────────────┘ │
284/// │┌───────────────────┐ │
285/// ││      'PAR1'       │ │ Parquet Magic Bytes
286/// │└───────────────────┘ │
287/// └──────────────────────┘
288///      Output Buffer
289/// ```
290///
291/// # Example
292/// ```no_run
293/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataWriter};
294/// # fn get_metadata() -> ParquetMetaData { unimplemented!(); }
295/// // write parquet metadata to an in-memory buffer
296/// let mut buffer = vec![];
297/// let metadata: ParquetMetaData = get_metadata();
298/// let writer = ParquetMetaDataWriter::new(&mut buffer, &metadata);
299/// // write the metadata to the buffer
300/// writer.finish().unwrap();
301/// assert!(!buffer.is_empty());
302/// ```
303pub struct ParquetMetaDataWriter<'a, W: Write> {
304    buf: TrackedWrite<W>,
305    metadata: &'a ParquetMetaData,
306}
307
308impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
309    /// Create a new `ParquetMetaDataWriter` to write to `buf`
310    ///
311    /// Note any embedded offsets in the metadata will be written assuming the
312    /// metadata is at the start of the buffer. If the metadata is being written
313    /// to a location other than the start of the buffer, see [`Self::new_with_tracked`]
314    ///
315    /// See example on the struct level documentation
316    pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
317        Self::new_with_tracked(TrackedWrite::new(buf), metadata)
318    }
319
320    /// Create a new ParquetMetaDataWriter to write to `buf`
321    ///
322    /// This method is used when the metadata is being written to a location other
323    /// than the start of the buffer.
324    ///
325    /// See example on the struct level documentation
326    pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
327        Self { buf, metadata }
328    }
329
330    /// Write the metadata to the buffer
331    pub fn finish(mut self) -> Result<()> {
332        let file_metadata = self.metadata.file_metadata();
333
334        let schema = Arc::new(file_metadata.schema().clone());
335        let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
336        let created_by = file_metadata.created_by().map(str::to_string);
337
338        let row_groups = self
339            .metadata
340            .row_groups()
341            .iter()
342            .map(|rg| rg.to_thrift())
343            .collect::<Vec<_>>();
344
345        let key_value_metadata = file_metadata.key_value_metadata().cloned();
346
347        let column_indexes = self.convert_column_indexes();
348        let offset_indexes = self.convert_offset_index();
349
350        let mut encoder = ThriftMetadataWriter::new(
351            &mut self.buf,
352            &schema,
353            &schema_descr,
354            row_groups,
355            created_by,
356            file_metadata.version(),
357        );
358        encoder = encoder.with_column_indexes(&column_indexes);
359        encoder = encoder.with_offset_indexes(&offset_indexes);
360        if let Some(key_value_metadata) = key_value_metadata {
361            encoder = encoder.with_key_value_metadata(key_value_metadata);
362        }
363        encoder.finish()?;
364
365        Ok(())
366    }
367
368    fn convert_column_indexes(&self) -> Vec<Vec<Option<ColumnIndex>>> {
369        if let Some(row_group_column_indexes) = self.metadata.column_index() {
370            (0..self.metadata.row_groups().len())
371                .map(|rg_idx| {
372                    let column_indexes = &row_group_column_indexes[rg_idx];
373                    column_indexes
374                        .iter()
375                        .map(|column_index| match column_index {
376                            Index::NONE => None,
377                            Index::BOOLEAN(column_index) => Some(column_index.to_thrift()),
378                            Index::BYTE_ARRAY(column_index) => Some(column_index.to_thrift()),
379                            Index::DOUBLE(column_index) => Some(column_index.to_thrift()),
380                            Index::FIXED_LEN_BYTE_ARRAY(column_index) => {
381                                Some(column_index.to_thrift())
382                            }
383                            Index::FLOAT(column_index) => Some(column_index.to_thrift()),
384                            Index::INT32(column_index) => Some(column_index.to_thrift()),
385                            Index::INT64(column_index) => Some(column_index.to_thrift()),
386                            Index::INT96(column_index) => Some(column_index.to_thrift()),
387                        })
388                        .collect()
389                })
390                .collect()
391        } else {
392            // make a None for each row group, for each column
393            self.metadata
394                .row_groups()
395                .iter()
396                .map(|rg| std::iter::repeat(None).take(rg.columns().len()).collect())
397                .collect()
398        }
399    }
400
401    fn convert_offset_index(&self) -> Vec<Vec<Option<OffsetIndex>>> {
402        if let Some(row_group_offset_indexes) = self.metadata.offset_index() {
403            (0..self.metadata.row_groups().len())
404                .map(|rg_idx| {
405                    let offset_indexes = &row_group_offset_indexes[rg_idx];
406                    offset_indexes
407                        .iter()
408                        .map(|offset_index| Some(offset_index.to_thrift()))
409                        .collect()
410                })
411                .collect()
412        } else {
413            // make a None for each row group, for each column
414            self.metadata
415                .row_groups()
416                .iter()
417                .map(|rg| std::iter::repeat(None).take(rg.columns().len()).collect())
418                .collect()
419        }
420    }
421}
422
423#[derive(Debug, Default)]
424struct MetadataObjectWriter {
425    #[cfg(feature = "encryption")]
426    file_encryptor: Option<Arc<FileEncryptor>>,
427}
428
429impl MetadataObjectWriter {
430    #[inline]
431    fn write_object(object: &impl TSerializable, sink: impl Write) -> Result<()> {
432        let mut protocol = TCompactOutputProtocol::new(sink);
433        object.write_to_out_protocol(&mut protocol)?;
434        Ok(())
435    }
436}
437
438/// Implementations of [`MetadataObjectWriter`] methods for when encryption is disabled
439#[cfg(not(feature = "encryption"))]
440impl MetadataObjectWriter {
441    /// Write [`FileMetaData`] in Thrift format
442    fn write_file_metadata(&self, file_metadata: &FileMetaData, sink: impl Write) -> Result<()> {
443        Self::write_object(file_metadata, sink)
444    }
445
446    /// Write a column [`OffsetIndex`] in Thrift format
447    fn write_offset_index(
448        &self,
449        offset_index: &OffsetIndex,
450        _column_chunk: &ColumnChunk,
451        _row_group_idx: usize,
452        _column_idx: usize,
453        sink: impl Write,
454    ) -> Result<()> {
455        Self::write_object(offset_index, sink)
456    }
457
458    /// Write a column [`ColumnIndex`] in Thrift format
459    fn write_column_index(
460        &self,
461        column_index: &ColumnIndex,
462        _column_chunk: &ColumnChunk,
463        _row_group_idx: usize,
464        _column_idx: usize,
465        sink: impl Write,
466    ) -> Result<()> {
467        Self::write_object(column_index, sink)
468    }
469
470    /// No-op implementation of row-group metadata encryption
471    fn apply_row_group_encryption(
472        &self,
473        row_groups: Vec<RowGroup>,
474    ) -> Result<(Vec<RowGroup>, Option<Vec<RowGroup>>)> {
475        Ok((row_groups, None))
476    }
477
478    /// Get the "magic" bytes identifying the file type
479    pub fn get_file_magic(&self) -> &[u8; 4] {
480        get_file_magic()
481    }
482
483    fn get_plaintext_footer_crypto_metadata(
484        &self,
485    ) -> (Option<EncryptionAlgorithm>, Option<Vec<u8>>) {
486        (None, None)
487    }
488}
489
490/// Implementations of [`MetadataObjectWriter`] methods that rely on encryption being enabled
491#[cfg(feature = "encryption")]
492impl MetadataObjectWriter {
493    /// Set the file encryptor to use
494    fn with_file_encryptor(mut self, encryptor: Option<Arc<FileEncryptor>>) -> Self {
495        self.file_encryptor = encryptor;
496        self
497    }
498
499    /// Write [`FileMetaData`] in Thrift format, possibly encrypting it if required
500    fn write_file_metadata(
501        &self,
502        file_metadata: &FileMetaData,
503        mut sink: impl Write,
504    ) -> Result<()> {
505        match self.file_encryptor.as_ref() {
506            Some(file_encryptor) if file_encryptor.properties().encrypt_footer() => {
507                // First write FileCryptoMetadata
508                let crypto_metadata = Self::file_crypto_metadata(file_encryptor)?;
509                let mut protocol = TCompactOutputProtocol::new(&mut sink);
510                crypto_metadata.write_to_out_protocol(&mut protocol)?;
511
512                // Then write encrypted footer
513                let aad = create_footer_aad(file_encryptor.file_aad())?;
514                let mut encryptor = file_encryptor.get_footer_encryptor()?;
515                encrypt_object(file_metadata, &mut encryptor, &mut sink, &aad)
516            }
517            Some(file_encryptor) if file_metadata.encryption_algorithm.is_some() => {
518                let aad = create_footer_aad(file_encryptor.file_aad())?;
519                let mut encryptor = file_encryptor.get_footer_encryptor()?;
520                write_signed_plaintext_object(file_metadata, &mut encryptor, &mut sink, &aad)
521            }
522            _ => Self::write_object(file_metadata, &mut sink),
523        }
524    }
525
526    /// Write a column [`OffsetIndex`] in Thrift format, possibly encrypting it if required
527    fn write_offset_index(
528        &self,
529        offset_index: &OffsetIndex,
530        column_chunk: &ColumnChunk,
531        row_group_idx: usize,
532        column_idx: usize,
533        sink: impl Write,
534    ) -> Result<()> {
535        match &self.file_encryptor {
536            Some(file_encryptor) => Self::write_object_with_encryption(
537                offset_index,
538                sink,
539                file_encryptor,
540                column_chunk,
541                ModuleType::OffsetIndex,
542                row_group_idx,
543                column_idx,
544            ),
545            None => Self::write_object(offset_index, sink),
546        }
547    }
548
549    /// Write a column [`ColumnIndex`] in Thrift format, possibly encrypting it if required
550    fn write_column_index(
551        &self,
552        column_index: &ColumnIndex,
553        column_chunk: &ColumnChunk,
554        row_group_idx: usize,
555        column_idx: usize,
556        sink: impl Write,
557    ) -> Result<()> {
558        match &self.file_encryptor {
559            Some(file_encryptor) => Self::write_object_with_encryption(
560                column_index,
561                sink,
562                file_encryptor,
563                column_chunk,
564                ModuleType::ColumnIndex,
565                row_group_idx,
566                column_idx,
567            ),
568            None => Self::write_object(column_index, sink),
569        }
570    }
571
572    /// If encryption is enabled and configured, encrypt row group metadata.
573    /// Returns a tuple of the row group metadata to write,
574    /// and possibly unencrypted metadata to be returned to clients if data was encrypted.
575    fn apply_row_group_encryption(
576        &self,
577        row_groups: Vec<RowGroup>,
578    ) -> Result<(Vec<RowGroup>, Option<Vec<RowGroup>>)> {
579        match &self.file_encryptor {
580            Some(file_encryptor) => {
581                let unencrypted_row_groups = row_groups.clone();
582                let encrypted_row_groups = Self::encrypt_row_groups(row_groups, file_encryptor)?;
583                Ok((encrypted_row_groups, Some(unencrypted_row_groups)))
584            }
585            None => Ok((row_groups, None)),
586        }
587    }
588
589    /// Get the "magic" bytes identifying the file type
590    fn get_file_magic(&self) -> &[u8; 4] {
591        get_file_magic(
592            self.file_encryptor
593                .as_ref()
594                .map(|encryptor| encryptor.properties()),
595        )
596    }
597
598    fn write_object_with_encryption(
599        object: &impl TSerializable,
600        mut sink: impl Write,
601        file_encryptor: &FileEncryptor,
602        column_metadata: &ColumnChunk,
603        module_type: ModuleType,
604        row_group_index: usize,
605        column_index: usize,
606    ) -> Result<()> {
607        let column_path_vec = &column_metadata
608            .meta_data
609            .as_ref()
610            .ok_or_else(|| {
611                general_err!(
612                    "Column metadata not set for column {} when encrypting object",
613                    column_index
614                )
615            })?
616            .path_in_schema;
617
618        let joined_column_path;
619        let column_path = if column_path_vec.len() == 1 {
620            &column_path_vec[0]
621        } else {
622            joined_column_path = column_path_vec.join(".");
623            &joined_column_path
624        };
625
626        if file_encryptor.is_column_encrypted(column_path) {
627            let aad = create_module_aad(
628                file_encryptor.file_aad(),
629                module_type,
630                row_group_index,
631                column_index,
632                None,
633            )?;
634            let mut encryptor = file_encryptor.get_column_encryptor(column_path)?;
635            encrypt_object(object, &mut encryptor, &mut sink, &aad)
636        } else {
637            Self::write_object(object, sink)
638        }
639    }
640
641    fn get_plaintext_footer_crypto_metadata(
642        &self,
643    ) -> (Option<EncryptionAlgorithm>, Option<Vec<u8>>) {
644        // Only plaintext footers may contain encryption algorithm and footer key metadata.
645        if let Some(file_encryptor) = self.file_encryptor.as_ref() {
646            let encryption_properties = file_encryptor.properties();
647            if !encryption_properties.encrypt_footer() {
648                return (
649                    Some(Self::encryption_algorithm_from_encryptor(file_encryptor)),
650                    encryption_properties.footer_key_metadata().cloned(),
651                );
652            }
653        }
654        (None, None)
655    }
656
657    fn encryption_algorithm_from_encryptor(file_encryptor: &FileEncryptor) -> EncryptionAlgorithm {
658        let supply_aad_prefix = file_encryptor
659            .properties()
660            .aad_prefix()
661            .map(|_| !file_encryptor.properties().store_aad_prefix());
662        let aad_prefix = if file_encryptor.properties().store_aad_prefix() {
663            file_encryptor.properties().aad_prefix().cloned()
664        } else {
665            None
666        };
667        EncryptionAlgorithm::AESGCMV1(AesGcmV1 {
668            aad_prefix,
669            aad_file_unique: Some(file_encryptor.aad_file_unique().clone()),
670            supply_aad_prefix,
671        })
672    }
673
674    fn file_crypto_metadata(
675        file_encryptor: &FileEncryptor,
676    ) -> Result<crate::format::FileCryptoMetaData> {
677        let properties = file_encryptor.properties();
678        Ok(crate::format::FileCryptoMetaData {
679            encryption_algorithm: Self::encryption_algorithm_from_encryptor(file_encryptor),
680            key_metadata: properties.footer_key_metadata().cloned(),
681        })
682    }
683
684    fn encrypt_row_groups(
685        row_groups: Vec<RowGroup>,
686        file_encryptor: &Arc<FileEncryptor>,
687    ) -> Result<Vec<RowGroup>> {
688        row_groups
689            .into_iter()
690            .enumerate()
691            .map(|(rg_idx, mut rg)| {
692                let cols: Result<Vec<ColumnChunk>> = rg
693                    .columns
694                    .into_iter()
695                    .enumerate()
696                    .map(|(col_idx, c)| {
697                        Self::encrypt_column_chunk(c, file_encryptor, rg_idx, col_idx)
698                    })
699                    .collect();
700                rg.columns = cols?;
701                Ok(rg)
702            })
703            .collect()
704    }
705
706    /// Apply column encryption to column chunk metadata
707    fn encrypt_column_chunk(
708        mut column_chunk: ColumnChunk,
709        file_encryptor: &Arc<FileEncryptor>,
710        row_group_index: usize,
711        column_index: usize,
712    ) -> Result<ColumnChunk> {
713        // Column crypto metadata should have already been set when the column was created.
714        // Here we apply the encryption by encrypting the column metadata if required.
715        match column_chunk.crypto_metadata.as_ref() {
716            None => {}
717            Some(ColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) => {
718                // When uniform encryption is used the footer is already encrypted,
719                // so the column chunk does not need additional encryption.
720            }
721            Some(ColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(col_key)) => {
722                let column_path = col_key.path_in_schema.join(".");
723                let mut column_encryptor = file_encryptor.get_column_encryptor(&column_path)?;
724                let meta_data = column_chunk
725                    .meta_data
726                    .take()
727                    .ok_or_else(|| general_err!("Column metadata not set for encryption"))?;
728                let aad = create_module_aad(
729                    file_encryptor.file_aad(),
730                    ModuleType::ColumnMetaData,
731                    row_group_index,
732                    column_index,
733                    None,
734                )?;
735                let ciphertext = encrypt_object_to_vec(&meta_data, &mut column_encryptor, &aad)?;
736
737                column_chunk.encrypted_column_metadata = Some(ciphertext);
738                debug_assert!(column_chunk.meta_data.is_none());
739            }
740        }
741
742        Ok(column_chunk)
743    }
744}