parquet/file/metadata/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#[cfg(feature = "encryption")]
19use crate::encryption::{
20    encrypt::{
21        encrypt_object, encrypt_object_to_vec, write_signed_plaintext_object, FileEncryptor,
22    },
23    modules::{create_footer_aad, create_module_aad, ModuleType},
24};
25#[cfg(feature = "encryption")]
26use crate::errors::ParquetError;
27use crate::errors::Result;
28use crate::file::metadata::{KeyValue, ParquetMetaData};
29use crate::file::page_index::index::Index;
30use crate::file::writer::{get_file_magic, TrackedWrite};
31use crate::format::EncryptionAlgorithm;
32#[cfg(feature = "encryption")]
33use crate::format::{AesGcmV1, ColumnCryptoMetaData};
34use crate::format::{ColumnChunk, ColumnIndex, FileMetaData, OffsetIndex, RowGroup};
35use crate::schema::types;
36use crate::schema::types::{SchemaDescPtr, SchemaDescriptor, TypePtr};
37use crate::thrift::TSerializable;
38use std::io::Write;
39use std::sync::Arc;
40use thrift::protocol::TCompactOutputProtocol;
41
42/// Writes `crate::file::metadata` structures to a thrift encoded byte stream
43///
44/// See [`ParquetMetaDataWriter`] for background and example.
45pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
46    buf: &'a mut TrackedWrite<W>,
47    schema: &'a TypePtr,
48    schema_descr: &'a SchemaDescPtr,
49    row_groups: Vec<RowGroup>,
50    column_indexes: Option<&'a [Vec<Option<ColumnIndex>>]>,
51    offset_indexes: Option<&'a [Vec<Option<OffsetIndex>>]>,
52    key_value_metadata: Option<Vec<KeyValue>>,
53    created_by: Option<String>,
54    object_writer: MetadataObjectWriter,
55    writer_version: i32,
56}
57
58impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
59    /// Serialize all the offset indexes to `self.buf`,
60    ///
61    /// Note: also updates the `ColumnChunk::offset_index_offset` and
62    /// `ColumnChunk::offset_index_length` to reflect the position and length
63    /// of the serialized offset indexes.
64    fn write_offset_indexes(&mut self, offset_indexes: &[Vec<Option<OffsetIndex>>]) -> Result<()> {
65        // iter row group
66        // iter each column
67        // write offset index to the file
68        for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
69            for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
70                if let Some(offset_index) = &offset_indexes[row_group_idx][column_idx] {
71                    let start_offset = self.buf.bytes_written();
72                    self.object_writer.write_offset_index(
73                        offset_index,
74                        column_metadata,
75                        row_group_idx,
76                        column_idx,
77                        &mut self.buf,
78                    )?;
79                    let end_offset = self.buf.bytes_written();
80                    // set offset and index for offset index
81                    column_metadata.offset_index_offset = Some(start_offset as i64);
82                    column_metadata.offset_index_length = Some((end_offset - start_offset) as i32);
83                }
84            }
85        }
86        Ok(())
87    }
88
89    /// Serialize all the column indexes to the `self.buf`
90    ///
91    /// Note: also updates the `ColumnChunk::column_index_offset` and
92    /// `ColumnChunk::column_index_length` to reflect the position and length
93    /// of the serialized column indexes.
94    fn write_column_indexes(&mut self, column_indexes: &[Vec<Option<ColumnIndex>>]) -> Result<()> {
95        // iter row group
96        // iter each column
97        // write column index to the file
98        for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
99            for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
100                if let Some(column_index) = &column_indexes[row_group_idx][column_idx] {
101                    let start_offset = self.buf.bytes_written();
102                    self.object_writer.write_column_index(
103                        column_index,
104                        column_metadata,
105                        row_group_idx,
106                        column_idx,
107                        &mut self.buf,
108                    )?;
109                    let end_offset = self.buf.bytes_written();
110                    // set offset and index for offset index
111                    column_metadata.column_index_offset = Some(start_offset as i64);
112                    column_metadata.column_index_length = Some((end_offset - start_offset) as i32);
113                }
114            }
115        }
116        Ok(())
117    }
118
119    /// Assembles and writes the final metadata to self.buf
120    pub fn finish(mut self) -> Result<crate::format::FileMetaData> {
121        let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
122
123        // Write column indexes and offset indexes
124        if let Some(column_indexes) = self.column_indexes {
125            self.write_column_indexes(column_indexes)?;
126        }
127        if let Some(offset_indexes) = self.offset_indexes {
128            self.write_offset_indexes(offset_indexes)?;
129        }
130
131        // We only include ColumnOrder for leaf nodes.
132        // Currently only supported ColumnOrder is TypeDefinedOrder so we set this
133        // for all leaf nodes.
134        // Even if the column has an undefined sort order, such as INTERVAL, this
135        // is still technically the defined TYPEORDER so it should still be set.
136        let column_orders = (0..self.schema_descr.num_columns())
137            .map(|_| crate::format::ColumnOrder::TYPEORDER(crate::format::TypeDefinedOrder {}))
138            .collect();
139        // This field is optional, perhaps in cases where no min/max fields are set
140        // in any Statistics or ColumnIndex object in the whole file.
141        // But for simplicity we always set this field.
142        let column_orders = Some(column_orders);
143
144        let (row_groups, unencrypted_row_groups) = self
145            .object_writer
146            .apply_row_group_encryption(self.row_groups)?;
147
148        let mut file_metadata = FileMetaData {
149            num_rows,
150            row_groups,
151            key_value_metadata: self.key_value_metadata.clone(),
152            version: self.writer_version,
153            schema: types::to_thrift(self.schema.as_ref())?,
154            created_by: self.created_by.clone(),
155            column_orders,
156            encryption_algorithm: self.object_writer.get_footer_encryption_algorithm(),
157            footer_signing_key_metadata: None,
158        };
159
160        // Write file metadata
161        let start_pos = self.buf.bytes_written();
162        self.object_writer
163            .write_file_metadata(&file_metadata, &mut self.buf)?;
164        let end_pos = self.buf.bytes_written();
165
166        // Write footer
167        let metadata_len = (end_pos - start_pos) as u32;
168
169        self.buf.write_all(&metadata_len.to_le_bytes())?;
170        self.buf.write_all(self.object_writer.get_file_magic())?;
171
172        if let Some(row_groups) = unencrypted_row_groups {
173            // If row group metadata was encrypted, we replace the encrypted row groups with
174            // unencrypted metadata before it is returned to users. This allows the metadata
175            // to be usable for retrieving the row group statistics for example, without users
176            // needing to decrypt the metadata.
177            file_metadata.row_groups = row_groups;
178        }
179
180        Ok(file_metadata)
181    }
182
183    pub fn new(
184        buf: &'a mut TrackedWrite<W>,
185        schema: &'a TypePtr,
186        schema_descr: &'a SchemaDescPtr,
187        row_groups: Vec<RowGroup>,
188        created_by: Option<String>,
189        writer_version: i32,
190    ) -> Self {
191        Self {
192            buf,
193            schema,
194            schema_descr,
195            row_groups,
196            column_indexes: None,
197            offset_indexes: None,
198            key_value_metadata: None,
199            created_by,
200            object_writer: Default::default(),
201            writer_version,
202        }
203    }
204
205    pub fn with_column_indexes(mut self, column_indexes: &'a [Vec<Option<ColumnIndex>>]) -> Self {
206        self.column_indexes = Some(column_indexes);
207        self
208    }
209
210    pub fn with_offset_indexes(mut self, offset_indexes: &'a [Vec<Option<OffsetIndex>>]) -> Self {
211        self.offset_indexes = Some(offset_indexes);
212        self
213    }
214
215    pub fn with_key_value_metadata(mut self, key_value_metadata: Vec<KeyValue>) -> Self {
216        self.key_value_metadata = Some(key_value_metadata);
217        self
218    }
219
220    #[cfg(feature = "encryption")]
221    pub fn with_file_encryptor(mut self, file_encryptor: Option<Arc<FileEncryptor>>) -> Self {
222        self.object_writer = self.object_writer.with_file_encryptor(file_encryptor);
223        self
224    }
225}
226
227/// Writes [`ParquetMetaData`] to a byte stream
228///
229/// This structure handles the details of writing the various parts of Parquet
230/// metadata into a byte stream. It is used to write the metadata into a parquet
231/// file and can also write metadata into other locations (such as a store of
232/// bytes).
233///
234/// # Discussion
235///
236/// The process of writing Parquet metadata is tricky because the
237/// metadata is not stored as a single inline thrift structure. It can have
238/// several "out of band" structures such as the [`OffsetIndex`] and
239/// BloomFilters stored in separate structures whose locations are stored as
240/// offsets from the beginning of the file.
241///
242/// Note: this writer does not directly write BloomFilters. In order to write
243/// BloomFilters, write the bloom filters into the buffer before creating the
244/// metadata writer. Then set the corresponding `bloom_filter_offset` and
245/// `bloom_filter_length` on [`ColumnChunkMetaData`] passed to this writer.
246///
247/// # Output Format
248///
249/// The format of the metadata is as follows:
250///
251/// 1. Optional [`ColumnIndex`] (thrift encoded)
252/// 2. Optional [`OffsetIndex`] (thrift encoded)
253/// 3. [`FileMetaData`] (thrift encoded)
254/// 4. Length of encoded `FileMetaData` (4 bytes, little endian)
255/// 5. Parquet Magic Bytes (4 bytes)
256///
257/// [`FileMetaData`]: crate::format::FileMetaData
258/// [`ColumnChunkMetaData`]: crate::file::metadata::ColumnChunkMetaData
259///
260/// ```text
261/// ┌──────────────────────┐
262/// │                      │
263/// │         ...          │
264/// │                      │
265/// │┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │
266/// │     ColumnIndex     ◀│─ ─ ─
267/// ││    (Optional)     │ │     │
268/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  │
269/// │┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │     │ FileMetadata
270/// │     OffsetIndex      │       contains embedded
271/// ││    (Optional)     │◀┼ ─   │ offsets to
272/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  │  │    ColumnIndex and
273/// │╔═══════════════════╗ │     │ OffsetIndex
274/// │║                   ║ │  │
275/// │║                   ║ ┼ ─   │
276/// │║   FileMetadata    ║ │
277/// │║                   ║ ┼ ─ ─ ┘
278/// │║                   ║ │
279/// │╚═══════════════════╝ │
280/// │┌───────────────────┐ │
281/// ││  metadata length  │ │ length of FileMetadata  (only)
282/// │└───────────────────┘ │
283/// │┌───────────────────┐ │
284/// ││      'PAR1'       │ │ Parquet Magic Bytes
285/// │└───────────────────┘ │
286/// └──────────────────────┘
287///      Output Buffer
288/// ```
289///
290/// # Example
291/// ```no_run
292/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataWriter};
293/// # fn get_metadata() -> ParquetMetaData { unimplemented!(); }
294/// // write parquet metadata to an in-memory buffer
295/// let mut buffer = vec![];
296/// let metadata: ParquetMetaData = get_metadata();
297/// let writer = ParquetMetaDataWriter::new(&mut buffer, &metadata);
298/// // write the metadata to the buffer
299/// writer.finish().unwrap();
300/// assert!(!buffer.is_empty());
301/// ```
302pub struct ParquetMetaDataWriter<'a, W: Write> {
303    buf: TrackedWrite<W>,
304    metadata: &'a ParquetMetaData,
305}
306
307impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
308    /// Create a new `ParquetMetaDataWriter` to write to `buf`
309    ///
310    /// Note any embedded offsets in the metadata will be written assuming the
311    /// metadata is at the start of the buffer. If the metadata is being written
312    /// to a location other than the start of the buffer, see [`Self::new_with_tracked`]
313    ///
314    /// See example on the struct level documentation
315    pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
316        Self::new_with_tracked(TrackedWrite::new(buf), metadata)
317    }
318
319    /// Create a new ParquetMetaDataWriter to write to `buf`
320    ///
321    /// This method is used when the metadata is being written to a location other
322    /// than the start of the buffer.
323    ///
324    /// See example on the struct level documentation
325    pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
326        Self { buf, metadata }
327    }
328
329    /// Write the metadata to the buffer
330    pub fn finish(mut self) -> Result<()> {
331        let file_metadata = self.metadata.file_metadata();
332
333        let schema = Arc::new(file_metadata.schema().clone());
334        let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
335        let created_by = file_metadata.created_by().map(str::to_string);
336
337        let row_groups = self
338            .metadata
339            .row_groups()
340            .iter()
341            .map(|rg| rg.to_thrift())
342            .collect::<Vec<_>>();
343
344        let key_value_metadata = file_metadata.key_value_metadata().cloned();
345
346        let column_indexes = self.convert_column_indexes();
347        let offset_indexes = self.convert_offset_index();
348
349        let mut encoder = ThriftMetadataWriter::new(
350            &mut self.buf,
351            &schema,
352            &schema_descr,
353            row_groups,
354            created_by,
355            file_metadata.version(),
356        );
357        encoder = encoder.with_column_indexes(&column_indexes);
358        encoder = encoder.with_offset_indexes(&offset_indexes);
359        if let Some(key_value_metadata) = key_value_metadata {
360            encoder = encoder.with_key_value_metadata(key_value_metadata);
361        }
362        encoder.finish()?;
363
364        Ok(())
365    }
366
367    fn convert_column_indexes(&self) -> Vec<Vec<Option<ColumnIndex>>> {
368        if let Some(row_group_column_indexes) = self.metadata.column_index() {
369            (0..self.metadata.row_groups().len())
370                .map(|rg_idx| {
371                    let column_indexes = &row_group_column_indexes[rg_idx];
372                    column_indexes
373                        .iter()
374                        .map(|column_index| match column_index {
375                            Index::NONE => None,
376                            Index::BOOLEAN(column_index) => Some(column_index.to_thrift()),
377                            Index::BYTE_ARRAY(column_index) => Some(column_index.to_thrift()),
378                            Index::DOUBLE(column_index) => Some(column_index.to_thrift()),
379                            Index::FIXED_LEN_BYTE_ARRAY(column_index) => {
380                                Some(column_index.to_thrift())
381                            }
382                            Index::FLOAT(column_index) => Some(column_index.to_thrift()),
383                            Index::INT32(column_index) => Some(column_index.to_thrift()),
384                            Index::INT64(column_index) => Some(column_index.to_thrift()),
385                            Index::INT96(column_index) => Some(column_index.to_thrift()),
386                        })
387                        .collect()
388                })
389                .collect()
390        } else {
391            // make a None for each row group, for each column
392            self.metadata
393                .row_groups()
394                .iter()
395                .map(|rg| std::iter::repeat(None).take(rg.columns().len()).collect())
396                .collect()
397        }
398    }
399
400    fn convert_offset_index(&self) -> Vec<Vec<Option<OffsetIndex>>> {
401        if let Some(row_group_offset_indexes) = self.metadata.offset_index() {
402            (0..self.metadata.row_groups().len())
403                .map(|rg_idx| {
404                    let offset_indexes = &row_group_offset_indexes[rg_idx];
405                    offset_indexes
406                        .iter()
407                        .map(|offset_index| Some(offset_index.to_thrift()))
408                        .collect()
409                })
410                .collect()
411        } else {
412            // make a None for each row group, for each column
413            self.metadata
414                .row_groups()
415                .iter()
416                .map(|rg| std::iter::repeat(None).take(rg.columns().len()).collect())
417                .collect()
418        }
419    }
420}
421
422#[derive(Debug, Default)]
423struct MetadataObjectWriter {
424    #[cfg(feature = "encryption")]
425    file_encryptor: Option<Arc<FileEncryptor>>,
426}
427
428impl MetadataObjectWriter {
429    #[inline]
430    fn write_object(object: &impl TSerializable, sink: impl Write) -> Result<()> {
431        let mut protocol = TCompactOutputProtocol::new(sink);
432        object.write_to_out_protocol(&mut protocol)?;
433        Ok(())
434    }
435}
436
437/// Implementations of [`MetadataObjectWriter`] methods for when encryption is disabled
438#[cfg(not(feature = "encryption"))]
439impl MetadataObjectWriter {
440    /// Write [`FileMetaData`] in Thrift format
441    fn write_file_metadata(&self, file_metadata: &FileMetaData, sink: impl Write) -> Result<()> {
442        Self::write_object(file_metadata, sink)
443    }
444
445    /// Write a column [`OffsetIndex`] in Thrift format
446    fn write_offset_index(
447        &self,
448        offset_index: &OffsetIndex,
449        _column_chunk: &ColumnChunk,
450        _row_group_idx: usize,
451        _column_idx: usize,
452        sink: impl Write,
453    ) -> Result<()> {
454        Self::write_object(offset_index, sink)
455    }
456
457    /// Write a column [`ColumnIndex`] in Thrift format
458    fn write_column_index(
459        &self,
460        column_index: &ColumnIndex,
461        _column_chunk: &ColumnChunk,
462        _row_group_idx: usize,
463        _column_idx: usize,
464        sink: impl Write,
465    ) -> Result<()> {
466        Self::write_object(column_index, sink)
467    }
468
469    /// No-op implementation of row-group metadata encryption
470    fn apply_row_group_encryption(
471        &self,
472        row_groups: Vec<RowGroup>,
473    ) -> Result<(Vec<RowGroup>, Option<Vec<RowGroup>>)> {
474        Ok((row_groups, None))
475    }
476
477    /// Get the "magic" bytes identifying the file type
478    pub fn get_file_magic(&self) -> &[u8; 4] {
479        get_file_magic()
480    }
481
482    fn get_footer_encryption_algorithm(&self) -> Option<EncryptionAlgorithm> {
483        None
484    }
485}
486
487/// Implementations of [`MetadataObjectWriter`] methods that rely on encryption being enabled
488#[cfg(feature = "encryption")]
489impl MetadataObjectWriter {
490    /// Set the file encryptor to use
491    fn with_file_encryptor(mut self, encryptor: Option<Arc<FileEncryptor>>) -> Self {
492        self.file_encryptor = encryptor;
493        self
494    }
495
496    /// Write [`FileMetaData`] in Thrift format, possibly encrypting it if required
497    fn write_file_metadata(
498        &self,
499        file_metadata: &FileMetaData,
500        mut sink: impl Write,
501    ) -> Result<()> {
502        match self.file_encryptor.as_ref() {
503            Some(file_encryptor) if file_encryptor.properties().encrypt_footer() => {
504                // First write FileCryptoMetadata
505                let crypto_metadata = Self::file_crypto_metadata(file_encryptor)?;
506                let mut protocol = TCompactOutputProtocol::new(&mut sink);
507                crypto_metadata.write_to_out_protocol(&mut protocol)?;
508
509                // Then write encrypted footer
510                let aad = create_footer_aad(file_encryptor.file_aad())?;
511                let mut encryptor = file_encryptor.get_footer_encryptor()?;
512                encrypt_object(file_metadata, &mut encryptor, &mut sink, &aad)
513            }
514            Some(file_encryptor) if file_metadata.encryption_algorithm.is_some() => {
515                let aad = create_footer_aad(file_encryptor.file_aad())?;
516                let mut encryptor = file_encryptor.get_footer_encryptor()?;
517                write_signed_plaintext_object(file_metadata, &mut encryptor, &mut sink, &aad)
518            }
519            _ => Self::write_object(file_metadata, &mut sink),
520        }
521    }
522
523    /// Write a column [`OffsetIndex`] in Thrift format, possibly encrypting it if required
524    fn write_offset_index(
525        &self,
526        offset_index: &OffsetIndex,
527        column_chunk: &ColumnChunk,
528        row_group_idx: usize,
529        column_idx: usize,
530        sink: impl Write,
531    ) -> Result<()> {
532        match &self.file_encryptor {
533            Some(file_encryptor) => Self::write_object_with_encryption(
534                offset_index,
535                sink,
536                file_encryptor,
537                column_chunk,
538                ModuleType::OffsetIndex,
539                row_group_idx,
540                column_idx,
541            ),
542            None => Self::write_object(offset_index, sink),
543        }
544    }
545
546    /// Write a column [`ColumnIndex`] in Thrift format, possibly encrypting it if required
547    fn write_column_index(
548        &self,
549        column_index: &ColumnIndex,
550        column_chunk: &ColumnChunk,
551        row_group_idx: usize,
552        column_idx: usize,
553        sink: impl Write,
554    ) -> Result<()> {
555        match &self.file_encryptor {
556            Some(file_encryptor) => Self::write_object_with_encryption(
557                column_index,
558                sink,
559                file_encryptor,
560                column_chunk,
561                ModuleType::ColumnIndex,
562                row_group_idx,
563                column_idx,
564            ),
565            None => Self::write_object(column_index, sink),
566        }
567    }
568
569    /// If encryption is enabled and configured, encrypt row group metadata.
570    /// Returns a tuple of the row group metadata to write,
571    /// and possibly unencrypted metadata to be returned to clients if data was encrypted.
572    fn apply_row_group_encryption(
573        &self,
574        row_groups: Vec<RowGroup>,
575    ) -> Result<(Vec<RowGroup>, Option<Vec<RowGroup>>)> {
576        match &self.file_encryptor {
577            Some(file_encryptor) => {
578                let unencrypted_row_groups = row_groups.clone();
579                let encrypted_row_groups = Self::encrypt_row_groups(row_groups, file_encryptor)?;
580                Ok((encrypted_row_groups, Some(unencrypted_row_groups)))
581            }
582            None => Ok((row_groups, None)),
583        }
584    }
585
586    /// Get the "magic" bytes identifying the file type
587    fn get_file_magic(&self) -> &[u8; 4] {
588        get_file_magic(
589            self.file_encryptor
590                .as_ref()
591                .map(|encryptor| encryptor.properties()),
592        )
593    }
594
595    fn write_object_with_encryption(
596        object: &impl TSerializable,
597        mut sink: impl Write,
598        file_encryptor: &FileEncryptor,
599        column_metadata: &ColumnChunk,
600        module_type: ModuleType,
601        row_group_index: usize,
602        column_index: usize,
603    ) -> Result<()> {
604        let column_path_vec = &column_metadata
605            .meta_data
606            .as_ref()
607            .ok_or_else(|| {
608                general_err!(
609                    "Column metadata not set for column {} when encrypting object",
610                    column_index
611                )
612            })?
613            .path_in_schema;
614
615        let joined_column_path;
616        let column_path = if column_path_vec.len() == 1 {
617            &column_path_vec[0]
618        } else {
619            joined_column_path = column_path_vec.join(".");
620            &joined_column_path
621        };
622
623        if file_encryptor.is_column_encrypted(column_path) {
624            let aad = create_module_aad(
625                file_encryptor.file_aad(),
626                module_type,
627                row_group_index,
628                column_index,
629                None,
630            )?;
631            let mut encryptor = file_encryptor.get_column_encryptor(column_path)?;
632            encrypt_object(object, &mut encryptor, &mut sink, &aad)
633        } else {
634            Self::write_object(object, sink)
635        }
636    }
637
638    fn get_footer_encryption_algorithm(&self) -> Option<EncryptionAlgorithm> {
639        if let Some(file_encryptor) = &self.file_encryptor {
640            return Some(Self::encryption_algorithm_from_encryptor(file_encryptor));
641        }
642        None
643    }
644
645    fn encryption_algorithm_from_encryptor(file_encryptor: &FileEncryptor) -> EncryptionAlgorithm {
646        let supply_aad_prefix = file_encryptor
647            .properties()
648            .aad_prefix()
649            .map(|_| !file_encryptor.properties().store_aad_prefix());
650        let aad_prefix = if file_encryptor.properties().store_aad_prefix() {
651            file_encryptor.properties().aad_prefix().cloned()
652        } else {
653            None
654        };
655        EncryptionAlgorithm::AESGCMV1(AesGcmV1 {
656            aad_prefix,
657            aad_file_unique: Some(file_encryptor.aad_file_unique().clone()),
658            supply_aad_prefix,
659        })
660    }
661
662    fn file_crypto_metadata(
663        file_encryptor: &FileEncryptor,
664    ) -> Result<crate::format::FileCryptoMetaData> {
665        let properties = file_encryptor.properties();
666        Ok(crate::format::FileCryptoMetaData {
667            encryption_algorithm: Self::encryption_algorithm_from_encryptor(file_encryptor),
668            key_metadata: properties.footer_key_metadata().cloned(),
669        })
670    }
671
672    fn encrypt_row_groups(
673        row_groups: Vec<RowGroup>,
674        file_encryptor: &Arc<FileEncryptor>,
675    ) -> Result<Vec<RowGroup>> {
676        row_groups
677            .into_iter()
678            .enumerate()
679            .map(|(rg_idx, mut rg)| {
680                let cols: Result<Vec<ColumnChunk>> = rg
681                    .columns
682                    .into_iter()
683                    .enumerate()
684                    .map(|(col_idx, c)| {
685                        Self::encrypt_column_chunk(c, file_encryptor, rg_idx, col_idx)
686                    })
687                    .collect();
688                rg.columns = cols?;
689                Ok(rg)
690            })
691            .collect()
692    }
693
694    /// Apply column encryption to column chunk metadata
695    fn encrypt_column_chunk(
696        mut column_chunk: ColumnChunk,
697        file_encryptor: &Arc<FileEncryptor>,
698        row_group_index: usize,
699        column_index: usize,
700    ) -> Result<ColumnChunk> {
701        // Column crypto metadata should have already been set when the column was created.
702        // Here we apply the encryption by encrypting the column metadata if required.
703        match column_chunk.crypto_metadata.as_ref() {
704            None => {}
705            Some(ColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) => {
706                // When uniform encryption is used the footer is already encrypted,
707                // so the column chunk does not need additional encryption.
708            }
709            Some(ColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(col_key)) => {
710                let column_path = col_key.path_in_schema.join(".");
711                let mut column_encryptor = file_encryptor.get_column_encryptor(&column_path)?;
712                let meta_data = column_chunk
713                    .meta_data
714                    .take()
715                    .ok_or_else(|| general_err!("Column metadata not set for encryption"))?;
716                let aad = create_module_aad(
717                    file_encryptor.file_aad(),
718                    ModuleType::ColumnMetaData,
719                    row_group_index,
720                    column_index,
721                    None,
722                )?;
723                let ciphertext = encrypt_object_to_vec(&meta_data, &mut column_encryptor, &aad)?;
724
725                column_chunk.encrypted_column_metadata = Some(ciphertext);
726                debug_assert!(column_chunk.meta_data.is_none());
727            }
728        }
729
730        Ok(column_chunk)
731    }
732}