parquet/file/metadata/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::file::metadata::thrift_gen::{EncryptionAlgorithm, FileMeta};
19use crate::file::metadata::{
20    ColumnChunkMetaData, ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData,
21};
22use crate::schema::types::{SchemaDescPtr, SchemaDescriptor};
23use crate::{
24    basic::ColumnOrder,
25    file::metadata::{FileMetaData, ParquetMetaDataBuilder},
26};
27#[cfg(feature = "encryption")]
28use crate::{
29    encryption::{
30        encrypt::{FileEncryptor, encrypt_thrift_object, write_signed_plaintext_thrift_object},
31        modules::{ModuleType, create_footer_aad, create_module_aad},
32    },
33    file::column_crypto_metadata::ColumnCryptoMetaData,
34    file::metadata::thrift_gen::{AesGcmV1, FileCryptoMetaData},
35};
36use crate::{errors::Result, file::page_index::column_index::ColumnIndexMetaData};
37
38use crate::{
39    file::writer::{TrackedWrite, get_file_magic},
40    parquet_thrift::WriteThrift,
41};
42use crate::{
43    file::{
44        metadata::{KeyValue, ParquetMetaData},
45        page_index::offset_index::OffsetIndexMetaData,
46    },
47    parquet_thrift::ThriftCompactOutputProtocol,
48};
49use std::io::Write;
50use std::sync::Arc;
51
52/// Writes `crate::file::metadata` structures to a thrift encoded byte stream
53///
54/// See [`ParquetMetaDataWriter`] for background and example.
55pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
56    buf: &'a mut TrackedWrite<W>,
57    schema_descr: &'a SchemaDescPtr,
58    row_groups: Vec<RowGroupMetaData>,
59    column_indexes: Option<Vec<Vec<Option<ColumnIndexMetaData>>>>,
60    offset_indexes: Option<Vec<Vec<Option<OffsetIndexMetaData>>>>,
61    key_value_metadata: Option<Vec<KeyValue>>,
62    created_by: Option<String>,
63    object_writer: MetadataObjectWriter,
64    writer_version: i32,
65}
66
67impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
68    /// Serialize all the offset indexes to `self.buf`,
69    ///
70    /// Note: also updates the `ColumnChunk::offset_index_offset` and
71    /// `ColumnChunk::offset_index_length` to reflect the position and length
72    /// of the serialized offset indexes.
73    fn write_offset_indexes(
74        &mut self,
75        offset_indexes: &[Vec<Option<OffsetIndexMetaData>>],
76    ) -> Result<()> {
77        // iter row group
78        // iter each column
79        // write offset index to the file
80        for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
81            for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
82                if let Some(offset_index) = &offset_indexes[row_group_idx][column_idx] {
83                    let start_offset = self.buf.bytes_written();
84                    self.object_writer.write_offset_index(
85                        offset_index,
86                        column_metadata,
87                        row_group_idx,
88                        column_idx,
89                        &mut self.buf,
90                    )?;
91                    let end_offset = self.buf.bytes_written();
92                    // set offset and index for offset index
93                    column_metadata.offset_index_offset = Some(start_offset as i64);
94                    column_metadata.offset_index_length = Some((end_offset - start_offset) as i32);
95                }
96            }
97        }
98        Ok(())
99    }
100
101    /// Serialize all the column indexes to the `self.buf`
102    ///
103    /// Note: also updates the `ColumnChunk::column_index_offset` and
104    /// `ColumnChunk::column_index_length` to reflect the position and length
105    /// of the serialized column indexes.
106    fn write_column_indexes(
107        &mut self,
108        column_indexes: &[Vec<Option<ColumnIndexMetaData>>],
109    ) -> Result<()> {
110        // iter row group
111        // iter each column
112        // write column index to the file
113        for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
114            for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
115                if let Some(column_index) = &column_indexes[row_group_idx][column_idx] {
116                    let start_offset = self.buf.bytes_written();
117                    self.object_writer.write_column_index(
118                        column_index,
119                        column_metadata,
120                        row_group_idx,
121                        column_idx,
122                        &mut self.buf,
123                    )?;
124                    let end_offset = self.buf.bytes_written();
125                    // set offset and index for offset index
126                    column_metadata.column_index_offset = Some(start_offset as i64);
127                    column_metadata.column_index_length = Some((end_offset - start_offset) as i32);
128                }
129            }
130        }
131        Ok(())
132    }
133
134    /// Assembles and writes the final metadata to self.buf
135    pub fn finish(mut self) -> Result<ParquetMetaData> {
136        let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
137
138        let column_indexes = std::mem::take(&mut self.column_indexes);
139        let offset_indexes = std::mem::take(&mut self.offset_indexes);
140
141        // Write column indexes and offset indexes
142        if let Some(column_indexes) = column_indexes.as_ref() {
143            self.write_column_indexes(column_indexes)?;
144        }
145        if let Some(offset_indexes) = offset_indexes.as_ref() {
146            self.write_offset_indexes(offset_indexes)?;
147        }
148
149        // We only include ColumnOrder for leaf nodes.
150        // Currently only supported ColumnOrder is TypeDefinedOrder so we set this
151        // for all leaf nodes.
152        // Even if the column has an undefined sort order, such as INTERVAL, this
153        // is still technically the defined TYPEORDER so it should still be set.
154        let column_orders = self
155            .schema_descr
156            .columns()
157            .iter()
158            .map(|col| {
159                let sort_order = ColumnOrder::get_sort_order(
160                    col.logical_type(),
161                    col.converted_type(),
162                    col.physical_type(),
163                );
164                ColumnOrder::TYPE_DEFINED_ORDER(sort_order)
165            })
166            .collect();
167
168        // This field is optional, perhaps in cases where no min/max fields are set
169        // in any Statistics or ColumnIndex object in the whole file.
170        // But for simplicity we always set this field.
171        let column_orders = Some(column_orders);
172
173        let (row_groups, unencrypted_row_groups) = self
174            .object_writer
175            .apply_row_group_encryption(self.row_groups)?;
176
177        let (encryption_algorithm, footer_signing_key_metadata) =
178            self.object_writer.get_plaintext_footer_crypto_metadata();
179
180        let file_metadata = FileMetaData::new(
181            self.writer_version,
182            num_rows,
183            self.created_by,
184            self.key_value_metadata,
185            self.schema_descr.clone(),
186            column_orders,
187        );
188
189        let file_meta = FileMeta {
190            file_metadata: &file_metadata,
191            row_groups: &row_groups,
192            encryption_algorithm,
193            footer_signing_key_metadata,
194        };
195
196        // Write file metadata
197        let start_pos = self.buf.bytes_written();
198        self.object_writer
199            .write_file_metadata(&file_meta, &mut self.buf)?;
200        let end_pos = self.buf.bytes_written();
201
202        // Write footer
203        let metadata_len = (end_pos - start_pos) as u32;
204
205        self.buf.write_all(&metadata_len.to_le_bytes())?;
206        self.buf.write_all(self.object_writer.get_file_magic())?;
207
208        // If row group metadata was encrypted, we replace the encrypted row groups with
209        // unencrypted metadata before it is returned to users. This allows the metadata
210        // to be usable for retrieving the row group statistics for example, without users
211        // needing to decrypt the metadata.
212        let mut builder = ParquetMetaDataBuilder::new(file_metadata);
213
214        builder = match unencrypted_row_groups {
215            Some(rg) => builder.set_row_groups(rg),
216            None => builder.set_row_groups(row_groups),
217        };
218
219        let column_indexes: Option<ParquetColumnIndex> = column_indexes.map(|ovvi| {
220            ovvi.into_iter()
221                .map(|vi| {
222                    vi.into_iter()
223                        .map(|oi| oi.unwrap_or(ColumnIndexMetaData::NONE))
224                        .collect()
225                })
226                .collect()
227        });
228
229        // FIXME(ets): this will panic if there's a missing index.
230        let offset_indexes: Option<ParquetOffsetIndex> = offset_indexes.map(|ovvi| {
231            ovvi.into_iter()
232                .map(|vi| vi.into_iter().map(|oi| oi.unwrap()).collect())
233                .collect()
234        });
235
236        builder = builder.set_column_index(column_indexes);
237        builder = builder.set_offset_index(offset_indexes);
238
239        Ok(builder.build())
240    }
241
242    pub fn new(
243        buf: &'a mut TrackedWrite<W>,
244        schema_descr: &'a SchemaDescPtr,
245        row_groups: Vec<RowGroupMetaData>,
246        created_by: Option<String>,
247        writer_version: i32,
248    ) -> Self {
249        Self {
250            buf,
251            schema_descr,
252            row_groups,
253            column_indexes: None,
254            offset_indexes: None,
255            key_value_metadata: None,
256            created_by,
257            object_writer: Default::default(),
258            writer_version,
259        }
260    }
261
262    pub fn with_column_indexes(
263        mut self,
264        column_indexes: Vec<Vec<Option<ColumnIndexMetaData>>>,
265    ) -> Self {
266        self.column_indexes = Some(column_indexes);
267        self
268    }
269
270    pub fn with_offset_indexes(
271        mut self,
272        offset_indexes: Vec<Vec<Option<OffsetIndexMetaData>>>,
273    ) -> Self {
274        self.offset_indexes = Some(offset_indexes);
275        self
276    }
277
278    pub fn with_key_value_metadata(mut self, key_value_metadata: Vec<KeyValue>) -> Self {
279        self.key_value_metadata = Some(key_value_metadata);
280        self
281    }
282
283    #[cfg(feature = "encryption")]
284    pub fn with_file_encryptor(mut self, file_encryptor: Option<Arc<FileEncryptor>>) -> Self {
285        self.object_writer = self.object_writer.with_file_encryptor(file_encryptor);
286        self
287    }
288}
289
290/// Writes [`ParquetMetaData`] to a byte stream
291///
292/// This structure handles the details of writing the various parts of Parquet
293/// metadata into a byte stream. It is used to write the metadata into a parquet
294/// file and can also write metadata into other locations (such as a store of
295/// bytes).
296///
297/// # Discussion
298///
299/// The process of writing Parquet metadata is tricky because the
300/// metadata is not stored as a single inline thrift structure. It can have
301/// several "out of band" structures such as the [`OffsetIndex`] and
302/// BloomFilters stored in separate structures whose locations are stored as
303/// offsets from the beginning of the file.
304///
305/// Note: this writer does not directly write BloomFilters. In order to write
306/// BloomFilters, write the bloom filters into the buffer before creating the
307/// metadata writer. Then set the corresponding `bloom_filter_offset` and
308/// `bloom_filter_length` on [`ColumnChunkMetaData`] passed to this writer.
309///
310/// # Output Format
311///
312/// The format of the metadata is as follows:
313///
314/// 1. Optional [`ColumnIndex`] (thrift encoded)
315/// 2. Optional [`OffsetIndex`] (thrift encoded)
316/// 3. [`FileMetaData`] (thrift encoded)
317/// 4. Length of encoded `FileMetaData` (4 bytes, little endian)
318/// 5. Parquet Magic Bytes (4 bytes)
319///
320/// [`FileMetaData`]: https://github.com/apache/parquet-format/tree/master?tab=readme-ov-file#metadata
321/// [`ColumnChunkMetaData`]: crate::file::metadata::ColumnChunkMetaData
322/// [`ColumnIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
323/// [`OffsetIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
324///
325/// ```text
326/// ┌──────────────────────┐
327/// │                      │
328/// │         ...          │
329/// │                      │
330/// │┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │
331/// │     ColumnIndex     ◀│─ ─ ─
332/// ││    (Optional)     │ │     │
333/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  │
334/// │┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │     │ FileMetadata
335/// │     OffsetIndex      │       contains embedded
336/// ││    (Optional)     │◀┼ ─   │ offsets to
337/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  │  │    ColumnIndex and
338/// │╔═══════════════════╗ │     │ OffsetIndex
339/// │║                   ║ │  │
340/// │║                   ║ ┼ ─   │
341/// │║   FileMetadata    ║ │
342/// │║                   ║ ┼ ─ ─ ┘
343/// │║                   ║ │
344/// │╚═══════════════════╝ │
345/// │┌───────────────────┐ │
346/// ││  metadata length  │ │ length of FileMetadata  (only)
347/// │└───────────────────┘ │
348/// │┌───────────────────┐ │
349/// ││      'PAR1'       │ │ Parquet Magic Bytes
350/// │└───────────────────┘ │
351/// └──────────────────────┘
352///      Output Buffer
353/// ```
354///
355/// # Example
356/// ```no_run
357/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataWriter};
358/// # fn get_metadata() -> ParquetMetaData { unimplemented!(); }
359/// // write parquet metadata to an in-memory buffer
360/// let mut buffer = vec![];
361/// let metadata: ParquetMetaData = get_metadata();
362/// let writer = ParquetMetaDataWriter::new(&mut buffer, &metadata);
363/// // write the metadata to the buffer
364/// writer.finish().unwrap();
365/// assert!(!buffer.is_empty());
366/// ```
367pub struct ParquetMetaDataWriter<'a, W: Write> {
368    buf: TrackedWrite<W>,
369    metadata: &'a ParquetMetaData,
370}
371
372impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
373    /// Create a new `ParquetMetaDataWriter` to write to `buf`
374    ///
375    /// Note any embedded offsets in the metadata will be written assuming the
376    /// metadata is at the start of the buffer. If the metadata is being written
377    /// to a location other than the start of the buffer, see [`Self::new_with_tracked`]
378    ///
379    /// See example on the struct level documentation
380    pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
381        Self::new_with_tracked(TrackedWrite::new(buf), metadata)
382    }
383
384    /// Create a new ParquetMetaDataWriter to write to `buf`
385    ///
386    /// This method is used when the metadata is being written to a location other
387    /// than the start of the buffer.
388    ///
389    /// See example on the struct level documentation
390    pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
391        Self { buf, metadata }
392    }
393
394    /// Write the metadata to the buffer
395    pub fn finish(mut self) -> Result<()> {
396        let file_metadata = self.metadata.file_metadata();
397
398        let schema = Arc::new(file_metadata.schema().clone());
399        let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
400        let created_by = file_metadata.created_by().map(str::to_string);
401
402        let row_groups = self.metadata.row_groups.clone();
403
404        let key_value_metadata = file_metadata.key_value_metadata().cloned();
405
406        let column_indexes = self.convert_column_indexes();
407        let offset_indexes = self.convert_offset_index();
408
409        let mut encoder = ThriftMetadataWriter::new(
410            &mut self.buf,
411            &schema_descr,
412            row_groups,
413            created_by,
414            file_metadata.version(),
415        );
416
417        if let Some(column_indexes) = column_indexes {
418            encoder = encoder.with_column_indexes(column_indexes);
419        }
420
421        if let Some(offset_indexes) = offset_indexes {
422            encoder = encoder.with_offset_indexes(offset_indexes);
423        }
424
425        if let Some(key_value_metadata) = key_value_metadata {
426            encoder = encoder.with_key_value_metadata(key_value_metadata);
427        }
428        encoder.finish()?;
429
430        Ok(())
431    }
432
433    fn convert_column_indexes(&self) -> Option<Vec<Vec<Option<ColumnIndexMetaData>>>> {
434        // TODO(ets): we're converting from ParquetColumnIndex to vec<vec<option>>,
435        // but then converting back to ParquetColumnIndex in the end. need to unify this.
436        self.metadata
437            .column_index()
438            .map(|row_group_column_indexes| {
439                (0..self.metadata.row_groups().len())
440                    .map(|rg_idx| {
441                        let column_indexes = &row_group_column_indexes[rg_idx];
442                        column_indexes
443                            .iter()
444                            .map(|column_index| Some(column_index.clone()))
445                            .collect()
446                    })
447                    .collect()
448            })
449    }
450
451    fn convert_offset_index(&self) -> Option<Vec<Vec<Option<OffsetIndexMetaData>>>> {
452        self.metadata
453            .offset_index()
454            .map(|row_group_offset_indexes| {
455                (0..self.metadata.row_groups().len())
456                    .map(|rg_idx| {
457                        let offset_indexes = &row_group_offset_indexes[rg_idx];
458                        offset_indexes
459                            .iter()
460                            .map(|offset_index| Some(offset_index.clone()))
461                            .collect()
462                    })
463                    .collect()
464            })
465    }
466}
467
468#[derive(Debug, Default)]
469struct MetadataObjectWriter {
470    #[cfg(feature = "encryption")]
471    file_encryptor: Option<Arc<FileEncryptor>>,
472}
473
474impl MetadataObjectWriter {
475    #[inline]
476    fn write_thrift_object(object: &impl WriteThrift, sink: impl Write) -> Result<()> {
477        let mut protocol = ThriftCompactOutputProtocol::new(sink);
478        object.write_thrift(&mut protocol)?;
479        Ok(())
480    }
481}
482
483/// Implementations of [`MetadataObjectWriter`] methods for when encryption is disabled
484#[cfg(not(feature = "encryption"))]
485impl MetadataObjectWriter {
486    /// Write [`FileMetaData`] in Thrift format
487    fn write_file_metadata(&self, file_metadata: &FileMeta, sink: impl Write) -> Result<()> {
488        Self::write_thrift_object(file_metadata, sink)
489    }
490
491    /// Write a column [`OffsetIndex`] in Thrift format
492    fn write_offset_index(
493        &self,
494        offset_index: &OffsetIndexMetaData,
495        _column_chunk: &ColumnChunkMetaData,
496        _row_group_idx: usize,
497        _column_idx: usize,
498        sink: impl Write,
499    ) -> Result<()> {
500        Self::write_thrift_object(offset_index, sink)
501    }
502
503    /// Write a column [`ColumnIndex`] in Thrift format
504    fn write_column_index(
505        &self,
506        column_index: &ColumnIndexMetaData,
507        _column_chunk: &ColumnChunkMetaData,
508        _row_group_idx: usize,
509        _column_idx: usize,
510        sink: impl Write,
511    ) -> Result<()> {
512        Self::write_thrift_object(column_index, sink)
513    }
514
515    /// No-op implementation of row-group metadata encryption
516    fn apply_row_group_encryption(
517        &self,
518        row_groups: Vec<RowGroupMetaData>,
519    ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
520        Ok((row_groups, None))
521    }
522
523    /// Get the "magic" bytes identifying the file type
524    pub fn get_file_magic(&self) -> &[u8; 4] {
525        get_file_magic()
526    }
527
528    fn get_plaintext_footer_crypto_metadata(
529        &self,
530    ) -> (Option<EncryptionAlgorithm>, Option<Vec<u8>>) {
531        (None, None)
532    }
533}
534
535/// Implementations of [`MetadataObjectWriter`] methods that rely on encryption being enabled
536#[cfg(feature = "encryption")]
537impl MetadataObjectWriter {
538    /// Set the file encryptor to use
539    fn with_file_encryptor(mut self, encryptor: Option<Arc<FileEncryptor>>) -> Self {
540        self.file_encryptor = encryptor;
541        self
542    }
543
544    /// Write [`FileMetaData`] in Thrift format, possibly encrypting it if required
545    ///
546    /// [`FileMetaData`]: https://github.com/apache/parquet-format/tree/master?tab=readme-ov-file#metadata
547    fn write_file_metadata(&self, file_metadata: &FileMeta, mut sink: impl Write) -> Result<()> {
548        match self.file_encryptor.as_ref() {
549            Some(file_encryptor) if file_encryptor.properties().encrypt_footer() => {
550                // First write FileCryptoMetadata
551                let crypto_metadata = Self::file_crypto_metadata(file_encryptor)?;
552                let mut protocol = ThriftCompactOutputProtocol::new(&mut sink);
553                crypto_metadata.write_thrift(&mut protocol)?;
554
555                // Then write encrypted footer
556                let aad = create_footer_aad(file_encryptor.file_aad())?;
557                let mut encryptor = file_encryptor.get_footer_encryptor()?;
558                encrypt_thrift_object(file_metadata, &mut encryptor, &mut sink, &aad)
559            }
560            Some(file_encryptor) if file_metadata.encryption_algorithm.is_some() => {
561                let aad = create_footer_aad(file_encryptor.file_aad())?;
562                let mut encryptor = file_encryptor.get_footer_encryptor()?;
563                write_signed_plaintext_thrift_object(file_metadata, &mut encryptor, &mut sink, &aad)
564            }
565            _ => Self::write_thrift_object(file_metadata, &mut sink),
566        }
567    }
568
569    /// Write a column [`OffsetIndex`] in Thrift format, possibly encrypting it if required
570    ///
571    /// [`OffsetIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
572    fn write_offset_index(
573        &self,
574        offset_index: &OffsetIndexMetaData,
575        column_chunk: &ColumnChunkMetaData,
576        row_group_idx: usize,
577        column_idx: usize,
578        sink: impl Write,
579    ) -> Result<()> {
580        match &self.file_encryptor {
581            Some(file_encryptor) => Self::write_thrift_object_with_encryption(
582                offset_index,
583                sink,
584                file_encryptor,
585                column_chunk,
586                ModuleType::OffsetIndex,
587                row_group_idx,
588                column_idx,
589            ),
590            None => Self::write_thrift_object(offset_index, sink),
591        }
592    }
593
594    /// Write a column [`ColumnIndex`] in Thrift format, possibly encrypting it if required
595    ///
596    /// [`ColumnIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
597    fn write_column_index(
598        &self,
599        column_index: &ColumnIndexMetaData,
600        column_chunk: &ColumnChunkMetaData,
601        row_group_idx: usize,
602        column_idx: usize,
603        sink: impl Write,
604    ) -> Result<()> {
605        match &self.file_encryptor {
606            Some(file_encryptor) => Self::write_thrift_object_with_encryption(
607                column_index,
608                sink,
609                file_encryptor,
610                column_chunk,
611                ModuleType::ColumnIndex,
612                row_group_idx,
613                column_idx,
614            ),
615            None => Self::write_thrift_object(column_index, sink),
616        }
617    }
618
619    /// If encryption is enabled and configured, encrypt row group metadata.
620    /// Returns a tuple of the row group metadata to write,
621    /// and possibly unencrypted metadata to be returned to clients if data was encrypted.
622    fn apply_row_group_encryption(
623        &self,
624        row_groups: Vec<RowGroupMetaData>,
625    ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
626        match &self.file_encryptor {
627            Some(file_encryptor) => {
628                let unencrypted_row_groups = row_groups.clone();
629                let encrypted_row_groups = Self::encrypt_row_groups(row_groups, file_encryptor)?;
630                Ok((encrypted_row_groups, Some(unencrypted_row_groups)))
631            }
632            None => Ok((row_groups, None)),
633        }
634    }
635
636    /// Get the "magic" bytes identifying the file type
637    fn get_file_magic(&self) -> &[u8; 4] {
638        get_file_magic(
639            self.file_encryptor
640                .as_ref()
641                .map(|encryptor| encryptor.properties()),
642        )
643    }
644
645    fn write_thrift_object_with_encryption(
646        object: &impl WriteThrift,
647        mut sink: impl Write,
648        file_encryptor: &FileEncryptor,
649        column_metadata: &ColumnChunkMetaData,
650        module_type: ModuleType,
651        row_group_index: usize,
652        column_index: usize,
653    ) -> Result<()> {
654        let column_path_vec = column_metadata.column_path().as_ref();
655
656        let joined_column_path;
657        let column_path = if column_path_vec.len() == 1 {
658            &column_path_vec[0]
659        } else {
660            joined_column_path = column_path_vec.join(".");
661            &joined_column_path
662        };
663
664        if file_encryptor.is_column_encrypted(column_path) {
665            use crate::encryption::encrypt::encrypt_thrift_object;
666
667            let aad = create_module_aad(
668                file_encryptor.file_aad(),
669                module_type,
670                row_group_index,
671                column_index,
672                None,
673            )?;
674            let mut encryptor = file_encryptor.get_column_encryptor(column_path)?;
675            encrypt_thrift_object(object, &mut encryptor, &mut sink, &aad)
676        } else {
677            Self::write_thrift_object(object, sink)
678        }
679    }
680
681    fn get_plaintext_footer_crypto_metadata(
682        &self,
683    ) -> (Option<EncryptionAlgorithm>, Option<Vec<u8>>) {
684        // Only plaintext footers may contain encryption algorithm and footer key metadata.
685        if let Some(file_encryptor) = self.file_encryptor.as_ref() {
686            let encryption_properties = file_encryptor.properties();
687            if !encryption_properties.encrypt_footer() {
688                return (
689                    Some(Self::encryption_algorithm_from_encryptor(file_encryptor)),
690                    encryption_properties.footer_key_metadata().cloned(),
691                );
692            }
693        }
694        (None, None)
695    }
696
697    fn encryption_algorithm_from_encryptor(file_encryptor: &FileEncryptor) -> EncryptionAlgorithm {
698        let supply_aad_prefix = file_encryptor
699            .properties()
700            .aad_prefix()
701            .map(|_| !file_encryptor.properties().store_aad_prefix());
702        let aad_prefix = if file_encryptor.properties().store_aad_prefix() {
703            file_encryptor.properties().aad_prefix()
704        } else {
705            None
706        };
707        EncryptionAlgorithm::AES_GCM_V1(AesGcmV1 {
708            aad_prefix: aad_prefix.cloned(),
709            aad_file_unique: Some(file_encryptor.aad_file_unique().clone()),
710            supply_aad_prefix,
711        })
712    }
713
714    fn file_crypto_metadata(file_encryptor: &'_ FileEncryptor) -> Result<FileCryptoMetaData<'_>> {
715        let properties = file_encryptor.properties();
716        Ok(FileCryptoMetaData {
717            encryption_algorithm: Self::encryption_algorithm_from_encryptor(file_encryptor),
718            key_metadata: properties.footer_key_metadata().map(|v| v.as_slice()),
719        })
720    }
721
722    fn encrypt_row_groups(
723        row_groups: Vec<RowGroupMetaData>,
724        file_encryptor: &Arc<FileEncryptor>,
725    ) -> Result<Vec<RowGroupMetaData>> {
726        row_groups
727            .into_iter()
728            .enumerate()
729            .map(|(rg_idx, mut rg)| {
730                let cols: Result<Vec<ColumnChunkMetaData>> = rg
731                    .columns
732                    .into_iter()
733                    .enumerate()
734                    .map(|(col_idx, c)| {
735                        Self::encrypt_column_chunk(c, file_encryptor, rg_idx, col_idx)
736                    })
737                    .collect();
738                rg.columns = cols?;
739                Ok(rg)
740            })
741            .collect()
742    }
743
744    /// Apply column encryption to column chunk metadata
745    fn encrypt_column_chunk(
746        mut column_chunk: ColumnChunkMetaData,
747        file_encryptor: &Arc<FileEncryptor>,
748        row_group_index: usize,
749        column_index: usize,
750    ) -> Result<ColumnChunkMetaData> {
751        // Column crypto metadata should have already been set when the column was created.
752        // Here we apply the encryption by encrypting the column metadata if required.
753        match column_chunk.column_crypto_metadata.as_ref() {
754            None => {}
755            Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
756                // When uniform encryption is used the footer is already encrypted,
757                // so the column chunk does not need additional encryption.
758            }
759            Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(col_key)) => {
760                use crate::file::metadata::thrift_gen::serialize_column_meta_data;
761
762                let column_path = col_key.path_in_schema.join(".");
763                let mut column_encryptor = file_encryptor.get_column_encryptor(&column_path)?;
764                let aad = create_module_aad(
765                    file_encryptor.file_aad(),
766                    ModuleType::ColumnMetaData,
767                    row_group_index,
768                    column_index,
769                    None,
770                )?;
771                // create temp ColumnMetaData that we can encrypt
772                let mut buffer: Vec<u8> = vec![];
773                {
774                    let mut prot = ThriftCompactOutputProtocol::new(&mut buffer);
775                    serialize_column_meta_data(&column_chunk, &mut prot)?;
776                }
777                let ciphertext = column_encryptor.encrypt(&buffer, &aad)?;
778
779                column_chunk.encrypted_column_metadata = Some(ciphertext);
780            }
781        }
782
783        Ok(column_chunk)
784    }
785}