parquet/file/metadata/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::file::metadata::thrift::FileMeta;
19use crate::file::metadata::{
20    ColumnChunkMetaData, ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData,
21};
22use crate::schema::types::{SchemaDescPtr, SchemaDescriptor};
23use crate::{
24    basic::ColumnOrder,
25    file::metadata::{FileMetaData, ParquetMetaDataBuilder},
26};
27#[cfg(feature = "encryption")]
28use crate::{
29    encryption::{
30        encrypt::{FileEncryptor, encrypt_thrift_object, write_signed_plaintext_thrift_object},
31        modules::{ModuleType, create_footer_aad, create_module_aad},
32    },
33    file::column_crypto_metadata::ColumnCryptoMetaData,
34    file::metadata::thrift::encryption::{AesGcmV1, EncryptionAlgorithm, FileCryptoMetaData},
35};
36use crate::{errors::Result, file::page_index::column_index::ColumnIndexMetaData};
37
38use crate::{
39    file::writer::{TrackedWrite, get_file_magic},
40    parquet_thrift::WriteThrift,
41};
42use crate::{
43    file::{
44        metadata::{KeyValue, ParquetMetaData},
45        page_index::offset_index::OffsetIndexMetaData,
46    },
47    parquet_thrift::ThriftCompactOutputProtocol,
48};
49use std::io::Write;
50use std::sync::Arc;
51
52/// Writes `crate::file::metadata` structures to a thrift encoded byte stream
53///
54/// See [`ParquetMetaDataWriter`] for background and example.
55pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
56    buf: &'a mut TrackedWrite<W>,
57    schema_descr: &'a SchemaDescPtr,
58    row_groups: Vec<RowGroupMetaData>,
59    column_indexes: Option<Vec<Vec<Option<ColumnIndexMetaData>>>>,
60    offset_indexes: Option<Vec<Vec<Option<OffsetIndexMetaData>>>>,
61    key_value_metadata: Option<Vec<KeyValue>>,
62    created_by: Option<String>,
63    object_writer: MetadataObjectWriter,
64    writer_version: i32,
65}
66
67impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
68    /// Serialize all the offset indexes to `self.buf`,
69    ///
70    /// Note: also updates the `ColumnChunk::offset_index_offset` and
71    /// `ColumnChunk::offset_index_length` to reflect the position and length
72    /// of the serialized offset indexes.
73    fn write_offset_indexes(
74        &mut self,
75        offset_indexes: &[Vec<Option<OffsetIndexMetaData>>],
76    ) -> Result<()> {
77        // iter row group
78        // iter each column
79        // write offset index to the file
80        for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
81            for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
82                if let Some(offset_index) = &offset_indexes[row_group_idx][column_idx] {
83                    let start_offset = self.buf.bytes_written();
84                    self.object_writer.write_offset_index(
85                        offset_index,
86                        column_metadata,
87                        row_group_idx,
88                        column_idx,
89                        &mut self.buf,
90                    )?;
91                    let end_offset = self.buf.bytes_written();
92                    // set offset and index for offset index
93                    column_metadata.offset_index_offset = Some(start_offset as i64);
94                    column_metadata.offset_index_length = Some((end_offset - start_offset) as i32);
95                }
96            }
97        }
98        Ok(())
99    }
100
101    /// Serialize all the column indexes to the `self.buf`
102    ///
103    /// Note: also updates the `ColumnChunk::column_index_offset` and
104    /// `ColumnChunk::column_index_length` to reflect the position and length
105    /// of the serialized column indexes.
106    fn write_column_indexes(
107        &mut self,
108        column_indexes: &[Vec<Option<ColumnIndexMetaData>>],
109    ) -> Result<()> {
110        // iter row group
111        // iter each column
112        // write column index to the file
113        for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
114            for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
115                if let Some(column_index) = &column_indexes[row_group_idx][column_idx] {
116                    let start_offset = self.buf.bytes_written();
117                    self.object_writer.write_column_index(
118                        column_index,
119                        column_metadata,
120                        row_group_idx,
121                        column_idx,
122                        &mut self.buf,
123                    )?;
124                    let end_offset = self.buf.bytes_written();
125                    // set offset and index for offset index
126                    column_metadata.column_index_offset = Some(start_offset as i64);
127                    column_metadata.column_index_length = Some((end_offset - start_offset) as i32);
128                }
129            }
130        }
131        Ok(())
132    }
133
134    /// Assembles and writes the final metadata to self.buf
135    pub fn finish(mut self) -> Result<ParquetMetaData> {
136        let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
137
138        let column_indexes = std::mem::take(&mut self.column_indexes);
139        let offset_indexes = std::mem::take(&mut self.offset_indexes);
140
141        // Write column indexes and offset indexes
142        if let Some(column_indexes) = column_indexes.as_ref() {
143            self.write_column_indexes(column_indexes)?;
144        }
145        if let Some(offset_indexes) = offset_indexes.as_ref() {
146            self.write_offset_indexes(offset_indexes)?;
147        }
148
149        // We only include ColumnOrder for leaf nodes.
150        // Currently only supported ColumnOrder is TypeDefinedOrder so we set this
151        // for all leaf nodes.
152        // Even if the column has an undefined sort order, such as INTERVAL, this
153        // is still technically the defined TYPEORDER so it should still be set.
154        let column_orders = self
155            .schema_descr
156            .columns()
157            .iter()
158            .map(|col| {
159                let sort_order = ColumnOrder::get_sort_order(
160                    col.logical_type(),
161                    col.converted_type(),
162                    col.physical_type(),
163                );
164                ColumnOrder::TYPE_DEFINED_ORDER(sort_order)
165            })
166            .collect();
167
168        // This field is optional, perhaps in cases where no min/max fields are set
169        // in any Statistics or ColumnIndex object in the whole file.
170        // But for simplicity we always set this field.
171        let column_orders = Some(column_orders);
172
173        let (row_groups, unencrypted_row_groups) = self
174            .object_writer
175            .apply_row_group_encryption(self.row_groups)?;
176
177        #[cfg(feature = "encryption")]
178        let (encryption_algorithm, footer_signing_key_metadata) =
179            self.object_writer.get_plaintext_footer_crypto_metadata();
180        #[cfg(feature = "encryption")]
181        let file_metadata = FileMetaData::new(
182            self.writer_version,
183            num_rows,
184            self.created_by,
185            self.key_value_metadata,
186            self.schema_descr.clone(),
187            column_orders,
188        )
189        .with_encryption_algorithm(encryption_algorithm)
190        .with_footer_signing_key_metadata(footer_signing_key_metadata);
191
192        #[cfg(not(feature = "encryption"))]
193        let file_metadata = FileMetaData::new(
194            self.writer_version,
195            num_rows,
196            self.created_by,
197            self.key_value_metadata,
198            self.schema_descr.clone(),
199            column_orders,
200        );
201
202        let file_meta = FileMeta {
203            file_metadata: &file_metadata,
204            row_groups: &row_groups,
205        };
206
207        // Write file metadata
208        let start_pos = self.buf.bytes_written();
209        self.object_writer
210            .write_file_metadata(&file_meta, &mut self.buf)?;
211        let end_pos = self.buf.bytes_written();
212
213        // Write footer
214        let metadata_len = (end_pos - start_pos) as u32;
215
216        self.buf.write_all(&metadata_len.to_le_bytes())?;
217        self.buf.write_all(self.object_writer.get_file_magic())?;
218
219        // If row group metadata was encrypted, we replace the encrypted row groups with
220        // unencrypted metadata before it is returned to users. This allows the metadata
221        // to be usable for retrieving the row group statistics for example, without users
222        // needing to decrypt the metadata.
223        let mut builder = ParquetMetaDataBuilder::new(file_metadata);
224
225        builder = match unencrypted_row_groups {
226            Some(rg) => builder.set_row_groups(rg),
227            None => builder.set_row_groups(row_groups),
228        };
229
230        let column_indexes: Option<ParquetColumnIndex> = column_indexes.map(|ovvi| {
231            ovvi.into_iter()
232                .map(|vi| {
233                    vi.into_iter()
234                        .map(|oi| oi.unwrap_or(ColumnIndexMetaData::NONE))
235                        .collect()
236                })
237                .collect()
238        });
239
240        // FIXME(ets): this will panic if there's a missing index.
241        let offset_indexes: Option<ParquetOffsetIndex> = offset_indexes.map(|ovvi| {
242            ovvi.into_iter()
243                .map(|vi| vi.into_iter().map(|oi| oi.unwrap()).collect())
244                .collect()
245        });
246
247        builder = builder.set_column_index(column_indexes);
248        builder = builder.set_offset_index(offset_indexes);
249
250        Ok(builder.build())
251    }
252
253    pub fn new(
254        buf: &'a mut TrackedWrite<W>,
255        schema_descr: &'a SchemaDescPtr,
256        row_groups: Vec<RowGroupMetaData>,
257        created_by: Option<String>,
258        writer_version: i32,
259    ) -> Self {
260        Self {
261            buf,
262            schema_descr,
263            row_groups,
264            column_indexes: None,
265            offset_indexes: None,
266            key_value_metadata: None,
267            created_by,
268            object_writer: Default::default(),
269            writer_version,
270        }
271    }
272
273    pub fn with_column_indexes(
274        mut self,
275        column_indexes: Vec<Vec<Option<ColumnIndexMetaData>>>,
276    ) -> Self {
277        self.column_indexes = Some(column_indexes);
278        self
279    }
280
281    pub fn with_offset_indexes(
282        mut self,
283        offset_indexes: Vec<Vec<Option<OffsetIndexMetaData>>>,
284    ) -> Self {
285        self.offset_indexes = Some(offset_indexes);
286        self
287    }
288
289    pub fn with_key_value_metadata(mut self, key_value_metadata: Vec<KeyValue>) -> Self {
290        self.key_value_metadata = Some(key_value_metadata);
291        self
292    }
293
294    #[cfg(feature = "encryption")]
295    pub fn with_file_encryptor(mut self, file_encryptor: Option<Arc<FileEncryptor>>) -> Self {
296        self.object_writer = self.object_writer.with_file_encryptor(file_encryptor);
297        self
298    }
299}
300
301/// Writes [`ParquetMetaData`] to a byte stream
302///
303/// This structure handles the details of writing the various parts of Parquet
304/// metadata into a byte stream. It is used to write the metadata into a parquet
305/// file and can also write metadata into other locations (such as a store of
306/// bytes).
307///
308/// # Discussion
309///
310/// The process of writing Parquet metadata is tricky because the
311/// metadata is not stored as a single inline thrift structure. It can have
312/// several "out of band" structures such as the [`OffsetIndex`] and
313/// BloomFilters stored in separate structures whose locations are stored as
314/// offsets from the beginning of the file.
315///
316/// Note: this writer does not directly write BloomFilters. In order to write
317/// BloomFilters, write the bloom filters into the buffer before creating the
318/// metadata writer. Then set the corresponding `bloom_filter_offset` and
319/// `bloom_filter_length` on [`ColumnChunkMetaData`] passed to this writer.
320///
321/// # Output Format
322///
323/// The format of the metadata is as follows:
324///
325/// 1. Optional [`ColumnIndex`] (thrift encoded)
326/// 2. Optional [`OffsetIndex`] (thrift encoded)
327/// 3. [`FileMetaData`] (thrift encoded)
328/// 4. Length of encoded `FileMetaData` (4 bytes, little endian)
329/// 5. Parquet Magic Bytes (4 bytes)
330///
331/// [`FileMetaData`]: https://github.com/apache/parquet-format/tree/master?tab=readme-ov-file#metadata
332/// [`ColumnChunkMetaData`]: crate::file::metadata::ColumnChunkMetaData
333/// [`ColumnIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
334/// [`OffsetIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
335///
336/// ```text
337/// ┌──────────────────────┐
338/// │                      │
339/// │         ...          │
340/// │                      │
341/// │┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │
342/// │     ColumnIndex     ◀│─ ─ ─
343/// ││    (Optional)     │ │     │
344/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  │
345/// │┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │     │ FileMetadata
346/// │     OffsetIndex      │       contains embedded
347/// ││    (Optional)     │◀┼ ─   │ offsets to
348/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  │  │    ColumnIndex and
349/// │╔═══════════════════╗ │     │ OffsetIndex
350/// │║                   ║ │  │
351/// │║                   ║ ┼ ─   │
352/// │║   FileMetadata    ║ │
353/// │║                   ║ ┼ ─ ─ ┘
354/// │║                   ║ │
355/// │╚═══════════════════╝ │
356/// │┌───────────────────┐ │
357/// ││  metadata length  │ │ length of FileMetadata  (only)
358/// │└───────────────────┘ │
359/// │┌───────────────────┐ │
360/// ││      'PAR1'       │ │ Parquet Magic Bytes
361/// │└───────────────────┘ │
362/// └──────────────────────┘
363///      Output Buffer
364/// ```
365///
366/// # Example
367/// ```no_run
368/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataWriter};
369/// # fn get_metadata() -> ParquetMetaData { unimplemented!(); }
370/// // write parquet metadata to an in-memory buffer
371/// let mut buffer = vec![];
372/// let metadata: ParquetMetaData = get_metadata();
373/// let writer = ParquetMetaDataWriter::new(&mut buffer, &metadata);
374/// // write the metadata to the buffer
375/// writer.finish().unwrap();
376/// assert!(!buffer.is_empty());
377/// ```
378pub struct ParquetMetaDataWriter<'a, W: Write> {
379    buf: TrackedWrite<W>,
380    metadata: &'a ParquetMetaData,
381}
382
383impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
384    /// Create a new `ParquetMetaDataWriter` to write to `buf`
385    ///
386    /// Note any embedded offsets in the metadata will be written assuming the
387    /// metadata is at the start of the buffer. If the metadata is being written
388    /// to a location other than the start of the buffer, see [`Self::new_with_tracked`]
389    ///
390    /// See example on the struct level documentation
391    pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
392        Self::new_with_tracked(TrackedWrite::new(buf), metadata)
393    }
394
395    /// Create a new ParquetMetaDataWriter to write to `buf`
396    ///
397    /// This method is used when the metadata is being written to a location other
398    /// than the start of the buffer.
399    ///
400    /// See example on the struct level documentation
401    pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
402        Self { buf, metadata }
403    }
404
405    /// Write the metadata to the buffer
406    pub fn finish(mut self) -> Result<()> {
407        let file_metadata = self.metadata.file_metadata();
408
409        let schema = Arc::new(file_metadata.schema().clone());
410        let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
411        let created_by = file_metadata.created_by().map(str::to_string);
412
413        let row_groups = self.metadata.row_groups.clone();
414
415        let key_value_metadata = file_metadata.key_value_metadata().cloned();
416
417        let column_indexes = self.convert_column_indexes();
418        let offset_indexes = self.convert_offset_index();
419
420        let mut encoder = ThriftMetadataWriter::new(
421            &mut self.buf,
422            &schema_descr,
423            row_groups,
424            created_by,
425            file_metadata.version(),
426        );
427
428        if let Some(column_indexes) = column_indexes {
429            encoder = encoder.with_column_indexes(column_indexes);
430        }
431
432        if let Some(offset_indexes) = offset_indexes {
433            encoder = encoder.with_offset_indexes(offset_indexes);
434        }
435
436        if let Some(key_value_metadata) = key_value_metadata {
437            encoder = encoder.with_key_value_metadata(key_value_metadata);
438        }
439        encoder.finish()?;
440
441        Ok(())
442    }
443
444    fn convert_column_indexes(&self) -> Option<Vec<Vec<Option<ColumnIndexMetaData>>>> {
445        // TODO(ets): we're converting from ParquetColumnIndex to vec<vec<option>>,
446        // but then converting back to ParquetColumnIndex in the end. need to unify this.
447        self.metadata
448            .column_index()
449            .map(|row_group_column_indexes| {
450                (0..self.metadata.row_groups().len())
451                    .map(|rg_idx| {
452                        let column_indexes = &row_group_column_indexes[rg_idx];
453                        column_indexes
454                            .iter()
455                            .map(|column_index| Some(column_index.clone()))
456                            .collect()
457                    })
458                    .collect()
459            })
460    }
461
462    fn convert_offset_index(&self) -> Option<Vec<Vec<Option<OffsetIndexMetaData>>>> {
463        self.metadata
464            .offset_index()
465            .map(|row_group_offset_indexes| {
466                (0..self.metadata.row_groups().len())
467                    .map(|rg_idx| {
468                        let offset_indexes = &row_group_offset_indexes[rg_idx];
469                        offset_indexes
470                            .iter()
471                            .map(|offset_index| Some(offset_index.clone()))
472                            .collect()
473                    })
474                    .collect()
475            })
476    }
477}
478
479#[derive(Debug, Default)]
480struct MetadataObjectWriter {
481    #[cfg(feature = "encryption")]
482    file_encryptor: Option<Arc<FileEncryptor>>,
483}
484
485impl MetadataObjectWriter {
486    #[inline]
487    fn write_thrift_object(object: &impl WriteThrift, sink: impl Write) -> Result<()> {
488        let mut protocol = ThriftCompactOutputProtocol::new(sink);
489        object.write_thrift(&mut protocol)?;
490        Ok(())
491    }
492}
493
494/// Implementations of [`MetadataObjectWriter`] methods for when encryption is disabled
495#[cfg(not(feature = "encryption"))]
496impl MetadataObjectWriter {
497    /// Write [`FileMetaData`] in Thrift format
498    fn write_file_metadata(&self, file_metadata: &FileMeta, sink: impl Write) -> Result<()> {
499        Self::write_thrift_object(file_metadata, sink)
500    }
501
502    /// Write a column [`OffsetIndex`] in Thrift format
503    fn write_offset_index(
504        &self,
505        offset_index: &OffsetIndexMetaData,
506        _column_chunk: &ColumnChunkMetaData,
507        _row_group_idx: usize,
508        _column_idx: usize,
509        sink: impl Write,
510    ) -> Result<()> {
511        Self::write_thrift_object(offset_index, sink)
512    }
513
514    /// Write a column [`ColumnIndex`] in Thrift format
515    fn write_column_index(
516        &self,
517        column_index: &ColumnIndexMetaData,
518        _column_chunk: &ColumnChunkMetaData,
519        _row_group_idx: usize,
520        _column_idx: usize,
521        sink: impl Write,
522    ) -> Result<()> {
523        Self::write_thrift_object(column_index, sink)
524    }
525
526    /// No-op implementation of row-group metadata encryption
527    fn apply_row_group_encryption(
528        &self,
529        row_groups: Vec<RowGroupMetaData>,
530    ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
531        Ok((row_groups, None))
532    }
533
534    /// Get the "magic" bytes identifying the file type
535    pub fn get_file_magic(&self) -> &[u8; 4] {
536        get_file_magic()
537    }
538}
539
540/// Implementations of [`MetadataObjectWriter`] methods that rely on encryption being enabled
541#[cfg(feature = "encryption")]
542impl MetadataObjectWriter {
543    /// Set the file encryptor to use
544    fn with_file_encryptor(mut self, encryptor: Option<Arc<FileEncryptor>>) -> Self {
545        self.file_encryptor = encryptor;
546        self
547    }
548
549    /// Write [`FileMetaData`] in Thrift format, possibly encrypting it if required
550    ///
551    /// [`FileMetaData`]: https://github.com/apache/parquet-format/tree/master?tab=readme-ov-file#metadata
552    fn write_file_metadata(&self, file_metadata: &FileMeta, mut sink: impl Write) -> Result<()> {
553        match self.file_encryptor.as_ref() {
554            Some(file_encryptor) if file_encryptor.properties().encrypt_footer() => {
555                // First write FileCryptoMetadata
556                let crypto_metadata = Self::file_crypto_metadata(file_encryptor)?;
557                let mut protocol = ThriftCompactOutputProtocol::new(&mut sink);
558                crypto_metadata.write_thrift(&mut protocol)?;
559
560                // Then write encrypted footer
561                let aad = create_footer_aad(file_encryptor.file_aad())?;
562                let mut encryptor = file_encryptor.get_footer_encryptor()?;
563                encrypt_thrift_object(file_metadata, &mut encryptor, &mut sink, &aad)
564            }
565            Some(file_encryptor) if file_metadata.file_metadata.encryption_algorithm.is_some() => {
566                let aad = create_footer_aad(file_encryptor.file_aad())?;
567                let mut encryptor = file_encryptor.get_footer_encryptor()?;
568                write_signed_plaintext_thrift_object(file_metadata, &mut encryptor, &mut sink, &aad)
569            }
570            _ => Self::write_thrift_object(file_metadata, &mut sink),
571        }
572    }
573
574    /// Write a column [`OffsetIndex`] in Thrift format, possibly encrypting it if required
575    ///
576    /// [`OffsetIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
577    fn write_offset_index(
578        &self,
579        offset_index: &OffsetIndexMetaData,
580        column_chunk: &ColumnChunkMetaData,
581        row_group_idx: usize,
582        column_idx: usize,
583        sink: impl Write,
584    ) -> Result<()> {
585        match &self.file_encryptor {
586            Some(file_encryptor) => Self::write_thrift_object_with_encryption(
587                offset_index,
588                sink,
589                file_encryptor,
590                column_chunk,
591                ModuleType::OffsetIndex,
592                row_group_idx,
593                column_idx,
594            ),
595            None => Self::write_thrift_object(offset_index, sink),
596        }
597    }
598
599    /// Write a column [`ColumnIndex`] in Thrift format, possibly encrypting it if required
600    ///
601    /// [`ColumnIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
602    fn write_column_index(
603        &self,
604        column_index: &ColumnIndexMetaData,
605        column_chunk: &ColumnChunkMetaData,
606        row_group_idx: usize,
607        column_idx: usize,
608        sink: impl Write,
609    ) -> Result<()> {
610        match &self.file_encryptor {
611            Some(file_encryptor) => Self::write_thrift_object_with_encryption(
612                column_index,
613                sink,
614                file_encryptor,
615                column_chunk,
616                ModuleType::ColumnIndex,
617                row_group_idx,
618                column_idx,
619            ),
620            None => Self::write_thrift_object(column_index, sink),
621        }
622    }
623
624    /// If encryption is enabled and configured, encrypt row group metadata.
625    /// Returns a tuple of the row group metadata to write,
626    /// and possibly unencrypted metadata to be returned to clients if data was encrypted.
627    fn apply_row_group_encryption(
628        &self,
629        row_groups: Vec<RowGroupMetaData>,
630    ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
631        match &self.file_encryptor {
632            Some(file_encryptor) => {
633                let unencrypted_row_groups = row_groups.clone();
634                let encrypted_row_groups = Self::encrypt_row_groups(row_groups, file_encryptor)?;
635                Ok((encrypted_row_groups, Some(unencrypted_row_groups)))
636            }
637            None => Ok((row_groups, None)),
638        }
639    }
640
641    /// Get the "magic" bytes identifying the file type
642    fn get_file_magic(&self) -> &[u8; 4] {
643        get_file_magic(
644            self.file_encryptor
645                .as_ref()
646                .map(|encryptor| encryptor.properties()),
647        )
648    }
649
650    fn write_thrift_object_with_encryption(
651        object: &impl WriteThrift,
652        mut sink: impl Write,
653        file_encryptor: &FileEncryptor,
654        column_metadata: &ColumnChunkMetaData,
655        module_type: ModuleType,
656        row_group_index: usize,
657        column_index: usize,
658    ) -> Result<()> {
659        let column_path_vec = column_metadata.column_path().as_ref();
660
661        let joined_column_path;
662        let column_path = if column_path_vec.len() == 1 {
663            &column_path_vec[0]
664        } else {
665            joined_column_path = column_path_vec.join(".");
666            &joined_column_path
667        };
668
669        if file_encryptor.is_column_encrypted(column_path) {
670            use crate::encryption::encrypt::encrypt_thrift_object;
671
672            let aad = create_module_aad(
673                file_encryptor.file_aad(),
674                module_type,
675                row_group_index,
676                column_index,
677                None,
678            )?;
679            let mut encryptor = file_encryptor.get_column_encryptor(column_path)?;
680            encrypt_thrift_object(object, &mut encryptor, &mut sink, &aad)
681        } else {
682            Self::write_thrift_object(object, sink)
683        }
684    }
685
686    fn get_plaintext_footer_crypto_metadata(
687        &self,
688    ) -> (Option<EncryptionAlgorithm>, Option<Vec<u8>>) {
689        // Only plaintext footers may contain encryption algorithm and footer key metadata.
690        if let Some(file_encryptor) = self.file_encryptor.as_ref() {
691            let encryption_properties = file_encryptor.properties();
692            if !encryption_properties.encrypt_footer() {
693                return (
694                    Some(Self::encryption_algorithm_from_encryptor(file_encryptor)),
695                    encryption_properties.footer_key_metadata().cloned(),
696                );
697            }
698        }
699        (None, None)
700    }
701
702    fn encryption_algorithm_from_encryptor(file_encryptor: &FileEncryptor) -> EncryptionAlgorithm {
703        let supply_aad_prefix = file_encryptor
704            .properties()
705            .aad_prefix()
706            .map(|_| !file_encryptor.properties().store_aad_prefix());
707        let aad_prefix = if file_encryptor.properties().store_aad_prefix() {
708            file_encryptor.properties().aad_prefix()
709        } else {
710            None
711        };
712        EncryptionAlgorithm::AES_GCM_V1(AesGcmV1 {
713            aad_prefix: aad_prefix.cloned(),
714            aad_file_unique: Some(file_encryptor.aad_file_unique().clone()),
715            supply_aad_prefix,
716        })
717    }
718
719    fn file_crypto_metadata(file_encryptor: &'_ FileEncryptor) -> Result<FileCryptoMetaData<'_>> {
720        let properties = file_encryptor.properties();
721        Ok(FileCryptoMetaData {
722            encryption_algorithm: Self::encryption_algorithm_from_encryptor(file_encryptor),
723            key_metadata: properties.footer_key_metadata().map(|v| v.as_slice()),
724        })
725    }
726
727    fn encrypt_row_groups(
728        row_groups: Vec<RowGroupMetaData>,
729        file_encryptor: &Arc<FileEncryptor>,
730    ) -> Result<Vec<RowGroupMetaData>> {
731        row_groups
732            .into_iter()
733            .enumerate()
734            .map(|(rg_idx, mut rg)| {
735                let cols: Result<Vec<ColumnChunkMetaData>> = rg
736                    .columns
737                    .into_iter()
738                    .enumerate()
739                    .map(|(col_idx, c)| {
740                        Self::encrypt_column_chunk(c, file_encryptor, rg_idx, col_idx)
741                    })
742                    .collect();
743                rg.columns = cols?;
744                Ok(rg)
745            })
746            .collect()
747    }
748
749    /// Apply column encryption to column chunk metadata
750    fn encrypt_column_chunk(
751        mut column_chunk: ColumnChunkMetaData,
752        file_encryptor: &Arc<FileEncryptor>,
753        row_group_index: usize,
754        column_index: usize,
755    ) -> Result<ColumnChunkMetaData> {
756        // Column crypto metadata should have already been set when the column was created.
757        // Here we apply the encryption by encrypting the column metadata if required.
758        match column_chunk.column_crypto_metadata.as_deref() {
759            None => {}
760            Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
761                // When uniform encryption is used the footer is already encrypted,
762                // so the column chunk does not need additional encryption.
763            }
764            Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(col_key)) => {
765                use crate::file::metadata::thrift::serialize_column_meta_data;
766
767                let column_path = col_key.path_in_schema.join(".");
768                let mut column_encryptor = file_encryptor.get_column_encryptor(&column_path)?;
769                let aad = create_module_aad(
770                    file_encryptor.file_aad(),
771                    ModuleType::ColumnMetaData,
772                    row_group_index,
773                    column_index,
774                    None,
775                )?;
776                // create temp ColumnMetaData that we can encrypt
777                let mut buffer: Vec<u8> = vec![];
778                {
779                    let mut prot = ThriftCompactOutputProtocol::new(&mut buffer);
780                    serialize_column_meta_data(&column_chunk, &mut prot)?;
781                }
782                let ciphertext = column_encryptor.encrypt(&buffer, &aad)?;
783
784                column_chunk.encrypted_column_metadata = Some(ciphertext);
785            }
786        }
787
788        Ok(column_chunk)
789    }
790}