parquet/file/metadata/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#[cfg(feature = "encryption")]
19use crate::encryption::encrypt::{encrypt_object, encrypt_object_to_vec, FileEncryptor};
20#[cfg(feature = "encryption")]
21use crate::encryption::modules::{create_footer_aad, create_module_aad, ModuleType};
22#[cfg(feature = "encryption")]
23use crate::errors::ParquetError;
24use crate::errors::Result;
25use crate::file::metadata::{KeyValue, ParquetMetaData};
26use crate::file::page_index::index::Index;
27use crate::file::writer::{get_file_magic, TrackedWrite};
28#[cfg(feature = "encryption")]
29use crate::format::{AesGcmV1, ColumnCryptoMetaData, EncryptionAlgorithm};
30use crate::format::{ColumnChunk, ColumnIndex, FileMetaData, OffsetIndex, RowGroup};
31use crate::schema::types;
32use crate::schema::types::{SchemaDescPtr, SchemaDescriptor, TypePtr};
33use crate::thrift::TSerializable;
34use std::io::Write;
35use std::sync::Arc;
36use thrift::protocol::TCompactOutputProtocol;
37
38/// Writes `crate::file::metadata` structures to a thrift encoded byte stream
39///
40/// See [`ParquetMetaDataWriter`] for background and example.
41pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
42    buf: &'a mut TrackedWrite<W>,
43    schema: &'a TypePtr,
44    schema_descr: &'a SchemaDescPtr,
45    row_groups: Vec<RowGroup>,
46    column_indexes: Option<&'a [Vec<Option<ColumnIndex>>]>,
47    offset_indexes: Option<&'a [Vec<Option<OffsetIndex>>]>,
48    key_value_metadata: Option<Vec<KeyValue>>,
49    created_by: Option<String>,
50    object_writer: MetadataObjectWriter,
51    writer_version: i32,
52}
53
54impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
55    /// Serialize all the offset indexes to `self.buf`,
56    ///
57    /// Note: also updates the `ColumnChunk::offset_index_offset` and
58    /// `ColumnChunk::offset_index_length` to reflect the position and length
59    /// of the serialized offset indexes.
60    fn write_offset_indexes(&mut self, offset_indexes: &[Vec<Option<OffsetIndex>>]) -> Result<()> {
61        // iter row group
62        // iter each column
63        // write offset index to the file
64        for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
65            for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
66                if let Some(offset_index) = &offset_indexes[row_group_idx][column_idx] {
67                    let start_offset = self.buf.bytes_written();
68                    self.object_writer.write_offset_index(
69                        offset_index,
70                        column_metadata,
71                        row_group_idx,
72                        column_idx,
73                        &mut self.buf,
74                    )?;
75                    let end_offset = self.buf.bytes_written();
76                    // set offset and index for offset index
77                    column_metadata.offset_index_offset = Some(start_offset as i64);
78                    column_metadata.offset_index_length = Some((end_offset - start_offset) as i32);
79                }
80            }
81        }
82        Ok(())
83    }
84
85    /// Serialize all the column indexes to the `self.buf`
86    ///
87    /// Note: also updates the `ColumnChunk::column_index_offset` and
88    /// `ColumnChunk::column_index_length` to reflect the position and length
89    /// of the serialized column indexes.
90    fn write_column_indexes(&mut self, column_indexes: &[Vec<Option<ColumnIndex>>]) -> Result<()> {
91        // iter row group
92        // iter each column
93        // write column index to the file
94        for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
95            for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
96                if let Some(column_index) = &column_indexes[row_group_idx][column_idx] {
97                    let start_offset = self.buf.bytes_written();
98                    self.object_writer.write_column_index(
99                        column_index,
100                        column_metadata,
101                        row_group_idx,
102                        column_idx,
103                        &mut self.buf,
104                    )?;
105                    let end_offset = self.buf.bytes_written();
106                    // set offset and index for offset index
107                    column_metadata.column_index_offset = Some(start_offset as i64);
108                    column_metadata.column_index_length = Some((end_offset - start_offset) as i32);
109                }
110            }
111        }
112        Ok(())
113    }
114
115    /// Assembles and writes the final metadata to self.buf
116    pub fn finish(mut self) -> Result<crate::format::FileMetaData> {
117        let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
118
119        // Write column indexes and offset indexes
120        if let Some(column_indexes) = self.column_indexes {
121            self.write_column_indexes(column_indexes)?;
122        }
123        if let Some(offset_indexes) = self.offset_indexes {
124            self.write_offset_indexes(offset_indexes)?;
125        }
126
127        // We only include ColumnOrder for leaf nodes.
128        // Currently only supported ColumnOrder is TypeDefinedOrder so we set this
129        // for all leaf nodes.
130        // Even if the column has an undefined sort order, such as INTERVAL, this
131        // is still technically the defined TYPEORDER so it should still be set.
132        let column_orders = (0..self.schema_descr.num_columns())
133            .map(|_| crate::format::ColumnOrder::TYPEORDER(crate::format::TypeDefinedOrder {}))
134            .collect();
135        // This field is optional, perhaps in cases where no min/max fields are set
136        // in any Statistics or ColumnIndex object in the whole file.
137        // But for simplicity we always set this field.
138        let column_orders = Some(column_orders);
139
140        let (row_groups, unencrypted_row_groups) = self
141            .object_writer
142            .apply_row_group_encryption(self.row_groups)?;
143
144        let mut file_metadata = FileMetaData {
145            num_rows,
146            row_groups,
147            key_value_metadata: self.key_value_metadata.clone(),
148            version: self.writer_version,
149            schema: types::to_thrift(self.schema.as_ref())?,
150            created_by: self.created_by.clone(),
151            column_orders,
152            encryption_algorithm: None,
153            footer_signing_key_metadata: None,
154        };
155
156        // Write file metadata
157        let start_pos = self.buf.bytes_written();
158        self.object_writer
159            .write_file_metadata(&file_metadata, &mut self.buf)?;
160        let end_pos = self.buf.bytes_written();
161
162        // Write footer
163        let metadata_len = (end_pos - start_pos) as u32;
164
165        self.buf.write_all(&metadata_len.to_le_bytes())?;
166        self.buf.write_all(self.object_writer.get_file_magic())?;
167
168        if let Some(row_groups) = unencrypted_row_groups {
169            // If row group metadata was encrypted, we replace the encrypted row groups with
170            // unencrypted metadata before it is returned to users. This allows the metadata
171            // to be usable for retrieving the row group statistics for example, without users
172            // needing to decrypt the metadata.
173            file_metadata.row_groups = row_groups;
174        }
175
176        Ok(file_metadata)
177    }
178
179    pub fn new(
180        buf: &'a mut TrackedWrite<W>,
181        schema: &'a TypePtr,
182        schema_descr: &'a SchemaDescPtr,
183        row_groups: Vec<RowGroup>,
184        created_by: Option<String>,
185        writer_version: i32,
186    ) -> Self {
187        Self {
188            buf,
189            schema,
190            schema_descr,
191            row_groups,
192            column_indexes: None,
193            offset_indexes: None,
194            key_value_metadata: None,
195            created_by,
196            object_writer: Default::default(),
197            writer_version,
198        }
199    }
200
201    pub fn with_column_indexes(mut self, column_indexes: &'a [Vec<Option<ColumnIndex>>]) -> Self {
202        self.column_indexes = Some(column_indexes);
203        self
204    }
205
206    pub fn with_offset_indexes(mut self, offset_indexes: &'a [Vec<Option<OffsetIndex>>]) -> Self {
207        self.offset_indexes = Some(offset_indexes);
208        self
209    }
210
211    pub fn with_key_value_metadata(mut self, key_value_metadata: Vec<KeyValue>) -> Self {
212        self.key_value_metadata = Some(key_value_metadata);
213        self
214    }
215
216    #[cfg(feature = "encryption")]
217    pub fn with_file_encryptor(mut self, file_encryptor: Option<Arc<FileEncryptor>>) -> Self {
218        self.object_writer = self.object_writer.with_file_encryptor(file_encryptor);
219        self
220    }
221}
222
223/// Writes [`ParquetMetaData`] to a byte stream
224///
225/// This structure handles the details of writing the various parts of Parquet
226/// metadata into a byte stream. It is used to write the metadata into a parquet
227/// file and can also write metadata into other locations (such as a store of
228/// bytes).
229///
230/// # Discussion
231///
232/// The process of writing Parquet metadata is tricky because the
233/// metadata is not stored as a single inline thrift structure. It can have
234/// several "out of band" structures such as the [`OffsetIndex`] and
235/// BloomFilters stored in separate structures whose locations are stored as
236/// offsets from the beginning of the file.
237///
238/// Note: this writer does not directly write BloomFilters. In order to write
239/// BloomFilters, write the bloom filters into the buffer before creating the
240/// metadata writer. Then set the corresponding `bloom_filter_offset` and
241/// `bloom_filter_length` on [`ColumnChunkMetaData`] passed to this writer.
242///
243/// # Output Format
244///
245/// The format of the metadata is as follows:
246///
247/// 1. Optional [`ColumnIndex`] (thrift encoded)
248/// 2. Optional [`OffsetIndex`] (thrift encoded)
249/// 3. [`FileMetaData`] (thrift encoded)
250/// 4. Length of encoded `FileMetaData` (4 bytes, little endian)
251/// 5. Parquet Magic Bytes (4 bytes)
252///
253/// [`FileMetaData`]: crate::format::FileMetaData
254/// [`ColumnChunkMetaData`]: crate::file::metadata::ColumnChunkMetaData
255///
256/// ```text
257/// ┌──────────────────────┐
258/// │                      │
259/// │         ...          │
260/// │                      │
261/// │┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │
262/// │     ColumnIndex     ◀│─ ─ ─
263/// ││    (Optional)     │ │     │
264/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  │
265/// │┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │     │ FileMetadata
266/// │     OffsetIndex      │       contains embedded
267/// ││    (Optional)     │◀┼ ─   │ offsets to
268/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  │  │    ColumnIndex and
269/// │╔═══════════════════╗ │     │ OffsetIndex
270/// │║                   ║ │  │
271/// │║                   ║ ┼ ─   │
272/// │║   FileMetadata    ║ │
273/// │║                   ║ ┼ ─ ─ ┘
274/// │║                   ║ │
275/// │╚═══════════════════╝ │
276/// │┌───────────────────┐ │
277/// ││  metadata length  │ │ length of FileMetadata  (only)
278/// │└───────────────────┘ │
279/// │┌───────────────────┐ │
280/// ││      'PAR1'       │ │ Parquet Magic Bytes
281/// │└───────────────────┘ │
282/// └──────────────────────┘
283///      Output Buffer
284/// ```
285///
286/// # Example
287/// ```no_run
288/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataWriter};
289/// # fn get_metadata() -> ParquetMetaData { unimplemented!(); }
290/// // write parquet metadata to an in-memory buffer
291/// let mut buffer = vec![];
292/// let metadata: ParquetMetaData = get_metadata();
293/// let writer = ParquetMetaDataWriter::new(&mut buffer, &metadata);
294/// // write the metadata to the buffer
295/// writer.finish().unwrap();
296/// assert!(!buffer.is_empty());
297/// ```
298pub struct ParquetMetaDataWriter<'a, W: Write> {
299    buf: TrackedWrite<W>,
300    metadata: &'a ParquetMetaData,
301}
302
303impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
304    /// Create a new `ParquetMetaDataWriter` to write to `buf`
305    ///
306    /// Note any embedded offsets in the metadata will be written assuming the
307    /// metadata is at the start of the buffer. If the metadata is being written
308    /// to a location other than the start of the buffer, see [`Self::new_with_tracked`]
309    ///
310    /// See example on the struct level documentation
311    pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
312        Self::new_with_tracked(TrackedWrite::new(buf), metadata)
313    }
314
315    /// Create a new ParquetMetaDataWriter to write to `buf`
316    ///
317    /// This method is used when the metadata is being written to a location other
318    /// than the start of the buffer.
319    ///
320    /// See example on the struct level documentation
321    pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
322        Self { buf, metadata }
323    }
324
325    /// Write the metadata to the buffer
326    pub fn finish(mut self) -> Result<()> {
327        let file_metadata = self.metadata.file_metadata();
328
329        let schema = Arc::new(file_metadata.schema().clone());
330        let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
331        let created_by = file_metadata.created_by().map(str::to_string);
332
333        let row_groups = self
334            .metadata
335            .row_groups()
336            .iter()
337            .map(|rg| rg.to_thrift())
338            .collect::<Vec<_>>();
339
340        let key_value_metadata = file_metadata.key_value_metadata().cloned();
341
342        let column_indexes = self.convert_column_indexes();
343        let offset_indexes = self.convert_offset_index();
344
345        let mut encoder = ThriftMetadataWriter::new(
346            &mut self.buf,
347            &schema,
348            &schema_descr,
349            row_groups,
350            created_by,
351            file_metadata.version(),
352        );
353        encoder = encoder.with_column_indexes(&column_indexes);
354        encoder = encoder.with_offset_indexes(&offset_indexes);
355        if let Some(key_value_metadata) = key_value_metadata {
356            encoder = encoder.with_key_value_metadata(key_value_metadata);
357        }
358        encoder.finish()?;
359
360        Ok(())
361    }
362
363    fn convert_column_indexes(&self) -> Vec<Vec<Option<ColumnIndex>>> {
364        if let Some(row_group_column_indexes) = self.metadata.column_index() {
365            (0..self.metadata.row_groups().len())
366                .map(|rg_idx| {
367                    let column_indexes = &row_group_column_indexes[rg_idx];
368                    column_indexes
369                        .iter()
370                        .map(|column_index| match column_index {
371                            Index::NONE => None,
372                            Index::BOOLEAN(column_index) => Some(column_index.to_thrift()),
373                            Index::BYTE_ARRAY(column_index) => Some(column_index.to_thrift()),
374                            Index::DOUBLE(column_index) => Some(column_index.to_thrift()),
375                            Index::FIXED_LEN_BYTE_ARRAY(column_index) => {
376                                Some(column_index.to_thrift())
377                            }
378                            Index::FLOAT(column_index) => Some(column_index.to_thrift()),
379                            Index::INT32(column_index) => Some(column_index.to_thrift()),
380                            Index::INT64(column_index) => Some(column_index.to_thrift()),
381                            Index::INT96(column_index) => Some(column_index.to_thrift()),
382                        })
383                        .collect()
384                })
385                .collect()
386        } else {
387            // make a None for each row group, for each column
388            self.metadata
389                .row_groups()
390                .iter()
391                .map(|rg| std::iter::repeat(None).take(rg.columns().len()).collect())
392                .collect()
393        }
394    }
395
396    fn convert_offset_index(&self) -> Vec<Vec<Option<OffsetIndex>>> {
397        if let Some(row_group_offset_indexes) = self.metadata.offset_index() {
398            (0..self.metadata.row_groups().len())
399                .map(|rg_idx| {
400                    let offset_indexes = &row_group_offset_indexes[rg_idx];
401                    offset_indexes
402                        .iter()
403                        .map(|offset_index| Some(offset_index.to_thrift()))
404                        .collect()
405                })
406                .collect()
407        } else {
408            // make a None for each row group, for each column
409            self.metadata
410                .row_groups()
411                .iter()
412                .map(|rg| std::iter::repeat(None).take(rg.columns().len()).collect())
413                .collect()
414        }
415    }
416}
417
418#[derive(Debug, Default)]
419struct MetadataObjectWriter {
420    #[cfg(feature = "encryption")]
421    file_encryptor: Option<Arc<FileEncryptor>>,
422}
423
424impl MetadataObjectWriter {
425    #[inline]
426    fn write_object(object: &impl TSerializable, sink: impl Write) -> Result<()> {
427        let mut protocol = TCompactOutputProtocol::new(sink);
428        object.write_to_out_protocol(&mut protocol)?;
429        Ok(())
430    }
431}
432
433/// Implementations of [`MetadataObjectWriter`] methods for when encryption is disabled
434#[cfg(not(feature = "encryption"))]
435impl MetadataObjectWriter {
436    /// Write [`FileMetaData`] in Thrift format
437    fn write_file_metadata(&self, file_metadata: &FileMetaData, sink: impl Write) -> Result<()> {
438        Self::write_object(file_metadata, sink)
439    }
440
441    /// Write a column [`OffsetIndex`] in Thrift format
442    fn write_offset_index(
443        &self,
444        offset_index: &OffsetIndex,
445        _column_chunk: &ColumnChunk,
446        _row_group_idx: usize,
447        _column_idx: usize,
448        sink: impl Write,
449    ) -> Result<()> {
450        Self::write_object(offset_index, sink)
451    }
452
453    /// Write a column [`ColumnIndex`] in Thrift format
454    fn write_column_index(
455        &self,
456        column_index: &ColumnIndex,
457        _column_chunk: &ColumnChunk,
458        _row_group_idx: usize,
459        _column_idx: usize,
460        sink: impl Write,
461    ) -> Result<()> {
462        Self::write_object(column_index, sink)
463    }
464
465    /// No-op implementation of row-group metadata encryption
466    fn apply_row_group_encryption(
467        &self,
468        row_groups: Vec<RowGroup>,
469    ) -> Result<(Vec<RowGroup>, Option<Vec<RowGroup>>)> {
470        Ok((row_groups, None))
471    }
472
473    /// Get the "magic" bytes identifying the file type
474    pub fn get_file_magic(&self) -> &[u8; 4] {
475        get_file_magic()
476    }
477}
478
479/// Implementations of [`MetadataObjectWriter`] methods that rely on encryption being enabled
480#[cfg(feature = "encryption")]
481impl MetadataObjectWriter {
482    /// Set the file encryptor to use
483    fn with_file_encryptor(mut self, encryptor: Option<Arc<FileEncryptor>>) -> Self {
484        self.file_encryptor = encryptor;
485        self
486    }
487
488    /// Write [`FileMetaData`] in Thrift format, possibly encrypting it if required
489    fn write_file_metadata(
490        &self,
491        file_metadata: &FileMetaData,
492        mut sink: impl Write,
493    ) -> Result<()> {
494        match self.file_encryptor.as_ref() {
495            Some(file_encryptor) if file_encryptor.properties().encrypt_footer() => {
496                // First write FileCryptoMetadata
497                let crypto_metadata = Self::file_crypto_metadata(file_encryptor)?;
498                let mut protocol = TCompactOutputProtocol::new(&mut sink);
499                crypto_metadata.write_to_out_protocol(&mut protocol)?;
500
501                // Then write encrypted footer
502                let aad = create_footer_aad(file_encryptor.file_aad())?;
503                let mut encryptor = file_encryptor.get_footer_encryptor()?;
504                encrypt_object(file_metadata, &mut encryptor, &mut sink, &aad)
505            }
506            _ => Self::write_object(file_metadata, &mut sink),
507        }
508    }
509
510    /// Write a column [`OffsetIndex`] in Thrift format, possibly encrypting it if required
511    fn write_offset_index(
512        &self,
513        offset_index: &OffsetIndex,
514        column_chunk: &ColumnChunk,
515        row_group_idx: usize,
516        column_idx: usize,
517        sink: impl Write,
518    ) -> Result<()> {
519        match &self.file_encryptor {
520            Some(file_encryptor) => Self::write_object_with_encryption(
521                offset_index,
522                sink,
523                file_encryptor,
524                column_chunk,
525                ModuleType::OffsetIndex,
526                row_group_idx,
527                column_idx,
528            ),
529            None => Self::write_object(offset_index, sink),
530        }
531    }
532
533    /// Write a column [`ColumnIndex`] in Thrift format, possibly encrypting it if required
534    fn write_column_index(
535        &self,
536        column_index: &ColumnIndex,
537        column_chunk: &ColumnChunk,
538        row_group_idx: usize,
539        column_idx: usize,
540        sink: impl Write,
541    ) -> Result<()> {
542        match &self.file_encryptor {
543            Some(file_encryptor) => Self::write_object_with_encryption(
544                column_index,
545                sink,
546                file_encryptor,
547                column_chunk,
548                ModuleType::ColumnIndex,
549                row_group_idx,
550                column_idx,
551            ),
552            None => Self::write_object(column_index, sink),
553        }
554    }
555
556    /// If encryption is enabled and configured, encrypt row group metadata.
557    /// Returns a tuple of the row group metadata to write,
558    /// and possibly unencrypted metadata to be returned to clients if data was encrypted.
559    fn apply_row_group_encryption(
560        &self,
561        row_groups: Vec<RowGroup>,
562    ) -> Result<(Vec<RowGroup>, Option<Vec<RowGroup>>)> {
563        match &self.file_encryptor {
564            Some(file_encryptor) => {
565                let unencrypted_row_groups = row_groups.clone();
566                let encrypted_row_groups = Self::encrypt_row_groups(row_groups, file_encryptor)?;
567                Ok((encrypted_row_groups, Some(unencrypted_row_groups)))
568            }
569            None => Ok((row_groups, None)),
570        }
571    }
572
573    /// Get the "magic" bytes identifying the file type
574    fn get_file_magic(&self) -> &[u8; 4] {
575        get_file_magic(
576            self.file_encryptor
577                .as_ref()
578                .map(|encryptor| encryptor.properties()),
579        )
580    }
581
582    fn write_object_with_encryption(
583        object: &impl TSerializable,
584        mut sink: impl Write,
585        file_encryptor: &FileEncryptor,
586        column_metadata: &ColumnChunk,
587        module_type: ModuleType,
588        row_group_index: usize,
589        column_index: usize,
590    ) -> Result<()> {
591        let column_path_vec = &column_metadata
592            .meta_data
593            .as_ref()
594            .ok_or_else(|| {
595                general_err!(
596                    "Column metadata not set for column {} when encrypting object",
597                    column_index
598                )
599            })?
600            .path_in_schema;
601
602        let joined_column_path;
603        let column_path = if column_path_vec.len() == 1 {
604            &column_path_vec[0]
605        } else {
606            joined_column_path = column_path_vec.join(".");
607            &joined_column_path
608        };
609
610        if file_encryptor.is_column_encrypted(column_path) {
611            let aad = create_module_aad(
612                file_encryptor.file_aad(),
613                module_type,
614                row_group_index,
615                column_index,
616                None,
617            )?;
618            let mut encryptor = file_encryptor.get_column_encryptor(column_path)?;
619            encrypt_object(object, &mut encryptor, &mut sink, &aad)
620        } else {
621            Self::write_object(object, sink)
622        }
623    }
624
625    fn file_crypto_metadata(
626        file_encryptor: &FileEncryptor,
627    ) -> Result<crate::format::FileCryptoMetaData> {
628        let properties = file_encryptor.properties();
629        let supply_aad_prefix = properties
630            .aad_prefix()
631            .map(|_| !properties.store_aad_prefix());
632        let encryption_algorithm = AesGcmV1 {
633            aad_prefix: if properties.store_aad_prefix() {
634                properties.aad_prefix().cloned()
635            } else {
636                None
637            },
638            aad_file_unique: Some(file_encryptor.aad_file_unique().clone()),
639            supply_aad_prefix,
640        };
641
642        Ok(crate::format::FileCryptoMetaData {
643            encryption_algorithm: EncryptionAlgorithm::AESGCMV1(encryption_algorithm),
644            key_metadata: properties.footer_key_metadata().cloned(),
645        })
646    }
647
648    fn encrypt_row_groups(
649        row_groups: Vec<RowGroup>,
650        file_encryptor: &Arc<FileEncryptor>,
651    ) -> Result<Vec<RowGroup>> {
652        row_groups
653            .into_iter()
654            .enumerate()
655            .map(|(rg_idx, mut rg)| {
656                let cols: Result<Vec<ColumnChunk>> = rg
657                    .columns
658                    .into_iter()
659                    .enumerate()
660                    .map(|(col_idx, c)| {
661                        Self::encrypt_column_chunk(c, file_encryptor, rg_idx, col_idx)
662                    })
663                    .collect();
664                rg.columns = cols?;
665                Ok(rg)
666            })
667            .collect()
668    }
669
670    /// Apply column encryption to column chunk metadata
671    fn encrypt_column_chunk(
672        mut column_chunk: ColumnChunk,
673        file_encryptor: &Arc<FileEncryptor>,
674        row_group_index: usize,
675        column_index: usize,
676    ) -> Result<ColumnChunk> {
677        // Column crypto metadata should have already been set when the column was created.
678        // Here we apply the encryption by encrypting the column metadata if required.
679        match column_chunk.crypto_metadata.as_ref() {
680            None => {}
681            Some(ColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) => {
682                // When uniform encryption is used the footer is already encrypted,
683                // so the column chunk does not need additional encryption.
684            }
685            Some(ColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(col_key)) => {
686                let column_path = col_key.path_in_schema.join(".");
687                let mut column_encryptor = file_encryptor.get_column_encryptor(&column_path)?;
688                let meta_data = column_chunk
689                    .meta_data
690                    .take()
691                    .ok_or_else(|| general_err!("Column metadata not set for encryption"))?;
692                let aad = create_module_aad(
693                    file_encryptor.file_aad(),
694                    ModuleType::ColumnMetaData,
695                    row_group_index,
696                    column_index,
697                    None,
698                )?;
699                let ciphertext = encrypt_object_to_vec(&meta_data, &mut column_encryptor, &aad)?;
700
701                column_chunk.encrypted_column_metadata = Some(ciphertext);
702                debug_assert!(column_chunk.meta_data.is_none());
703            }
704        }
705
706        Ok(column_chunk)
707    }
708}