Skip to main content

parquet/file/metadata/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::file::metadata::thrift::FileMeta;
19use crate::file::metadata::{
20    ColumnChunkMetaData, ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData,
21};
22use crate::schema::types::{SchemaDescPtr, SchemaDescriptor};
23use crate::{
24    basic::ColumnOrder,
25    file::metadata::{FileMetaData, ParquetMetaDataBuilder},
26};
27#[cfg(feature = "encryption")]
28use crate::{
29    encryption::{
30        encrypt::{FileEncryptor, encrypt_thrift_object, write_signed_plaintext_thrift_object},
31        modules::{ModuleType, create_footer_aad, create_module_aad},
32    },
33    file::column_crypto_metadata::ColumnCryptoMetaData,
34    file::metadata::thrift::encryption::{AesGcmV1, EncryptionAlgorithm, FileCryptoMetaData},
35};
36use crate::{errors::Result, file::page_index::column_index::ColumnIndexMetaData};
37
38use crate::{
39    file::writer::{TrackedWrite, get_file_magic},
40    parquet_thrift::WriteThrift,
41};
42use crate::{
43    file::{
44        metadata::{KeyValue, ParquetMetaData},
45        page_index::offset_index::OffsetIndexMetaData,
46    },
47    parquet_thrift::ThriftCompactOutputProtocol,
48};
49use std::io::Write;
50use std::sync::Arc;
51
52/// Writes `crate::file::metadata` structures to a thrift encoded byte stream
53///
54/// See [`ParquetMetaDataWriter`] for background and example.
55pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
56    buf: &'a mut TrackedWrite<W>,
57    schema_descr: &'a SchemaDescPtr,
58    row_groups: Vec<RowGroupMetaData>,
59    column_indexes: Option<Vec<Vec<Option<ColumnIndexMetaData>>>>,
60    offset_indexes: Option<Vec<Vec<Option<OffsetIndexMetaData>>>>,
61    key_value_metadata: Option<Vec<KeyValue>>,
62    created_by: Option<String>,
63    object_writer: MetadataObjectWriter,
64    writer_version: i32,
65    write_path_in_schema: bool,
66}
67
68impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
69    /// Serialize all the offset indexes to `self.buf`,
70    ///
71    /// Note: also updates the `ColumnChunk::offset_index_offset` and
72    /// `ColumnChunk::offset_index_length` to reflect the position and length
73    /// of the serialized offset indexes.
74    fn write_offset_indexes(
75        &mut self,
76        offset_indexes: &[Vec<Option<OffsetIndexMetaData>>],
77    ) -> Result<()> {
78        // iter row group
79        // iter each column
80        // write offset index to the file
81        for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
82            for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
83                if let Some(offset_index) = &offset_indexes[row_group_idx][column_idx] {
84                    let start_offset = self.buf.bytes_written();
85                    self.object_writer.write_offset_index(
86                        offset_index,
87                        column_metadata,
88                        row_group_idx,
89                        column_idx,
90                        &mut self.buf,
91                    )?;
92                    let end_offset = self.buf.bytes_written();
93                    // set offset and index for offset index
94                    column_metadata.offset_index_offset = Some(start_offset as i64);
95                    column_metadata.offset_index_length = Some((end_offset - start_offset) as i32);
96                }
97            }
98        }
99        Ok(())
100    }
101
102    /// Serialize all the column indexes to the `self.buf`
103    ///
104    /// Note: also updates the `ColumnChunk::column_index_offset` and
105    /// `ColumnChunk::column_index_length` to reflect the position and length
106    /// of the serialized column indexes.
107    fn write_column_indexes(
108        &mut self,
109        column_indexes: &[Vec<Option<ColumnIndexMetaData>>],
110    ) -> Result<()> {
111        // iter row group
112        // iter each column
113        // write column index to the file
114        for (row_group_idx, row_group) in self.row_groups.iter_mut().enumerate() {
115            for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() {
116                if let Some(column_index) = &column_indexes[row_group_idx][column_idx] {
117                    let start_offset = self.buf.bytes_written();
118                    // only update column_metadata if the write succeeds
119                    if self.object_writer.write_column_index(
120                        column_index,
121                        column_metadata,
122                        row_group_idx,
123                        column_idx,
124                        &mut self.buf,
125                    )? {
126                        let end_offset = self.buf.bytes_written();
127                        // set offset and index for offset index
128                        column_metadata.column_index_offset = Some(start_offset as i64);
129                        column_metadata.column_index_length =
130                            Some((end_offset - start_offset) as i32);
131                    }
132                }
133            }
134        }
135        Ok(())
136    }
137
138    /// Serialize the column indexes and transform to `Option<ParquetColumnIndex>`
139    fn finalize_column_indexes(&mut self) -> Result<Option<ParquetColumnIndex>> {
140        let column_indexes = std::mem::take(&mut self.column_indexes);
141
142        // Write column indexes to file
143        if let Some(column_indexes) = column_indexes.as_ref() {
144            self.write_column_indexes(column_indexes)?;
145        }
146
147        // check to see if the index is `None` for every row group and column chunk
148        let all_none = column_indexes
149            .as_ref()
150            .is_some_and(|ci| ci.iter().all(|cii| cii.iter().all(|idx| idx.is_none())));
151
152        // transform from Option<Vec<Vec<Option<ColumnIndexMetaData>>>> to
153        // Option<Vec<Vec<ColumnIndexMetaData>>>
154        let column_indexes: Option<ParquetColumnIndex> = if all_none {
155            None
156        } else {
157            column_indexes.map(|ovvi| {
158                ovvi.into_iter()
159                    .map(|vi| {
160                        vi.into_iter()
161                            .map(|ci| ci.unwrap_or(ColumnIndexMetaData::NONE))
162                            .collect()
163                    })
164                    .collect()
165            })
166        };
167
168        Ok(column_indexes)
169    }
170
171    /// Serialize the offset indexes and transform to `Option<ParquetOffsetIndex>`
172    fn finalize_offset_indexes(&mut self) -> Result<Option<ParquetOffsetIndex>> {
173        let offset_indexes = std::mem::take(&mut self.offset_indexes);
174
175        // Write offset indexes to file
176        if let Some(offset_indexes) = offset_indexes.as_ref() {
177            self.write_offset_indexes(offset_indexes)?;
178        }
179
180        // check to see if the index is `None` for every row group and column chunk
181        let all_none = offset_indexes
182            .as_ref()
183            .is_some_and(|oi| oi.iter().all(|oii| oii.iter().all(|idx| idx.is_none())));
184
185        let offset_indexes: Option<ParquetOffsetIndex> = if all_none {
186            None
187        } else {
188            // FIXME(ets): this will panic if there's a missing index.
189            offset_indexes.map(|ovvi| {
190                ovvi.into_iter()
191                    .map(|vi| vi.into_iter().map(|oi| oi.unwrap()).collect())
192                    .collect()
193            })
194        };
195
196        Ok(offset_indexes)
197    }
198
199    /// Assembles and writes the final metadata to self.buf
200    pub fn finish(mut self) -> Result<ParquetMetaData> {
201        let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
202
203        // serialize page indexes and transform to the proper form for use in ParquetMetaData
204        let column_indexes = self.finalize_column_indexes()?;
205        let offset_indexes = self.finalize_offset_indexes()?;
206
207        // We only include ColumnOrder for leaf nodes.
208        // Currently only supported ColumnOrder is TypeDefinedOrder so we set this
209        // for all leaf nodes.
210        // Even if the column has an undefined sort order, such as INTERVAL, this
211        // is still technically the defined TYPEORDER so it should still be set.
212        let column_orders = self
213            .schema_descr
214            .columns()
215            .iter()
216            .map(|col| {
217                let sort_order = ColumnOrder::sort_order_for_type(
218                    col.logical_type_ref(),
219                    col.converted_type(),
220                    col.physical_type(),
221                );
222                ColumnOrder::TYPE_DEFINED_ORDER(sort_order)
223            })
224            .collect();
225
226        // This field is optional, perhaps in cases where no min/max fields are set
227        // in any Statistics or ColumnIndex object in the whole file.
228        // But for simplicity we always set this field.
229        let column_orders = Some(column_orders);
230
231        let (row_groups, unencrypted_row_groups) = self
232            .object_writer
233            .apply_row_group_encryption(self.row_groups)?;
234
235        #[cfg(feature = "encryption")]
236        let (encryption_algorithm, footer_signing_key_metadata) =
237            self.object_writer.get_plaintext_footer_crypto_metadata();
238        #[cfg(feature = "encryption")]
239        let file_metadata = FileMetaData::new(
240            self.writer_version,
241            num_rows,
242            self.created_by,
243            self.key_value_metadata,
244            self.schema_descr.clone(),
245            column_orders,
246        )
247        .with_encryption_algorithm(encryption_algorithm)
248        .with_footer_signing_key_metadata(footer_signing_key_metadata);
249
250        #[cfg(not(feature = "encryption"))]
251        let file_metadata = FileMetaData::new(
252            self.writer_version,
253            num_rows,
254            self.created_by,
255            self.key_value_metadata,
256            self.schema_descr.clone(),
257            column_orders,
258        );
259
260        let file_meta = FileMeta {
261            file_metadata: &file_metadata,
262            row_groups: &row_groups,
263            write_path_in_schema: self.write_path_in_schema,
264        };
265
266        // Write file metadata
267        let start_pos = self.buf.bytes_written();
268        self.object_writer
269            .write_file_metadata(&file_meta, &mut self.buf)?;
270        let end_pos = self.buf.bytes_written();
271
272        // Write footer
273        let metadata_len = (end_pos - start_pos) as u32;
274
275        self.buf.write_all(&metadata_len.to_le_bytes())?;
276        self.buf.write_all(self.object_writer.get_file_magic())?;
277
278        // If row group metadata was encrypted, we replace the encrypted row groups with
279        // unencrypted metadata before it is returned to users. This allows the metadata
280        // to be usable for retrieving the row group statistics for example, without users
281        // needing to decrypt the metadata.
282        let builder = ParquetMetaDataBuilder::new(file_metadata)
283            .set_column_index(column_indexes)
284            .set_offset_index(offset_indexes);
285
286        Ok(match unencrypted_row_groups {
287            Some(rg) => builder.set_row_groups(rg).build(),
288            None => builder.set_row_groups(row_groups).build(),
289        })
290    }
291
292    pub fn new(
293        buf: &'a mut TrackedWrite<W>,
294        schema_descr: &'a SchemaDescPtr,
295        row_groups: Vec<RowGroupMetaData>,
296        created_by: Option<String>,
297        writer_version: i32,
298        write_path_in_schema: bool,
299    ) -> Self {
300        Self {
301            buf,
302            schema_descr,
303            row_groups,
304            column_indexes: None,
305            offset_indexes: None,
306            key_value_metadata: None,
307            created_by,
308            object_writer: Default::default(),
309            writer_version,
310            write_path_in_schema,
311        }
312    }
313
314    pub fn with_column_indexes(
315        mut self,
316        column_indexes: Vec<Vec<Option<ColumnIndexMetaData>>>,
317    ) -> Self {
318        self.column_indexes = Some(column_indexes);
319        self
320    }
321
322    pub fn with_offset_indexes(
323        mut self,
324        offset_indexes: Vec<Vec<Option<OffsetIndexMetaData>>>,
325    ) -> Self {
326        self.offset_indexes = Some(offset_indexes);
327        self
328    }
329
330    pub fn with_key_value_metadata(mut self, key_value_metadata: Vec<KeyValue>) -> Self {
331        self.key_value_metadata = Some(key_value_metadata);
332        self
333    }
334
335    #[cfg(feature = "encryption")]
336    pub fn with_file_encryptor(mut self, file_encryptor: Option<Arc<FileEncryptor>>) -> Self {
337        self.object_writer = self.object_writer.with_file_encryptor(file_encryptor);
338        self
339    }
340}
341
342/// Writes [`ParquetMetaData`] to a byte stream
343///
344/// This structure handles the details of writing the various parts of Parquet
345/// metadata into a byte stream. It is used to write the metadata into a parquet
346/// file and can also write metadata into other locations (such as a store of
347/// bytes).
348///
349/// # Discussion
350///
351/// The process of writing Parquet metadata is tricky because the
352/// metadata is not stored as a single inline thrift structure. It can have
353/// several "out of band" structures such as the [`OffsetIndex`] and
354/// BloomFilters stored in separate structures whose locations are stored as
355/// offsets from the beginning of the file.
356///
357/// Note: this writer does not directly write BloomFilters. In order to write
358/// BloomFilters, write the bloom filters into the buffer before creating the
359/// metadata writer. Then set the corresponding `bloom_filter_offset` and
360/// `bloom_filter_length` on [`ColumnChunkMetaData`] passed to this writer.
361///
362/// # Output Format
363///
364/// The format of the metadata is as follows:
365///
366/// 1. Optional [`ColumnIndex`] (thrift encoded)
367/// 2. Optional [`OffsetIndex`] (thrift encoded)
368/// 3. [`FileMetaData`] (thrift encoded)
369/// 4. Length of encoded `FileMetaData` (4 bytes, little endian)
370/// 5. Parquet Magic Bytes (4 bytes)
371///
372/// [`FileMetaData`]: https://github.com/apache/parquet-format/tree/master?tab=readme-ov-file#metadata
373/// [`ColumnChunkMetaData`]: crate::file::metadata::ColumnChunkMetaData
374/// [`ColumnIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
375/// [`OffsetIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
376///
377/// ```text
378/// ┌──────────────────────┐
379/// │                      │
380/// │         ...          │
381/// │                      │
382/// │┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │
383/// │     ColumnIndex     ◀│─ ─ ─
384/// ││    (Optional)     │ │     │
385/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  │
386/// │┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │     │ FileMetadata
387/// │     OffsetIndex      │       contains embedded
388/// ││    (Optional)     │◀┼ ─   │ offsets to
389/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  │  │    ColumnIndex and
390/// │╔═══════════════════╗ │     │ OffsetIndex
391/// │║                   ║ │  │
392/// │║                   ║ ┼ ─   │
393/// │║   FileMetadata    ║ │
394/// │║                   ║ ┼ ─ ─ ┘
395/// │║                   ║ │
396/// │╚═══════════════════╝ │
397/// │┌───────────────────┐ │
398/// ││  metadata length  │ │ length of FileMetadata  (only)
399/// │└───────────────────┘ │
400/// │┌───────────────────┐ │
401/// ││      'PAR1'       │ │ Parquet Magic Bytes
402/// │└───────────────────┘ │
403/// └──────────────────────┘
404///      Output Buffer
405/// ```
406///
407/// # Example
408/// ```no_run
409/// # use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataWriter};
410/// # fn get_metadata() -> ParquetMetaData { unimplemented!(); }
411/// // write parquet metadata to an in-memory buffer
412/// let mut buffer = vec![];
413/// let metadata: ParquetMetaData = get_metadata();
414/// let writer = ParquetMetaDataWriter::new(&mut buffer, &metadata);
415/// // write the metadata to the buffer
416/// writer.finish().unwrap();
417/// assert!(!buffer.is_empty());
418/// ```
419pub struct ParquetMetaDataWriter<'a, W: Write> {
420    buf: TrackedWrite<W>,
421    metadata: &'a ParquetMetaData,
422    write_path_in_schema: bool,
423}
424
425impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
426    /// Create a new `ParquetMetaDataWriter` to write to `buf`
427    ///
428    /// Note any embedded offsets in the metadata will be written assuming the
429    /// metadata is at the start of the buffer. If the metadata is being written
430    /// to a location other than the start of the buffer, see [`Self::new_with_tracked`]
431    ///
432    /// See example on the struct level documentation
433    pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
434        Self::new_with_tracked(TrackedWrite::new(buf), metadata)
435    }
436
437    /// Create a new ParquetMetaDataWriter to write to `buf`
438    ///
439    /// This method is used when the metadata is being written to a location other
440    /// than the start of the buffer.
441    ///
442    /// See example on the struct level documentation
443    pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
444        Self {
445            buf,
446            metadata,
447            write_path_in_schema: true,
448        }
449    }
450
451    /// Set whether or not to write the `path_in_schema` field in the Thrift `ColumnMetaData`
452    /// struct.
453    pub fn with_write_path_in_schema(self, val: bool) -> Self {
454        Self {
455            write_path_in_schema: val,
456            ..self
457        }
458    }
459
460    /// Write the metadata to the buffer
461    pub fn finish(mut self) -> Result<()> {
462        let file_metadata = self.metadata.file_metadata();
463
464        let schema = Arc::new(file_metadata.schema().clone());
465        let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
466        let created_by = file_metadata.created_by().map(str::to_string);
467
468        let row_groups = self.metadata.row_groups.clone();
469
470        let key_value_metadata = file_metadata.key_value_metadata().cloned();
471
472        let column_indexes = self.convert_column_indexes();
473        let offset_indexes = self.convert_offset_index();
474
475        let mut encoder = ThriftMetadataWriter::new(
476            &mut self.buf,
477            &schema_descr,
478            row_groups,
479            created_by,
480            file_metadata.version(),
481            self.write_path_in_schema,
482        );
483
484        if let Some(column_indexes) = column_indexes {
485            encoder = encoder.with_column_indexes(column_indexes);
486        }
487
488        if let Some(offset_indexes) = offset_indexes {
489            encoder = encoder.with_offset_indexes(offset_indexes);
490        }
491
492        if let Some(key_value_metadata) = key_value_metadata {
493            encoder = encoder.with_key_value_metadata(key_value_metadata);
494        }
495        encoder.finish()?;
496
497        Ok(())
498    }
499
500    fn convert_column_indexes(&self) -> Option<Vec<Vec<Option<ColumnIndexMetaData>>>> {
501        // TODO(ets): we're converting from ParquetColumnIndex to vec<vec<option>>,
502        // but then converting back to ParquetColumnIndex in the end. need to unify this.
503        self.metadata
504            .column_index()
505            .map(|row_group_column_indexes| {
506                (0..self.metadata.row_groups().len())
507                    .map(|rg_idx| {
508                        let column_indexes = &row_group_column_indexes[rg_idx];
509                        column_indexes
510                            .iter()
511                            .map(|column_index| Some(column_index.clone()))
512                            .collect()
513                    })
514                    .collect()
515            })
516    }
517
518    fn convert_offset_index(&self) -> Option<Vec<Vec<Option<OffsetIndexMetaData>>>> {
519        self.metadata
520            .offset_index()
521            .map(|row_group_offset_indexes| {
522                (0..self.metadata.row_groups().len())
523                    .map(|rg_idx| {
524                        let offset_indexes = &row_group_offset_indexes[rg_idx];
525                        offset_indexes
526                            .iter()
527                            .map(|offset_index| Some(offset_index.clone()))
528                            .collect()
529                    })
530                    .collect()
531            })
532    }
533}
534
535#[derive(Debug, Default)]
536struct MetadataObjectWriter {
537    #[cfg(feature = "encryption")]
538    file_encryptor: Option<Arc<FileEncryptor>>,
539}
540
541impl MetadataObjectWriter {
542    #[inline]
543    fn write_thrift_object(object: &impl WriteThrift, sink: impl Write) -> Result<()> {
544        let mut protocol = ThriftCompactOutputProtocol::new(sink);
545        object.write_thrift(&mut protocol)?;
546        Ok(())
547    }
548}
549
550/// Implementations of [`MetadataObjectWriter`] methods for when encryption is disabled
551#[cfg(not(feature = "encryption"))]
552impl MetadataObjectWriter {
553    /// Write [`FileMetaData`] in Thrift format
554    ///
555    /// [`FileMetaData`]: https://github.com/apache/parquet-format/tree/master?tab=readme-ov-file#metadata
556    fn write_file_metadata(&self, file_metadata: &FileMeta, sink: impl Write) -> Result<()> {
557        Self::write_thrift_object(file_metadata, sink)
558    }
559
560    /// Write a column [`OffsetIndex`] in Thrift format
561    ///
562    /// [`OffsetIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
563    fn write_offset_index(
564        &self,
565        offset_index: &OffsetIndexMetaData,
566        _column_chunk: &ColumnChunkMetaData,
567        _row_group_idx: usize,
568        _column_idx: usize,
569        sink: impl Write,
570    ) -> Result<()> {
571        Self::write_thrift_object(offset_index, sink)
572    }
573
574    /// Write a column [`ColumnIndex`] in Thrift format
575    ///
576    /// If `column_index` is [`ColumnIndexMetaData::NONE`] the index will not be written and
577    /// this will return `false`. Returns `true` otherwise.
578    ///
579    /// [`ColumnIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
580    fn write_column_index(
581        &self,
582        column_index: &ColumnIndexMetaData,
583        _column_chunk: &ColumnChunkMetaData,
584        _row_group_idx: usize,
585        _column_idx: usize,
586        sink: impl Write,
587    ) -> Result<bool> {
588        match column_index {
589            // Missing indexes may also have the placeholder ColumnIndexMetaData::NONE
590            ColumnIndexMetaData::NONE => Ok(false),
591            _ => {
592                Self::write_thrift_object(column_index, sink)?;
593                Ok(true)
594            }
595        }
596    }
597
598    /// No-op implementation of row-group metadata encryption
599    fn apply_row_group_encryption(
600        &self,
601        row_groups: Vec<RowGroupMetaData>,
602    ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
603        Ok((row_groups, None))
604    }
605
606    /// Get the "magic" bytes identifying the file type
607    pub fn get_file_magic(&self) -> &[u8; 4] {
608        get_file_magic()
609    }
610}
611
612/// Implementations of [`MetadataObjectWriter`] methods that rely on encryption being enabled
613#[cfg(feature = "encryption")]
614impl MetadataObjectWriter {
615    /// Set the file encryptor to use
616    fn with_file_encryptor(mut self, encryptor: Option<Arc<FileEncryptor>>) -> Self {
617        self.file_encryptor = encryptor;
618        self
619    }
620
621    /// Write [`FileMetaData`] in Thrift format, possibly encrypting it if required
622    ///
623    /// [`FileMetaData`]: https://github.com/apache/parquet-format/tree/master?tab=readme-ov-file#metadata
624    fn write_file_metadata(&self, file_metadata: &FileMeta, mut sink: impl Write) -> Result<()> {
625        match self.file_encryptor.as_ref() {
626            Some(file_encryptor) if file_encryptor.properties().encrypt_footer() => {
627                // First write FileCryptoMetadata
628                let crypto_metadata = Self::file_crypto_metadata(file_encryptor)?;
629                let mut protocol = ThriftCompactOutputProtocol::new(&mut sink);
630                crypto_metadata.write_thrift(&mut protocol)?;
631
632                // Then write encrypted footer
633                let aad = create_footer_aad(file_encryptor.file_aad())?;
634                let mut encryptor = file_encryptor.get_footer_encryptor()?;
635                encrypt_thrift_object(file_metadata, &mut encryptor, &mut sink, &aad)
636            }
637            Some(file_encryptor) if file_metadata.file_metadata.encryption_algorithm.is_some() => {
638                let aad = create_footer_aad(file_encryptor.file_aad())?;
639                let mut encryptor = file_encryptor.get_footer_encryptor()?;
640                write_signed_plaintext_thrift_object(file_metadata, &mut encryptor, &mut sink, &aad)
641            }
642            _ => Self::write_thrift_object(file_metadata, &mut sink),
643        }
644    }
645
646    /// Write a column [`OffsetIndex`] in Thrift format, possibly encrypting it if required
647    ///
648    /// [`OffsetIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
649    fn write_offset_index(
650        &self,
651        offset_index: &OffsetIndexMetaData,
652        column_chunk: &ColumnChunkMetaData,
653        row_group_idx: usize,
654        column_idx: usize,
655        sink: impl Write,
656    ) -> Result<()> {
657        match &self.file_encryptor {
658            Some(file_encryptor) => Self::write_thrift_object_with_encryption(
659                offset_index,
660                sink,
661                file_encryptor,
662                column_chunk,
663                ModuleType::OffsetIndex,
664                row_group_idx,
665                column_idx,
666            ),
667            None => Self::write_thrift_object(offset_index, sink),
668        }
669    }
670
671    /// Write a column [`ColumnIndex`] in Thrift format, possibly encrypting it if required
672    ///
673    /// If `column_index` is [`ColumnIndexMetaData::NONE`] the index will not be written and
674    /// this will return `false`. Returns `true` otherwise.
675    ///
676    /// [`ColumnIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
677    fn write_column_index(
678        &self,
679        column_index: &ColumnIndexMetaData,
680        column_chunk: &ColumnChunkMetaData,
681        row_group_idx: usize,
682        column_idx: usize,
683        sink: impl Write,
684    ) -> Result<bool> {
685        match column_index {
686            // Missing indexes may also have the placeholder ColumnIndexMetaData::NONE
687            ColumnIndexMetaData::NONE => Ok(false),
688            _ => {
689                match &self.file_encryptor {
690                    Some(file_encryptor) => Self::write_thrift_object_with_encryption(
691                        column_index,
692                        sink,
693                        file_encryptor,
694                        column_chunk,
695                        ModuleType::ColumnIndex,
696                        row_group_idx,
697                        column_idx,
698                    )?,
699                    None => Self::write_thrift_object(column_index, sink)?,
700                }
701                Ok(true)
702            }
703        }
704    }
705
706    /// If encryption is enabled and configured, encrypt row group metadata.
707    /// Returns a tuple of the row group metadata to write,
708    /// and possibly unencrypted metadata to be returned to clients if data was encrypted.
709    fn apply_row_group_encryption(
710        &self,
711        row_groups: Vec<RowGroupMetaData>,
712    ) -> Result<(Vec<RowGroupMetaData>, Option<Vec<RowGroupMetaData>>)> {
713        match &self.file_encryptor {
714            Some(file_encryptor) => {
715                let unencrypted_row_groups = row_groups.clone();
716                let encrypted_row_groups = Self::encrypt_row_groups(row_groups, file_encryptor)?;
717                Ok((encrypted_row_groups, Some(unencrypted_row_groups)))
718            }
719            None => Ok((row_groups, None)),
720        }
721    }
722
723    /// Get the "magic" bytes identifying the file type
724    fn get_file_magic(&self) -> &[u8; 4] {
725        get_file_magic(
726            self.file_encryptor
727                .as_ref()
728                .map(|encryptor| encryptor.properties()),
729        )
730    }
731
732    fn write_thrift_object_with_encryption(
733        object: &impl WriteThrift,
734        mut sink: impl Write,
735        file_encryptor: &FileEncryptor,
736        column_metadata: &ColumnChunkMetaData,
737        module_type: ModuleType,
738        row_group_index: usize,
739        column_index: usize,
740    ) -> Result<()> {
741        let column_path_vec = column_metadata.column_path().as_ref();
742
743        let joined_column_path;
744        let column_path = if column_path_vec.len() == 1 {
745            &column_path_vec[0]
746        } else {
747            joined_column_path = column_path_vec.join(".");
748            &joined_column_path
749        };
750
751        if file_encryptor.is_column_encrypted(column_path) {
752            use crate::encryption::encrypt::encrypt_thrift_object;
753
754            let aad = create_module_aad(
755                file_encryptor.file_aad(),
756                module_type,
757                row_group_index,
758                column_index,
759                None,
760            )?;
761            let mut encryptor = file_encryptor.get_column_encryptor(column_path)?;
762            encrypt_thrift_object(object, &mut encryptor, &mut sink, &aad)
763        } else {
764            Self::write_thrift_object(object, sink)
765        }
766    }
767
768    fn get_plaintext_footer_crypto_metadata(
769        &self,
770    ) -> (Option<EncryptionAlgorithm>, Option<Vec<u8>>) {
771        // Only plaintext footers may contain encryption algorithm and footer key metadata.
772        if let Some(file_encryptor) = self.file_encryptor.as_ref() {
773            let encryption_properties = file_encryptor.properties();
774            if !encryption_properties.encrypt_footer() {
775                return (
776                    Some(Self::encryption_algorithm_from_encryptor(file_encryptor)),
777                    encryption_properties.footer_key_metadata().cloned(),
778                );
779            }
780        }
781        (None, None)
782    }
783
784    fn encryption_algorithm_from_encryptor(file_encryptor: &FileEncryptor) -> EncryptionAlgorithm {
785        let supply_aad_prefix = file_encryptor
786            .properties()
787            .aad_prefix()
788            .map(|_| !file_encryptor.properties().store_aad_prefix());
789        let aad_prefix = if file_encryptor.properties().store_aad_prefix() {
790            file_encryptor.properties().aad_prefix()
791        } else {
792            None
793        };
794        EncryptionAlgorithm::AES_GCM_V1(AesGcmV1 {
795            aad_prefix: aad_prefix.cloned(),
796            aad_file_unique: Some(file_encryptor.aad_file_unique().clone()),
797            supply_aad_prefix,
798        })
799    }
800
801    fn file_crypto_metadata(file_encryptor: &'_ FileEncryptor) -> Result<FileCryptoMetaData<'_>> {
802        let properties = file_encryptor.properties();
803        Ok(FileCryptoMetaData {
804            encryption_algorithm: Self::encryption_algorithm_from_encryptor(file_encryptor),
805            key_metadata: properties.footer_key_metadata().map(|v| v.as_slice()),
806        })
807    }
808
809    fn encrypt_row_groups(
810        row_groups: Vec<RowGroupMetaData>,
811        file_encryptor: &Arc<FileEncryptor>,
812    ) -> Result<Vec<RowGroupMetaData>> {
813        row_groups
814            .into_iter()
815            .enumerate()
816            .map(|(rg_idx, mut rg)| {
817                let cols: Result<Vec<ColumnChunkMetaData>> = rg
818                    .columns
819                    .into_iter()
820                    .enumerate()
821                    .map(|(col_idx, c)| {
822                        Self::encrypt_column_chunk(c, file_encryptor, rg_idx, col_idx)
823                    })
824                    .collect();
825                rg.columns = cols?;
826                Ok(rg)
827            })
828            .collect()
829    }
830
831    /// Apply column encryption to column chunk metadata
832    fn encrypt_column_chunk(
833        mut column_chunk: ColumnChunkMetaData,
834        file_encryptor: &Arc<FileEncryptor>,
835        row_group_index: usize,
836        column_index: usize,
837    ) -> Result<ColumnChunkMetaData> {
838        // Column crypto metadata should have already been set when the column was created.
839        // Here we apply the encryption by encrypting the column metadata if required.
840        let encryptor = match column_chunk.column_crypto_metadata.as_deref() {
841            None => None,
842            Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
843                let is_footer_encrypted = file_encryptor.properties().encrypt_footer();
844
845                // When uniform encryption is used the footer is already encrypted,
846                // so the column chunk does not need additional encryption.
847                // Except if we're in plaintext footer mode, then we need to encrypt
848                // the column metadata here.
849                if !is_footer_encrypted {
850                    Some(file_encryptor.get_footer_encryptor()?)
851                } else {
852                    None
853                }
854            }
855            Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(col_key)) => {
856                let column_path = col_key.path_in_schema.join(".");
857                Some(file_encryptor.get_column_encryptor(&column_path)?)
858            }
859        };
860
861        if let Some(mut encryptor) = encryptor {
862            use crate::file::metadata::thrift::serialize_column_meta_data;
863
864            let aad = create_module_aad(
865                file_encryptor.file_aad(),
866                ModuleType::ColumnMetaData,
867                row_group_index,
868                column_index,
869                None,
870            )?;
871            // create temp ColumnMetaData that we can encrypt
872            let mut buffer: Vec<u8> = vec![];
873            {
874                let mut prot = ThriftCompactOutputProtocol::new(&mut buffer);
875                serialize_column_meta_data(&column_chunk, &mut prot)?;
876            }
877            let ciphertext = encryptor.encrypt(&buffer, &aad)?;
878            column_chunk.encrypted_column_metadata = Some(ciphertext);
879            // Track whether the footer is plaintext, which affects how we serialize
880            // the column metadata (we need to write stripped metadata for backward compatibility)
881            column_chunk.plaintext_footer_mode = !file_encryptor.properties().encrypt_footer();
882        }
883
884        Ok(column_chunk)
885    }
886}